As previously discussed, Zlog is an implementation of the CORFU distributed log protocol on top of Ceph. This post will discuss the basics of using the API, and provide details on the new asynchronous API design.

Zlog API Example

First create a connection to the sequencer service:

zlog::SeqrClient seqr("localhost", "5678");

Next create a brand new log. A given log is striped across objects in a RADOS pool. When you create a new log provide a handle to the pool, as well as a striping width and a name for the log.

const int stripe_width = 100;
zlog::Log log;
zlog::Log::Create(ioctx, "mylog", stripe_width, &seqr, log);

Now the log is available to use. In the following code snippet a string is appended to the log which returns the position at which the string was stored. Finally the string at the reported position is read back and verified.

ceph::bufferlist bl_in;
bl_in.append("My first log entry");

uint64_t pos;
log.Append(bl_in, &pos);

ceph::bufferlist bl_out;
log.Read(pos, bl_out);

assert(bl_in == bl_out);

Next we'll discuss the asynchronous versions of Read and Append.

Asynchronous API

The two components of the asynchronous interface that we'll discuss today are AioAppend and AioRead, which are the asynchronous versions of Log::Append and Log::Read, respectively:

/*
 * Append data asynchronously to the log and return its position.
 */
int AioAppend(AioCompletion *c, ceph::bufferlist& data,
    uint64_t *pposition = NULL);

/*
 * Read data asynchronously from the log.
 */
int AioRead(uint64_t position, AioCompletion *c,
    ceph::bufferlist *bpl);

The append interface AioAppend takes a completion context, a blob of data that will be appended, and an optional output parameter that upon success will contain the final location of the data that was appended to the log. Similarly, AioRead will populate the output data parameter with the contents of the log at the provided location (assuming it exists).

The AioCompletion object holds context for the invocation, and is used to discover if the operation was successful. To use the API one must first create an AioCompletion object. The following creates a basic completion object with no callbacks:

AioCompletion *c = zlog::Log::aio_create_completion();

The API also allows a callback to be defined that will automatically be called upon when the operation has completed (either successfully or not). The following creates a completion object that will use aio_cb as the callback, and will pass a pointer to an AioState object that upon successfully appending data will contain the final position. Note that the completion object is used to interogate the return value of the operation using AioCompletion::get_return_value():

struct AioState {
  Log::AioCompletion *c;
  uint64_t position;
}

static void aio_cb(completion_t cb, void *arg)
{
  AioState *s = (AioState*)arg;

  if (s->get_return_value() == 0)
    std::cout << "pos: " << s->position << std::endl;
  else
    std::cout << "pos: error" << std::endl;
}

AioState *s = new AioState;
ceph::bufferlist bl; // data to append
s->c = Log::aio_create_completion(s, aio_cb);

Once the callback and completion have been setup, use AioAppend to dispatch the operation:

log.AioAppend(s->c, bl, &s->position);

A thread can block until an asychronous operation completes by using AioCompletion::wait_for_complete():

s->c->wait_on_complete();

When a completion is no long used the application should free the object using the AioCompletion::release() interface:

s->c->release();

Asynchronous API Implementation

Building an asynchronous API for Zlog can be challenging because some interface (e.g. Append) may need to retry using a new position. Rather than completely reinvent the internal guts of an asynchronous API I instead have built the API as a layer on top of the RADOS asychronous API.

When a new AIO completion context is created we build a data structure that holds information about the context:

Log::AioCompletion *Log::aio_create_completion(void *arg,
    Log::AioCompletion::callback_t cb)
{
  AioCompletionImpl *impl = new AioCompletionImpl;
  impl->safe_cb = cb;
  impl->safe_cb_arg = arg;
  return new AioCompletion(impl);
}

Log::AioCompletion *Log::aio_create_completion()
{
  return aio_create_completion(NULL, NULL);
}

The internal completion object contains several pieces of state:

