In a previous post we discussed the design of zlog, our implementation of the CORFU distributed shared-log protocol on top of Ceph. A key component of the system is the sequencer server that reports the current log tail to clients. In this post we'll discuss the implementation and performance of the sequencer in zlog.

The fast path of the sequencer server is simple. It contains an in-memory counter that is incremented when a client requests the next position in the log. Our current implementation receive requests from clients that are about 60 bytes in size (log identifier, epoch value), and responds with the current counter and epoch totaling about 16 bytes. The current design achieves approximately 180K requests per second with 4 server threads on the loopback interface, and approximately 100K requests per second over the 10 Gb ethernet network on Google Compute Engine.

In contrast, the authors of the Tango paper report sequencer performance of approximately 500K requests per second over a 1 Gb ethernet link. While 100K requests per second for our implementation is more than enough to get started with some interesting applications, it would be nice to reach parity with the results in the Tango paper.

In the remainder of this paper we'll discuss the current implementation. If anyone has suggestions on improving performance that would be great :)

Protocol

Our implementation uses Google Protocol Buffers to encode messages between clients and the sequencer. Below is the specification for the messages. The MSeqRequest is sent by clients, and the MSeqReply is the message type of each server reply.

package zlog_proto;

option optimize_for = SPEED;

message MSeqRequest {
    required uint64 epoch = 1;
    required string pool = 2;
    required string name = 3;
    required bool next = 4;
}

message MSeqReply {
    enum Status {
        OK = 0;
        INIT_LOG = 1;
        STALE_EPOCH = 2;
    }
    required uint64 position = 1;
    optional Status status = 2 [default = OK];
}

Next we'll discuss the server and client designs.

Server

When the server starts up it binds to a specific port --port and starts a set of worker threads --nthreads. The server can also be daemonized but this code has been removed for brevity. The LogManager object tracks log metadata and is used by the server.

int main(int argc, char* argv[])
{
  int port;
  std::string host;
  int nthreads;

  po::options_description desc("Allowed options");
  desc.add_options()
    ("port", po::value<int>(&port)->required(), "Server port")
    ("nthreads", po::value<int>(&nthreads)->default_value(1), "Num threads")
    ("report-sec", po::value<int>(&report_sec)->default_value(0), "Time between rate reports")
    ("daemon,d", "Run in background")
  ;

  po::variables_map vm;
  po::store(po::parse_command_line(argc, argv, desc), vm);
  po::notify(vm);

  if (nthreads <= 0 || nthreads > 64)
    nthreads = 1;

  Server s(port, nthreads);
  log_mgr = new LogManager();
  s.run();

  return 0;
}

The server design uses Boost ASIO and is based on the multi-threaded HTTP server example available on the Boost ASIO website. The server contains a single I/O service object, and we set the TCP_NODELAY option for all new connections.

class Server {
 public:
  Server(short port, std::size_t nthreads)
    : acceptor_(io_service_,
        boost::asio::ip::tcp::endpoint(
          boost::asio::ip::tcp::v4(), port)),
      nthreads_(nthreads)
  {
    acceptor_.set_option(boost::asio::ip::tcp::no_delay(true));
    start_accept();
  }

When the server starts a set of threads all call the run method on the I/O service object.

  void run() {
    std::vector<boost::shared_ptr<boost::thread> > threads;
    for (std::size_t i = 0; i < nthreads_; i++) {
      boost::shared_ptr<boost::thread> thread(new boost::thread(
            boost::bind(&boost::asio::io_service::run, &io_service_)));
      threads.push_back(thread);
    }

    for (std::size_t i = 0; i < threads.size(); ++i)
      threads[i]->join();
  }

New connections are handled by the state machine implemented in the Session object that we'll see next.

 private:
  void start_accept() {
    Session* new_session = new Session(io_service_);
    acceptor_.async_accept(new_session->socket(),
        boost::bind(&Server::handle_accept, this, new_session,
          boost::asio::placeholders::error));
  }

