Overview

Last time we saw how tasks are launched, and today we'll look at how tasks are registered because the high-level runtime wraps application-level tasks. Rather than interacting directly with application-level tasks Legion intercepts them to provide setup and tear down functionality.

About Legion Runtime Class

These notes are closely based on the set of Legion Runtime Class videos produced by the Legion developers. They are my own notes and code walks, and any errors or things that are just plain wrong represent my own mistakes.

Today's notes are based on the following video:

Task Registration

Here is a typical task registration call. There are many different types based on different characteristics of the task being registered:

template<typename T,
  T (*TASK_PTR)(const Task*, const std::vector<PhysicalRegion>&,
      Context, HighLevelRuntime*)>
/*static*/ TaskID HighLevelRuntime::register_legion_task(TaskID id,
    Processor::Kind proc_kind,
    bool single, bool index,
    VariantID vid,
    TaskConfigOptions options,
    const char *task_name)
{
  if (task_name == NULL)
  {
    // Has no name, so just call it by its number
    char *buffer = (char*)malloc(32*sizeof(char));
    sprintf(buffer,"%d",id);
    task_name = buffer;
  }
  return HighLevelRuntime::update_collection_table(
      LegionTaskWrapper::legion_task_wrapper<T,TASK_PTR>, 
      LegionTaskWrapper::inline_task_wrapper<T,TASK_PTR>, id, proc_kind, 
      single, index, vid, sizeof(T), options, task_name);
}

The important thing to notice here is LegionTaskWrapper::legion_task_wrapper which wraps the function pointer provided by the application. The two important things that the wrapper does is (1) provides setup and clean-up routines, and (2) converts between the low-level runtime task interface to the high-level interface that applications expect.

All low-level tasks expect three arguments (argument blob, argument size, and the processor they are running on). Here is how the wrapper works. First there are static assertions that check some properties on the task:

template<typename T,
  T (*TASK_PTR)(const Task*, const std::vector<PhysicalRegion>&,
      Context, HighLevelRuntime*)>
void LegionTaskWrapper::legion_task_wrapper(const void *args, 
    size_t arglen, Processor p)
{
  // Assert that we are returning Futures or FutureMaps
  LEGION_STATIC_ASSERT((LegionTypeInequality<T,Future>::value));
  LEGION_STATIC_ASSERT((LegionTypeInequality<T,FutureMap>::value));
  // Assert that the return type size is within the required size
  LEGION_STATIC_ASSERT(sizeof(T) <= MAX_RETURN_SIZE);

...

Next the runtime is retrieved from the processor parameter provided by the low-level runtime. Recall that the Context object is really just a Task*. When the task was launched the context was stuffed into the arguments. Here we are now, so we cast the argument back to the task, and ask the runtime to being the task which also returns the physical regions specified by the region requirements for the task.

  // Get the high level runtime
  HighLevelRuntime *runtime = HighLevelRuntime::get_runtime(p);

  // Read the context out of the buffer
  Context ctx = *((const Context*)args);

  const std::vector<PhysicalRegion> &regions = runtime->begin_task(ctx);

...

Finally we call the application-level function and record the return value, and serialize the return value via the end_task call.

  // Invoke the task with the given context
  T return_value = 
     (*TASK_PTR)(reinterpret_cast<Task*>(ctx),regions,ctx,runtime);

  // Send the return value back
  LegionSerialization::end_task<T>(runtime, ctx, &return_value);
}

Task Execution

Before proceeding take a look at SingleTask::launch_task to see how the context is set as the arguments. Here proxy_this is pointing to a Task*:

Event task_launch_event = launch_processor.spawn(low_id, &proxy_this,
             sizeof(proxy_this), start_condition, task_priority);

The begin_task and end_task routines on the runtime forward the call to the tasks:

const std::vector<PhysicalRegion>& Runtime::begin_task(SingleTask *ctx)
{
  return ctx->begin_task();
}

void Runtime::end_task(SingleTask *ctx, const void *result, 
    size_t result_size, bool owned)
{
  ctx->end_task(result, result_size, owned);
}

