Zlog Asynchronous I/O Support
TweetAs previously discussed, Zlog is an implementation of the CORFU distributed log protocol on top of Ceph. This post will discuss the basics of using the API, and provide details on the new asynchronous API design.
Zlog API Example
First create a connection to the sequencer service:
zlog::SeqrClient seqr("localhost", "5678");
Next create a brand new log. A given log is striped across objects in a RADOS pool. When you create a new log provide a handle to the pool, as well as a striping width and a name for the log.
const int stripe_width = 100;
zlog::Log log;
zlog::Log::Create(ioctx, "mylog", stripe_width, &seqr, log);
Now the log is available to use. In the following code snippet a string is appended to the log which returns the position at which the string was stored. Finally the string at the reported position is read back and verified.
ceph::bufferlist bl_in;
bl_in.append("My first log entry");
uint64_t pos;
log.Append(bl_in, &pos);
ceph::bufferlist bl_out;
log.Read(pos, bl_out);
assert(bl_in == bl_out);
Next we'll discuss the asynchronous versions of Read
and Append
.
Asynchronous API
The two components of the asynchronous interface that we'll discuss today are
AioAppend
and AioRead
, which are the asynchronous versions of
Log::Append
and Log::Read
, respectively:
/*
* Append data asynchronously to the log and return its position.
*/
int AioAppend(AioCompletion *c, ceph::bufferlist& data,
uint64_t *pposition = NULL);
/*
* Read data asynchronously from the log.
*/
int AioRead(uint64_t position, AioCompletion *c,
ceph::bufferlist *bpl);
The append interface AioAppend
takes a completion context, a blob of data
that will be appended, and an optional output parameter that upon success will
contain the final location of the data that was appended to the log.
Similarly, AioRead
will populate the output data parameter with the
contents of the log at the provided location (assuming it exists).
The AioCompletion
object holds context for the invocation, and is used to
discover if the operation was successful. To use the API one must first create
an AioCompletion
object. The following creates a basic completion object
with no callbacks:
AioCompletion *c = zlog::Log::aio_create_completion();
The API also allows a callback to be defined that will automatically be called
upon when the operation has completed (either successfully or not). The
following creates a completion object that will use aio_cb
as the callback,
and will pass a pointer to an AioState
object that upon successfully
appending data will contain the final position. Note that the completion
object is used to interogate the return value of the operation using
AioCompletion::get_return_value()
:
struct AioState {
Log::AioCompletion *c;
uint64_t position;
}
static void aio_cb(completion_t cb, void *arg)
{
AioState *s = (AioState*)arg;
if (s->get_return_value() == 0)
std::cout << "pos: " << s->position << std::endl;
else
std::cout << "pos: error" << std::endl;
}
AioState *s = new AioState;
ceph::bufferlist bl; // data to append
s->c = Log::aio_create_completion(s, aio_cb);
Once the callback and completion have been setup, use AioAppend
to dispatch
the operation:
log.AioAppend(s->c, bl, &s->position);
A thread can block until an asychronous operation completes by using
AioCompletion::wait_for_complete()
:
s->c->wait_on_complete();
When a completion is no long used the application should free the object using
the AioCompletion::release()
interface:
s->c->release();
Asynchronous API Implementation
Building an asynchronous API for Zlog can be challenging because some
interface (e.g. Append
) may need to retry using a new position. Rather than
completely reinvent the internal guts of an asynchronous API I instead have
built the API as a layer on top of the RADOS asychronous API.
When a new AIO completion context is created we build a data structure that holds information about the context:
Log::AioCompletion *Log::aio_create_completion(void *arg,
Log::AioCompletion::callback_t cb)
{
AioCompletionImpl *impl = new AioCompletionImpl;
impl->safe_cb = cb;
impl->safe_cb_arg = arg;
return new AioCompletion(impl);
}
Log::AioCompletion *Log::aio_create_completion()
{
return aio_create_completion(NULL, NULL);
}
The internal completion object contains several pieces of state:
struct zlog::Log::AioCompletionImpl {
std::condition_variable cond;
std::mutex lock;
int ref;
bool complete;
bool released;
...
The above state are used to control concurreny, and track users of the completion object. Next we have some more interesting components:
...
/*
* Common
*
* position:
* - current attempt (append)
* - target (read)
* bl:
* - data being appended (append)
* - temp storage for read (read)
*/
int retval;
Log::AioCompletion::callback_t safe_cb;
void *safe_cb_arg;
uint64_t position;
ceph::bufferlist bl;
AioType type;
...
The above are common to all types of asychronous operations (currently reads
and appends). The retval
component holds the final return value of the operation
as if the synchronous version of Log::Append
or Log::Read
were called.
Next are the callback function pointer and callback context stored when the
completion was created. The position
element holds the final location for
appends, or the target location for reads. Data for reads and appends are
stored in bl
, and type
marks the type of operation (currently read or
append). Next we have type-specific fields:
...
/*
* AioAppend
*
* pposition:
* - final append position
*/
uint64_t *pposition;
...
This pointer is used to store the location that the final position on an append should be stored which is provided by the API client.
...
/*
* AioRead
*
* pbl:
* - where to put result
*/
ceph::bufferlist *pbl;
...
Likewise for a read the pbl
pointer specifies where the retrieved data
should be stored.
...
Log *log;
librados::IoCtx *ioctx;
librados::AioCompletion *c;
};
Finally, a reference to the underlying log and RADOS context are stored, along with the RADOS completion context.
Setup of Append Operation
The intial setup involves first looking up the current tail position. This is our first guess about where the data will be appended.
int Log::AioAppend(AioCompletion *c, ceph::bufferlist& data,
uint64_t *pposition)
{
// initial position guess
uint64_t position;
int ret = CheckTail(&position, true);
if (ret)
return ret;
Next the AIO completion context is filled in (see above for a description of all these fields).
AioCompletionImpl *impl = (AioCompletionImpl*)c->pc;
impl->log = this;
impl->bl = data;
impl->position = position;
impl->pposition = pposition;
impl->ioctx = ioctx_;
impl->type = ZLOG_AIO_APPEND;
A reference is acquired on behalf of the client, and the underlying RADOS
completion object is created. Note that the RADOS completion is setup to use
aio_safe_cb
as the callback which implements the retry logic for appending
which we'll discuss later.
impl->get(); // rados aio now has a reference
impl->c = librados::Rados::aio_create_completion(impl, NULL, aio_safe_cb);
assert(impl->c);
Finally we create and dispatch the asynchronous operation and immediately return the caller.
librados::ObjectWriteOperation op;
zlog::cls_zlog_write(op, epoch_, position, data);
std::string oid = position_to_oid(position);
ret = ioctx_->aio_operate(oid, impl->c, &op);
return ret;
}
Setup of Read Operation
Setup for AioRead
is easy. First the context is filled in:
int Log::AioRead(uint64_t position, AioCompletion *c,
ceph::bufferlist *pbl)
{
AioCompletionImpl *impl = (AioCompletionImpl*)c->pc;
impl->log = this;
impl->pbl = pbl;
impl->position = position;
impl->ioctx = ioctx_;
impl->type = ZLOG_AIO_READ;
Grab a reference for the caller, and setup the underlying RADOS callback which
uses the same callback as AioAppend
:
impl->get(); // rados aio now has a reference
impl->c = librados::Rados::aio_create_completion(impl, NULL, aio_safe_cb);
assert(impl->c);
Finally the operation is prepared and asynchronously dispatched, immediately returning to the caller.
librados::ObjectReadOperation op;
zlog::cls_zlog_read(op, epoch_, position);
std::string oid = position_to_oid(position);
int ret = ioctx_->aio_operate(oid, impl->c, &op, &impl->bl);
return ret;
}
Asynchronous Callback
The guts of the asynchronous API are in the callback which implements the Zlog specific semantics on top of the underlying RADOS callback mechanism. Note that this callback is invovked when the underlying RADOS operation completes successfully or experiences a failure, so both cases must be handled. Additionally, this is used to handle both reads and appends.
void aio_safe_cb(librados::completion_t cb, void *arg)
{
zlog::Log::AioCompletionImpl *impl = (zlog::Log::AioCompletionImpl*)arg;
librados::AioCompletion *rc = impl->c;
bool finish = false;
impl->lock.lock();
int ret = rc->get_return_value();
// done with the rados completion
rc->release();
assert(impl->type == ZLOG_AIO_APPEND ||
impl->type == ZLOG_AIO_READ);
When the callback is made we first initialize the finish
variable to false
which indicates whether or not we will dispatch a new RADOS operation
asychronously (e.g. retrying an append), or invoke the Zlog client callback.
The value stored in ret
contains the return value of the RADOS operation,
and we immediately release the resources associated with the RADOS completion
object which won't be needed further.
Next we perform tasks based on the return value. First we consider the case that everything succeeded on the RADOS side:
if (ret == zlog::CLS_ZLOG_OK) {
/*
* Append was successful. We're done.
*/
if (impl->type == ZLOG_AIO_APPEND && impl->pposition) {
*impl->pposition = impl->position;
} else if (impl->type == ZLOG_AIO_READ && impl->pbl &&
impl->bl.length() > 0) {
*impl->pbl = impl->bl;
}
ret = 0;
finish = true;
If an append is being performed, then we write the final position back into
the location specified by the client. If a read is being performed we copy the
data back to the client. Note that in this case we also are done, so we set
finish
to be true.
Next we consider the case that the operation was tagged with a stale epoch
value. In this case we want to resubmit the operation again with a new epoch
value. First we refresh the log. If an error occured during refresh we will
return an error to the client, hence we set finish
to be true. Otherwise if
the refresh was successful we fall through.
} else if (ret == zlog::CLS_ZLOG_STALE_EPOCH) {
/*
* We'll need to try again with a new epoch.
*/
ret = impl->log->RefreshProjection();
if (ret)
finish = true;
Next we consider a generic RADOS error which will also result in returning the error to the client.
} else if (ret < 0) {
/*
* Encountered a RADOS error.
*/
finish = true;
The operation could also encounter a NOT_WRITTEN
state which is returned to
the client, but we assert invariants of the protocol which state that this
return value is only valid if a read is being performed. The case is the same
for the INVALIDATED
return value.
} else if (ret == zlog::CLS_ZLOG_NOT_WRITTEN) {
assert(impl->type == ZLOG_AIO_READ);
ret = -ENODEV;
finish = true;
} else if (ret == zlog::CLS_ZLOG_INVALIDATED) {
assert(impl->type == ZLOG_AIO_READ);
ret = -EFAULT;
finish = true;
} else {
The final case is that an append is being performed and the position was
marked as READ_ONLY
. All other cases are major failures.
if (impl->type == ZLOG_AIO_APPEND)
assert(ret == zlog::CLS_ZLOG_READ_ONLY);
else
assert(0);
}
If finish
is false then we will not yet return to the client because there
needs to be a retry of the operation. First, if an append is happening then we
must get a new tail position to try.
if (!finish) {
if (impl->type == ZLOG_AIO_APPEND) {
// if we are appending, get a new position
uint64_t position;
ret = impl->log->CheckTail(&position, true);
if (ret)
finish = true;
else
impl->position = position;
}
If a failure occurs we return to the client, otherwise we setup a new RADOS completion object and prepare it.
// we are still good. build a new aio
if (!finish) {
impl->c = librados::Rados::aio_create_completion(impl, NULL, aio_safe_cb);
assert(impl->c);
// don't need impl->get(): reuse reference
Finally we do any operation-specific setup and resubmit the operation which will result in this callback being invoked again asynchronously.
// build and submit new op
std::string oid = impl->log->position_to_oid(impl->position);
switch (impl->type) {
case ZLOG_AIO_APPEND:
{
librados::ObjectWriteOperation op;
zlog::cls_zlog_write(op, impl->log->epoch_, impl->position, impl->bl);
ret = impl->ioctx->aio_operate(oid, impl->c, &op);
if (ret)
finish = true;
}
break;
case ZLOG_AIO_READ:
{
librados::ObjectReadOperation op;
zlog::cls_zlog_read(op, impl->log->epoch_, impl->position);
ret = impl->ioctx->aio_operate(oid, impl->c, &op, &impl->bl);
if (ret)
finish = true;
}
break;
default:
assert(0);
}
}
}
This retry process continues until success or error occurs which will set
finish
to be true. In this case the Zlog completion object is finalized and
the client callback is invoked.
// complete aio if append success, or any error
if (finish) {
impl->retval = ret;
impl->complete = true;
impl->lock.unlock();
if (impl->safe_cb)
impl->safe_cb(impl, impl->safe_cb_arg);
impl->cond.notify_all();
impl->lock.lock();
impl->put_unlock();
return;
}
impl->lock.unlock();
}
And that is it. Without utilizing the RADOS asynchronous interface a lot more infrastructure would have to have been created.