Legion Runtime Class #7: Tasks
TweetAbout Legion Runtime Class
These notes are closely based on the set of Legion Runtime Class videos produced by the Legion developers. They are my own notes and code walks, and any errors or things that are just plain wrong represent my own mistakes.
Today's notes are based on the following video:
Overview
In this class we'll take a look at how task scheduling works in Legion. Last time we looked at how operations are scheduled via add_to_local_ready_queue
which is used to handle non-task operations that are ready to go through the mapping process. Recall that the round-robin dependency chains were used to throttle these operations.
There is a different path that task operations take for scheduling because tasks have different semantics than non-task operations. This is primarily because tasks can be delayed or moved around between nodes. When tasks are ready to be mapped they go through add_to_ready_queue
. The first thing is to mark the task as not scheduled to track its state:
void ProcessorManager::add_to_ready_queue(TaskOp *task, bool prev_failure)
{
task->schedule = false;
task->activate_outstanding_task();
ContextID ctx_id = task->get_parent()->get_context_id();
ContextState &state = context_states[ctx_id];
AutoLock q_lock(queue_lock);
if (prev_failure)
ready_queues[task->map_id].push_front(task);
else
ready_queues[task->map_id].push_back(task);
if (state.active && (state.owned_tasks == 0))
increment_active_contexts();
state.owned_tasks++;
}
Before talking about the ContextState
, notice that tasks are pushed onto a ready_queue
structure:
// For each mapper, a list of tasks that are ready to map
std::vector<std::list<TaskOp*> > ready_queues;
The ready_queue
tracks all of the tasks that are ready to map for a particular mapper. We want to give the mappers the ability to choose when and where tasks get mapped. Mappers may choose to defer mapping to achieve task stealing, or to migrate a task to a different node. The ready_queue
is maintained by the runtime and is available for mappers as they make decisions. Notice that this is unlike the non-task operations that were launched immediately.
One of the powerful things that Legion allows is for the runtime to run far ahead of the actual execution of the program to see into the future. However there is a trade-off between knowing the future, and the mapping failures that might come from mapping too far into the future. How far ahead the scheduling runs is controlled by the mapper via a few parameters. The mapper can set various parameters that controls this behavior via the mapper callback:
virtual void configure_context(Task *task) = 0;
For instance, max_window_size
is used to control how many outstanding operations a tasks (context) should schedule. The min_task_to_schedule
parameter says how many pending sub-tasks a context should schedule before the runtime stops asking the mapper to continue mapping. There are more advanced features for doing this such as utilizing the frame abstractions which lets mappers specify at a very high-level things such as looking ahead 2 or 3 time steps (or whatever applications-specific metric is configured).
Now that that is out the way, let's look to see the mechanisms by which these parameters are used to control scheduling behavior. The last two lines of add_to_ready_queue
check some values on the ContextState
object that was obtained at the beginning of the method. The first thing we check is if the context is active. An active context means that there aren't any outstanding mapped tasks, and an owned task is a task that is ready to map but hasn't been mapped yet. That metric is effectively analogous to the size of the ready queue, but note that ready queues are tracked per mapper; the owned tasks value associated with the ContextState
is tracked on a per-context basis.
Basically the last line says that if we don't have enough outstanding mapped tasks, and we are about to have a new task then we want to enable the scheduler.
if (state.active && (state.owned_tasks == 0))
increment_active_contexts();
state.owned_tasks++;
The increment_active_contexts
increments the number of active contexts, and optionally enables the scheduler. The scheduler is enabled by this method if there were previously no active contexts and the scheduler wasn't already active:
void ProcessorManager::increment_active_contexts(void) {
if ((total_active_contexts == 0) && !task_scheduler_enabled) {
task_scheduler_enabled = true;
launch_task_scheduler();
}
total_active_contexts++;
}
Launching the task scheduler involves scheduling a high-level runtime meta task with ID HLR_SCHEDULER_ID
. When the scheduler is enabled this meta task is launched immediately.
void ProcessorManager::launch_task_scheduler(void)
{
SchedulerArgs sched_args;
sched_args.hlr_id = HLR_SCHEDULER_ID;
sched_args.proc = local_proc;
utility_proc.spawn(HLR_TASK_ID, &sched_args, sizeof(sched_args));
}
Recall that the high-level runtime task is that giant switch statement. Here is the relevant case that turns around and runs process_schedule_request
with the relevant processor:
case HLR_SCHEDULER_ID:
{
const ProcessorManager::SchedulerArgs *sched_args =
(const ProcessorManager::SchedulerArgs*)args;
Runtime::get_runtime(p)->process_schedule_request(sched_args->proc);
break;
}
The process_schedule_request
method runs perform_scheduling
on the processor manager for the given processor:
void Runtime::process_schedule_request(Processor proc)
{
ProcessorManager *manager = proc_managers[proc];
manager->perform_scheduling();
}
Finally we make it to perform_scheduling
where mapping operations are run via perform_mapping_operations
. The final step is to see figure out if there is more stuff to be done. If there are more active contexts then we re-launch the task scheduler, otherwise we disable the scheduler and pop back up the stack.
void ProcessorManager::perform_scheduling(void)
{
perform_mapping_operations();
AutoLock q_lock(queue_lock);
if (!pending_shutdown && (total_active_contexts > 0)) {
task_scheduler_enabled = true;
launch_task_scheduler();
} else
task_scheduler_enabled = false;
}
So as we can see the mapper has some control over when a context is active via a set of parameters. As long as the context is active the runtime will keep scheduling tasks. Now let's look to see how some of these parameters are set on the ContextState
object. These values (e.g. state.active
) are controlled by other calls (e.g. activate_context
and deactivate_context
). The calls themselves originate from within a context itself (recall that a context is actually a single task), for instance before and after a task starts running, or when scheduling has proceeded far enough into the future.
Running Mapping Operations
Recall from above that when the scheduler is enabled ProcessorManager::perform_scheduling
is eventually called which in turn calls perform_mapping_operations
. This call is where a lot of really important things happen in Legion. We'll look at this large function in detail now.
The high-level idea of this function is to filter out the ready tasks based on the mapper preferences, and then issue the mapping requests. But there are a lot of different things that can happen along the way. First we create local structures to track state.
void ProcessorManager::perform_mapping_operations(void)
{
std::multimap<Processor,MapperID> stealing_targets;
std::vector<MapperID> mappers_with_work;
Next we enter a loop over all of the mappers (recall that there is a ready_queue
structure per mapper). The first thing we do is create a local copy of the tasks in the ready queues by stashing pointers. We want to quickly create this snapshot so that we can release the locks that protect the queues. We also record the generations, and we'll talk about that in a few minutes.
for (unsigned map_id = 0; map_id < ready_queues.size(); map_id++)
{
std::list<TaskOp*> visible_tasks;
std::vector<GenerationID> visible_generations;
{
AutoLock q_lock(queue_lock,1,false/*exclusive*/);
visible_tasks.insert(visible_tasks.begin(),
ready_queues[map_id].begin(), ready_queues[map_id].end());
visible_generations.resize(visible_tasks.size());
unsigned idx = 0;
for (std::list<TaskOp*>::const_iterator it = visible_tasks.begin();
it != visible_tasks.end(); it++, idx++) {
visible_generations[idx] = (*it)->get_generation();
}
}
Next we present the list of tasks in the ready queues to the mapper which can set state variables indicating what the mapper wants to do with the task (e.g. schedule it). This is done via the select_tasks_to_schedule
mapper callback.
// Watch me stomp all over the C++ type system here
const std::list<Task*> &ready_tasks =
*((std::list<Task*>*)(&(visible_tasks)));
// Acquire the mapper lock and ask the mapper about scheduling
// and then about stealing if not disabled
{
AutoLock map_lock(mapper_locks[map_id]);
if (!visible_tasks.empty())
{
mapper_objects[map_id]->select_tasks_to_schedule(ready_tasks);
}
The mapper will annotate each task with its preference. Next we check with the mapper to see if it wasn't to steal any work from other mappers. This might occur if the mapper sees that it doesn't have enough work. We won't talk much about stealing today, but the blacklist below is used to prevent mappers from generating too many stealing requests when the target stealing mappers don't actually have any work available.
if (!stealing_disabled)
{
AutoLock steal_lock(stealing_lock);
std::set<Processor> &blacklist = outstanding_steal_requests[map_id];
if (blacklist.size() < max_outstanding_steals)
{
std::set<Processor> steal_targets;
mapper_objects[map_id]->target_task_steal(blacklist,
steal_targets);
for (std::set<Processor>::const_iterator it =
steal_targets.begin(); it != steal_targets.end(); it++)
{
if (it->exists() && ((*it) != local_proc) &&
(blacklist.find(*it) == blacklist.end()))
{
stealing_targets.insert(std::pair<Processor,MapperID>(
*it,map_id));
blacklist.insert(*it);
}
}
}
}
}
Now it is time to actually process the list of tasks that the mapper has filtered. Here we iterate over the snapshot and check to see if the mapper wants to schedule the task. The other case that we handle is if the mapper wants to send the task to a different processor. If either case is true then we need to do something with the task. There are two cases that arise directly from the mapper having operated on a snapshot of the tasks in which case transformations on the underlying ready queues may have invalidated the mapper decisions.
{
std::list<TaskOp*> &rqueue = ready_queues[map_id];
AutoLock q_lock(queue_lock);
unsigned gen_idx = 0;
for (std::list<TaskOp*>::iterator vis_it = visible_tasks.begin();
vis_it != visible_tasks.end(); gen_idx++)
{
if ((*vis_it)->schedule ||
((*vis_it)->target_proc != local_proc))
We need to figure out if the task is still around by looping over the ready queue again. If it isn't found then something else happened to it (e.g. it was stolen). If we did find it then we need to check that its generation hasn't changed. This case is a consequence of the implementation that recycles the physical operation C++ objects.
{
bool found = false;
for (std::list<TaskOp*>::iterator it = rqueue.begin();
it != rqueue.end(); it++)
{
// In order to be the same task, they need to have the
// same pointer and have the same generation
if (((*it) == (*vis_it)) &&
(visible_generations[gen_idx] == (*it)->get_generation()))
{
rqueue.erase(it);
found = true;
break;
}
}
if (!found) // stolen
{
// Remove it from our list
vis_it = visible_tasks.erase(vis_it);
}
If the task wasn't found then it is removed from the tasks being considered. Not shown is the case where the mapper decided to not schedule it in which case it is also removed from the set of visible tasks. If it was found and the mapper wants to schedule it then it is removed from the ready queue. We then update the context state by decrementing the number of active contexts.
{
ContextID ctx_id = (*vis_it)->get_parent()->get_context_id();
ContextState &state = context_states[ctx_id];
state.owned_tasks--;
if (state.active && (state.owned_tasks == 0))
decrement_active_contexts();
vis_it++;
}
Finally, after processing each mapper if there are remaining tasks on the ready queue we update the local state variable declared at the beginning of the method that says which mappers still have work.
if (!rqueue.empty())
mappers_with_work.push_back(map_id);
}
Now we go back through the list of tasks that we need to do something with. That is, the mapper wants the runtime to either run them or move them. In order to handle the task a meta task is launched. We'll talk about deactive_outstanding_task
another time. If a task mapping is deferred (e.g. physical region tree state is not available) then we create an event used as a precondition on the utility task handling the mapping. So, for each task we launch high-level runtime task with the HLR_TRIGGER_TASK_ID
variant.
TriggerTaskArgs args;
args.hlr_id = HLR_TRIGGER_TASK_ID;
args.manager = this;
for (std::list<TaskOp*>::iterator vis_it = visible_tasks.begin();
vis_it != visible_tasks.end(); vis_it++)
{
// If we made it in here then we have definitely
// pulled the task off of the ready queue
(*vis_it)->deactivate_outstanding_task();
Event wait_on = (*vis_it)->defer_mapping();
// We give a slight priority to triggering the execution
// of tasks relative to other runtime operations because
// they actually have a feedback mechanism controlling
// how far they get ahead. We give a slight edge in priority
// to tasks being sent remotely to get them in flight.
// Give priority to things which are getting sent remotely
args.op = *vis_it;
int priority = ((*vis_it)->target_proc != local_proc) ? 2 : 1;
utility_proc.spawn(HLR_TASK_ID, &args, sizeof(args),
wait_on, priority);
}
The high-level runtime task will trigger execution and if anything fails the task will go back onto the ready queue.
case HLR_TRIGGER_TASK_ID:
{
// Key off of args here instead of data
const ProcessorManager::TriggerTaskArgs *trigger_args =
(const ProcessorManager::TriggerTaskArgs*)args;
TaskOp *op = trigger_args->op;
bool mapped = op->trigger_execution();
if (!mapped) {
ProcessorManager *manager = trigger_args->manager;
manager->add_to_ready_queue(op, true/*failure*/);
}
break;
}
Finally, back in perform_mapping_operations
we handle the remaining bits of work. First, if stealing is enabled and there were mappers that had remaining tasks on their work queues then this information is broadcast. But who to broadcast this information to? The issue_advertisements
is smart and sends the broadcast to those nodes that had previously requested to steal from us but could not because we had no work to do. Recall that we blacklist processors that steal requests were rejected from. This information is used to prevent unnecessary steal requests flooding the network, as well as serving to create the broadcast list here:
// Advertise any work that we have
if (!stealing_disabled && !mappers_with_work.empty())
{
for (std::vector<MapperID>::const_iterator it =
mappers_with_work.begin(); it != mappers_with_work.end(); it++)
{
issue_advertisements(*it);
}
}
If mappers had requested that tasks be stolen, then the very last thing to occur is that these requests are sent out.
// Finally issue any steal requeusts
if (!stealing_disabled && !stealing_targets.empty())
runtime->send_steal_request(stealing_targets, local_proc);
}