struct zlog::Log::AioCompletionImpl {
  std::condition_variable cond;
  std::mutex lock;
  int ref;
  bool complete;
  bool released;
...

The above state are used to control concurreny, and track users of the completion object. Next we have some more interesting components:

...
  /*
   * Common
   *
   * position:
   *   - current attempt (append)
   *   - target (read)
   * bl:
   *  - data being appended (append)
   *  - temp storage for read (read)
   */
  int retval;
  Log::AioCompletion::callback_t safe_cb;
  void *safe_cb_arg;
  uint64_t position;
  ceph::bufferlist bl;
  AioType type;
...

The above are common to all types of asychronous operations (currently reads and appends). The retval component holds the final return value of the operation as if the synchronous version of Log::Append or Log::Read were called. Next are the callback function pointer and callback context stored when the completion was created. The position element holds the final location for appends, or the target location for reads. Data for reads and appends are stored in bl, and type marks the type of operation (currently read or append). Next we have type-specific fields:

...
  /*
   * AioAppend
   *
   * pposition:
   *  - final append position
   */
  uint64_t *pposition;
...

This pointer is used to store the location that the final position on an append should be stored which is provided by the API client.

...
  /*
   * AioRead
   *
   * pbl:
   *  - where to put result
   */
  ceph::bufferlist *pbl;
...

Likewise for a read the pbl pointer specifies where the retrieved data should be stored.

...
  Log *log;
  librados::IoCtx *ioctx;
  librados::AioCompletion *c;
};

Finally, a reference to the underlying log and RADOS context are stored, along with the RADOS completion context.

Setup of Append Operation

The intial setup involves first looking up the current tail position. This is our first guess about where the data will be appended.

int Log::AioAppend(AioCompletion *c, ceph::bufferlist& data,
    uint64_t *pposition)
{
  // initial position guess
  uint64_t position;
  int ret = CheckTail(&position, true);
  if (ret)
    return ret;

Next the AIO completion context is filled in (see above for a description of all these fields).

  AioCompletionImpl *impl = (AioCompletionImpl*)c->pc;

  impl->log = this;
  impl->bl = data;
  impl->position = position;
  impl->pposition = pposition;
  impl->ioctx = ioctx_;
  impl->type = ZLOG_AIO_APPEND;

A reference is acquired on behalf of the client, and the underlying RADOS completion object is created. Note that the RADOS completion is setup to use aio_safe_cb as the callback which implements the retry logic for appending which we'll discuss later.

  impl->get(); // rados aio now has a reference
  impl->c = librados::Rados::aio_create_completion(impl, NULL, aio_safe_cb);
  assert(impl->c);

Finally we create and dispatch the asynchronous operation and immediately return the caller.

  librados::ObjectWriteOperation op;
  zlog::cls_zlog_write(op, epoch_, position, data);

  std::string oid = position_to_oid(position);
  ret = ioctx_->aio_operate(oid, impl->c, &op);

  return ret;
}

Setup of Read Operation

Setup for AioRead is easy. First the context is filled in:

int Log::AioRead(uint64_t position, AioCompletion *c,
    ceph::bufferlist *pbl)
{
  AioCompletionImpl *impl = (AioCompletionImpl*)c->pc;

  impl->log = this;
  impl->pbl = pbl;
  impl->position = position;
  impl->ioctx = ioctx_;
  impl->type = ZLOG_AIO_READ;

Grab a reference for the caller, and setup the underlying RADOS callback which uses the same callback as AioAppend:

  impl->get(); // rados aio now has a reference
  impl->c = librados::Rados::aio_create_completion(impl, NULL, aio_safe_cb);
  assert(impl->c);

Finally the operation is prepared and asynchronously dispatched, immediately returning to the caller.

  librados::ObjectReadOperation op;
  zlog::cls_zlog_read(op, epoch_, position);

  std::string oid = position_to_oid(position);
  int ret = ioctx_->aio_operate(oid, impl->c, &op, &impl->bl);

  return ret;
}

Asynchronous Callback

The guts of the asynchronous API are in the callback which implements the Zlog specific semantics on top of the underlying RADOS callback mechanism. Note that this callback is invovked when the underlying RADOS operation completes successfully or experiences a failure, so both cases must be handled. Additionally, this is used to handle both reads and appends.

