From df4b1b4205b0d3547ec919e216277a1a565638d6 Mon Sep 17 00:00:00 2001 From: tchernobog Date: Wed, 6 Sep 2006 12:29:22 +0000 Subject: [PATCH] - Merge branch 0.3-r1003--scheduler-manage-preemption into trunk git-svn-id: svn://svn.gna.org/svn/sgpemv2/trunk@1023 3ecf2c5c-341e-0410-92b4-d18e462d057c --- .../src/builtin-policies/rr_priority.py | 13 ++++ plugins/pyloader/src/builtin-policies/sjf.py | 1 + plugins/pyloader/src/sgpem.i | 35 +++++++--- src/backend/concrete_history.cc | 34 ++++++++-- src/backend/concrete_history.hh | 10 ++- src/backend/concrete_simulation.cc | 28 ++++++-- src/backend/history.hh | 1 + src/backend/ready_queue.cc | 18 ++++- src/backend/ready_queue.hh | 4 ++ src/backend/scheduler.cc | 65 ++++++++++++------- src/text_simulation.cc | 4 +- 11 files changed, 158 insertions(+), 55 deletions(-) diff --git a/plugins/pyloader/src/builtin-policies/rr_priority.py b/plugins/pyloader/src/builtin-policies/rr_priority.py index ab1413b..9365fb1 100644 --- a/plugins/pyloader/src/builtin-policies/rr_priority.py +++ b/plugins/pyloader/src/builtin-policies/rr_priority.py @@ -58,3 +58,16 @@ the time slice will have to end before the former can run.""" self.sort(queue,by_ltime) self.sort(queue,by_prio) + # manage preemption: see if we've a running thread + # in the ready queue, and if it can still run + if self.is_preemptive() == True: + higher_prio = queue.get_item_at(0).get_current_priority() + i = 0 + while i < queue.size(): + sched = queue.get_item_at(i) + priority = sched.get_current_priority() + if(priority != higher_prio): + break + if sched.get_state() == "running": + queue.bubble_to_front(i) + i += 1 diff --git a/plugins/pyloader/src/builtin-policies/sjf.py b/plugins/pyloader/src/builtin-policies/sjf.py index b9e8a68..45bb5f8 100644 --- a/plugins/pyloader/src/builtin-policies/sjf.py +++ b/plugins/pyloader/src/builtin-policies/sjf.py @@ -53,3 +53,4 @@ Time Next in this case.""" a.get_total_cpu_time() - a.get_elapsed_time() < \ b.get_total_cpu_time() - b.get_elapsed_time() self.sort(queue,cmpf) + diff --git a/plugins/pyloader/src/sgpem.i b/plugins/pyloader/src/sgpem.i index 199a04c..01ec084 100644 --- a/plugins/pyloader/src/sgpem.i +++ b/plugins/pyloader/src/sgpem.i @@ -150,15 +150,6 @@ namespace sgpem { public: virtual ~Schedulable() = 0; - enum state - { - state_running = 1<<0, - state_ready = 1<<1, - state_blocked = 1<<2, - state_future = 1<<3, - state_terminated = 1<<4 - }; - virtual unsigned int get_arrival_time() const = 0; virtual unsigned int get_elapsed_time() const = 0; virtual int get_last_acquisition() const = 0; @@ -166,7 +157,30 @@ namespace sgpem { virtual int get_base_priority() const = 0; virtual int get_current_priority() const = 0; virtual unsigned int get_total_cpu_time() const = 0; - virtual state get_state() const = 0; + + %ignore Schedulable::get_state() const; + %extend { + const char* get_state() const + { + switch(self->get_state()) + { + case Schedulable::state_future: + return "future"; + case Schedulable::state_terminated: + return "terminated"; + case Schedulable::state_running: + return "running"; + case Schedulable::state_ready: + return "ready"; + case Schedulable::state_blocked: + return "blocked"; + default: + // should never get here + return "undefined"; + } + } + } + %ignore Schedulable::get_name() const; %extend { @@ -204,6 +218,7 @@ namespace sgpem { sgpem::Thread& get_item_at(position index); void swap(position a, position b) throw(std::out_of_range); + void bubble_to_front(position x) throw(std::out_of_range); private: // Avoid instantiation and copy diff --git a/src/backend/concrete_history.cc b/src/backend/concrete_history.cc index f142cad..f628cc4 100644 --- a/src/backend/concrete_history.cc +++ b/src/backend/concrete_history.cc @@ -82,7 +82,7 @@ static T* deep_find(const std::vector& v, const T& obj) // ----------------- ConcreteHistory::ConcreteHistory() - : History(), _snapshots() + : History(), _snapshots(), _sealed(false) { _snapshots.push_back(new ConcreteEnvironment()); } @@ -95,7 +95,7 @@ ConcreteHistory::~ConcreteHistory() } ConcreteHistory::ConcreteHistory(const ConcreteHistory& h) : - History(h) + History(h), _sealed(h._sealed) { typedef Snapshots::const_iterator SnapIt; for (SnapIt it = h._snapshots.begin(); it != h._snapshots.end(); ++it) @@ -489,12 +489,16 @@ ConcreteHistory::edit_subrequest(SubRequest& subrequest, void -ConcreteHistory::step_front(position p) +ConcreteHistory::set_front(position p) { - _front = p; - if (p > _snapshots.size()) - _front = _snapshots.size(); - notify_change(); + position old_front = _front; + if (p > _snapshots.size() - 1) + _front = _snapshots.size() - 1; + else + _front = p; + + if(old_front != _front) + notify_change(); } void @@ -507,6 +511,7 @@ ConcreteHistory::reset(bool notify) for_each(it, _snapshots.end(), deletor()); _snapshots.resize(1); // Truncate to keep only our "model" _front = 0; + _sealed = false; if (notify) notify_change(); @@ -519,3 +524,18 @@ ConcreteHistory::notify_change() for (it = _observers.begin(); it != _observers.end(); it++) (*it)->update(*this); } + + +bool +ConcreteHistory::is_sealed() const +{ + return _sealed; +} + +bool +ConcreteHistory::seal() +{ + bool t = _sealed; + _sealed = true; + return t; +} diff --git a/src/backend/concrete_history.hh b/src/backend/concrete_history.hh index 89d5574..aa24cd6 100644 --- a/src/backend/concrete_history.hh +++ b/src/backend/concrete_history.hh @@ -109,8 +109,13 @@ namespace sgpem resource_key_t resource_key, time_t duration); + // sets the front to position p + virtual void set_front(position p); - virtual void step_front(position p); + bool is_sealed() const; + + // (Returns if the History was sealed before this call) + bool seal(); virtual void reset(bool notify = true); @@ -123,7 +128,8 @@ namespace sgpem private: // Disable assignment, implement it only if needed ConcreteHistory& operator=(const ConcreteHistory& op2); - + + bool _sealed; } ; //~ class ConcreteHistory diff --git a/src/backend/concrete_simulation.cc b/src/backend/concrete_simulation.cc index 55650cd..796a0df 100644 --- a/src/backend/concrete_simulation.cc +++ b/src/backend/concrete_simulation.cc @@ -64,7 +64,7 @@ ConcreteSimulation::jump_to(History::position p) throw(UserInterruptException, N // pauses the simulation (see below) break; case state_stopped: - _history.step_front(0); + _history.set_front(0); break; default: break; @@ -86,7 +86,7 @@ ConcreteSimulation::jump_to(History::position p) throw(UserInterruptException, N yet_to_finish = step(); increment++; } - get_history().step_front(p); + get_history().set_front(std::min(p, _history.get_size())); if (!yet_to_finish) stop(); @@ -126,7 +126,7 @@ ConcreteSimulation::run() throw(UserInterruptException, NullPolicyException, Mal switch (_state) { case state_stopped: - _history.step_front(0); + _history.set_front(0); break; default: break; @@ -136,9 +136,10 @@ ConcreteSimulation::run() throw(UserInterruptException, NullPolicyException, Mal //step forward bool yet_to_finish = step(); - get_history().step_front(get_history().get_front() + 1); + _history.set_front(_history.get_front() + 1); if (yet_to_finish) { + if(_mode == mode_step_by_step) pause(); else @@ -173,10 +174,23 @@ ConcreteSimulation::step() try { - //step forward + // step forward bool yet_to_finish = true; - if (get_history().get_front() == get_history().get_size() - 1) - yet_to_finish = Scheduler::get_instance().step_forward(_history, *get_policy(), *get_resource_policy()); + if (_history.get_front() == _history.get_size() - 1) + if(!_history.is_sealed()) + yet_to_finish = Scheduler::get_instance().step_forward(_history, *get_policy(), *get_resource_policy()); + else + yet_to_finish = false; + + if (!yet_to_finish) _history.seal(); + + // since the simulation expects to be notified + // of simulation termination when reaching the last environment + // and the front will be updated just out of this method, + // we have to make this horrible thing + if (_history.get_front() == _history.get_size() - 2 && _history.is_sealed()) + yet_to_finish = false; + return yet_to_finish; } catch (const CPUPolicyException& e) diff --git a/src/backend/history.hh b/src/backend/history.hh index 77a814a..5527f4e 100644 --- a/src/backend/history.hh +++ b/src/backend/history.hh @@ -129,6 +129,7 @@ namespace sgpem time_t duration) = 0; virtual position get_front() const; + virtual bool is_sealed() const = 0; virtual void attach(HistoryObserver& observer); virtual void detach(const HistoryObserver& observer); diff --git a/src/backend/ready_queue.cc b/src/backend/ready_queue.cc index eba81d6..835bbfd 100644 --- a/src/backend/ready_queue.cc +++ b/src/backend/ready_queue.cc @@ -27,7 +27,7 @@ using sgpem::ReadyQueue; void ReadyQueue::swap(position a, position b) -throw (std::out_of_range) + throw (std::out_of_range) { if (a == b) return; @@ -51,7 +51,7 @@ ReadyQueue::size() const sgpem::Thread& ReadyQueue::get_item_at(position index) -throw (std::out_of_range) + throw (std::out_of_range) { // Checks index access return *_scheds.at(index); @@ -59,7 +59,7 @@ throw (std::out_of_range) const sgpem::Thread& ReadyQueue::get_item_at(position index) const -throw (std::out_of_range) + throw (std::out_of_range) { // Checks index access return *_scheds.at(index); @@ -71,6 +71,18 @@ ReadyQueue::append(Thread& thread) _scheds.push_back(&thread); } + +void +ReadyQueue::bubble_to_front(position x) + throw(std::out_of_range) +{ + while(x > 0) + { + swap(x - 1, x); + --x; + } +} + void ReadyQueue::erase_first() { diff --git a/src/backend/ready_queue.hh b/src/backend/ready_queue.hh index 6c7434f..be95e17 100644 --- a/src/backend/ready_queue.hh +++ b/src/backend/ready_queue.hh @@ -42,6 +42,10 @@ namespace sgpem Thread& get_item_at(position index) throw (std::out_of_range); const Thread& get_item_at(position index) const throw (std::out_of_range); void append(Thread& schedulable); + + /** \brief Bubble element x to the front of the queue + */ + void bubble_to_front(position x) throw(std::out_of_range); void erase_first(); private: diff --git a/src/backend/scheduler.cc b/src/backend/scheduler.cc index 3cf9473..b56bb48 100644 --- a/src/backend/scheduler.cc +++ b/src/backend/scheduler.cc @@ -61,6 +61,7 @@ typedef Environment::SubRequestQueue SubRequestQueue; // ------------------ Static helper functions -------------- +inline bool is_running(const Thread* running_thread); static void collect_threads(const std::vector& procs, Threads& collected_threads); static void prepare_ready_queue(ConcreteEnvironment& snapshot, const Threads& all_threads); static void terminate_all_requests_of(DynamicThread& thread, ConcreteEnvironment& environment); @@ -75,6 +76,12 @@ static void determine_subr_allocable_status(const Resource& res, SubRequestQueue // --------------------------------------------------------- +bool +is_running(const Thread* running_thread) +{ + return running_thread != NULL && running_thread->get_state() == Schedulable::state_running; +} + // Collects all threads of an environment into a single vector void @@ -480,6 +487,7 @@ Scheduler::step_forward(History& history, CPUPolicy& cpu_policy, ResourcePolicy& current.set_state(Schedulable::state_terminated); current.set_last_release(current_instant); terminate_all_requests_of(current, *new_snapshot); + running_thread = NULL; } // if we found the running thread, there isn't another one, @@ -536,29 +544,31 @@ Scheduler::step_forward(History& history, CPUPolicy& cpu_policy, ResourcePolicy& _ready_queue = &new_snapshot->get_sorted_queue(); // Determine if the policy is pre_emptive, and what time slice it uses - bool preemptible_policy = cpu_policy.is_pre_emptive(); + bool preemptive_policy = cpu_policy.is_pre_emptive(); int time_slice = cpu_policy.get_time_slice(); // ?. See if old running_thread has to be put to ready state - // This happens when the policy makes use of preemptability by - // priority, or when a time slice ended - if (running_thread != NULL && running_thread->get_state() == Schedulable::state_running && - (preemptible_policy || (time_slice > 0 && - // A process can be preempted every n-th time-slice, so we use the modulo operator - (current_instant - running_thread->get_last_acquisition()) % time_slice == 0) )) + // This happens when a time slice ends + if (is_running(running_thread) && time_slice > 0 && + // A process can be preempted every n-th time-slice, so we use the modulo operator: + (current_instant - running_thread->get_last_acquisition()) % time_slice == 0) { running_thread->set_state(Schedulable::state_ready); - // We don't set the last_release parameter here. If necessary, - // we'll do that below, when selecting a new running thread, - // only if it's different from the previous one. + running_thread->set_last_release(current_instant); } + prepare_ready_queue(*new_snapshot, all_threads); + + // If the policy is preemptible, and we still have a running thread, + // add it to the queue. + if(preemptive_policy && is_running(running_thread)) + _ready_queue->append(*running_thread); + // ?. Ask the policy to sort the queue. If we must select // a new thread and it can't run for some reason (it goes blocked, or // terminates), then we remove it from the built ReadyQueue and - // check if the next one can run. - prepare_ready_queue(*new_snapshot, all_threads); + // check if the next one can run (see while loop further below). if(_ready_queue->size() > 0) cpu_policy.sort_queue(); // If we don't have to select a new running thread, because the old one didn't @@ -566,7 +576,11 @@ Scheduler::step_forward(History& history, CPUPolicy& cpu_policy, ResourcePolicy& // * if the current running thread doesn't block, we can perform the final cleanup, // since the queue is already sorted // * else we've to select another running thread, so we continue down in the method - if(running_thread != NULL && running_thread->get_state() == Schedulable::state_running) + if( // Non-preemptive policy, not ended time-slice: + (!preemptive_policy && is_running(running_thread)) || + // Pre-emptive policy, running thread still the first of the queue. + // Note: if is_running(running_thread) == true, then _ready_queue->size() > 0 + (preemptive_policy && is_running(running_thread) && &_ready_queue->get_item_at(0) == running_thread) ) { raise_new_requests(*running_thread, *new_snapshot, resource_policy); if(running_thread->get_state() != Schedulable::state_blocked) @@ -602,18 +616,6 @@ Scheduler::step_forward(History& history, CPUPolicy& cpu_policy, ResourcePolicy& continue; } - // If the new running is different from the old one, - // remember to release our old pal, and to acquire our - // new runner. - // It'll sufficit that *one* candidate is different from - // the old running to trigger this, and it's rightly so. - if(&candidate != running_thread) - { - if(running_thread != NULL) - running_thread->set_last_release(current_instant); - candidate.set_last_acquisition(current_instant); - } - // Now we check if our candidate blocks on a new request raise_new_requests(candidate, *new_snapshot, resource_policy); if(candidate.get_state() != Schedulable::state_blocked) @@ -633,6 +635,19 @@ Scheduler::step_forward(History& history, CPUPolicy& cpu_policy, ResourcePolicy& // Fix fields of running thread DynamicThread& new_running = (DynamicThread&) _ready_queue->get_item_at(0); new_running.set_state(Schedulable::state_running); + + // If the new running is different from the old one, + // remember to release our old pal, and to acquire our + // new runner. + if(&new_running != running_thread) + { + if(running_thread != NULL) + { + running_thread->set_state(Schedulable::state_ready); + running_thread->set_last_release(current_instant); + } + new_running.set_last_acquisition(current_instant); + } } } diff --git a/src/text_simulation.cc b/src/text_simulation.cc index 1572e3b..808c2c9 100644 --- a/src/text_simulation.cc +++ b/src/text_simulation.cc @@ -777,6 +777,8 @@ TextSimulation::on_set(const Tokens& arguments) if (policy == 0) { Simulation::get_instance().set_policy(*it); + cout << endl << (*it)->get_name() << " scheduling policy selected." << endl; + // FIXME: dedicate a function to set resource policy // which includes the following default one ResourcePolicyManager & rpm = *ResourcePoliciesGatekeeper::get_instance().get_registered().at(0); @@ -1001,7 +1003,7 @@ TextSimulation::on_show_cpu_policies(const Tokens& arguments) for (CPUPolicyIt it = policies.begin(); it != policies.end(); ++it) { ostringstream oss; - oss << index << ". " << (*it)->get_name() << endl; + oss << endl << index << ". " << (*it)->get_name() << endl; oss << "\t" << (*it)->get_description() << endl; p_stdout(oss.str());