Mappers may request that tasks run on a processor from a set of candidate processors (rather than a specific processor), so the first thing that happens is we figure out exactly which processor it is that we are running on now.

const std::vector<PhysicalRegion>& SingleTask::begin_task(void)
{
  // Switch over the executing processor to the one
  // that has actually been assigned to run this task.
  executing_processor = Processor::get_executing_processor();

Recall that we discussed previously that processor managers track the set of contexts that have mapped but haven't yet started running. Here we decrement the pending counts to update this state. This allows the runtime to manage how far ahead the program runs.

  // Decrement the number of pending tasks on the processor which
  // we originally mapped this task
  parent_ctx->decrement_pending();

Finally we record the start time which will later be used to compute the amount of time the application function required to run:

  // Start the profiling if requested
  if (profile_task)
    this->start_time = (TimeStamp::get_current_time_in_micros() - 
        Runtime::init_time);

  return physical_regions;
}

Ok that was straightforward. What about shutting down a task? First up we calculate the runtime and let the mapper know.

void SingleTask::end_task(const void *res, size_t res_size, bool owned)
{
  if (profile_task)
  {
    this->stop_time = (TimeStamp::get_current_time_in_micros() -
        Runtime::init_time);
    runtime->invoke_mapper_notify_profiling(executing_processor, this);
  }

Now we clean-up. This includes any physical regions that the task was using (unless they were unmapped explicitly by the task):

  // unmap all of the physical regions which are still mapped
  for (unsigned idx = 0; idx < regions.size(); idx++)
  {
    if (physical_regions[idx].impl->is_mapped())
      physical_regions[idx].impl->unmap_region();
  }
  // now we can clear the physical regions since we're done using them
  physical_regions.clear();

Then we take care of any inline mappings that the task created:

  // Do the same thing with any residual inline mapped regions
  for (std::list<PhysicalRegion>::const_iterator it = 
      inline_regions.begin(); it != inline_regions.end(); it++)
  {
    if (it->impl->is_mapped())
      it->impl->unmap_region();
  }
  inline_regions.clear();

Now it gets more complicated. Parent tasks expect data mutated by children to be visible. So a series of close operations on the physical region will be issued which take care of this stuff. This is more complicated stuff that I can't explain well. See the video of Legion thesis for more information for what is means for these regions to be closed.

  if (!is_leaf() || (num_virtual_mappings > 0))
  {
    for (unsigned idx = 0; idx < local_instances.size(); idx++)
    {
      if (!virtual_mapped[idx] && !region_deleted[idx]
          && !IS_READ_ONLY(regions[idx]) &&
          !IS_NO_ACCESS(regions[idx]))
      {
        CloseOp *close_op = runtime->get_available_close_op();    
        close_op->initialize(this, idx, local_instances[idx]);
        runtime->add_to_dependence_queue(executing_processor, close_op);
      }
    }
  }

Next up the return value from the application function is handled. The handle_future method is virtual on the task operation class. We'll look at it for IndividualTask, but defer its behavior for point tasks.

  // Handle the future result
  handle_future(res, res_size, owned); 

The result is treated differently if the task is running remotely or locally.

void IndividualTask::handle_future(const void *res, size_t res_size,
    bool owned)
{
  // Save our future value so we can set it or send it back later
  if (is_remote())
  {
    if (owned)
    {
      future_store = const_cast<void*>(res);
      future_size = res_size;
    }
    else
    {
      future_size = res_size;
      future_store = legion_malloc(FUTURE_RESULT_ALLOC, future_size);
      memcpy(future_store,res,future_size);
    }
  }
  else
  {
    // Set our future, but don't trigger it yet
    if (must_epoch == NULL)
      result.impl->set_result(res, res_size, owned);
    else
      must_epoch->set_future(index_point, res, res_size, owned);
  }
}

Returning back to end_task, the final thing is to deal with some expensive clean-up work. Since we are running on a processor dedicated to application-level work we'll offload it onto a utility processor if possible. The heavy work is inside post_end_work.

#ifdef SPECIALIZED_UTIL_PROCS
  Processor util = runtime->get_cleanup_proc(executing_processor);
#else
  Processor util = runtime->find_utility_group();
#endif
  if (util != executing_processor)
  {
    PostEndArgs post_end_args;
    post_end_args.hlr_id = HLR_POST_END_ID;
    post_end_args.proxy_this = this;
    util.spawn(HLR_TASK_ID, &post_end_args, sizeof(post_end_args));
  }
  else
    post_end_task();
}

