Overview

We'll look today at generational events and active messages in Realm.

About Legion Runtime Class

These notes are closely based on the set of Legion Runtime Class videos produced by the Legion developers. They are my own notes and code walks, and any errors or things that are just plain wrong represent my own mistakes.

Today's notes are based on the following video:

Generational Events

Events in Realm have an ID value that we have seen in the previous talk on Realm. Additionally, Realm events have a generation field that allows the runtime to overlay distinct events on the same C++ structure to avoid reclaiming events.

class Event {
public:
  typedef IDType id_t;
  typedef unsigned gen_t;
...

Events have a relatively simple interface. The has_triggered is a fast check to determine if the event has triggered. Threads can block waiting for an event to be triggered, or can register as a waiter without blocking.

class GenEventImpl : public Event::Impl {
public:
  // record that the event has triggered and notify anybody who cares
  void trigger(Event::gen_t gen_triggered, int trigger_node, Event wait_on = Event::NO_EVENT);

  // test whether an event has triggered without waiting
  virtual bool has_triggered(Event::gen_t needed_gen);

  virtual void external_wait(Event::gen_t needed_gen);

  virtual bool add_waiter(Event::gen_t needed_gen, EventWaiter *waiter/*, bool pre_subscribed = false*/);

When an event is created the node it is created on is considered the owning node. An event tracks the waiters on a event that are local to the node, and also tracks the set of remote nodes that have waiters on this event. Local waiters are notified without network communication when an event triggers, while active messages are used to communicate with remote waiters. The NodeMask structure below is used to track the remote nodes with event waiters.