void aio_safe_cb(librados::completion_t cb, void *arg)
{
  zlog::Log::AioCompletionImpl *impl = (zlog::Log::AioCompletionImpl*)arg;
  librados::AioCompletion *rc = impl->c;
  bool finish = false;

  impl->lock.lock();

  int ret = rc->get_return_value();

  // done with the rados completion
  rc->release();

  assert(impl->type == ZLOG_AIO_APPEND ||
         impl->type == ZLOG_AIO_READ);

When the callback is made we first initialize the finish variable to false which indicates whether or not we will dispatch a new RADOS operation asychronously (e.g. retrying an append), or invoke the Zlog client callback. The value stored in ret contains the return value of the RADOS operation, and we immediately release the resources associated with the RADOS completion object which won't be needed further.

Next we perform tasks based on the return value. First we consider the case that everything succeeded on the RADOS side:

  if (ret == zlog::CLS_ZLOG_OK) {
    /*
     * Append was successful. We're done.
     */
    if (impl->type == ZLOG_AIO_APPEND && impl->pposition) {
      *impl->pposition = impl->position;
    } else if (impl->type == ZLOG_AIO_READ && impl->pbl &&
        impl->bl.length() > 0) {
      *impl->pbl = impl->bl;
    }
    ret = 0;
    finish = true;

If an append is being performed, then we write the final position back into the location specified by the client. If a read is being performed we copy the data back to the client. Note that in this case we also are done, so we set finish to be true.

Next we consider the case that the operation was tagged with a stale epoch value. In this case we want to resubmit the operation again with a new epoch value. First we refresh the log. If an error occured during refresh we will return an error to the client, hence we set finish to be true. Otherwise if the refresh was successful we fall through.

  } else if (ret == zlog::CLS_ZLOG_STALE_EPOCH) {
    /*
     * We'll need to try again with a new epoch.
     */
    ret = impl->log->RefreshProjection();
    if (ret)
      finish = true;

Next we consider a generic RADOS error which will also result in returning the error to the client.

  } else if (ret < 0) {
    /*
     * Encountered a RADOS error.
     */
    finish = true;

The operation could also encounter a NOT_WRITTEN state which is returned to the client, but we assert invariants of the protocol which state that this return value is only valid if a read is being performed. The case is the same for the INVALIDATED return value.

  } else if (ret == zlog::CLS_ZLOG_NOT_WRITTEN) {
    assert(impl->type == ZLOG_AIO_READ);
    ret = -ENODEV;
    finish = true;
  } else if (ret == zlog::CLS_ZLOG_INVALIDATED) {
    assert(impl->type == ZLOG_AIO_READ);
    ret = -EFAULT;
    finish = true;
  } else {

The final case is that an append is being performed and the position was marked as READ_ONLY. All other cases are major failures.

    if (impl->type == ZLOG_AIO_APPEND)
      assert(ret == zlog::CLS_ZLOG_READ_ONLY);
    else
      assert(0);
  }

If finish is false then we will not yet return to the client because there needs to be a retry of the operation. First, if an append is happening then we must get a new tail position to try.

  if (!finish) {
    if (impl->type == ZLOG_AIO_APPEND) {
      // if we are appending, get a new position
      uint64_t position;
      ret = impl->log->CheckTail(&position, true);
      if (ret)
        finish = true;
      else
        impl->position = position;
    }

If a failure occurs we return to the client, otherwise we setup a new RADOS completion object and prepare it.

    // we are still good. build a new aio
    if (!finish) {
      impl->c = librados::Rados::aio_create_completion(impl, NULL, aio_safe_cb);
      assert(impl->c);
      // don't need impl->get(): reuse reference

Finally we do any operation-specific setup and resubmit the operation which will result in this callback being invoked again asynchronously.

      // build and submit new op
      std::string oid = impl->log->position_to_oid(impl->position);
      switch (impl->type) {
        case ZLOG_AIO_APPEND:
          {
            librados::ObjectWriteOperation op;
            zlog::cls_zlog_write(op, impl->log->epoch_, impl->position, impl->bl);
            ret = impl->ioctx->aio_operate(oid, impl->c, &op);
            if (ret)
              finish = true;
          }
          break;

        case ZLOG_AIO_READ:
          {
            librados::ObjectReadOperation op;
            zlog::cls_zlog_read(op, impl->log->epoch_, impl->position);
            ret = impl->ioctx->aio_operate(oid, impl->c, &op, &impl->bl);
            if (ret)
              finish = true;
          }
          break;

        default:
          assert(0);
      }
    }
  }

This retry process continues until success or error occurs which will set finish to be true. In this case the Zlog completion object is finalized and the client callback is invoked.

  // complete aio if append success, or any error
  if (finish) {
    impl->retval = ret;
    impl->complete = true;
    impl->lock.unlock();
    if (impl->safe_cb)
      impl->safe_cb(impl, impl->safe_cb_arg);
    impl->cond.notify_all();
    impl->lock.lock();
    impl->put_unlock();
    return;
  }

  impl->lock.unlock();
}

And that is it. Without utilizing the RADOS asynchronous interface a lot more infrastructure would have to have been created.