About Legion Runtime Class

These notes are closely based on the set of Legion Runtime Class videos produced by the Legion developers. They are my own notes and code walks, and any errors or things that are just plain wrong represent my own mistakes.

Today's notes are based on the following video:

Overview

The notes for this class with cover some of the hardware resource managers that the high-level runtime service provides as some of its internal services. The managers covered at the MemoryManager, MessageManager, and the ProcessorManager.

Memory Manager

The MemoryManager tracks the usage of memories provided by the low-level runtime. An MemoryManager instance is created for every low-level memory, including both local and remote memories. Instances of the manager are instantiated lazily since there may be thousands of distinct memories in a large machine with tasks on nodes accessing only a subset of those memories. You'll find the MemoryManager declaration in runtime.h:

/**
 * \class MemoryManager
 * The goal of the memory manager is to keep track of all of
 * the physical instances that the runtime knows about in various
 * memories throughout the system.  This will then allow for
 * feedback when mapping to know when memories are nearing
 * their capacity.
 */
class MemoryManager {
  public:
    MemoryManager(Memory mem, Runtime *rt);
    MemoryManager(const MemoryManager &rhs);
    ~MemoryManager(void);
...

During physical dependence analysis instances are created. When this is done the high-level runtime will register that the instance has been created with a memory manager. This is the entrance point for registering physical instances with a memory manager:

void Runtime::allocate_physical_instance(PhysicalManager *instance)
{
  find_memory(instance->memory)->allocate_physical_instance(instance);
}

The find_memory method is used to find the memory manager for a particular memory, and is also responsible for the lazy instantiation of new memory managers. The Memory object represented here is a low-level memory (i.e. a small handle), and a map from the memory handle to the manager is maintained by the runtime. First we check if a manager already exists for the memory in the mapping, and if not, a new memory manager is created and cached:

MemoryManager* Runtime::find_memory(Memory mem)
{
  AutoLock m_lock(memory_manager_lock);

  // already have a manager for this memory?
  std::map<Memory,MemoryManager*>::const_iterator finder = memory_managers.find(mem);
  if (finder != memory_managers.end())
      return finder->second;

  // lazy creation
  MemoryManager *result = new MemoryManager(mem, this);
  memory_managers[mem] = result;
  return result;
}

Once we get back the memory manager instance in the runtime, allocate_physical_instance on the manager instance is called.

void MemoryManager::allocate_physical_instance(PhysicalManager *manager)
{
  const size_t inst_size = manager->get_instance_size();

  AutoLock m_lock(manager_lock);

  // track capacity
  remaining_capacity -= inst_size;

  // categorize the memory into the correct data structure
  if (manager->is_reduction_manager()) {
    ReductionManager *reduc = manager->as_reduction_manager();
    reduction_instances[reduc] = inst_size;
  } else {
    InstanceManager *inst = manager->as_instance_manager();
    physical_instances[inst] = inst_size;
  }
}

In addition to notifying the memory manager about allocated physical instances, there are additional calls for freeing instances. It is important to note that memory managers represent an incomplete view of the memory being used by the low-level runtime system. The reason for this is that only the allocations performed on the local node are tracked, even though a memory may have memory allocated within it from other nodes in the machine. The partial view is incomplete, but it is helpful for mappers when making decisions.

Message Manager

The MessageManager provides an endpoint for communicating with other remote nodes in the system. There is MessageManager instance created for every other node in the machine. The manager provides an in-order channel for communicating with other nodes. Messages aren't necessarily sent over the network in-order, and out-of-order messages are put back together into the correct order for the receiver. Similar to memory managers, message managers are created lazily in response to messages being sent so that nodes that only communicate with their neighbors do not have to incur the overhead of maintaining unused message manager.

As a driving example let's look at how a task is sent to a remote node. This process starts in Runtime::send_task, and is probably a decision that some mapper has made. The first thing that happens is to check if the target processor is actually a local processor. Recall that local processors in the processor manager are eagerly instantiated. If it is local, then the method has a quick path that simply adds the task to the target processor's ready queue. If it isn't local, then the task is packaged up and sent to the remote node as a message.

void Runtime::send_task(Processor target, TaskOp *task)
{
  // Check to see if the target processor is still local 
  std::map<Processor,ProcessorManager*>::const_iterator finder = proc_managers.find(target);
  if (finder != proc_managers.end()) {
    // Update the current processor
    task->current_proc = target;
    finder->second->add_to_ready_queue(task,false/*previous failure*/);
  } else {
... // see below
  }
}}