  void handle_accept(Session* new_session,
      const boost::system::error_code& error) {
    if (!error)
      new_session->start();
    else
      delete new_session;
    start_accept();
  }

  boost::asio::io_service io_service_;
  boost::asio::ip::tcp::acceptor acceptor_;
  std::size_t nthreads_;
};

Session

The Session object implements the sequencer protocol state machine. Each connection starts with the client sending us a 32-bit message header.

  void start() {
    boost::asio::async_read(socket_,
        boost::asio::buffer(buffer_, sizeof(uint32_t)),
        boost::bind(&Session::handle_hdr, this,
          boost::asio::placeholders::error,
          boost::asio::placeholders::bytes_transferred));
  }

Next we sanity check the header and read the rest of the message.

  void handle_hdr(const boost::system::error_code& err, size_t size) {
    if (err) {
      delete this;
      return;
    }

    uint32_t msg_size = ntohl(*((uint32_t*)buffer_));

    if (msg_size > sizeof(buffer_)) {
      std::cerr << "message is too large" << std::endl;
      delete this;
      return;
    }

    boost::asio::async_read(socket_,
        boost::asio::buffer(buffer_, msg_size),
        boost::bind(&Session::handle_msg, this,
          boost::asio::placeholders::error,
          boost::asio::placeholders::bytes_transferred));
  }

Next we handle the part of the message with the interesting parts. We start by re-setting a MSeqRequest and initializing it with the received data.

  void handle_msg(const boost::system::error_code& err, size_t size) {
    if (err) {
      delete this;
      return;
    }

    req_.Clear();

    if (!req_.ParseFromArray(buffer_, size)) {
      std::cerr << "failed to parse message" << std::endl;
      delete this;
      return;
    }

    if (!req_.IsInitialized()) {
      std::cerr << "received incomplete message" << std::endl;
      delete this;
      return;
    }

Next we prepare a MSeqReply structure and contact the LogManager with the information in the request. We'll discuss the LogManager in the next section.

    reply_.Clear();

    uint64_t seq;

    int ret = log_mgr->ReadSequence(req_.pool(), req_.name(),
        req_.epoch(), req_.next(), &seq);
    if (ret == -EAGAIN)
      reply_.set_status(zlog_proto::MSeqReply::INIT_LOG);
    else if (ret == -ERANGE)
      reply_.set_status(zlog_proto::MSeqReply::STALE_EPOCH);
    else
      assert(!ret);

Once the LogManager is finished, we record any errors that occurred, and set the position reported by the LogManager and send out the reply back to the client.

    reply_.set_position(seq);

    assert(reply_.IsInitialized());

    uint32_t msg_size = reply_.ByteSize();
    assert(msg_size < sizeof(buffer_));
    assert(reply_.SerializeToArray(buffer_, msg_size));

    // scatter/gather buffers
    std::vector<boost::asio::const_buffer> out;
    be_msg_size_ = htonl(msg_size);
    out.push_back(boost::asio::buffer(&be_msg_size_, sizeof(be_msg_size_)));
    out.push_back(boost::asio::buffer(buffer_, msg_size));

    boost::asio::async_write(socket_, out,
        boost::bind(&Session::handle_reply, this,
          boost::asio::placeholders::error,
          boost::asio::placeholders::bytes_transferred));
  }

Next we'll present the LogManager component of the sequencer server.

Log Manager

The LogManager manages the mapping between log identifiers and the actual tail counter. A tail is represented by a Sequence object:

class Sequence {
 public:
  Sequence() : seq_(0) {}
  Sequence(uint64_t seq) : seq_(seq) {}

  uint64_t read() {
    return seq_;
  }

  void inc() {
    seq_++;
  }

 private:
  uint64_t seq_;
};

A log is identified by the tuple (name, pool) and stored in the LogManager in the Log struct that includes the current epoch value for the log instance:

  struct Log {
    Log() {}
    Log(uint64_t pos, uint64_t epoch) : seq(pos), epoch(epoch) {}
    Sequence seq;
    uint64_t epoch;
  };