Since tasks are operations we have to update their progress through the execution pipeline. First thing complete_execution marks this stage complete. Usually this is enough for operations, but tasks have more stages. Task operations also have the complete and committed stages. However, for a task to be complete and committed its children also have to be complete and committed.

We won't go into detail about all the logic here that handles these cases, but we'll follow how the metadata is updated next.

void SingleTask::post_end_task(void)
{
  // Mark that we are done executing this operation
  complete_execution();

  // Mark that we are done executing and then see if we need to
  // trigger any of our mapping, completion, or commit methods
  bool need_complete = false;
  bool need_commit = false;

  // If we're a leaf with no virtual mappings then
  // there are guaranteed to be no children
  {
    AutoLock o_lock(op_lock);
    if (executing_children.empty() && executed_children.empty())
    {
      if (!children_complete_invoked)
      {
        need_complete = true;
        children_complete_invoked = true;
      }
      if (complete_children.empty() && 
          !children_commit_invoked)
      {
        need_commit = true;
        children_commit_invoked = true;
      }
    }
  } 

  if (need_complete)
  {
    // If we had any virtual mappings, mark that we are
    // now mapping complete since all children are mapped
    if (num_virtual_mappings > 0)
      complete_mapping();
    trigger_children_complete();
  }
  if (need_commit)
  {
    trigger_children_committed();
  } 
}

Here are the state variables that track children. Executing children are still in-flight. Executed children have finished execution, but they haven't completed (e.g. children have children that haven't completed). Finally completed children have completed but not yet committed.

protected:
  // Track whether this task has finished executing
  LegionSet<Operation*,EXECUTING_CHILD_ALLOC>::tracked executing_children;
  LegionSet<Operation*,EXECUTED_CHILD_ALLOC>::tracked executed_children;
  LegionSet<Operation*,COMPLETE_CHILD_ALLOC>::tracked complete_children;

The transition of tasks between these states occurs via a few methods on the task object. A task is first in the executing state:

void SingleTask::register_child_operation(Operation *op)
{
...
  executing_children.insert(op);
...
}

Then it will move to the list of children that have finished execution:

void SingleTask::register_child_executed(Operation *op)
{
...
      executing_children.erase(finder);
      // Now put it in the list of executing operations
      // Note this doesn't change the number of active children
      // so there's no need to trigger any window waits
      //
      // Add some hysteresis here so that we have some runway for when
      // the paused task resumes it can run for a little while.
      executed_children.insert(op);
...
}

And so forth. A task is first added to these structures when its created:

```c++
void Operation::initialize_operation(SingleTask *ctx, bool track, 
    Event child_event, 
    unsigned regs/*= 0*/)
{
  parent_ctx = ctx;
  track_parent = track;
  children_mapped = child_event;
  if (track_parent)
    parent_ctx->register_child_operation(this);
  for (unsigned idx = 0; idx < regs; idx++)
    unverified_regions.insert(idx);
}

The track_parent flag is used to indicate that the task is local to its parent. The context for a parent will still exist when a task is running remotely, but it is primarily a proxy and all of the scheduling stuff that happens where the parent is will have this information instead.

Handling the state transitions is easy because there is nothing to do. The operational pipeline does it for us. When the execution phase is completed the transition occurs automatically:

void Operation::complete_execution(void)
{
  bool need_resolution = false;
  bool need_complete = false;
  // Tell our parent context that we are done mapping
  // It's important that this is done before we mark that we
  // are executed to avoid race conditions
  if (track_parent)
    parent_ctx->register_child_executed(this);
...

For completeness, here is the final transition:

void Operation::complete_operation(void)
{
  bool need_trigger = false;
  // Tell our parent that we are complete
  // It's important that we do this before we mark ourselves
  // completed in order to avoid race conditions
  if (track_parent)
    parent_ctx->register_child_complete(this);
...