The first thing that happens in the case that the target processor is remote is that we find the message manager for the remote node. Next a serializer is created that is used to package up the task into a buffer. Then ask the manager to send the task to the remote node using the send_task method.

    MessageManager *manager = find_messenger(target);
    Serializer rez;
    bool deactivate_task;
    {
      RezCheck z(rez);
      rez.serialize(target);
      rez.serialize(task->get_task_kind());
      deactivate_task = task->pack_task(rez, target);
    }
    // Put it on the queue and send it
    manager->send_task(rez, true/*flush*/);
    if (deactivate_task)
      task->deactivate();
...

The message manager calls package_message that adds some metadata describing the type of message. The flush parameter indicates that the message should be eagerly pushed to the remote node.

void MessageManager::send_task(Serializer &rez, bool flush)
{
  package_message(rez, TASK_MESSAGE, flush);
}

There is a call send_xxx (e.g. send_task) for each type of message. The package_message method is a generic method that handles the low-level details in a generic way. Essentially package_message handles the packing of small messages into a single buffer, or splitting large messages across buffers. The typical buffer size is 8-16 KB, but is configurable. Check out the source for more details, but the next stop on the code walk occurs when package_message is ready to flush a buffer. To do this it calls send_message:

void MessageManager::package_message(Serializer &rez, MessageKind k, bool flush)
{
...
  while (buffer_size > 0) {
    unsigned remaining = sending_buffer_size - sending_index;
    if (remaining == 0)
      send_message(false/*complete*/);
...

The send_message routine is what handles the sending of messages and implements the logic for guaranteeing that messages are handled in order on the receiving node. When send_message is called it sends the buffer that the message manager maintains. What is interesting is that messages are sent by spawning a task on the target remote node, and the buffer is sent as a normal task argument:

void MessageManager::send_message(bool complete)
{
...
  // Send the message
  Event next_event = target.spawn(HLR_TASK_ID, sending_buffer,
             sending_index, last_message_event);

  // Update the event
  last_message_event = next_event;
...
}

Notice that the task that is launched uses the last_message_event as the precondition, and then updates the last_message_event variable when the return event from the spawn method which will trigger when the remote task completes, creating a chain of dependencies. In fact, when the message manager is first created last_message_event is initialized to the special Event::NO_EVENT that is always in a has-triggered state. This dependency chaining mechanism is what guarantees in-order processing of messages, even though messages may be sent out-of-order.

Processor Manager

The ProcessorManager tracks the low-level processors on the local node, and controls the rate of meta-work such as dependence analysis and mapping analysis. Unlike the MemoryManager and MessageManager processor managers are eagerly instantiated. The motivation for this is that we will likely want to use each of the local processors, and remote processors aren't tracked by the processor manager.

Processor managers are created at the same time that the runtime instance is created during the start-up / bootstrap phase, and occurs within the Runtime::Runtime constructor. Below we see a for loop that iterates over the local_procs set of processors, and creates a manager for each application processor (e.g. CPU or GPU), but does not create a manager for the utility processors.

...
for (std::set<Processor>::const_iterator it = local_procs.begin();
    it != local_procs.end(); it++)
{
  ProcessorManager *manager = new ProcessorManager(*it,
          machine->get_processor_kind(*it), this,
          superscalar_width, DEFAULT_MAPPER_SLOTS, 
          stealing_disabled, machine->get_all_processors().size()-1);
  proc_managers[*it] = manager;
...
}

The processor manager tracks all of the different types of mappers for a low-level processor. There is a simple interface for managing the set of mappers. When mappers are registered with the high-level runtime, processor managers eventually are responsible for handling that through a simple API:

class ProcessorManager {
...
  public:
    void add_mapper(MapperID mid, Mapper *m, bool check);
    void replace_default_mapper(Mapper *m);
    Mapper* find_mapper(MapperID mid) const; 
...
  public:
    std::vector<Mapper*> mapper_objects;
    std::vector<Reservation> mapper_locks;
...

Each mapper is stored in the mapper_objects vector where its position in the vector is its ID. To avoid forcing developers to think about concurrency in the mappers, access to each mapper is serialized through a distinct lock stored in the mapper_locks vector. The runtime will automatically acquire the lock before dispatching calls to the mapper, and release the lock after the call.

The processor manager is responsible for invoking the mappers associated with its processor. There is a call with the naming pattern invoke_mapper_xxx for each method that the mapper interface supports:

...
  public:
    // Functions that perform mapping calls
    void invoke_mapper_set_task_options(TaskOp *task);
    void invoke_mapper_select_variant(TaskOp *task);
    bool invoke_mapper_pre_map_task(TaskOp *task);
...

Calls to mappers start at the high-level runtime (e.g. invoke_mapper_map_task(Processor target, SingleTask *task);) which will find the corresponding mapper and dispatch the call to the correct mapper.

One of the most important things that processor manager does is handle the scheduling of task execution so that operations like dependence analysis occur at good rates and at the right point in time. Recall the methods from previous classes such as add_to_ready_queue. Many of these calls as well as calls that mirror stages of the operational pipeline are bounced off of the processor manager in order for the processor manager to implement rate control:

...
  public:
    void add_to_dependence_queue(Operation *op);
    void add_to_ready_queue(TaskOp *op, bool previous_failure);
    void add_to_local_ready_queue(Operation *op, bool previous_failure);
...

When mappers are added to a processor manager data structures are expanded if necessary. For instance, the vector of mappers and corresponding locks may be resized:

void ProcessorManager::add_mapper(MapperID mid, Mapper *m, bool check)
{
  if (mid >= mapper_objects.size()) {
    int old_size = mapper_objects.size();
    mapper_objects.resize(mid+1);
    mapper_locks.resize(mid+1);
    ...
    for (unsigned int i=old_size; i<(mid+1); i++) {
      mapper_objects[i] = NULL;
      mapper_locks[i].destroy_reservation();
      mapper_locks[i] = Reservation::NO_RESERVATION;
      ...
    }
  }
  mapper_locks[mid] = Reservation::create_reservation();
  mapper_objects[mid] = m;
}

The mapper with ID 0 is reserved for the default mapper, and the ProcessorManager::replace_default_mapper method can be used to change the default mapper:

void ProcessorManager::replace_default_mapper(Mapper *m)
{
  delete mapper_objects[0];
  mapper_objects[0] = m;
}

We can look now at how mappers are invoked by the processor manager. Here we'll look at invoke_mapper_set_task_options. We'll split this up because there are several different things going on here. There are two things mappers can do that drive the structure of this method. First, mappers can request that the mapper call be deferred to future time, and mappers can request that message be sent other mappers.

The first thing that happens is that we setup stack variables to track defer events as well as messages generated by the mapper:

void ProcessorManager::invoke_mapper_set_task_options(TaskOp *task)
{
  std::vector<MapperMessage> messages;
  Event wait_on = defer_mapper_event[task->map_id];

Next we enter a loop that executes until the mapper method has successfully executed. The first thing we do is check to see if we are deferring this event. If this is the first time through the loop, then it is likely that wait_on does not exist, so we do not wait on anything:

  do {
    if (wait_on.exists())
      wait_on.wait(false/*block*/);

The next thing we do is invoke the mapper object after first obtaining the corresponding lock for that mapper.

    AutoLock m_lock(mapper_locks[task->map_id]);
    inside_mapper_call[task->map_id] = true; 
    mapper_objects[task->map_id]->select_task_options(task);
    inside_mapper_call[task->map_id] = false;

Now there are two cases that we need to handle after invoking the mapper. The first is that the mapper could request that the invocation be deferred and restarted later:

    if (defer_mapper_event[task->map_id].exists()) {
      wait_on = defer_mapper_event[task->map_id];
      defer_mapper_event[task->map_id] = Event::NO_EVENT;
      continue;
    }

If the call is deferred then we return to the beginning of the loop and enter wait_on.wait(..) which will wait until the event has triggered, allowing the processor to be used by another task. The thread isn't blocked; in that sense this is a lot like a context switch in an operating system.

The other thing that a mapper may have done is to send messages. If messages were sent then we transfer them into our data structure on the stack:

    AutoLock g_lock(message_lock);
    if (!mapper_messages[task->map_id].empty()) {
      messages = mapper_messages[task->map_id];
      mapper_messages[task->map_id].clear();
    }

After the mapper call has been successfully invoked, we dispatch any messages that may have been queued up. This is done after all of the locks have been released to avoid deadlocks:

    } while (wait_on.exists());
    if (!messages.empty())
      send_mapper_messages(task->map_id, messages);

The structure of this method is mirrored across each of the different mapper calls that the processor manager may invoke.

Let's take a look now at one of the components related to the operational pipeline. We've seen add_to_dependence_queue before but now things should make more sense. First we setup a high-level runtime task that will perform the dependence analysis, and the manager is this processor manager.

void ProcessorManager::add_to_dependence_queue(Operation *op)
{
  DeferredTriggerArgs args;
  args.hlr_id = HLR_TRIGGER_DEPENDENCE_ID;
  args.manager = this;
  args.op = op;
  ... // see below

Next we see how the processor manager is involved in enforcing the serial execution of dependence analysis. This analysis must take place serially within a context, which represents a parent and its sub-tasks. We grab the context from the parent and when we spawn the task we use the event stored in dependence_preconditions related to the context as a precondition. The precondition is then updated with the returned event. In this manner dependence analysis within a context is serialized, but analysis across contexts can occur in parallel.

  ContextID ctx_id = op->get_parent()->get_context_id();
  AutoLock d_lock(dependence_lock);
  Event next = utility_proc.spawn(HLR_TASK_ID, &args, sizeof(args),
          dependence_preconditions[ctx_id]);
  dependence_preconditions[ctx_id] = next;
}

Since execution occurs serially within a context and there are a small number of contexts, we don't see any example of rate-limiting here.

Next let's take a look at add_to_local_ready_queue. This is invoked when an operation has had its dependence analysis performed and is ready to be mapped. First thing we do is setup a high-level runtime task to schedule the mapping:

void ProcessorManager::add_to_local_ready_queue(Operation *op, 
         bool prev_failure)
{
  TriggerOpArgs args;
  args.hlr_id = HLR_TRIGGER_OP_ID;
  args.manager = this;
  args.op = op;
  ... // see below

Next there are two cases related to if there had been a previous failure. If there had been no previous failure (the common case) then we rate limit the scheduling of the task. If there was a previous failure we launch the task immediately because it is likely that other operations are waiting on the completion of the failed operation.

Here we get to see an example of how rate limiting is implemented. Notice the local_scheduler_preconditions structure that is indexed by next_local_index. This structure holds preconditions for the task being launched. Immediately after the task is launched we record the completion event in this structure in the next position, and loop the index back around if it reaches a maximum value. This effectively creates a set of dependence chains that result in rate limiting because of the serial nature of the dependence chains. The level of parallelism is controlled by the superscalar_width variable. Note that the these dependence chains are purely artificial.

  if (!prev_failure) {
    AutoLock l_lock(local_queue_lock); 
    Event next = utility_proc.spawn(HLR_TASK_ID, &args, sizeof(args),
              local_scheduler_preconditions[next_local_index]);

    local_scheduler_preconditions[next_local_index++] = next;

    if (next_local_index == superscalar_width)
      next_local_index = 0;
  } else
    utility_proc.spawn(HLR_TASK_ID, &args, sizeof(args));
}

If there had been a previous failure the task is launched immediately.