sgpemv2/src/backend/scheduler.cc

592 lines
23 KiB
C++

// src/backend/scheduler.cc - Copyright 2005, 2006, University
// of Padova, dept. of Pure and Applied
// Mathematics
//
// This file is part of SGPEMv2.
//
// This is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
//
// SGPEMv2 is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with SGPEMv2; if not, write to the Free Software
// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
#include "concrete_environment.hh"
#include "concrete_history.hh"
#include "cpu_policy.hh"
#include "cpu_policy_exception.hh"
#include "scheduler.hh"
// Do not include full template definition in the header file
#include "singleton.tcc"
#include "sequences.tcc"
#include <glibmm/thread.h>
#include <algorithm>
#include <cassert>
#include <memory>
using namespace std;
using namespace sgpem;
// Explicit template instantiation to allow to export symbols from the DSO.
template class SG_DLLEXPORT Singleton<Scheduler>;
typedef std::vector<DynamicProcess*> Processes;
typedef std::vector<DynamicRequest*> Requests;
typedef std::vector<DynamicSubRequest*> SubRequests;
typedef std::vector<DynamicThread*> Threads;
typedef Environment::Resources Resources;
typedef Environment::SubRequestQueue SubRequestQueue;
// ------------------ Static helper functions --------------
static void collect_threads(const std::vector<Process*>& procs, Threads& collected_threads);
static void prepare_ready_queue(ConcreteEnvironment& snapshot, const Threads& all_threads);
static void terminate_all_requests_of(DynamicThread& thread, ConcreteEnvironment& environment);
static void update_allocated_requests(DynamicThread& running_thread, ConcreteEnvironment& environment);
static void raise_new_requests(DynamicThread& running_thread, ConcreteEnvironment& environment);
static void look_for_mutant_request_states(ConcreteEnvironment& environment, unsigned int& alive_threads);
static void determine_subr_allocable_status(const DynamicRequest& req, DynamicSubRequest& subr,
const Resource& res, SubRequestQueue& queue);
// ---------------------------------------------------------
// Collects all threads of an environment into a single vector
void
collect_threads(const std::vector<Process*>& procs,
Threads& collected_threads)
{
collected_threads.clear();
for (Iseq<vector<Process*>::const_iterator> seq = const_iseq(procs); seq; ++seq)
{
const Threads& ts = ((DynamicProcess&) **seq).get_dynamic_threads();
collected_threads.insert(collected_threads.end(), ts.begin(), ts.end());
}
}
void
prepare_ready_queue(ConcreteEnvironment& snapshot,
const Threads& all_threads)
{
ReadyQueue& queue = snapshot.get_sorted_queue();
assert(queue.size() == 0);
for (Iseq<Threads::const_iterator> seq = const_iseq(all_threads); seq; ++seq)
{
if ((*seq)->get_state() == Schedulable::state_ready)
queue.append(**seq);
}
}
// When a thread terminates, unconditionally kill all its requests
void
terminate_all_requests_of(DynamicThread& thread,
ConcreteEnvironment& environment)
{
Requests& reqs = thread.get_dynamic_requests();
for (Iseq<Requests::iterator> r_it = iseq(reqs); r_it; ++r_it)
{
SubRequests& subreqs = (*r_it)->get_dynamic_subrequests();
for (Iseq<SubRequests::iterator> s_it = iseq(subreqs); s_it; ++s_it)
{
(*s_it)->set_state(Request::state_exhausted);
Environment::resource_key_t rkey = (*s_it)->get_resource_key();
SubRequestQueue& queue = environment.get_request_queue(rkey);
SubRequestQueue::iterator removable = find(queue.begin(), queue.end(), *s_it);
if(removable != queue.end()) queue.erase(removable);
}
}
}
// For the current thread, see if there are requests that are exhausted
void
update_allocated_requests(DynamicThread& running_thread,
ConcreteEnvironment& environment)
{
// Go for all dynamic requests of this thread
Requests& reqs = running_thread.get_dynamic_requests();
for (Iseq<Requests::iterator> req_it = iseq(reqs); req_it; ++req_it)
{
SubRequests& cur_request = (*req_it)->get_dynamic_subrequests();
for (Iseq<SubRequests::iterator> subr_it = iseq(cur_request); subr_it; ++subr_it)
{
DynamicSubRequest& cur_subr = **subr_it;
if(cur_subr.get_state() == Request::state_allocated)
{
cur_subr.decrease_remaining_time();
if(cur_subr.get_remaining_time() == 0)
{
cur_subr.set_state(Request::state_exhausted);
Environment::resource_key_t rkey = cur_subr.get_resource_key();
SubRequestQueue& queue = environment.get_request_queue(rkey);
SubRequestQueue::iterator removable = find(queue.begin(), queue.end(), &cur_subr);
if(removable != queue.end()) queue.erase(removable);
}
}
} //~ for(over subrequests)
} //~ for(over requests)
}
// This function main role is to raise the requests of a thread which is trying to run.
// After finding those future requests that should not be future any more, each of their
// subrequests is added to the queue of a resource. Once put in the queue, their state
// is either ALLOCABLE or UNALLOCABLE.
// Remember that a thread may run only if all of its requests are either FUTURE,
// ALLOCATED or EXHAUSTED.
void
raise_new_requests(DynamicThread& running_thread, ConcreteEnvironment& environment)
{
// Go for all dynamic requests of this thread
Requests& reqs = running_thread.get_dynamic_requests();
for (Iseq<Requests::iterator> req_it = iseq(reqs); req_it; ++req_it)
{
DynamicRequest& cur_req = **req_it;
SubRequests& subreqs = (*req_it)->get_dynamic_subrequests();
// Add to the queue only requests passing from future to another state:
if(cur_req.get_state() == Request::state_future &&
cur_req.get_instant() == running_thread.get_elapsed_time())
{
for (Iseq<SubRequests::iterator> subr_it = iseq(subreqs); subr_it; ++subr_it)
{
// Do the proper adding:
DynamicSubRequest& cur_subr = **subr_it;
Environment::resource_key_t rkey = cur_subr.get_resource_key();
SubRequestQueue& queue = environment.get_request_queue(rkey);
queue.push_back(&cur_subr);
/// TODO: right here, right now we should call the resource policy to
/// update the queue. Updates the state of the subrequest depending
/// on the position in the queue, as explained before.
// Get the number of places for the corresponding resource
Resource& resource = *environment.get_resources().find(rkey)->second;
// See if the subrequest is allocable or unallocable, and set its state.
// It's important that the subrequest has already been added to the queue.
determine_subr_allocable_status(cur_req, cur_subr, resource, queue);
} //~ for(over subrequests)
} //~ if(request is future and is time to allocate it)
// A request may be ALLOCATED only when it is ALLOCABLE, i.e. when all its subrequests
// are ALLOCABLE. A request is allocated when all its subrequests are either TERMINATED
// or ALLOCATED, but at least one is ALLOCATED.
// Now, since the thread is willing to run, we must allocate the request if possible.
// All requests we treat are at the moment non-preemptable, so it is not permitted to temporarily
// preempt a resource to a thread to free a place and potentially allow one other thread
// to use that place. This is why we need to allocate requests (which means allocating
// resources to threads).
// If it is actually allocable, allocate it
switch(cur_req.get_state())
{
case Request::state_allocable:
for(Iseq<SubRequests::const_iterator> it_dsrs = const_iseq(subreqs); it_dsrs; ++it_dsrs)
{
DynamicSubRequest& subreq = **it_dsrs;
assert(subreq.get_state() == Request::state_allocable);
// Move this request up the queue, to the back of the allocated
// subrequests. This is mainly for display. :-)
// The rest of the queue sorting business is up to the resource policy.
Environment::resource_key_t rkey = subreq.get_resource_key();
SubRequestQueue& queue = environment.get_request_queue(rkey);
assert(queue.size() > 0);
SubRequestQueue::iterator alloc_it = queue.begin();
for(; (*alloc_it)->get_state() == Request::state_allocated; ++alloc_it)
assert(alloc_it != queue.end()); // We cannot reach the end without having found the current subr!
SubRequestQueue::iterator this_subreq = find(alloc_it, queue.end(), &subreq);
assert(this_subreq != queue.end());
swap(*alloc_it, *this_subreq);
subreq.set_state(Request::state_allocated);
}
break;
case Request::state_unallocable:
// If it does exist at least one unallocable request, the thread may not run!
running_thread.set_state(Schedulable::state_blocked);
break;
default:
break;
}//~ switch(request state)
} //~ for(over requests)
}
// The following loop determines how many places in the resource are
// really available for a thread, and how many places for a specific
// resource a thread really needs. Admittedly, it's a bit of a hack (in
// the way it's written, not conceptually!)
void
determine_subr_allocable_status(const DynamicRequest& req, DynamicSubRequest& subr,
const Resource& res, SubRequestQueue& queue)
{
unsigned int total_places = res.get_places();
unsigned int free_places = total_places;
unsigned int needed_places = 0;
for(Iseq<SubRequestQueue::const_iterator> queue_it = const_iseq(queue);
queue_it && free_places >= needed_places; queue_it++)
{
SubRequest& sr = **queue_it;
if(sr.get_state() == Request::state_allocated)
{
assert(free_places > 0); // Just to be sure...
free_places--;
}
// Okay, this won't win a beauty contest...
if(&sr.get_request() == &req)
needed_places++;
} //~ for(over subrequest queue)
// If the number of places this thread need for a resource are
// less or equal the places left free, it's allocable.
if(needed_places <= free_places)
subr.set_state(Request::state_allocable);
else
{
subr.set_state(Request::state_unallocable);
// Okay, this is difficult to understand, so read carefully:
// when we make a subrequest unallocable, it means that the
// whole request is unallocable. However, it may happen that
// there are other subrequests just marked allocable for
// a given resource. Since a request is atomic, either we're
// given *all* the places we asked for in a resource, or none.
// This doesn't affect the state for other subrequests on other
// resources, which may stay (atomically) allocable.
// (Maybe it was better to implement the number of "places"
// needed directly into subrequests, after all...)
for(Iseq<SubRequestQueue::iterator> queue_it = iseq(queue); queue_it; queue_it++)
{
DynamicSubRequest& x = static_cast<DynamicSubRequest&>(**queue_it);
if(&x.get_request() == &req && x.get_state() == Request::state_allocable)
x.set_state(Request::state_unallocable);
}
}
}
// This function checks if there are some allocable or unallocable
// requests that should change their state according to the previous
// step of the simulation. It also put previously blocked threads
// back into ready state if need arises.
void
look_for_mutant_request_states(ConcreteEnvironment& environment,
unsigned int& alive_threads)
{
// Now listening to: Testament's ``First Strike Is Still Deadly''
// The name of this function evokes mighty monsters from the abyss. In
// fact, it's what it actually does (okay, okay, pull the other one,
// it's got brass bells on).
// We start assuming that SubRequestsQueues are up-to-date
Resources& resources = environment.get_resources();
for(Iseq<Resources::iterator> res_it = iseq(resources); res_it; ++res_it)
{
Environment::resource_key_t rkey = res_it->first;
SubRequestQueue& queue = environment.get_request_queue(rkey);
unsigned int queue_pos = 0;
for(Iseq<SubRequestQueue::iterator> subr_it = iseq(queue); subr_it; ++subr_it, ++queue_pos)
{
DynamicSubRequest& subr = (DynamicSubRequest&) **subr_it;
DynamicRequest& req = subr.get_request();
Request::state prev_req_state = req.get_state();
// If a request is already allocated, we don't have to touch it!
if(prev_req_state == Request::state_allocated)
continue;
// Update the state of the subrequest, either from allocable to
// unallocable or vice-versa
determine_subr_allocable_status(req, subr, *res_it->second, queue);
// TODO: The following is a moderately expensive operation
// to do here. See if we can move it somewhere else.
// If a request changes state from allocable to unallocable,
// the corresponding thread should be blocked, and vice-versa
DynamicThread& thread = req.get_thread();
if(prev_req_state == Request::state_allocable &&
req.get_state() == Request::state_unallocable)
{
if(thread.get_state() != Schedulable::state_blocked)
alive_threads--;
thread.set_state(Schedulable::state_blocked);
}
else if(prev_req_state == Request::state_unallocable &&
req.get_state() == Request::state_allocable)
{
if(thread.get_state() == Schedulable::state_blocked)
alive_threads++;
thread.set_state(Schedulable::state_ready);
}
} //~ for(over subrequests in the queue)
} //~ for(over resources)
}
// ---------------------------------------------------------
//private constructor.
Scheduler::Scheduler()
: _ready_queue(NULL), _policy(NULL), _step_mutex()
{}
ReadyQueue*
Scheduler::get_ready_queue()
{
return _ready_queue;
}
CPUPolicy*
Scheduler::get_policy()
{
return _policy;
}
bool
Scheduler::step_forward(History& history, CPUPolicy& cpu_policy)
throw(UserInterruptException, MalformedPolicyException)
{
// This very method should be exclusive: no concurrent behaviour, from when we
// store a readyqueue and policy pointer for the user-policy to retrieve, to when
// the policy returns
Glib::Mutex::Lock lock (_step_mutex);
// NOTE: Be sure to read the *ORIGINAL* documentation in the design document for this method!
unsigned int alive_threads = 0; // Assume we've finished. Then prove me wrong.
int current_instant = history.get_size() - 1; /* They should be equivalent */
// Safe cast:
ConcreteHistory& concrete_history = static_cast<ConcreteHistory&>(history);
// Use an auto_ptr since we've some exceptions in the coming...
auto_ptr<ConcreteEnvironment> new_snapshot(new ConcreteEnvironment(concrete_history.get_last_environment()));
Threads all_threads;
DynamicThread* running_thread = NULL;
collect_threads(new_snapshot->get_processes(), all_threads);
// When a new instant cames, we could have to update the state of future
// threads to make them ready, or running threads to make them terminated
// We also update the other properties of the running thread, and keep a
// count of the alive threads
for (Iseq<Threads::iterator> it = iseq(all_threads); it; ++it)
{
DynamicThread& current = **it;
// 1. mark future threads as ready, if appropriate
if (current.get_state() == Schedulable::state_future)
{
Process& parent = current.get_process();
if ((long) parent.get_arrival_time() <= current_instant &&
parent.get_elapsed_time() == current.get_arrival_time())
current.set_state(Schedulable::state_ready);
}
// Save the current running thread for future usage, if it hasn't ended
// its allotted time
if (current.get_state() == Schedulable::state_running)
{
assert(running_thread == NULL); // ... only one thread must be running at a time.
running_thread = &current; // Even if we can change its state to terminate
// increasing the time elapsed of the running thread + process
// should be done here as the first thing, instead than
// directly after selecting them
if (current.get_total_cpu_time() - current.get_elapsed_time() > 0)
current.decrease_remaining_time();
// 4a. Look for exhausted requests for the running thread
update_allocated_requests(current, *new_snapshot);
// 2. mark threads that used all their allotted time as terminated,
// and put their requests as exhausted
if (current.get_total_cpu_time() - current.get_elapsed_time() == 0)
{
current.set_state(Schedulable::state_terminated);
terminate_all_requests_of(current, *new_snapshot);
}
}
// 3. check for simulation termination (we can directly use threads
// for this check, since processes' state is based upon threads' one)
Schedulable::state cur_state = current.get_state();
if ((cur_state & (Schedulable::state_blocked | Schedulable::state_terminated)) == 0 &&
(current.get_process().get_state() != Schedulable::state_terminated)) // check for holes
{
alive_threads++;
}
} //~ for over all_threads
// ?. Time to see if some unallocable request became allocable, so
// the thread can pass from blocked to ready state, or the other way
// round
look_for_mutant_request_states(*new_snapshot, alive_threads);
// Now if the simulation ended we append the newly
// created environment and return false
if (alive_threads == 0) goto final_cleanup;
// Use the CPU Policy to sort the ready queue, and manage
// requests for the newly selected running thread
try
{
// Temporarily set the _ready_queue param and the _policy one for
// use from external plugins. In fact, this is how get_ready_queue()
// acts as a callback function.
_policy = &cpu_policy;
_ready_queue = &new_snapshot->get_sorted_queue();
// Determine if the policy is pre_emptive, and what time slice it uses
bool preemptible_policy = cpu_policy.is_pre_emptive();
int time_slice = cpu_policy.get_time_slice();
// ?. See if old running_thread has to be put to ready state
// This happens when the policy makes use of preemptability by
// priority, or when a time slice ended
if (running_thread != NULL && running_thread->get_state() == Schedulable::state_running &&
(preemptible_policy ||
time_slice == current_instant - running_thread->get_last_acquisition()) )
{
running_thread->set_state(Schedulable::state_ready);
running_thread->set_last_release(current_instant);
}
// ?. Ask the policy to sort the queue. If we must select
// a new thread and it can't run for some reason (it goes blocked, or
// terminates), then we remove it from the built ReadyQueue and
// check if the next one can run.
prepare_ready_queue(*new_snapshot, all_threads);
if(_ready_queue->size() > 0) cpu_policy.sort_queue();
// If we don't have to select a new running thread, because the old one didn't
// have to release the CPU, our work may end here:
// * if the current running thread doesn't block, we can perform the final cleanup,
// since the queue is already sorted
// * else we've to select another running thread, so we continue down in the method
if(running_thread != NULL && running_thread->get_state() == Schedulable::state_running)
{
raise_new_requests(*running_thread, *new_snapshot);
if(running_thread->get_state() != Schedulable::state_blocked)
goto final_cleanup;
else
{
running_thread->set_last_release(current_instant);
running_thread = NULL;
alive_threads--;
// Proceed to select a new running thread, below
}
}
bool we_ve_got_a_winner = false;
while(_ready_queue->size() > 0 && !we_ve_got_a_winner) // No sense in trying to schedule something that isn't there
{
// Else, it's time to see if the first candidate can run
DynamicThread& candidate = (DynamicThread&) _ready_queue->get_item_at(0);
candidate.set_last_acquisition(current_instant);
// If a thread has been created with duration "0" (silly, but possible);
// if you think it's safe, you can change this condition with an assert
// and delete the body of the ``if''.
if(candidate.get_total_cpu_time() - candidate.get_elapsed_time() == 0)
{
candidate.set_last_release(current_instant);
candidate.set_state(Schedulable::state_terminated);
// Put every request of this thread to state_exhausted
terminate_all_requests_of(candidate, *new_snapshot);
_ready_queue->erase_first();
alive_threads--;
continue;
}
// Now we check if our candidate blocks on a new request
raise_new_requests(candidate, *new_snapshot);
if(candidate.get_state() != Schedulable::state_blocked)
// If we got here, our candidate can run
we_ve_got_a_winner /*!hurrah!*/ = true;
else // if blocked, we've to remove it from the ready queue
{
_ready_queue->erase_first();
alive_threads--;
}
}
// ?. Finally select the new thread (if appropriate); now we're sure
// the one we have can run
if (we_ve_got_a_winner)
{
// Fix fields of running thread
DynamicThread& new_running = (DynamicThread&) _ready_queue->get_item_at(0);
new_running.set_state(Schedulable::state_running);
}
}
catch (const CPUPolicyException& e)
{
// Reset values that the policy doesn't need anymore
_policy = NULL;
_ready_queue = NULL;
// Do we need to update/reset something else?
// Going up unwinding the stack, tell:
// - the user that the policy sucks
// - SimulationController that everything stopped
throw;
}
final_cleanup:
// append the new snapshot...
// ...and remember to release the auto_ptr!
concrete_history.append_new_environment(new_snapshot.release());
// Reset values that the policy doesn't need anymore
_policy = NULL;
_ready_queue = NULL;
// If we got there, a step has been performed.
// Return if we can perform another step.
return alive_threads != 0;
}