The important part is when the log sequences are read and incremented. This is handled by the ReadSequence method called from the server Session object in responce to a client request. The routine looks up the log in a std::map and if it exists the sequence value is returned, and optionally incremented. If the log isn't found in the index the client recieves an -EAGAIN message and a request to register the log is placed in a queue. A separate thread handles the registration of a new log in the LogManager because it is an expensive process that would otherwise block other client requests.

  uint64_t ReadSequence(const std::string& pool, const std::string& name,
      uint64_t epoch, bool increment, uint64_t *seq) {

    boost::unique_lock<boost::mutex> g(lock_);

    std::map<std::pair<std::string, std::string>, Log>::iterator it =
      logs_.find(std::make_pair(pool, name));

    if (it == logs_.end()) {
      QueueLogInit(pool, name);
      return -EAGAIN;
    }

    if (epoch < it->second.epoch)
      return -ERANGE;

    if (increment)
      it->second.seq.inc();

    *seq = it->second.seq.read();

    return 0;
  }

The process of registering a new log includes the slow tail finding mechanism, but is omitted here because it isn't part of the fast path. Next we'll show the client side, which is very straight-forward.

Client

A client starts by connecting to the sequencer server.

void SeqrClient::Connect() {
  boost::asio::ip::tcp::resolver resolver(io_service_);
  boost::asio::ip::tcp::resolver::query query(
      boost::asio::ip::tcp::v4(), host_.c_str(), port_);
  boost::asio::ip::tcp::resolver::iterator iterator = resolver.resolve(query);
  boost::asio::connect(socket_, iterator);
}

Client requests are in the form of a MSeqRequest object and include the metadata for identifying the log.

int SeqrClient::CheckTail(uint64_t epoch, const std::string& pool,
    const std::string& name, uint64_t *position, bool next) {
  // fill in msg
  zlog_proto::MSeqRequest req;
  req.set_epoch(epoch);
  req.set_name(name);
  req.set_next(next);
  req.set_pool(pool);

  // serialize header and protobuf message
  uint32_t msg_size = req.ByteSize();
  uint32_t be_msg_size = htonl(msg_size);
  uint32_t total_msg_size = msg_size + sizeof(be_msg_size);
  assert(total_msg_size <= sizeof(buffer));

  // add header
  memcpy(buffer, &be_msg_size, sizeof(be_msg_size));

  // add protobuf msg
  assert(req.IsInitialized());
  assert(req.SerializeToArray(buffer + sizeof(be_msg_size), msg_size));

  // send
  boost::asio::write(socket_, boost::asio::buffer(buffer, total_msg_size));

The write call is blocking, so after it returns we read the response from the same socket.

  // get reply
  boost::asio::read(socket_, boost::asio::buffer(&be_msg_size, sizeof(be_msg_size)));
  msg_size = ntohl(be_msg_size);
  assert(msg_size < sizeof(buffer));
  boost::asio::read(socket_, boost::asio::buffer(buffer, msg_size));

  // deserialize
  zlog_proto::MSeqReply reply;
  assert(reply.ParseFromArray(buffer, msg_size));
  assert(reply.IsInitialized());

  if (reply.status() == zlog_proto::MSeqReply::INIT_LOG)
    return -EAGAIN;
  else if (reply.status() == zlog_proto::MSeqReply::STALE_EPOCH)
    return -ERANGE;
  else {
    assert(reply.status() == zlog_proto::MSeqReply::OK);
    *position = reply.position();
  }

  return 0;
}

And that's it. Performance is OK at 100K requests per second, but it would be nice to reach the reported 500K requests per second form the Tango paper.

Thoughts

I haven't had much time to focus on optimizing the server.

  1. Some simple hacks suggest that the use of the std::map and shared locking aren't currently a bottleneck.

  2. Apparently there is locking in the I/O service, and we could use multiple I/O service objects, but I'm not familiar enough with Boost ASIO internals to know if this is worth the effort.

I'll be revisting sequencer performance in the future, and will now be focusing on building some applications on top of zlog for demonstration.