Overview

In this post we'll take a look at how slice tasks and points tasks get moved around and eventually complete. Additionally, we'll take a look at task stealing.

About Legion Runtime Class

These notes are closely based on the set of Legion Runtime Class videos produced by the Legion developers. They are my own notes and code walks, and any errors or things that are just plain wrong represent my own mistakes.

Today's notes are based on the following video:

Index Space Completion

Recall that index space tasks are launched, which are then broken recursively into other slice tasks, and eventually point tasks are launched for each point in the original space. This was covered in a previous post, and most of the important bits of this occurred MultiTask::trigger_execution. But we didn't touch on how completion actually occurs. Let's do that.

Normally when an individual task calls trigger_task_complete it tries to wrap things up, clean up state, and send information back to other nodes about the completion. Point tasks are different because they don't clean themselves up--they are really part of a larger task (a slice task). What happens is that it notifies its owning slice task:

    void PointTask::trigger_task_complete(void)
    {
      // If we had any virtual mappings, we can now be considered mapped
      if (num_virtual_mappings > 0)
        slice_owner->record_child_mapped();

      // Pass back our created and deleted operations 
      slice_owner->return_privileges(this);

      slice_owner->record_child_complete();

An aside about tracking the owner. Notice that the slice_owner pointer is dereferenced. The pointer is stored in the PointTask class. This field is filled in when the points in a slice are enumerated. The pointer is guaranteed to always be valid because point tasks will always run on the same node as their slice task.

    class PointTask : public SingleTask {
    ...

    protected:
      friend class SliceTask;
      SliceTask *slice_owner;
      UserEvent point_termination;
    };

A slice task also contains a pointer to its parent index task, but slice tasks can be moved around in the cluster, thus this pointer is only valid on the node where the index task was created.

    class SliceTask : public MultiTask {
    ...

    protected:
      // For knowing which fraction of the
      // domain we have (1/denominator)
      long long denominator;
      IndexTask *index_owner;

So, back in PointTask::trigger_task_complete. When the point task completes it calls record_child_complete the notifies the slice task about the point task completing. Notice that the slice task decrements a counter that tracks the number of points in this slice. When the counter reaches zero it means that all of the child point tasks have completed, which causes trigger_children_complete to be invoked.

    void SliceTask::record_child_complete(void)
    {
      bool needs_trigger = false;
      {
        AutoLock o_lock(op_lock);
        num_uncomplete_points--;
        if ((num_uncomplete_points == 0) && !children_complete_invoked)
        {
          needs_trigger = true;
          children_complete_invoked = true;
        }
      }
      if (needs_trigger)
        trigger_children_complete();
    }

Eventually the slice task will make it through the pipeline and complete, and will call trigger_slice_complete. Notice that when the slice task completes and notifies the parent index task, it checks for the case that it is running locally, and calls the method on the index task directly. Otherwise a message is send to handle this notification.

    void SliceTask::trigger_slice_complete(void)
    {
      if (is_remote())
      {
        // Send back the message saying that this slice is complete
        Serializer rez;
        pack_remote_complete(rez);
        runtime->send_slice_remote_complete(orig_proc, rez);
      }
      else
      {
        index_owner->return_slice_complete(points.size());
      }
      complete_operation();
    }

Similarly, IndexTask tracks the total number of points and waits for them all to be completed before calling its trigger_children_complete stage.

    void IndexTask::return_slice_complete(unsigned points)
    {
      bool need_trigger = false;
      {
        AutoLock o_lock(op_lock);
        complete_points += points;
        if (slice_fraction.is_whole() && 
            (complete_points == total_points) &&
            !children_complete_invoked)
        {
          need_trigger = true;
          children_complete_invoked = true;
        }
      }
      if (need_trigger)
        trigger_children_complete();
    }

Basically once an index space has been split up and distributed completion is the reverse process where all the results come back to the index task. To optimize this process Legion avoids bouncing around the cluster visiting all of the intermediate states by notifying the high-level index task directly using the index_owner pointer that is stashed in each slice task. This pointer is packaged up in SliceTask::pack_remote_complete.

This tracking of point task stages and mirroring those state changes in the containing slice tasks for different pipeline stages occurs transitively up to the index task for some pipeline stages, not just completion. Consult other stages (e.g. mapping) for other examples.

Stealing

Recall that ProcessorManager::perform_mapping_operations is like a scheduler which tries to keep the processor busy by looking at a set of ready queues for things to do. When there isn't enough work to do, the processor manager will also try to steal work from other nodes.

What happens is that the mapper is asked to construct a set of processors that it would like to try and steal from, at which point the requests for stealing are sent out.

      // Finally issue any steal requeusts
      if (!stealing_disabled && !stealing_targets.empty())
        runtime->send_steal_request(stealing_targets, local_proc);

In order to avoid sending too many messages the runtime keeps track of a blacklist. The blacklist contains a list of processors that steal requests have been sent to, and if a mapper asks to steal from a task in the task list that request is dropped by the runtime. A processor is removed from the blacklist when either a task is stolen from that processor, or when that processor later advertises new work.

Sending out the requests involves iterating over a multimap, and for each processor packaging up the set of mapper ids and sending an active message for the steal request.

    void Runtime::send_steal_request(
              const std::multimap<Processor,MapperID> &targets, Processor thief)
    {
      for (std::multimap<Processor,MapperID>::const_iterator it = 
            targets.begin(); it != targets.end(); it++)
      {
        Processor target = it->first;
        std::map<Processor,ProcessorManager*>::const_iterator finder = 
          proc_managers.find(target);
        if (finder == proc_managers.end())
        {
          // Need to send remotely
          MessageManager *manager = find_messenger(target);
          Serializer rez;
          {
            RezCheck z(rez);
            rez.serialize(target);
            rez.serialize(thief);
            int num_mappers = targets.count(target);
            rez.serialize(num_mappers);
            for ( ; it != targets.upper_bound(target); it++)
              rez.serialize(it->second);
          }
          manager->send_steal_request(rez, true/*flush*/);
        }
        else
        {
          // Still local, so notify the processor manager
          std::vector<MapperID> thieves;
          for ( ; it != targets.upper_bound(target); it++)
            thieves.push_back(it->second);
          finder->second->process_steal_request(thief, thieves);
        }
      }
    }

On remote processor:

    void MessageManager::handle_messages(unsigned num_messages,
                                         const char *args, size_t arglen)
    {
      for (unsigned idx = 0; idx < num_messages; idx++)
          ...
          case STEAL_MESSAGE:
            {
              runtime->handle_steal(derez);
              break;
            }

Handling this call is simple. The mapper IDs are decoded from the structure and the processor is notified.

    void runtime::handle_steal(deserializer &derez)
    {
      derezcheck z(derez);
      processor target;
      derez.deserialize(target);
      processor thief;
      derez.deserialize(thief);
      int num_mappers;
      derez.deserialize(num_mappers);
      std::vector<mapperid> thieves(num_mappers);
      for (int idx = 0; idx < num_mappers; idx++)
        derez.deserialize(thieves[idx]);

      proc_managers[target]->process_steal_request(thief, thieves);
    }

In process_steal_request the first thing that is done is to construct a list of tasks that are eligible for stealing. This list is then presented to the mapper which can decide what it would like to allow to be stolen, and can also specify that nothing should be stolen. Some tasks can't be stolen, and some might be in the list but are subject to race conditions. The returned list from the mapper is verified and pruned of invalid choices.

    void ProcessorManager::process_steal_request(Processor thief,
                                           const std::vector<MapperID> &thieves)
    ...

If there is a problem stealing then the requesting node is recorded. This will be used to send out advertisements later.

        if (!successful_steal) 
        {
          AutoLock thief_lock(thieving_lock);
          failed_thiefs.insert(std::pair<MapperID,Processor>(stealer,thief));
        }
      }

On any node during perform_mapping_operations this set of thieves are notified via issue_advertisements.

    void ProcessorManager::perform_mapping_operations(void)

      // Advertise any work that we have
      if (!stealing_disabled && !mappers_with_work.empty())
      {
        for (std::vector<MapperID>::const_iterator it = 
              mappers_with_work.begin(); it != mappers_with_work.end(); it++)
        {
          issue_advertisements(*it);
        }
      }

The issue_advertisements routine will consult the list of thieves recorded when there was a failed steal.

    void ProcessorManager::issue_advertisements(MapperID map_id)
    {
      // Create a clone of the processors we want to advertise so that
      // we don't call into the high level runtime holding a lock
      std::set<Processor> failed_waiters;
      // Check to see if we have any failed thieves with the mapper id
      {
        AutoLock theif_lock(thieving_lock);
        if (failed_thiefs.lower_bound(map_id) != 
            failed_thiefs.upper_bound(map_id))
        {
          for (std::multimap<MapperID,Processor>::iterator it = 
                failed_thiefs.lower_bound(map_id); it != 
                failed_thiefs.upper_bound(map_id); it++)
          {
            failed_waiters.insert(it->second);
          } 
          // Erase all the failed theives
          failed_thiefs.erase(failed_thiefs.lower_bound(map_id),
                              failed_thiefs.upper_bound(map_id));
        }
      }
      if (!failed_waiters.empty())
        runtime->send_advertisements(failed_waiters, map_id, local_proc);
    }

That's it.