// src/backend/scheduler.cc - Copyright 2005, 2006, University // of Padova, dept. of Pure and Applied // Mathematics // // This file is part of SGPEMv2. // // This is free software; you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation; either version 2 of the License, or // (at your option) any later version. // // SGPEMv2 is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with SGPEMv2; if not, write to the Free Software // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA // DISCLAIMER FOR THE RAMPANT CODER: \\ // ----------------------------------------------------\\ // ``If you touch this code, your ass is grass, \\ // and I'm the lawnmover.'' \\ // -- David Cutler \\ // ----------------------------------------------------\\ #include "concrete_environment.hh" #include "concrete_history.hh" #include #include #include // Do not include full template definition in the header file #include #include #include #include #include #include using namespace std; using namespace sgpem; // Explicit template instantiation to allow to export symbols from the DSO. template class SG_DLLEXPORT Singleton; typedef std::vector Processes; typedef std::vector Requests; typedef std::vector SubRequests; typedef std::vector Threads; typedef Environment::Resources Resources; typedef Environment::SubRequestQueue SubRequestQueue; // ------------------ Static helper functions -------------- inline bool is_running(const Thread* running_thread); static void collect_threads(const std::vector& procs, Threads& collected_threads); static void prepare_ready_queue(ConcreteEnvironment& snapshot, const Threads& all_threads); static void terminate_all_requests_of(DynamicThread& thread, ConcreteEnvironment& environment); static void update_allocated_requests(DynamicThread& running_thread, ConcreteEnvironment& environment); static void raise_new_requests(DynamicThread& running_thread, ConcreteEnvironment& environment, ResourcePolicy& resource_policy); static void look_for_mutant_request_states(ConcreteEnvironment& environment, unsigned int& alive_threads); static void determine_subr_allocable_status(const DynamicRequest& req, DynamicSubRequest& subr, const Resource& res, SubRequestQueue& queue); static void determine_subr_allocable_status(const Resource& res, const SubRequestQueue& queue); // --------------------------------------------------------- bool is_running(const Thread* running_thread) { return running_thread != NULL && running_thread->get_state() == Schedulable::state_running; } // Collects all threads of an environment into a single vector void collect_threads(const std::vector& procs, Threads& collected_threads) { collected_threads.clear(); for (Iseq::const_iterator> seq = iseq(procs); seq; ++seq) { const Threads& ts = ((DynamicProcess&) **seq).get_dynamic_threads(); collected_threads.insert(collected_threads.end(), ts.begin(), ts.end()); } } void prepare_ready_queue(ConcreteEnvironment& snapshot, const Threads& all_threads) { ReadyQueue& queue = snapshot.get_sorted_queue(); assert(queue.size() == 0); for (Iseq seq = iseq(all_threads); seq; ++seq) { if ((*seq)->get_state() == Schedulable::state_ready) queue.append(**seq); } } // When a thread terminates, unconditionally kill all its requests void terminate_all_requests_of(DynamicThread& thread, ConcreteEnvironment& environment) { Requests& reqs = thread.get_dynamic_requests(); for (Iseq r_it = iseq(reqs); r_it; ++r_it) { SubRequests& subreqs = (*r_it)->get_dynamic_subrequests(); for (Iseq s_it = iseq(subreqs); s_it; ++s_it) { (*s_it)->set_state(Request::state_exhausted); Environment::resource_key_t rkey = (*s_it)->get_resource_key(); SubRequestQueue& queue = environment.get_request_queue(rkey); SubRequestQueue::iterator removable = find(queue.begin(), queue.end(), *s_it); if(removable != queue.end()) queue.erase(removable); } } } // For the current thread, see if there are requests that are exhausted void update_allocated_requests(DynamicThread& running_thread, ConcreteEnvironment& environment) { // Go for all dynamic requests of this thread Requests& reqs = running_thread.get_dynamic_requests(); for (Iseq req_it = iseq(reqs); req_it; ++req_it) { SubRequests& cur_request = (*req_it)->get_dynamic_subrequests(); for (Iseq subr_it = iseq(cur_request); subr_it; ++subr_it) { DynamicSubRequest& cur_subr = **subr_it; if(cur_subr.get_state() == Request::state_allocated) { cur_subr.decrease_remaining_time(); if(cur_subr.get_remaining_time() == 0) { cur_subr.set_state(Request::state_exhausted); Environment::resource_key_t rkey = cur_subr.get_resource_key(); SubRequestQueue& queue = environment.get_request_queue(rkey); SubRequestQueue::iterator removable = find(queue.begin(), queue.end(), &cur_subr); if(removable != queue.end()) queue.erase(removable); } } } //~ for(over subrequests) } //~ for(over requests) } // This function main role is to raise the requests of a thread which is trying to run. // After finding those future requests that should not be future any more, each of their // subrequests is added to the queue of a resource. Once put in the queue, their state // is either ALLOCABLE or UNALLOCABLE. // Remember that a thread may run only if all of its requests are either FUTURE, // ALLOCATED or EXHAUSTED. void raise_new_requests(DynamicThread& running_thread, ConcreteEnvironment& environment, ResourcePolicy& resource_policy) { // Go for all dynamic requests of this thread Requests& reqs = running_thread.get_dynamic_requests(); for (Iseq req_it = iseq(reqs); req_it; ++req_it) { DynamicRequest& cur_req = **req_it; SubRequests& subreqs = (*req_it)->get_dynamic_subrequests(); // Add to the queue only requests passing from future to another state: if(cur_req.get_state() == Request::state_future && cur_req.get_instant() == running_thread.get_elapsed_time()) { for (Iseq subr_it = iseq(subreqs); subr_it; ++subr_it) { // Do the proper adding: DynamicSubRequest& cur_subr = **subr_it; Environment::resource_key_t rkey = cur_subr.get_resource_key(); SubRequestQueue& queue = environment.get_request_queue(rkey); queue.push_back(&cur_subr); /// TODO: right here, right now we should call the resource policy to /// update the queue. Updates the state of the subrequest depending /// on the position in the queue, as explained before. resource_policy.enforce(environment, queue, cur_subr); // Get the number of places for the corresponding resource Resource& resource = *environment.get_resources().find(rkey)->second; // WARNING: adding a new request may require updating the status of ALL other // requests in the queue determine_subr_allocable_status(resource, queue); // after that, check if it is globally allocable. // See if the subrequest is allocable or unallocable, and set its state. // It's important that the subrequest has already been added to the queue. // determine_subr_allocable_status(cur_req, cur_subr, resource, queue); } //~ for(over subrequests) } //~ if(request is future and is time to allocate it) // A request may be ALLOCATED only when it is ALLOCABLE, i.e. when all its subrequests // are ALLOCABLE. A request is allocated when all its subrequests are either TERMINATED // or ALLOCATED, but at least one is ALLOCATED. // Now, since the thread is willing to run, we must allocate the request if possible. // All requests we treat are at the moment non-preemptable, so it is not permitted to temporarily // preempt a resource to a thread to free a place and potentially allow one other thread // to use that place. This is why we need to allocate requests (which means allocating // resources to threads). // If it is actually allocable, allocate it switch(cur_req.get_state()) { case Request::state_allocable: { const SubRequests& const_subreqs = subreqs; for(Iseq it_dsrs = iseq(const_subreqs); it_dsrs; ++it_dsrs) { DynamicSubRequest& subreq = **it_dsrs; assert(subreq.get_state() == Request::state_allocable); /* // Move this request up the queue, to the back of the allocated // subrequests. This is mainly for display. :-) // The rest of the queue sorting business is up to the resource policy. Environment::resource_key_t rkey = subreq.get_resource_key(); SubRequestQueue& queue = environment.get_request_queue(rkey); assert(queue.size() > 0); SubRequestQueue::iterator alloc_it = queue.begin(); for(; (*alloc_it)->get_state() == Request::state_allocated; ++alloc_it) assert(alloc_it != queue.end()); // We cannot reach the end without having found the current subr! SubRequestQueue::iterator this_subreq = find(alloc_it, queue.end(), &subreq); assert(this_subreq != queue.end()); swap(*alloc_it, *this_subreq); */ subreq.set_state(Request::state_allocated); } } break; case Request::state_unallocable: // If it does exist at least one unallocable request, the thread may not run! running_thread.set_state(Schedulable::state_blocked); break; default: break; }//~ switch(request state) } //~ for(over requests) } // The following loop determines how many places in the resource are // really available for a thread, and how many places for a specific // resource a thread really needs. Admittedly, it's a bit of a hack (in // the way it's written, not conceptually!) void determine_subr_allocable_status(const DynamicRequest& req, DynamicSubRequest& subr, const Resource& res, SubRequestQueue& queue) { unsigned int total_places = res.get_places(); unsigned int free_places = total_places; unsigned int needed_places = 0; unsigned int position_in_queue = 0; bool too_far_in_the_queue = false; const SubRequestQueue& const_queue = queue; for(Iseq queue_it = iseq(const_queue); queue_it && free_places >= needed_places; queue_it++, position_in_queue++) { SubRequest& sr = **queue_it; if(sr.get_state() == Request::state_allocated) { assert(free_places > 0); // Just to be sure... free_places--; } // Okay, this won't win a beauty contest... if(&sr.get_request() == &req) needed_places++; // Well, and what about this, then? if(&sr == &subr && position_in_queue + 1 > total_places) too_far_in_the_queue = true; } //~ for(over subrequest queue) // If the number of places this thread need for a resource are // less or equal the places left free, it's allocable. if(needed_places <= free_places && !too_far_in_the_queue) subr.set_state(Request::state_allocable); else { subr.set_state(Request::state_unallocable); /* // Okay, this is difficult to understand, so read carefully: // when we make a subrequest unallocable, it means that the // whole request is unallocable. However, it may happen that // there are other subrequests just marked allocable for // a given resource. Since a request is atomic, either we're // given *all* the places we asked for in a resource, or none. // This doesn't affect the state for other subrequests on other // resources, which may stay (atomically) allocable. // (Maybe it was better to implement the number of "places" // needed directly into subrequests, after all...) for(Iseq queue_it = iseq(queue); queue_it; queue_it++) { DynamicSubRequest& x = static_cast(**queue_it); if(&x.get_request() == &req && x.get_state() == Request::state_allocable) x.set_state(Request::state_unallocable); } */ } } // The following loop updates the states of the subrequests depending // on their position in the queue void determine_subr_allocable_status(const Resource& res, const SubRequestQueue& queue) { unsigned int total_places = res.get_places(); unsigned int position_in_queue = 0; for(Iseq queue_it = iseq(queue); queue_it; queue_it++, position_in_queue++) { DynamicSubRequest& sr = (DynamicSubRequest&) **queue_it; if (sr.get_state() == Request::state_allocated) continue; if(position_in_queue + 1 > total_places) { sr.set_state(Request::state_unallocable); // Kludge: sr.get_request().get_thread().set_state(Schedulable::state_blocked); } else sr.set_state(Request::state_allocable); } //~ for(over subrequest queue) } // This function checks if there are some allocable or unallocable // requests that should change their state according to the previous // step of the simulation. It also put previously blocked threads // back into ready state if need arises. void look_for_mutant_request_states(ConcreteEnvironment& environment, unsigned int& alive_threads) { // Now listening to: Testament's ``First Strike Is Still Deadly'' // The name of this function evokes mighty monsters from the abyss. In // fact, it's what it actually does (okay, okay, pull the other one, // it's got brass bells on). // We start assuming that SubRequestsQueues are up-to-date Resources& resources = environment.get_resources(); for(Iseq res_it = iseq(resources); res_it; ++res_it) { Environment::resource_key_t rkey = res_it->first; SubRequestQueue& queue = environment.get_request_queue(rkey); unsigned int queue_pos = 0; for(Iseq subr_it = iseq(queue); subr_it; ++subr_it, ++queue_pos) { DynamicSubRequest& subr = (DynamicSubRequest&) **subr_it; DynamicRequest& req = subr.get_request(); Request::state prev_req_state = req.get_state(); // If a request is already allocated, we don't have to touch it! if(prev_req_state == Request::state_allocated) continue; // Update the state of the subrequest, either from allocable to // unallocable or vice-versa // determine_subr_allocable_status(req, subr, *res_it->second, queue); determine_subr_allocable_status(*res_it->second, queue); // TODO: The following is a moderately expensive operation // to do here. See if we can move it somewhere else. // If a request changes state from allocable to unallocable, // the corresponding thread should be blocked, and vice-versa DynamicThread& thread = req.get_thread(); if(prev_req_state == Request::state_allocable && req.get_state() == Request::state_unallocable) { if(thread.get_state() != Schedulable::state_blocked) alive_threads--; thread.set_state(Schedulable::state_blocked); } else if(prev_req_state == Request::state_unallocable && req.get_state() == Request::state_allocable) { if(thread.get_state() == Schedulable::state_blocked) alive_threads++; thread.set_state(Schedulable::state_ready); } } //~ for(over subrequests in the queue) } //~ for(over resources) } // --------------------------------------------------------- //private constructor. Scheduler::Scheduler() : _ready_queue(NULL), _policy(NULL), _step_mutex() {} ReadyQueue* Scheduler::get_ready_queue() { return _ready_queue; } CPUPolicy* Scheduler::get_policy() { return _policy; } bool Scheduler::step_forward(History& history, CPUPolicy& cpu_policy, ResourcePolicy& resource_policy) throw(UserInterruptException, MalformedPolicyException) { // This very method should be exclusive: no concurrent behaviour, from when we // store a readyqueue and policy pointer for the user-policy to retrieve, to when // the policy returns Glib::Mutex::Lock lock (_step_mutex); // NOTE: Be sure to read the *ORIGINAL* documentation in the design document for this method! unsigned int alive_threads = 0; // Assume we've finished. Then prove me wrong. int current_instant = history.get_size() - 1; /* They should be equivalent */ // Safe cast: ConcreteHistory& concrete_history = static_cast(history); // Use an auto_ptr since we've some exceptions in the coming... auto_ptr new_snapshot(new ConcreteEnvironment(concrete_history.get_environment_at(current_instant))); Threads all_threads; DynamicThread* running_thread = NULL; collect_threads(new_snapshot->get_processes(), all_threads); // The first thing we've to do is to update the state of the old // running thread, if there's one. for (Iseq it = iseq(all_threads); it; ++it) { DynamicThread& current = **it; // Save the current running thread for future usage, if it hasn't ended // its allotted time if (current.get_state() == Schedulable::state_running) { running_thread = ¤t; // Even if we can change its state to terminate // increasing the time elapsed of the running thread + process // should be done here as the first thing, instead than // directly after selecting them if (current.get_total_cpu_time() - current.get_elapsed_time() > 0) current.decrease_remaining_time(); // 4a. Look for exhausted requests for the running thread update_allocated_requests(current, *new_snapshot); // 2. mark threads that used all their allotted time as terminated, // and put their requests as exhausted if (current.get_total_cpu_time() - current.get_elapsed_time() == 0) { current.set_state(Schedulable::state_terminated); current.set_last_release(current_instant); terminate_all_requests_of(current, *new_snapshot); running_thread = NULL; } // if we found the running thread, there isn't another one, // so we can safely exit the for loop. break; } //~ if state == running } //~ for over all threads // When a new instant cames, we could have to update the state of future // threads to make them ready. We also keep a count of alive threads for (Iseq it = iseq(all_threads); it; ++it) { DynamicThread& current = **it; // 1. mark future threads as ready, if appropriate if (current.get_state() == Schedulable::state_future) { Process& parent = current.get_process(); if ((long) parent.get_arrival_time() <= current_instant && parent.get_elapsed_time() == current.get_arrival_time()) current.set_state(Schedulable::state_ready); } // 3. check for simulation termination (we can directly use threads // for this check, since processes' state is based upon threads' one) Schedulable::state cur_state = current.get_state(); if ((cur_state & (Schedulable::state_blocked | Schedulable::state_terminated)) == 0 && (current.get_process().get_state() & (Schedulable::state_terminated | Schedulable::state_blocked)) == 0) // check for holes { alive_threads++; } } //~ for over all_threads // ?. Time to see if some unallocable request became allocable, so // the thread can pass from blocked to ready state, or the other way // round look_for_mutant_request_states(*new_snapshot, alive_threads); // Now if the simulation ended we append the newly // created environment and return false if (alive_threads == 0) goto final_cleanup; // Use the CPU Policy to sort the ready queue, and manage // requests for the newly selected running thread try { // Temporarily set the _ready_queue param and the _policy one for // use from external plugins. In fact, this is how get_ready_queue() // acts as a callback function. _policy = &cpu_policy; _ready_queue = &new_snapshot->get_sorted_queue(); // Determine if the policy is pre_emptive, and what time slice it uses bool preemptive_policy = cpu_policy.is_pre_emptive(); int time_slice = cpu_policy.get_time_slice(); // ?. See if old running_thread has to be put to ready state // This happens when a time slice ends if (is_running(running_thread) && time_slice > 0 && // A process can be preempted every n-th time-slice, so we use the modulo operator: (current_instant - running_thread->get_last_acquisition()) % time_slice == 0) { running_thread->set_state(Schedulable::state_ready); running_thread->set_last_release(current_instant); } prepare_ready_queue(*new_snapshot, all_threads); // If the policy is preemptible, and we still have a running thread, // add it to the queue. if(preemptive_policy && is_running(running_thread)) _ready_queue->append(*running_thread); // ?. Ask the policy to sort the queue. If we must select // a new thread and it can't run for some reason (it goes blocked, or // terminates), then we remove it from the built ReadyQueue and // check if the next one can run (see while loop further below). if(_ready_queue->size() > 0) cpu_policy.sort_queue(); // If we don't have to select a new running thread, because the old one didn't // have to release the CPU, our work may end here: // * if the current running thread doesn't block, we can perform the final cleanup, // since the queue is already sorted // * else we've to select another running thread, so we continue down in the method if( // Non-preemptive policy, not ended time-slice: (!preemptive_policy && is_running(running_thread)) || // Pre-emptive policy, running thread still the first of the queue. // Note: if is_running(running_thread) == true, then _ready_queue->size() > 0 (preemptive_policy && is_running(running_thread) && &_ready_queue->get_item_at(0) == running_thread) ) { raise_new_requests(*running_thread, *new_snapshot, resource_policy); if(running_thread->get_state() != Schedulable::state_blocked) goto final_cleanup; else { running_thread->set_last_release(current_instant); running_thread = NULL; alive_threads--; // Proceed to select a new running thread, below } } bool we_ve_got_a_winner = false; while(_ready_queue->size() > 0 && !we_ve_got_a_winner) // No sense in trying to schedule something that isn't there { // Else, it's time to see if the first candidate can run DynamicThread& candidate = (DynamicThread&) _ready_queue->get_item_at(0); // If a thread has been created with duration "0" (silly, but possible); // if you think it's safe, you can change this condition with an assert // and delete the body of the ``if''. if(candidate.get_total_cpu_time() - candidate.get_elapsed_time() == 0) { candidate.set_last_acquisition(current_instant); candidate.set_last_release(current_instant); candidate.set_state(Schedulable::state_terminated); // Put every request of this thread to state_exhausted terminate_all_requests_of(candidate, *new_snapshot); _ready_queue->erase_first(); alive_threads--; continue; } // Now we check if our candidate blocks on a new request raise_new_requests(candidate, *new_snapshot, resource_policy); if(candidate.get_state() != Schedulable::state_blocked) // If we got here, our candidate can run we_ve_got_a_winner /*!hurrah!*/ = true; else // if blocked, we've to remove it from the ready queue { _ready_queue->erase_first(); alive_threads--; } } // ?. Finally select the new thread (if appropriate); now we're sure // the one we have can run if (we_ve_got_a_winner) { // Fix fields of running thread DynamicThread& new_running = (DynamicThread&) _ready_queue->get_item_at(0); _ready_queue->erase_first(); new_running.set_state(Schedulable::state_running); // If the new running is different from the old one, // remember to release our old pal, and to acquire our // new runner. if(&new_running != running_thread) { if(running_thread != NULL) { running_thread->set_state(Schedulable::state_ready); running_thread->set_last_release(current_instant); } new_running.set_last_acquisition(current_instant); } } } catch (const CPUPolicyException& e) { // Reset values that the policy doesn't need anymore _policy = NULL; _ready_queue = NULL; // Do we need to update/reset something else? // Going up unwinding the stack, tell: // - the user that the policy sucks // - SimulationController that everything stopped throw; } final_cleanup: // append the new snapshot... // ...and remember to release the auto_ptr! concrete_history.append_new_environment(new_snapshot.release()); // Reset values that the policy doesn't need anymore _policy = NULL; _ready_queue = NULL; // If we got there, a step has been performed. // Return if we can perform another step. return alive_threads != 0; }