From 6a1e30b0a28e93a57f5abd6c0a71bf050533dfd9 Mon Sep 17 00:00:00 2001 From: matrevis Date: Thu, 10 Aug 2006 23:39:13 +0000 Subject: [PATCH] - Reconstructed step forward. There is still some visible bug in the imlpementation, so this version should not be considered definitive. please give me an other day to check it. git-svn-id: svn://svn.gna.org/svn/sgpemv2/trunk@840 3ecf2c5c-341e-0410-92b4-d18e462d057c --- src/backend/scheduler.cc | 1332 +++++++++++++++++++------------------- src/backend/scheduler.hh | 9 +- 2 files changed, 662 insertions(+), 679 deletions(-) diff --git a/src/backend/scheduler.cc b/src/backend/scheduler.cc index 6da165f..0857e7e 100644 --- a/src/backend/scheduler.cc +++ b/src/backend/scheduler.cc @@ -23,7 +23,6 @@ #include "cpu_policy.hh" #include "scheduler.hh" #include "user_interrupt_exception.hh" -#include "malformed_policy_exception.hh" // Do not include full template definition in the header file #include "singleton.tcc" @@ -31,7 +30,6 @@ #include #include -#include #include using namespace std; @@ -40,436 +38,16 @@ using namespace sgpem; // Explicit template instantiation to allow to export symbols from the DSO. template class SG_DLLEXPORT Singleton; - typedef std::vector Processes; typedef std::vector Requests; typedef std::vector SubRequests; typedef std::vector Threads; - - -class Extender -{ -public: - - enum walk_purpose - { - walk_to_sweep = 0, - walk_to_advance = 1, - walk_to_update = 2, - walk_to_allocate_cpu = 3, - walk_to_allocate_resources = 4 - }; - - Extender(auto_ptr & _env, walk_purpose _walk, unsigned int _front) - : env(_env), walk(_walk), front(_front) - {} - - /// \brief Manages a single SubRequest object, depending on its state. - /// Zero step: any -> terminated. Added to cope with malformed threads. - /// First step: allocated -> terminated. - /// Second step: non-allocable -> allocable. - /// Third step: allocable -> allocated, or future -> allocated. - /// - /// The name and the structure of this method are ugly. They are inherited - /// from the whole visitor's structure, anyway we could simply switch on of - /// the state the SubRequest obejct, and we could (should?) factor out the - /// operations which check if the request is allocable or not, depending on - /// the queue position. Anyway, this factoring may lead to code dispersion. - /// I think it is better to hold the computational core in one single place. - void - extendSubRequest(DynamicSubRequest* sp) - { - DynamicSubRequest& s = *sp; - switch (walk) - { - /// Terminates directly the subrequest - case walk_to_sweep: - { - s.set_state(Request::state_exhausted); - /// Remove the subrequest (pointer) from the queue. - bool found = false; - typedef Environment::SubRequestQueue SubRequestQueue; - SubRequestQueue& queue = env->get_request_queue(s.get_resource_key()); - SubRequestQueue::iterator it = queue.begin(); - for (; !found && it != queue.end(); it++) - if ((*it) == sp) - { - found = true; - queue.erase(it); - } - break; - } - /// Updates the state of an ALLOCATED subrequest, decreasing appropriate - /// counters, and checks if it become TERMINATED. In the latter case the - /// function finds the position of the subrequest (pointer) in the - /// requested resource's queue and removes it. - case walk_to_advance: - { - if (s.get_state() != Request::state_allocated) - break; - /// Decrease remaining time, since the resource has been used. - s.decrease_remaining_time(); - /// Check for termination. - if (s.get_remaining_time() == 0) - { - s.set_state(Request::state_exhausted); - /// Remove the subrequest (pointer) from the queue. - bool found = false; - typedef Environment::SubRequestQueue SubRequestQueue; - SubRequestQueue& queue = env->get_request_queue(s.get_resource_key()); - SubRequestQueue::iterator it = queue.begin(); - for (; !found && it != queue.end(); it++) - { - if ((*it) == &s) - { - found = true; - queue.erase(it); - } - } - } - break; - } - - /// Updates the state of a non-ALLOCATED subrequest, in case it become - /// ALLOCABLE or UNALLOCABLE, which may happen only when a resource has - /// been released. - /// We could jump this check if no resource were released. - /// It finds the position of the subrequest (pointer) in the requested - /// resource's queue. If the position is within the places of the - /// resource, the subrequest is ALLOCABLE. - /// - case walk_to_update: - { - if (s.get_state() != Request::state_unallocable - && s.get_state() != Request::state_allocable) - break; - /// The subrequest is in the queue for sure. Let's find it! - /// I really need an integer for this operation. - uint position = 0; - Environment::SubRequestQueue& queue = env->get_request_queue(s.get_resource_key()); - Environment::SubRequestQueue::iterator it = queue.begin(); - while (it != queue.end()) - { - if (*it == &s) - break; - it++; - position++; - } - /// Watch out: in a resource with 2 places, 0 and 1 are valid queue - /// positions, 2 is right one place out. - s.set_state(position >= env->get_resources().find(s.get_resource_key())->second->get_places() ? - Request::state_unallocable : Request::state_allocable); - break; - } - - /// Updates the state of a FUTURE subrequest when the time has come - /// for it to be raised, setting it as allocable it if it is the case, - /// or blocking it. Enqueues the subrequest (pointer) at the end of the - /// requested resource's queue. - /// The resource policy should be called to manage the queue. - /// If the position is within the places of the resource, the subrequest - /// is ALLOCABLE, otherwise it is NON-ALLOCABLE. - case walk_to_allocate_cpu: - { - if (s.get_state() != Request::state_future) - break; - Environment::SubRequestQueue& queue = env->get_request_queue(s.get_resource_key()); - /// Enqueue the subrequest at the back of the queue. - queue.push_back(&s); - - /// TODO: right here, right now we should call the resource policy to - /// update the queue. Updates the state of the subrequest depending - /// on the position in the queue, as explained before. - s.set_state(queue.size() > env->get_resources().find(s.get_resource_key())->second->get_places() ? - Request::state_unallocable : Request::state_allocable); - // Oh I miss ML so much. - break; - } - - /// This is ugly, but hey, none's perfect. - /// Updates the state of a ALLOCABLE subrequest allocating it. - case walk_to_allocate_resources: - { - if (s.get_state() == Request::state_allocable) - s.set_state(Request::state_allocated); - break; - } - } - } - - - - - /// \brief Manages a single Request object, depending on its state. - /// Updates the state of a request, depending on its state, recursively - /// updating the contained subrequests. The state of the request is then - /// a function of the states of the subrequests. - /// - /// Zero step: any -> terminated. Added to cope with malformed threads. - /// First step: allocated -> terminated. - /// Second step: non-allocable -> allocable. - /// Third step: allocable -> allocated, or future -> allocated. - /// - /// The following function may be reduced to a pair of lines. - /// - /// Longer, safer and more efficient version (and hopefully much easier - /// to understand!) - void - extendRequest(DynamicRequest& r) - { - switch (walk) - { - case walk_to_sweep: - { - typedef vector SubRequests; - SubRequests list = r.get_dynamic_subrequests(); - for (SubRequests::iterator it = list.begin(); it != list.end(); it++) - extendSubRequest(*it); - break; - } - /// Updates the state of an ALLOCATED request. - case walk_to_advance: - { - if (r.get_state() != Request::state_allocated) - break; - typedef vector SubRequests; - SubRequests list = r.get_dynamic_subrequests(); - for (SubRequests::iterator it = list.begin(); it != list.end(); it++) - extendSubRequest(*it); - break; - } - - /// Updates the state of a NON-ALLOCABLE request. - case walk_to_update: - { - if (r.get_state() != Request::state_unallocable) - break; - typedef vector SubRequests; - SubRequests list = r.get_dynamic_subrequests(); - for (SubRequests::iterator it = list.begin(); it != list.end(); it++) - extendSubRequest(*it); - break; - } - /// Updates the state of an ALLOCABLE or FUTURE request. - case walk_to_allocate_cpu: - { - /// This is the only interesting case. If the current instant, measured - /// over the containing process execution time, is equal to the instant - /// in which the request has to be raised, the subrequests are - /// recursively updated for the first time ever, and their status - /// changes from FUTURE to something else. - if (r.get_state() == Request::state_future - && r.get_instant() == front) - { - typedef vector SubRequests; - SubRequests list = r.get_dynamic_subrequests(); - for (SubRequests::iterator it = list.begin(); it != list.end(); it++) - extendSubRequest(*it); - } - /// Finally, allocates the reuqest if possible. - if (r.get_state() == Request::state_allocable) - { - typedef vector SubRequests; - SubRequests list = r.get_dynamic_subrequests(); - walk = walk_to_allocate_resources; - for (SubRequests::iterator it = list.begin(); it != list.end(); it++) - extendSubRequest(*it); - walk = walk_to_allocate_cpu; - } - break; - } - } - } - /* - - // the following methods are never used in step forward (by now) - - /// \brief Manages a single Thread object, depending on its state. - /// - /// First step: running -> terminated, or running -> running, or - /// running -> ready. - /// Second step: future -> ready or blocked -> ready. - /// Third step: ready -> running, or ready -> blocked - /// - /// The front is shifted to reflect the thread time: this is useful - /// for reasons analogous to those found in extendProcess. - void - extendThread(DynamicThread& t) - { - /// Shifts the front. The old front will be restored on exit. - int old_front = front; - front = t.get_elapsed_time(); - switch (walk) - { - - /// If the thread is RUNNING, its requests are updated in cascade, - /// the counters are decreased, and the state is updated depending - /// on the remaining time and the remaining quantum. - case walk_to_advance: - { - if (t.state != "RUNNING") - break; - /// Update the requests. - for (int j = 0; j < t.requests.size(); j++) - t.requests[j].accept(*this); - t.decrease_remaining_time(); - /// If the quantum is finished, we may need to change the state - /// of the thread to ready. - if (t.get_remaining_cpu_time() != 0 && t.remaining_quantum == 0 - && !priority_preemptable && quantum_preemptable) - { - t.state = "READY"; - env->readyqueue.erase(env->readyqueue.begin()); - env->readyqueue.push_back(&t); - /// FIXME: we should call the policy, with event "arrival"; - /// this works for round robin only. - } - /// If the remaining is finished, the thread is terminated. - if (t.get_remaining_cpu_time() == 0) - { - t.state = "TERMINATED"; - env->readyqueue.erase(env->readyqueue.begin()); - } - break; - } - /// If the thread is FUTURE, checks if it's time to arrive. - /// If the thread is BLOCKED, checks if requests have changed - /// their status since the last time, and if none of them is - /// BLOCKED, the thread state is set to READY. - case walk_to_update: - { - /// Remember, front has been localized to current thread time. - if (t.state == "FUTURE" && t.arrival == old_front) - { - t.state = "READY"; - env->readyqueue.push_back(&t); - /// FIXME: we should call the policy, with event "new arrival". - } - if (t.state == "BLOCKED") - { - bool blocked = false; - for (int j = 0; j < t.requests.size() && !blocked; j++) - { - t.requests[j].accept(*this); - if (t.requests[j].get_state() == "NON-ALLOCABLE") - blocked = true; - } - if (!blocked) - { - t.state = "READY"; - env->readyqueue.push_back(&t); - /// FIXME: we should call the policy, with event "arrival". - } - } - break; - } - - /// If the thread is the first on the ready_queue, try to run. The - /// thread may block on a request. - case walk_to_allocate_cpu: - { - ReadyQueue & queue = env->get_sorted_queue(); - /// Check if we are eligible to run. - if (queue.size() != 0 && &t == &queue.get_item_at(0)) - { - /// Lets' try to run! - t.set_state(Schedulable::state_running); - - typedef vector Requests; - Requests list = t.get_dynamic_requests(); - for (Requests::iterator it = list.begin(); it != list.end() - && t.get_state() != Schedulable::state_blocked; it++) - { - extendRequest(**it); - /// If one request is not allocable, the thread can't run. - if ((**it).get_state() == Request::state_unallocable) - { - t.set_state(Schedulable::state_blocked); - queue.erase_first(); - } - } - - // /// If the thread is runnable, we may need to refill its quantum. - // if (t.state == "RUNNING" && t.remaining_quantum == 0) - // t.remaining_quantum = quantum_size; - } - break; - } - - } - front = old_front; - } - */ - - - - // private class members -private: - - auto_ptr & env; - int walk; - unsigned int front; -}; - - - - - - - - - - - - - - - - - -// ------------------ Static helper functions -------------- - - -// Collects all threads of an environment into a single vector -static void -collect_threads(const std::vector& procs, - std::vector& collected_threads) -{ - typedef std::vector Processes; - typedef std::vector Threads; - - collected_threads.clear(); - for (Processes::const_iterator it1 = procs.begin(); it1 != procs.end(); it1++) - { - const Threads& ts = ((DynamicProcess&) **it1).get_dynamic_threads(); - collected_threads.insert(collected_threads.end(), ts.begin(), ts.end()); - } -} - -static void prepare_ready_queue(ConcreteEnvironment& snapshot, - std::vector& all_threads) -{ - typedef std::vector Threads; - ReadyQueue& queue = snapshot.get_sorted_queue(); - assert(queue.size() == 0); - - for (Threads::const_iterator it = all_threads.begin(); - it != all_threads.end(); it++) - if ((*it)->get_state() == Schedulable::state_ready) - queue.append(**it); -} - -// --------------------------------------------------------- - - - - -//private constructor. The parameter is discarded Scheduler::Scheduler() - : _ready_queue(NULL), _policy(NULL), _step_mutex() -{} + : _ready_queue(NULL), _policy(NULL), _step_mutex() +{ +} ReadyQueue* @@ -479,277 +57,681 @@ Scheduler::get_ready_queue() } -/** \note E' fondamentale che questo metodo memorizzi localmente qualora la politica - attuale sia a prerilascio o meno, e la durata del quanto di tempo, in quanto la politica - e' libera di variare questi parametri a piacere durante l'esecuzione della simulazione -*/ -void -Scheduler::reset_status() -{ - // DEPRECATED (?) -} - - CPUPolicy* Scheduler::get_policy() { - return _policy; + return _policy; } +//............................................................................. +//............................................................................. +// StepForward Preludium +//............................................................................. +//............................................................................. + + +static bool +validate_environment(ConcreteEnvironment& e) +{ + return true; +} + + + + // Introduces newly arrived processes. + // Postcondition: + // for each process p in next_snapshot + // ( + // for each thread t belonging to p + // ( + // if the arrival time of p is equal to front, then + // ( + // the state of p is state_ready and + // if the remaining time of t is zero, then + // the state of t is state_terminated + // ) + // ) + // ) +static void +update_future_processes(unsigned int front, auto_ptr& next_snapshot) +{ + typedef std::vector Processes; + Processes ps = next_snapshot->get_processes(); + for(Processes::const_iterator it_ps = ps.begin(); it_ps != ps.end(); it_ps++) + { + DynamicProcess& dp = dynamic_cast(**it_ps); + if(dp.get_arrival_time() == front) + { + front = dp.get_elapsed_time(); // == 0 + assert(front == 0); + typedef std::vector DynamicThreads; + DynamicThreads dts = dp.get_dynamic_threads(); + for(DynamicThreads::const_iterator it_dts = dts.begin(); it_dts != dts.end(); it_dts++) + { + DynamicThread& dt = **it_dts; + if(dt.get_arrival_time() == front) + { + dt.set_state(Schedulable::state_ready); + // in this way we will never have threads ready having remaining time == 0 + if (dt.get_elapsed_time() == dt.get_total_cpu_time()) + dt.set_state(Schedulable::state_terminated); + } + } + } + } +} + + // Updates running process and thread + // Postcondition: + // for each thread t in next_snapshot + // ( + // if the state of t is state_running then + // ( + // the remaining time is one less than the one in the previous snapshot + // if the remaining time is zero then + // the state of t is state_terminated + // ) + // ) +static void +advance_running_process_and_thread(unsigned int front, auto_ptr& next_snapshot) +{ + typedef std::vector Processes; + Processes ps = next_snapshot->get_processes(); + for(Processes::const_iterator it_ps = ps.begin(); it_ps != ps.end(); it_ps++) + { + DynamicProcess& dp = dynamic_cast(**it_ps); + if(dp.get_state() == Schedulable::state_running) + { + front = dp.get_elapsed_time(); + typedef std::vector DynamicThreads; + DynamicThreads dts = dp.get_dynamic_threads(); + for(DynamicThreads::const_iterator it_dts = dts.begin(); it_dts != dts.end(); it_dts++) + { + DynamicThread& dt = **it_dts; + if(dt.get_state() == Schedulable::state_running) + { + dt.decrease_remaining_time(); + if(dt.get_total_cpu_time() == dt.get_elapsed_time()) + { + dt.set_state(Schedulable::state_terminated); + typedef std::vector DynamicRequests; + DynamicRequests& drs = dt.get_dynamic_requests(); + for(DynamicRequests::const_iterator it_drs = drs.begin(); it_drs != drs.end(); it_drs++) + { + DynamicRequest& dr = **it_drs; + if(dr.get_state() != Request::state_exhausted) + { + typedef std::vector DynamicSubRequests; + DynamicSubRequests dsrs = dr.get_dynamic_subrequests(); + for(DynamicSubRequests::const_iterator it_dsrs = dsrs.begin(); it_dsrs != dsrs.end(); it_dsrs++) + { + DynamicSubRequest& dsr = **it_dsrs; + if(dsr.get_state() == Request::state_allocated) + { + while(dsr.get_remaining_time() != 0) + dsr.decrease_remaining_time(); + /// Remove the subrequest (pointer) from the queue. + bool removed = false; + typedef Environment::SubRequestQueue SubRequestQueue; + SubRequestQueue& queue = next_snapshot->get_request_queue(dsr.get_resource_key()); + for (SubRequestQueue::iterator it = queue.begin(); !removed && it != queue.end(); it++) + { + if ((*it) == &dsr) + { + queue.erase(it); + removed = true; + } + } + } + dsr.set_state(Request::state_exhausted); + } + } + } + } + } + } + } + } +} + + // Introduces newly arrived threads. + // Postcondition: + // for each process p in next_snapshot + // ( + // for each thread t belonging to p + // ( + // if the arrival time of p is equal to the elapsed time of p, then + // ( + // the state of p is state_ready and + // if the remaining time of t is zero, then + // the state of t is state_terminated + // ) + // ) + // ) +static void +update_future_threads(unsigned int front, auto_ptr& next_snapshot) +{ + typedef std::vector Processes; + Processes ps = next_snapshot->get_processes(); + for(Processes::const_iterator it_ps = ps.begin(); it_ps != ps.end(); it_ps++) + { + DynamicProcess& dp = dynamic_cast(**it_ps); + if(dp.get_arrival_time() <= front) + { + front = dp.get_elapsed_time(); + typedef std::vector DynamicThreads; + DynamicThreads dts = dp.get_dynamic_threads(); + for(DynamicThreads::const_iterator it_dts = dts.begin(); it_dts != dts.end(); it_dts++) + { + DynamicThread& dt = **it_dts; + if(dt.get_arrival_time() == front) + { + dt.set_state(Schedulable::state_ready); + // in this way we will never have threads ready having remaining time == 0 + if (dt.get_elapsed_time() == dt.get_total_cpu_time()) + dt.set_state(Schedulable::state_terminated); + } + } + } + } +} + + // Updates allocated resources and subresources + // Postcondition: + // for each subrequest sr in next_snapshot + // ( + // if the state of sr is state_running then + // ( + // the remaining time is one less than the one in the previous snapshot + // if the remaining time is zero then + // the state of sr is state_exhausted + // ) + // ) +static void +advance_allocated_requests_and_subrequests(unsigned int front, auto_ptr& next_snapshot) +{ + typedef std::vector Processes; + Processes ps = next_snapshot->get_processes(); + for(Processes::const_iterator it_ps = ps.begin(); it_ps != ps.end(); it_ps++) + { + DynamicProcess& dp = dynamic_cast(**it_ps); + if(dp.get_state() == Schedulable::state_running) + { + typedef std::vector DynamicThreads; + DynamicThreads dts = dp.get_dynamic_threads(); + for(DynamicThreads::const_iterator it_dts = dts.begin(); it_dts != dts.end(); it_dts++) + { + DynamicThread& dt = **it_dts; + if(dt.get_state() == Schedulable::state_running) + { + typedef std::vector DynamicRequests; + DynamicRequests& drs = dt.get_dynamic_requests(); + for(DynamicRequests::const_iterator it_drs = drs.begin(); it_drs != drs.end(); it_drs++) + { + DynamicRequest& dr = **it_drs; + if(dr.get_state() == Request::state_allocated) + { + typedef std::vector DynamicSubRequests; + DynamicSubRequests dsrs = dr.get_dynamic_subrequests(); + for(DynamicSubRequests::const_iterator it_dsrs = dsrs.begin(); it_dsrs != dsrs.end(); it_dsrs++) + { + DynamicSubRequest& dsr = **it_dsrs; + if(dsr.get_state() == Request::state_allocated) + { + dsr.decrease_remaining_time(); + if(dsr.get_remaining_time() == 0) + { + dsr.set_state(Request::state_exhausted); + /// Remove the subrequest (pointer) from the queue. + bool removed = false; + typedef Environment::SubRequestQueue SubRequestQueue; + SubRequestQueue& queue = next_snapshot->get_request_queue(dsr.get_resource_key()); + for (SubRequestQueue::iterator it = queue.begin(); !removed && it != queue.end(); it++) + { + if ((*it) == &dsr) + { + queue.erase(it); + removed = true; + } + } + } + } + } + } + } + } + } + } + } +} + + // Updates unallocated subrequests + // Postcondition: + // for each subrequest sr in next_snapshot + // ( + // let r be the resource specified in the subrequest + // let q be the queue associated with r + // let s be the state of sr + // let p be the number of places of r + // if sr is in q then + // if the position of sr in q is lesser or equal to p then + // the state of sr is state_allocable + // else + // the state of sr is state_unallocable + // ) + // for each thread t in next_snapshot + // ( + // let a be the number of requests of t whose state is state_allocable + // let u be the number of requests of t whose state is state_unallocable + // if a is not zero and u is zero then + // the state of t is state_ready + // if a is not zero and u is not zero then + // the state of t is state_blocked + // ) +static void +update_nonrunning_threads_and_unallocated_subrequests(unsigned int front, auto_ptr& next_snapshot) +{ + typedef std::vector Processes; + Processes ps = next_snapshot->get_processes(); + for(Processes::const_iterator it_ps = ps.begin(); it_ps != ps.end(); it_ps++) + { + DynamicProcess& dp = dynamic_cast(**it_ps); + typedef std::vector DynamicThreads; + DynamicThreads dts = dp.get_dynamic_threads(); + for(DynamicThreads::const_iterator it_dts = dts.begin(); it_dts != dts.end(); it_dts++) + { + DynamicThread& dt = **it_dts; + if(dt.get_state() == Schedulable::state_ready || dt.get_state() == Schedulable::state_blocked) + { + bool blocked = false; + typedef std::vector DynamicRequests; + DynamicRequests drs = dt.get_dynamic_requests(); + for(DynamicRequests::const_iterator it_drs = drs.begin(); it_drs != drs.end(); it_drs++) + { + DynamicRequest& dr = **it_drs; + if(dr.get_state() == Request::state_allocable || dr.get_state() == Request::state_unallocable) + { + typedef std::vector DynamicSubRequests; + DynamicSubRequests dsrs = dr.get_dynamic_subrequests(); + for(DynamicSubRequests::const_iterator it_dsrs = dsrs.begin(); it_dsrs != dsrs.end(); it_dsrs++) + { + DynamicSubRequest& dsr = **it_dsrs; + assert(dsr.get_state() == Request::state_allocable || dsr.get_state() == Request::state_unallocable); + unsigned int position = 0; + Environment::SubRequestQueue& queue = next_snapshot->get_request_queue(dsr.get_resource_key()); + Environment::SubRequestQueue::iterator it = queue.begin(); + while (it != queue.end()) + { + if (*it == &dsr) + break; + it++; + position++; + } + /// Watch out: in a resource with 2 places, 0 and 1 are valid queue + /// positions, 2 is right one place out. + if (position >= next_snapshot->get_resources().find(dsr.get_resource_key())->second->get_places()) + dsr.set_state(Request::state_unallocable); + else + dsr.set_state(Request::state_allocable); + } + } + if (dr.get_state() == Request::state_unallocable) + blocked = true; + } + if (blocked) + dt.set_state(Schedulable::state_blocked); + else + dt.set_state(Schedulable::state_ready); + } + } + } +} + + // prepares the environment advancing counters and updating states appropriately +static void +advance_and_update_all(unsigned int front, auto_ptr& next_snapshot) +{ + // Checks if there are future processes that should be marked as ready, + // since their arrival time is equal to the simulation current instant. + update_future_processes(front, next_snapshot); + + // Updates the counters of the running entities. + // Checks if the running thread (if any) should be marked as + // terminated. + advance_running_process_and_thread(front, next_snapshot); + + // Checks if there are subrequests associated to the running thread that have + // just exausted, or if the old running thread has terminated. Then adjust + // the subrequests' list accordingly (freeing associated resources); + advance_allocated_requests_and_subrequests(front, next_snapshot); + + // Checks if there are future threads that should be marked as ready, + // since their arrival time is equal to their owner process' elapsed time. + update_future_threads(front, next_snapshot); + + // If any thread was blocked on a now freed resource, pass it from blocked + // to ready state, by discovering just-now-allocable requests. + // If any thread was ready on a now busy resource, pass it from ready to + // blocked state, by discovering just-now-unallocable requests. + // This is obtained by + update_nonrunning_threads_and_unallocated_subrequests(front, next_snapshot); + +} + + + // Function ?? FIXME + // Postcondition: + // if the policy is priority-preemptable or the quantum is finished then + // for each thread t in next_snapshot + // ( + // if the policy is priority-preemptable and the state of t was state_running + // the state of t is state_ready + // if the policy is quantum-preemptable and the state of t was state_running and the quantum has finished + // the state of t is state_ready + // ) +static void +manage_preemption(unsigned int front, auto_ptr& next_snapshot, bool preemptible_policy, unsigned int time_slice) +{ + typedef std::vector Processes; + Processes ps = next_snapshot->get_processes(); + for(Processes::const_iterator it_ps = ps.begin(); it_ps != ps.end(); it_ps++) + { + DynamicProcess& dp = dynamic_cast(**it_ps); + typedef std::vector DynamicThreads; + DynamicThreads dts = dp.get_dynamic_threads(); + for(DynamicThreads::const_iterator it_dts = dts.begin(); it_dts != dts.end(); it_dts++) + { + DynamicThread& dt = **it_dts; + if(dt.get_state() == Schedulable::state_running) + { + if(preemptible_policy || time_slice == front - dt.get_last_acquisition()) + { + dt.set_state(Schedulable::state_ready); + dt.set_last_release(front); + } + } + } + } +} + + + + // FIXME: document me please +static void +build_ready_queue(unsigned int front, auto_ptr& next_snapshot) +{ + ReadyQueue& queue = next_snapshot->get_sorted_queue(); + assert(queue.size() == 0); + + typedef std::vector Processes; + Processes ps = next_snapshot->get_processes(); + for(Processes::const_iterator it_ps = ps.begin(); it_ps != ps.end(); it_ps++) + { + DynamicProcess& dp = dynamic_cast(**it_ps); + typedef std::vector DynamicThreads; + DynamicThreads dts = dp.get_dynamic_threads(); + for(DynamicThreads::const_iterator it_dts = dts.begin(); it_dts != dts.end(); it_dts++) + { + DynamicThread& dt = **it_dts; + if(dt.get_state() == Schedulable::state_ready) + queue.append(dt); + } + } +} + + + // returns false if and only if the snapshot contains a candidate running thread, + // the thread's state is state_running. +static bool +find_a_candidate(unsigned int front, auto_ptr& next_snapshot) +{ + // look for a currently running process first + typedef std::vector Processes; + Processes ps = next_snapshot->get_processes(); + for(Processes::const_iterator it_ps = ps.begin(); it_ps != ps.end(); it_ps++) + { + DynamicProcess& dp = dynamic_cast(**it_ps); + if (dp.get_state() == Schedulable::state_running) + { + typedef std::vector DynamicThreads; + DynamicThreads dts = dp.get_dynamic_threads(); + for(DynamicThreads::const_iterator it_dts = dts.begin(); it_dts != dts.end(); it_dts++) + { + DynamicThread& dt = **it_dts; + if(dt.get_state() == Schedulable::state_running) + return false; + } + } + } + // if no process has been found, select the first on the ready queue. + ReadyQueue& queue = next_snapshot->get_sorted_queue(); + if (queue.size() == 0) + return true; + DynamicThread& candidate = dynamic_cast(queue.get_item_at(0)); + candidate.set_state(Schedulable::state_running); + // HACK HACK HACK + // we do not remove the candidate from the ready queue. this information is useful + // since we chose to set the last_aquisition and the last_release just after a + // successful selection. see try_to_run(). + return false; +} + + + + + + + + // returns true if and oly if the simulation is terminated +static bool +check_if_simulation_is_terminated(unsigned int front, auto_ptr& next_snapshot) +{ + typedef std::vector Processes; + Processes ps = next_snapshot->get_processes(); + for(Processes::const_iterator it_ps = ps.begin(); it_ps != ps.end(); it_ps++) + { + DynamicProcess& dp = dynamic_cast(**it_ps); + if (dp.get_state() == Schedulable::state_running) + return false; + if (dp.get_state() == Schedulable::state_future) + return false; + } + return true; +} + + + + + + + +static bool +try_to_run(unsigned int front, auto_ptr& next_snapshot) +{ + printf("trying to run"); + bool success = true; + typedef std::vector Processes; + Processes ps = next_snapshot->get_processes(); + for(Processes::const_iterator it_ps = ps.begin(); it_ps != ps.end(); it_ps++) + { + DynamicProcess& dp = dynamic_cast(**it_ps); + if (dp.get_state() == Schedulable::state_running) + { + typedef std::vector DynamicThreads; + DynamicThreads dts = dp.get_dynamic_threads(); + for(DynamicThreads::const_iterator it_dts = dts.begin(); it_dts != dts.end(); it_dts++) + { + DynamicThread& dt = **it_dts; + if(dt.get_state() == Schedulable::state_running) + { + typedef std::vector DynamicRequests; + DynamicRequests drs = dt.get_dynamic_requests(); + for(DynamicRequests::const_iterator it_drs = drs.begin(); it_drs != drs.end(); it_drs++) + { + printf("checking all requests"); + DynamicRequest& dr = **it_drs; + if(dr.get_state() == Request::state_future && dr.get_instant() == dt.get_elapsed_time()) + { + printf("found a new request"); + typedef std::vector DynamicSubRequests; + DynamicSubRequests dsrs = dr.get_dynamic_subrequests(); + for(DynamicSubRequests::const_iterator it_dsrs = dsrs.begin(); it_dsrs != dsrs.end(); it_dsrs++) + { + DynamicSubRequest& dsr = **it_dsrs; + assert(dsr.get_state() == Request::state_future); + Environment::SubRequestQueue& queue = next_snapshot->get_request_queue(dsr.get_resource_key()); + /// Enqueue the subrequest at the back of the queue. + queue.push_back(&dsr); + printf("pushing back a request"); + + /// TODO: right here, right now we should call the resource policy to + /// update the queue. Updates the state of the subrequest depending + /// on the position in the queue, as explained before. + unsigned int places = next_snapshot->get_resources().find(dsr.get_resource_key())->second->get_places(); + dsr.set_state(queue.size() > places ? Request::state_unallocable : Request::state_allocable); + } // for each subrequest + // if it is actually allocable, allocate it + if (dr.get_state() == Request::state_allocable) + { + for(DynamicSubRequests::const_iterator it_dsrs = dsrs.begin(); it_dsrs != dsrs.end(); it_dsrs++) + { + DynamicSubRequest& dsr = **it_dsrs; + assert(dsr.get_state() == Request::state_allocable); + dsr.set_state(Request::state_allocated); + } + } + } + // if it does exist at least one unallocable request, the thread may not run! + if (dr.get_state() == Request::state_unallocable) + { + dt.set_state(Schedulable::state_blocked); + success = false; + } + } + } + // HACK HACK HACK + // this check is legal, since all pointers in the sorted queue should be valid. + // if the current thread is the first in the ready queue, we need to remove it + if (next_snapshot->get_sorted_queue().size() != 0 && + &next_snapshot->get_sorted_queue().get_item_at(0) == &dt) + { + // in case of success in allocating the cpu, we must update the last_aquisition + if (success) + dt.set_last_acquisition(front); + next_snapshot->get_sorted_queue().erase_first(); + } + // if the current thread was the one that was previously running + else + { + // we must update the last_release + if (!success) + dt.set_last_release(front); + //cpu_policy.sort_queue(); + } + } + } + } + return success; +} + +//............................................................................. +//............................................................................. +// The Return of the Son of the Monster Step Forward +// +// The refactoring of this method has been carried on listening to Frank +// Zappa's The Return of the Son of the Monster Magnet. +// The magic word for tonight is.. procedural! +//............................................................................. +//............................................................................. + bool -Scheduler::step_forward(History& history, CPUPolicy& cpu_policy) throw(UserInterruptException, MalformedPolicyException) +Scheduler::step_forward(ConcreteHistory& concrete_history, CPUPolicy& cpu_policy) + throw(UserInterruptException) { - // This very method should be exclusive: no concurrent behaviour, from when we - // store a readyqueue and policy pointer for the user-policy to retrieve, to when - // the policy returns - // TODO: restrict this area to maximise parallelism - Glib::Mutex::Lock lock (_step_mutex); + // Preconditions: + assert (concrete_history.get_size() > 0); - // NOTE: Be sure to read the *ORIGINAL* documentation in the design document for this method! + // Concurrency: + // This very method should be exclusive: no concurrent behaviour, from when + // we store a readyqueue and policy pointer for the user-policy to retrieve, + // to when the policy returns. + Glib::Mutex::Lock lock(_step_mutex); - // FIXME: handle me! I'm not just a pretty boolean, I want to be *USED*! *EXPLOITED*! - // *RAPED*! *MAKE ME BLEED*! - bool simulation_ended = true; // Assume we've finished. Then prove me wrong. - // since history always contains at leas one element.. - assert (history.get_size() > 0); - unsigned int current_instant = history.get_size() - 1; /* They should be equivalent */ + // The instant we are going to build a snapshot for: + // if history has size 1, then the snapshot we will add will describe the + // environment at instan 0, i.e. (1 - 1). + unsigned int next_instant = concrete_history.get_size() - 1; - ConcreteHistory& concrete_history = (ConcreteHistory&) history; + // The snapshot we are building: it is built as a copy og the last one. + // We use an auto_ptr since we've some exceptions in the coming... + auto_ptr next_snapshot(new ConcreteEnvironment(concrete_history.get_last_environment())); - // Use an auto_ptr since we've some exceptions in the coming... - auto_ptr new_snapshot(new ConcreteEnvironment(concrete_history.get_last_environment())); + // Temporarily set the _ready_queue param and the _policy one for + // use from external plugins + _policy = &cpu_policy; + _ready_queue = &next_snapshot->get_sorted_queue(); - Threads all_threads; - DynamicThread* running_thread = NULL; + // begin procedure step forward + // ahw, I lowe soo munch all thissh olt fasshoned procedureal programing stylus + advance_and_update_all(next_instant, next_snapshot); - collect_threads(new_snapshot->get_processes(), all_threads); - - // designer + implementer (Matteo) comment follows: - - for (Threads::iterator it = all_threads.begin(); it != all_threads.end(); it++) + try { - DynamicThread& current = **it; + // If the user-policy is preemptible, or if the assigned time slice has + // terminated, pass the running thread to ready state. + manage_preemption(next_instant, next_snapshot, cpu_policy.is_pre_emptive(), cpu_policy.get_time_slice()); - // 1. mark future threads as ready, if appropriate - if (current.get_state() == Schedulable::state_future) - { - // if there is at least a future process, don't terminate the simulation - Process& parent = current.get_process(); - if (parent.get_state() == Schedulable::state_future) - simulation_ended = false; - // start a thread only if its parent is arrived! - if (parent.get_arrival_time() <= current_instant && parent.get_elapsed_time() == current.get_arrival_time()) - { - current.set_state(Schedulable::state_ready); - // in this way we will never have threads ready having remaining time == 0 - if (current.get_elapsed_time() == current.get_total_cpu_time()) - current.set_state(Schedulable::state_terminated); - } - } + // Build the schedulables ready queue + build_ready_queue(next_instant, next_snapshot); - // Save the current running thread for future usage, if it hasn't ended - // its allotted time - if (current.get_state() == Schedulable::state_running) - { - assert(running_thread == NULL); // ... and one to bind them all. - running_thread = ¤t; // Even if we change its state to terminate - - // increasing the time elapsed of the running thread + process - // should be done here as the first thing, instead than - // directly after selecting them - if (current.get_total_cpu_time() - current.get_elapsed_time() > 0) - current.decrease_remaining_time(); - - // 2. mark threads that used all their allotted time as terminated - if (current.get_total_cpu_time() - current.get_elapsed_time() == 0) - current.set_state(Schedulable::state_terminated); - } - - // 3. check for simulation termination (we can directly use threads - // for this check, since processes' state is based upon threads' one) - if ( /* we still think that */ simulation_ended && - (current.get_state() & (Schedulable::state_blocked | - Schedulable::state_terminated)) == 0) - simulation_ended = false; - } - - // 4a. Look for exhausted requests for the running thread - if (running_thread != NULL) - { - bool running_terminated = running_thread->get_state() == Schedulable::state_terminated; - Extender e(new_snapshot, - running_terminated ? Extender::walk_to_sweep : Extender::walk_to_advance, - running_thread->get_elapsed_time()); - - Requests& reqs = running_thread->get_dynamic_requests(); - for (Requests::iterator r_it = reqs.begin(); r_it != reqs.end(); r_it++) - e.extendRequest(**r_it); - } - - // Unblock blocked threads.. by discovering just-now-allocable requests - for (Threads::iterator it = all_threads.begin(); it != all_threads.end(); it++) - { - DynamicThread& current = **it; - - // for each still blocked thread - if (current.get_state() == Schedulable::state_blocked) - { - // Since it was blocked then one and only one (why?) request is unallocable. - // lets' find it and see if our information is outdated (i.e. now it is - // allocable. - bool blocked = false; - // for each request - Requests& reqs = current.get_dynamic_requests(); - Extender e(new_snapshot, Extender::walk_to_advance, current.get_elapsed_time()); - for (Requests::iterator r_it = reqs.begin(); r_it != reqs.end() && !blocked; r_it++) - { - // update its state. - e.extendRequest(**r_it); - // if it is still unallocable, leave the thread blocked - if ((**r_it).get_state() == Request::state_unallocable) - blocked = true; - } - // if no blocked request has been found, the thread is ready. - if (!blocked) - current.set_state(Schedulable::state_ready); - } - } + // and pass it to the policy itself + if (next_snapshot->get_sorted_queue().size() != 0) + cpu_policy.sort_queue(); - // ---------- FIXME ---------------- - // Check correctness: Now if the simulation ended we - // append the newly created environment and return false - if (simulation_ended) goto final_cleanup; + bool found = false; + bool none_ready_or_running = false; + bool simulation_terminated = false; + do + { + // If no thread is in running state, select a new thread to be + // executed (so it changes its state from ready to running). + // However, in case it hasn't to run for a strictly positive + // amount of time, put it into terminated state. (??) Then try to + // select a new schedulable, by jumping (??) to the point about + // building the ready queue. + none_ready_or_running = find_a_candidate(next_instant, next_snapshot); + if (none_ready_or_running) + simulation_terminated = check_if_simulation_is_terminated(next_instant, next_snapshot); + else + // Check pending requests for the running thread. If in the + // current instant the thread does a new unfulfillable request, + // put it in blocked state and jump (??) to the point about building + // the ready queue + found = try_to_run(next_instant, next_snapshot); + } + while (!found && !none_ready_or_running && !simulation_terminated); + // append the new snapshot, releasing the auto_ptr! + concrete_history.append_new_environment(next_snapshot.release()); - bool preemptible_policy; - unsigned int time_slice; - try - { - // Temporarily set the _ready_queue param and the _policy one for - // use from external plugins - _policy = &cpu_policy; - _ready_queue = &new_snapshot->get_sorted_queue(); - // ?. Use the policy to sort the queue + // Reset values that the policy doesn't need anymore + _policy = NULL; + _ready_queue = NULL; - preemptible_policy = cpu_policy.is_pre_emptive(); - time_slice = cpu_policy.get_time_slice(); + return !simulation_terminated; // watch out for the ! + } + catch(UserInterruptException& e) + { + // Reset values that the policy doesn't need anymore + _policy = NULL; + _ready_queue = NULL; - // ?. See if old running_thread has to be put to ready state - // This happens when the policy makes use of preemptability by - // priority, or when a time slice ended - if (running_thread != NULL && running_thread->get_state() == Schedulable::state_running && - (preemptible_policy || - time_slice == current_instant - running_thread->get_last_acquisition()) ) - { - running_thread->set_state(Schedulable::state_ready); - running_thread->set_last_release(current_instant); - } - - - - // --------------------------------------------------------------------------------- - // ------------------------------- 3 ---------------------------------------- - // --------------------------------------------------------------------------------- - - // the problem is that a blocked thread is in the ready queue, but here we - // should collect ready threads only. - // an obvious solution is to remove it manually, but hey, we must understand - // what s happening - - prepare_ready_queue(*new_snapshot, all_threads); - if (_ready_queue->size() != 0) - cpu_policy.sort_queue(); - bool found = false; - - - - // Try to continue running the current running thread - if (running_thread != NULL && running_thread->get_state() == Schedulable::state_running) - { - found = true; - // the thread may block on raising a request - Requests& reqs = running_thread->get_dynamic_requests(); - for (Requests::iterator r_it = reqs.begin(); r_it != reqs.end() - && running_thread->get_state() != Schedulable::state_blocked; r_it++) - { - Extender e(new_snapshot, Extender::walk_to_allocate_cpu, running_thread->get_elapsed_time()); - e.extendRequest(**r_it); - if ((**r_it).get_state() == Request::state_unallocable) - { - running_thread->set_state(Schedulable::state_blocked); - found = false; - } - } - } // end trying to continue old running thread - - // if the old running thread may not directly continue, pick from the ready queue. - //int debug = 1000; - while (_ready_queue->size() != 0 && !found)//&& debug-- > 0); - { - // try with the first on the queue - found = true; - DynamicThread& candidate = (DynamicThread&) _ready_queue->get_item_at(0); - - - Requests& reqs = candidate.get_dynamic_requests(); - for (Requests::iterator r_it = reqs.begin(); r_it != reqs.end(); r_it++) - { - Extender e(new_snapshot, Extender::walk_to_allocate_cpu, candidate.get_elapsed_time()); - e.extendRequest(**r_it); - if ((**r_it).get_state() == Request::state_unallocable) - { - candidate.set_state(Schedulable::state_blocked); - // the element is not ready any more, so we must remove it from the queue? - found = false; - } - } - if (found) - { - candidate.set_state(Schedulable::state_running); - candidate.set_last_acquisition(current_instant); - } - _ready_queue->erase_first(); - //cpu_policy.sort_queue(); - } // end picking from ready queue - - - - // FIXME - // FIXME - // FIXME - // the following condition is not sufficient. we must also - // be sure that no other processes are ready. - if (!found) - simulation_ended = true; - - - } // end of try - catch (const CPUPolicyException& e) - { - // Reset values that the policy doesn't need anymore - _policy = NULL; - _ready_queue = NULL; - - // Do we need to update/reset something else? - - // Going up unwinding the stack, tell: - // - the user that the policy sucks - // - SimulationController that everything stopped - throw; - } - -final_cleanup: - - // append the new snapshot... - // ...and remember to release the auto_ptr! - concrete_history.append_new_environment(new_snapshot.release()); - - // Reset values that the policy doesn't need anymore - _policy = NULL; - _ready_queue = NULL; - - // If we got there, a step has been performed - return simulation_ended == false; + // Do we need to update/reset something else? + // Going up unwinding the stack, tell: + // - the user that the policy sucks + // - SimulationController that everything stopped + throw; + } } diff --git a/src/backend/scheduler.hh b/src/backend/scheduler.hh index eb9fa3e..f8e3c93 100644 --- a/src/backend/scheduler.hh +++ b/src/backend/scheduler.hh @@ -28,11 +28,10 @@ namespace sgpem #include "config.h" -#include "history.hh" +#include "concrete_history.hh" #include "cpu_policy.hh" #include "ready_queue.hh" #include "user_interrupt_exception.hh" -#include "malformed_policy_exception.hh" // Do not include full template definition here #include "singleton.hh" @@ -68,16 +67,18 @@ namespace sgpem ReadyQueue* get_ready_queue(); /** Resets the simulation to the initial state. + Deprecated. */ - void reset_status(); +// void reset_status(); /** Generates a new ReadyQueue representing the status of the processes at the simulation instant next to the current one, and extends the History by one instant with it. + Returning true if next instants will be identical to the last one added. \return false If the simulation has ended, true otherwise */ - bool step_forward(History& history, CPUPolicy& cpu_policy) throw(UserInterruptException, MalformedPolicyException); + bool step_forward(ConcreteHistory& history, CPUPolicy& cpu_policy) throw(UserInterruptException); /** Returns the policy that will be used to generate the simulation at the next instant.