// src/backend/scheduler.cc - Copyright 2005, 2006, University // of Padova, dept. of Pure and Applied // Mathematics // // This file is part of SGPEMv2. // // This is free software; you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation; either version 2 of the License, or // (at your option) any later version. // // SGPEMv2 is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with SGPEMv2; if not, write to the Free Software // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA #include "concrete_environment.hh" #include "concrete_history.hh" #include "policy.hh" #include "scheduler.hh" #include "user_interrupt_exception.hh" // Do not include full template definition in the header file #include "singleton.tcc" #include #include #include using namespace std; using namespace sgpem; // Explicit template instantiation to allow to export symbols from the DSO. template class SG_DLLEXPORT Singleton; // ------------------ Static helper functions -------------- // Collects all threads of an environment into a single vector static void collect_threads(const std::vector& procs, std::vector& collected_threads) { typedef std::vector Processes; typedef std::vector Threads; collected_threads.clear(); for(Processes::const_iterator it1 = procs.begin(); it1 != procs.end(); it1++) { const Threads& ts = ((DynamicProcess&) **it1).get_dynamic_threads(); collected_threads.insert(collected_threads.end(), ts.begin(), ts.end()); } } static void prepare_ready_queue(ConcreteEnvironment& snapshot, std::vector& all_threads) { typedef std::vector Threads; ReadyQueue& queue = snapshot.get_sorted_queue(); assert(queue.size() == 0); for(Threads::const_iterator it = all_threads.begin(); it != all_threads.end(); it++) { if((*it)->get_state() == Schedulable::state_ready) queue.append(**it); } } // --------------------------------------------------------- //private constructor. The parameter is discarded Scheduler::Scheduler() : _ready_queue(NULL), _policy(NULL) {} ReadyQueue* Scheduler::get_ready_queue() { // FIXME return the correct queue accordingly to the value returned by Policy::wants() return _ready_queue; } /** \note E' fondamentale che questo metodo memorizzi localmente qualora la politica attuale sia a prerilascio o meno, e la durata del quanto di tempo, in quanto la politica e' libera di variare questi parametri a piacere durante l'esecuzione della simulazione */ void Scheduler::reset_status() { // DEPRECATED (?) } Policy* Scheduler::get_policy() { return _policy; } void Scheduler::step_forward(History& history, Policy& cpu_policy) throw(UserInterruptException) { // This very method should be exclusive: no concurrent behaviour, from when we // store a readyqueue and policy pointer for the user-policy to retrieve, to when // the policy returns // TODO: restrict this area to maximise parallelism Glib::Mutex::Lock lock(_mutex); // NOTE: Be sure to read the *ORIGINAL* documentation in the design document for this method! // FIXME: handle me! I'm not just a pretty boolean, I want to be *USED*! *EXPLOITED*! // *RAPED*! *MAKE ME BLEED*! bool simulation_ended = true; // Assume we've finished. Then prove me wrong. ConcreteHistory& concrete_history = (ConcreteHistory&) history; // Use an auto_ptr since we've some exceptions in the coming... auto_ptr new_snapshot(new ConcreteEnvironment(concrete_history.get_last_environment())); typedef std::vector Processes; typedef std::vector Requests; typedef std::vector SubRequests; typedef std::vector Threads; Threads all_threads; DynamicThread* running_thread = NULL; collect_threads(new_snapshot->get_processes(), all_threads); // designer + implementer (Matteo) comment follows: for(Threads::iterator it = all_threads.begin(); it != all_threads.end(); it++) { DynamicThread& current = **it; // 1. mark future threads as ready, if appropriate if(current.get_state() == Schedulable::state_future) { Process& parent = current.get_process(); if(parent.get_elapsed_time() == current.get_arrival_time()) current.set_state(Schedulable::state_ready); } // Save the current running thread for future usage, if it hasn't ended // its allotted time if(current.get_state() == Schedulable::state_running) { // increasing the time elapsed of the running thread + process // should be done here as the first thing, instead than // directly after selecting them running_thread->decrease_remaining_time(); running_thread = ¤t; // Even if we change its state to terminated // 2. mark threads that used all their allotted time as terminated if(current.get_total_cpu_time() - current.get_elapsed_time() == 0) current.set_state(Schedulable::state_terminated); } // 3. check for simulation termination (we can directly use threads // for this check, since processes' state is based upon threads' one) if( /* we still think that */ simulation_ended && (current.get_state() & (Schedulable::state_blocked | Schedulable::state_terminated)) == 0) simulation_ended = false; } // 4a. Look for exhausted requests for the running thread if(running_thread != NULL) { Requests& reqs = running_thread->get_dynamic_requests(); bool running_terminated = running_thread->get_state() == Schedulable::state_terminated; for(Requests::iterator it = reqs.begin(); it != reqs.end(); it++) { DynamicRequest& rq = **it; if(rq.get_state() == Request::state_allocated) /* decrease remaining time for request */; // ASK MARCO : can we implement request::decrease_remaining_time() as // a function that calls decrease_remaining_time() on all its subrequests, // that in turn never go with a remaining time < 0? // If the running thread terminated uncoditionally put them in exhausted state if(running_terminated /* || rq.get_remaining_time() == 0 */ ) rq.set_state(Request::state_exhausted); } // FIXME we lack a way to tell and/or remember for how // much a subrequest has been being fulfilled // THIS MEANS this part is NOT complete // We should check if a request has been fulfilled // FIXME If a request was being fulfilled to the running thread, // we should decrease the request remaining time here. // This is why we kept a ref to the old running thread, // even if it was terminated } //~ if running_thread != NULL // ---------- FIXME ---------------- // What to do now if the simulation ended? /* / * / * / * (I'M HERE) < * * * * * * * * * * * * \ * \ * \ * * (is it visible enough for you?) */ prepare_ready_queue(*new_snapshot, all_threads); try { // Temporarily set the _ready_queue param and the _policy one for // use from external plugins _policy = &cpu_policy; _ready_queue = &new_snapshot->get_sorted_queue(); // ?. Use the policy to sort the queue // FIXME: how does it get the queue? cpu_policy.sort_queue(); } catch(UserInterruptException& e) { // Reset values that the policy doesn't need anymore _policy = NULL; _ready_queue = NULL; // Do we need to update/reset something else? // Going up unwinding the stack, tell: // - the user that the policy sucks // - SimulationController that everything stopped throw; } // append the new snapshot... // ...and remember to release the auto_ptr! concrete_history.append_new_environment(new_snapshot.release()); // Reset values that the policy doesn't need anymore _policy = NULL; _ready_queue = NULL; }