  NodeMask remote_waiters;
  std::vector<EventWaiter *> local_waiters; // set of local threads that are waiting on event

When an event is checked for its trigger condition the has_triggered interface is used. The ID is used to resolve to the event's implementation, and has_triggered is called on that implementation providing the event's generation.

bool Event::has_triggered(void) const
{
  DetailedTimer::ScopedPush sp(TIME_LOW_LEVEL);
  if(!id) return true; // special case: NO_EVENT has always triggered
  Event::Impl *e = get_runtime()->get_event_impl(*this);
  return e->has_triggered(gen);
}

The event ID is used to resolve the implementation using this routine:

Event::Impl *Runtime::Impl::get_event_impl(Event e)
{
  ID id(e);
  switch(id.type()) {
  case ID::ID_EVENT:
    return get_genevent_impl(e);
  case ID::ID_BARRIER:
    return get_barrier_impl(e);
  default:
    assert(0);
  }
}

The has_triggered method on the generational event is as follows:

bool GenEventImpl::has_triggered(Event::gen_t needed_gen)
{
  return (needed_gen <= generation);
}

It is clear that this check is inherently a race condition. However, by exploiting the property of events that they can only transition from the state untriggered to triggered, we can safely proceed if we observe that an event has triggered.

It is also possible that an event is triggering concurrently with this check, in which case if the condition evaluates to false a slow-path using locks can be used. In this way has_triggered is often used as a fast check before falling back to locks.

If an event hasn't triggered, often a thread will want to defer some work until that event has triggered. One way to do this is to block the caller until the condition has triggered. Typically a method like external_wait will be called after a thread has evaluated has_triggered to false. After taking a lock the gen_needed > generation expression is re-evaluated to resolve the race condition previously described.

When the event has been checked to guarantee that it hasn't triggered the calling thread will block on a condition variable that will trigger in the future.

void GenEventImpl::external_wait(Event::gen_t gen_needed)
{
  GASNetCondVar cv(mutex);
  PthreadCondWaiter w(cv);
  {
    AutoHSLLock a(mutex);

    if(gen_needed > generation) {
      local_waiters.push_back(&w);

      if((owner != gasnet_mynode()) && (gen_needed > gen_subscribed)) {
        printf("AAAH!  Can't subscribe to another node's event in external_wait()!\n");
        exit(1);
      }

      // now just sleep on the condition variable - hope we wake up
      cv.wait();
    }
  }
}

Another example of adding a waiter is when triggering an event. When an event is triggered a precondition can be provided to construct dependency chains. Notice the fast check has_triggered, in which case the triggering of this event is deferred until the wait_on event triggers.

void GenEventImpl::trigger(Event::gen_t gen_triggered, int trigger_node, Event wait_on)
{
  if(!wait_on.has_triggered()) {
    // deferred trigger
    wait_on.impl()->add_waiter(wait_on.gen, new DeferredEventTrigger(this));
    return;
  }

  ...
}

But notice the race above. Since has_triggered may return a false negative and the caller immediately returns, then add_waiter must explicitly trigger this event in the case that the event triggers concurrently with add_waiter.

The first thing that occurs is a race-free check for triggering condition. If we see that the event has triggered, we are done, but we record trigger_now as true and after releasing our mutex will go ahead and trigger the waiter (described above in trigger).

When the event hasn't triggered we are added the waiters list, and also we subscribe to a remote node if the event is owned by another node.

bool GenEventImpl::add_waiter(Event::gen_t needed_gen, EventWaiter *waiter)
{
  bool trigger_now = false;

  int subscribe_owner = -1;
  EventSubscribeArgs args;
  {
    AutoHSLLock a(mutex);

    if(needed_gen > generation) {

      // do we need to subscribe?
      if((owner != gasnet_mynode()) && (gen_subscribed < needed_gen)) {
        gen_subscribed = needed_gen;
    subscribe_owner = owner;
        ...
      }

      // now we add to the local waiter list
      local_waiters.push_back(waiter);
    } else {
      // event we are interested in has already triggered!
      trigger_now = true; // actually do trigger outside of mutex
    }
  }

At end of the routine we'll send out subscription requests and try to trigger the event if necessarily.

  if((subscribe_owner != -1))
    EventSubscribeMessage::request(owner, args);

  if(trigger_now) {
    bool nuke = waiter->event_triggered();
    if(nuke)
      delete waiter;
  }

  return true;  // waiter is always either enqueued or triggered right now
}

Active Messages

Brief overview of active messages, which are implemented on top of GASNet active messages. Each active message in GASNet needs a unique ID. Here are just a few:

enum ActiveMessageIDs {
  FIRST_AVAILABLE = 140,
  NODE_ANNOUNCE_MSGID,
  SPAWN_TASK_MSGID,
  EVENT_SUBSCRIBE_MSGID,
  EVENT_TRIGGER_MSGID,
  REMOTE_MALLOC_MSGID,
...

Each event also has a set of arguments and optionally a blob of data. For EventTriggerMessage that we have seen we need to record what node did the trigger and the event being triggered.

struct EventTriggerArgs {
  gasnet_node_t node;
  Event event;
};

The GASNet active message system is low-level and not convenient to program against. Legion wraps this system with higher-level abstractions using C++ templates that make things much nicer. Here is the event trigger message type. The ShortNoReply refers to the type of GASNet message. A short message only transports arguments and no data blob. In this case there is also no reply.

typedef ActiveMessageShortNoReply<EVENT_TRIGGER_MSGID,
    EventTriggerArgs, handle_event_trigger> EventTriggerMessage;

The handle_event_trigger is the handler for this particular event. It's quite simple. It looks at the event, grabs the implementation, and calls trigger. So it is really like an RPC in spirit.

void handle_event_trigger(EventTriggerArgs args)
{
  DetailedTimer::ScopedPush sp(TIME_LOW_LEVEL);
  GenEventImpl *impl = get_runtime()->get_genevent_impl(args.event);
  impl->trigger(args.event.gen, args.node);
}

We saw a couple instances of where the event trigger message was constructed. Now it should make more sense:

EventTriggerArgs args;
args.node = trigger_node;
args.event = me.convert<Event>();
args.event.gen = gen_triggered;
EventTriggerMessage::request(owner, args);

New message types are registered with GASNet when the system starts up. Here are a few:

gasnet_handlerentry_t handlers[128];
int hcount = 0;
hcount += NodeAnnounceMessage::add_handler_entries(&handlers[hcount], "Node Announce AM");
hcount += SpawnTaskMessage::add_handler_entries(&handlers[hcount], "Spawn Task AM");
hcount += LockRequestMessage::add_handler_entries(&handlers[hcount], "Lock Request AM");
hcount += EventTriggerMessage::add_handler_entries(&handlers[hcount], "Event Trigger AM");

Medium GASNet events are like short events, but can also be passed a blob of arbitrary data. There are other Legion-level wrappers around medium events that make using them easier. In this case they also take a pointer and data length. Here is the template:

template <int MSGID, class MSGTYPE, void (*FNPTR)(MSGTYPE, const void *, size_t)>
class ActiveMessageMediumNoReply {
 public:
...

Finally, a payload mode is used to express how the blob should be treated. For instance, PAYLOAD_NONE indicates there is no payload beyond the message arguments.

enum { PAYLOAD_NONE, // no payload in packet
       PAYLOAD_KEEP, // use payload pointer, guaranteed to be stable
       PAYLOAD_FREE, // take ownership of payload, free when done
       PAYLOAD_COPY, // make a copy of the payload
       PAYLOAD_SRCPTR, // payload has been copied to the src data pool
       PAYLOAD_PENDING, // payload needs to be copied, but hasn't yet
       PAYLOAD_KEEPREG, // use payload pointer, AND it's registered!
       PAYLOAD_EMPTY, // message can have payload, but this one is 0 bytes
};

The DMA system is one part of the system that utilizes PAYLOAD_KEEP. This mode tells the active message system that the data blob will be valid and not change until the message is delivered successfully. This can be used to avoid making data copies. If the data may change during delivery, PAYLOAD_COPY can be used.