- Merge branch 0.3-r1003--scheduler-manage-preemption into trunk

git-svn-id: svn://svn.gna.org/svn/sgpemv2/trunk@1023 3ecf2c5c-341e-0410-92b4-d18e462d057c
This commit is contained in:
tchernobog 2006-09-06 12:29:22 +00:00
parent cb5d958790
commit df4b1b4205
11 changed files with 158 additions and 55 deletions

View File

@ -58,3 +58,16 @@ the time slice will have to end before the former can run."""
self.sort(queue,by_ltime) self.sort(queue,by_ltime)
self.sort(queue,by_prio) self.sort(queue,by_prio)
# manage preemption: see if we've a running thread
# in the ready queue, and if it can still run
if self.is_preemptive() == True:
higher_prio = queue.get_item_at(0).get_current_priority()
i = 0
while i < queue.size():
sched = queue.get_item_at(i)
priority = sched.get_current_priority()
if(priority != higher_prio):
break
if sched.get_state() == "running":
queue.bubble_to_front(i)
i += 1

View File

@ -53,3 +53,4 @@ Time Next in this case."""
a.get_total_cpu_time() - a.get_elapsed_time() < \ a.get_total_cpu_time() - a.get_elapsed_time() < \
b.get_total_cpu_time() - b.get_elapsed_time() b.get_total_cpu_time() - b.get_elapsed_time()
self.sort(queue,cmpf) self.sort(queue,cmpf)

View File

@ -150,15 +150,6 @@ namespace sgpem {
public: public:
virtual ~Schedulable() = 0; virtual ~Schedulable() = 0;
enum state
{
state_running = 1<<0,
state_ready = 1<<1,
state_blocked = 1<<2,
state_future = 1<<3,
state_terminated = 1<<4
};
virtual unsigned int get_arrival_time() const = 0; virtual unsigned int get_arrival_time() const = 0;
virtual unsigned int get_elapsed_time() const = 0; virtual unsigned int get_elapsed_time() const = 0;
virtual int get_last_acquisition() const = 0; virtual int get_last_acquisition() const = 0;
@ -166,7 +157,30 @@ namespace sgpem {
virtual int get_base_priority() const = 0; virtual int get_base_priority() const = 0;
virtual int get_current_priority() const = 0; virtual int get_current_priority() const = 0;
virtual unsigned int get_total_cpu_time() const = 0; virtual unsigned int get_total_cpu_time() const = 0;
virtual state get_state() const = 0;
%ignore Schedulable::get_state() const;
%extend {
const char* get_state() const
{
switch(self->get_state())
{
case Schedulable::state_future:
return "future";
case Schedulable::state_terminated:
return "terminated";
case Schedulable::state_running:
return "running";
case Schedulable::state_ready:
return "ready";
case Schedulable::state_blocked:
return "blocked";
default:
// should never get here
return "undefined";
}
}
}
%ignore Schedulable::get_name() const; %ignore Schedulable::get_name() const;
%extend { %extend {
@ -204,6 +218,7 @@ namespace sgpem {
sgpem::Thread& get_item_at(position index); sgpem::Thread& get_item_at(position index);
void swap(position a, position b) throw(std::out_of_range); void swap(position a, position b) throw(std::out_of_range);
void bubble_to_front(position x) throw(std::out_of_range);
private: private:
// Avoid instantiation and copy // Avoid instantiation and copy

View File

@ -82,7 +82,7 @@ static T* deep_find(const std::vector<T*>& v, const T& obj)
// ----------------- // -----------------
ConcreteHistory::ConcreteHistory() ConcreteHistory::ConcreteHistory()
: History(), _snapshots() : History(), _snapshots(), _sealed(false)
{ {
_snapshots.push_back(new ConcreteEnvironment()); _snapshots.push_back(new ConcreteEnvironment());
} }
@ -95,7 +95,7 @@ ConcreteHistory::~ConcreteHistory()
} }
ConcreteHistory::ConcreteHistory(const ConcreteHistory& h) : ConcreteHistory::ConcreteHistory(const ConcreteHistory& h) :
History(h) History(h), _sealed(h._sealed)
{ {
typedef Snapshots::const_iterator SnapIt; typedef Snapshots::const_iterator SnapIt;
for (SnapIt it = h._snapshots.begin(); it != h._snapshots.end(); ++it) for (SnapIt it = h._snapshots.begin(); it != h._snapshots.end(); ++it)
@ -489,12 +489,16 @@ ConcreteHistory::edit_subrequest(SubRequest& subrequest,
void void
ConcreteHistory::step_front(position p) ConcreteHistory::set_front(position p)
{ {
_front = p; position old_front = _front;
if (p > _snapshots.size()) if (p > _snapshots.size() - 1)
_front = _snapshots.size(); _front = _snapshots.size() - 1;
notify_change(); else
_front = p;
if(old_front != _front)
notify_change();
} }
void void
@ -507,6 +511,7 @@ ConcreteHistory::reset(bool notify)
for_each(it, _snapshots.end(), deletor<ConcreteEnvironment>()); for_each(it, _snapshots.end(), deletor<ConcreteEnvironment>());
_snapshots.resize(1); // Truncate to keep only our "model" _snapshots.resize(1); // Truncate to keep only our "model"
_front = 0; _front = 0;
_sealed = false;
if (notify) if (notify)
notify_change(); notify_change();
@ -519,3 +524,18 @@ ConcreteHistory::notify_change()
for (it = _observers.begin(); it != _observers.end(); it++) for (it = _observers.begin(); it != _observers.end(); it++)
(*it)->update(*this); (*it)->update(*this);
} }
bool
ConcreteHistory::is_sealed() const
{
return _sealed;
}
bool
ConcreteHistory::seal()
{
bool t = _sealed;
_sealed = true;
return t;
}

View File

@ -109,8 +109,13 @@ namespace sgpem
resource_key_t resource_key, resource_key_t resource_key,
time_t duration); time_t duration);
// sets the front to position p
virtual void set_front(position p);
virtual void step_front(position p); bool is_sealed() const;
// (Returns if the History was sealed before this call)
bool seal();
virtual void reset(bool notify = true); virtual void reset(bool notify = true);
@ -123,7 +128,8 @@ namespace sgpem
private: private:
// Disable assignment, implement it only if needed // Disable assignment, implement it only if needed
ConcreteHistory& operator=(const ConcreteHistory& op2); ConcreteHistory& operator=(const ConcreteHistory& op2);
bool _sealed;
} }
; //~ class ConcreteHistory ; //~ class ConcreteHistory

View File

@ -64,7 +64,7 @@ ConcreteSimulation::jump_to(History::position p) throw(UserInterruptException, N
// pauses the simulation (see below) // pauses the simulation (see below)
break; break;
case state_stopped: case state_stopped:
_history.step_front(0); _history.set_front(0);
break; break;
default: default:
break; break;
@ -86,7 +86,7 @@ ConcreteSimulation::jump_to(History::position p) throw(UserInterruptException, N
yet_to_finish = step(); yet_to_finish = step();
increment++; increment++;
} }
get_history().step_front(p); get_history().set_front(std::min(p, _history.get_size()));
if (!yet_to_finish) if (!yet_to_finish)
stop(); stop();
@ -126,7 +126,7 @@ ConcreteSimulation::run() throw(UserInterruptException, NullPolicyException, Mal
switch (_state) switch (_state)
{ {
case state_stopped: case state_stopped:
_history.step_front(0); _history.set_front(0);
break; break;
default: default:
break; break;
@ -136,9 +136,10 @@ ConcreteSimulation::run() throw(UserInterruptException, NullPolicyException, Mal
//step forward //step forward
bool yet_to_finish = step(); bool yet_to_finish = step();
get_history().step_front(get_history().get_front() + 1); _history.set_front(_history.get_front() + 1);
if (yet_to_finish) if (yet_to_finish)
{ {
if(_mode == mode_step_by_step) if(_mode == mode_step_by_step)
pause(); pause();
else else
@ -173,10 +174,23 @@ ConcreteSimulation::step()
try try
{ {
//step forward // step forward
bool yet_to_finish = true; bool yet_to_finish = true;
if (get_history().get_front() == get_history().get_size() - 1) if (_history.get_front() == _history.get_size() - 1)
yet_to_finish = Scheduler::get_instance().step_forward(_history, *get_policy(), *get_resource_policy()); if(!_history.is_sealed())
yet_to_finish = Scheduler::get_instance().step_forward(_history, *get_policy(), *get_resource_policy());
else
yet_to_finish = false;
if (!yet_to_finish) _history.seal();
// since the simulation expects to be notified
// of simulation termination when reaching the last environment
// and the front will be updated just out of this method,
// we have to make this horrible thing
if (_history.get_front() == _history.get_size() - 2 && _history.is_sealed())
yet_to_finish = false;
return yet_to_finish; return yet_to_finish;
} }
catch (const CPUPolicyException& e) catch (const CPUPolicyException& e)

View File

@ -129,6 +129,7 @@ namespace sgpem
time_t duration) = 0; time_t duration) = 0;
virtual position get_front() const; virtual position get_front() const;
virtual bool is_sealed() const = 0;
virtual void attach(HistoryObserver& observer); virtual void attach(HistoryObserver& observer);
virtual void detach(const HistoryObserver& observer); virtual void detach(const HistoryObserver& observer);

View File

@ -27,7 +27,7 @@ using sgpem::ReadyQueue;
void void
ReadyQueue::swap(position a, position b) ReadyQueue::swap(position a, position b)
throw (std::out_of_range) throw (std::out_of_range)
{ {
if (a == b) return; if (a == b) return;
@ -51,7 +51,7 @@ ReadyQueue::size() const
sgpem::Thread& sgpem::Thread&
ReadyQueue::get_item_at(position index) ReadyQueue::get_item_at(position index)
throw (std::out_of_range) throw (std::out_of_range)
{ {
// Checks index access // Checks index access
return *_scheds.at(index); return *_scheds.at(index);
@ -59,7 +59,7 @@ throw (std::out_of_range)
const sgpem::Thread& const sgpem::Thread&
ReadyQueue::get_item_at(position index) const ReadyQueue::get_item_at(position index) const
throw (std::out_of_range) throw (std::out_of_range)
{ {
// Checks index access // Checks index access
return *_scheds.at(index); return *_scheds.at(index);
@ -71,6 +71,18 @@ ReadyQueue::append(Thread& thread)
_scheds.push_back(&thread); _scheds.push_back(&thread);
} }
void
ReadyQueue::bubble_to_front(position x)
throw(std::out_of_range)
{
while(x > 0)
{
swap(x - 1, x);
--x;
}
}
void void
ReadyQueue::erase_first() ReadyQueue::erase_first()
{ {

View File

@ -42,6 +42,10 @@ namespace sgpem
Thread& get_item_at(position index) throw (std::out_of_range); Thread& get_item_at(position index) throw (std::out_of_range);
const Thread& get_item_at(position index) const throw (std::out_of_range); const Thread& get_item_at(position index) const throw (std::out_of_range);
void append(Thread& schedulable); void append(Thread& schedulable);
/** \brief Bubble element x to the front of the queue
*/
void bubble_to_front(position x) throw(std::out_of_range);
void erase_first(); void erase_first();
private: private:

View File

@ -61,6 +61,7 @@ typedef Environment::SubRequestQueue SubRequestQueue;
// ------------------ Static helper functions -------------- // ------------------ Static helper functions --------------
inline bool is_running(const Thread* running_thread);
static void collect_threads(const std::vector<Process*>& procs, Threads& collected_threads); static void collect_threads(const std::vector<Process*>& procs, Threads& collected_threads);
static void prepare_ready_queue(ConcreteEnvironment& snapshot, const Threads& all_threads); static void prepare_ready_queue(ConcreteEnvironment& snapshot, const Threads& all_threads);
static void terminate_all_requests_of(DynamicThread& thread, ConcreteEnvironment& environment); static void terminate_all_requests_of(DynamicThread& thread, ConcreteEnvironment& environment);
@ -75,6 +76,12 @@ static void determine_subr_allocable_status(const Resource& res, SubRequestQueue
// --------------------------------------------------------- // ---------------------------------------------------------
bool
is_running(const Thread* running_thread)
{
return running_thread != NULL && running_thread->get_state() == Schedulable::state_running;
}
// Collects all threads of an environment into a single vector // Collects all threads of an environment into a single vector
void void
@ -480,6 +487,7 @@ Scheduler::step_forward(History& history, CPUPolicy& cpu_policy, ResourcePolicy&
current.set_state(Schedulable::state_terminated); current.set_state(Schedulable::state_terminated);
current.set_last_release(current_instant); current.set_last_release(current_instant);
terminate_all_requests_of(current, *new_snapshot); terminate_all_requests_of(current, *new_snapshot);
running_thread = NULL;
} }
// if we found the running thread, there isn't another one, // if we found the running thread, there isn't another one,
@ -536,29 +544,31 @@ Scheduler::step_forward(History& history, CPUPolicy& cpu_policy, ResourcePolicy&
_ready_queue = &new_snapshot->get_sorted_queue(); _ready_queue = &new_snapshot->get_sorted_queue();
// Determine if the policy is pre_emptive, and what time slice it uses // Determine if the policy is pre_emptive, and what time slice it uses
bool preemptible_policy = cpu_policy.is_pre_emptive(); bool preemptive_policy = cpu_policy.is_pre_emptive();
int time_slice = cpu_policy.get_time_slice(); int time_slice = cpu_policy.get_time_slice();
// ?. See if old running_thread has to be put to ready state // ?. See if old running_thread has to be put to ready state
// This happens when the policy makes use of preemptability by // This happens when a time slice ends
// priority, or when a time slice ended if (is_running(running_thread) && time_slice > 0 &&
if (running_thread != NULL && running_thread->get_state() == Schedulable::state_running && // A process can be preempted every n-th time-slice, so we use the modulo operator:
(preemptible_policy || (time_slice > 0 && (current_instant - running_thread->get_last_acquisition()) % time_slice == 0)
// A process can be preempted every n-th time-slice, so we use the modulo operator
(current_instant - running_thread->get_last_acquisition()) % time_slice == 0) ))
{ {
running_thread->set_state(Schedulable::state_ready); running_thread->set_state(Schedulable::state_ready);
// We don't set the last_release parameter here. If necessary, running_thread->set_last_release(current_instant);
// we'll do that below, when selecting a new running thread,
// only if it's different from the previous one.
} }
prepare_ready_queue(*new_snapshot, all_threads);
// If the policy is preemptible, and we still have a running thread,
// add it to the queue.
if(preemptive_policy && is_running(running_thread))
_ready_queue->append(*running_thread);
// ?. Ask the policy to sort the queue. If we must select // ?. Ask the policy to sort the queue. If we must select
// a new thread and it can't run for some reason (it goes blocked, or // a new thread and it can't run for some reason (it goes blocked, or
// terminates), then we remove it from the built ReadyQueue and // terminates), then we remove it from the built ReadyQueue and
// check if the next one can run. // check if the next one can run (see while loop further below).
prepare_ready_queue(*new_snapshot, all_threads);
if(_ready_queue->size() > 0) cpu_policy.sort_queue(); if(_ready_queue->size() > 0) cpu_policy.sort_queue();
// If we don't have to select a new running thread, because the old one didn't // If we don't have to select a new running thread, because the old one didn't
@ -566,7 +576,11 @@ Scheduler::step_forward(History& history, CPUPolicy& cpu_policy, ResourcePolicy&
// * if the current running thread doesn't block, we can perform the final cleanup, // * if the current running thread doesn't block, we can perform the final cleanup,
// since the queue is already sorted // since the queue is already sorted
// * else we've to select another running thread, so we continue down in the method // * else we've to select another running thread, so we continue down in the method
if(running_thread != NULL && running_thread->get_state() == Schedulable::state_running) if( // Non-preemptive policy, not ended time-slice:
(!preemptive_policy && is_running(running_thread)) ||
// Pre-emptive policy, running thread still the first of the queue.
// Note: if is_running(running_thread) == true, then _ready_queue->size() > 0
(preemptive_policy && is_running(running_thread) && &_ready_queue->get_item_at(0) == running_thread) )
{ {
raise_new_requests(*running_thread, *new_snapshot, resource_policy); raise_new_requests(*running_thread, *new_snapshot, resource_policy);
if(running_thread->get_state() != Schedulable::state_blocked) if(running_thread->get_state() != Schedulable::state_blocked)
@ -602,18 +616,6 @@ Scheduler::step_forward(History& history, CPUPolicy& cpu_policy, ResourcePolicy&
continue; continue;
} }
// If the new running is different from the old one,
// remember to release our old pal, and to acquire our
// new runner.
// It'll sufficit that *one* candidate is different from
// the old running to trigger this, and it's rightly so.
if(&candidate != running_thread)
{
if(running_thread != NULL)
running_thread->set_last_release(current_instant);
candidate.set_last_acquisition(current_instant);
}
// Now we check if our candidate blocks on a new request // Now we check if our candidate blocks on a new request
raise_new_requests(candidate, *new_snapshot, resource_policy); raise_new_requests(candidate, *new_snapshot, resource_policy);
if(candidate.get_state() != Schedulable::state_blocked) if(candidate.get_state() != Schedulable::state_blocked)
@ -633,6 +635,19 @@ Scheduler::step_forward(History& history, CPUPolicy& cpu_policy, ResourcePolicy&
// Fix fields of running thread // Fix fields of running thread
DynamicThread& new_running = (DynamicThread&) _ready_queue->get_item_at(0); DynamicThread& new_running = (DynamicThread&) _ready_queue->get_item_at(0);
new_running.set_state(Schedulable::state_running); new_running.set_state(Schedulable::state_running);
// If the new running is different from the old one,
// remember to release our old pal, and to acquire our
// new runner.
if(&new_running != running_thread)
{
if(running_thread != NULL)
{
running_thread->set_state(Schedulable::state_ready);
running_thread->set_last_release(current_instant);
}
new_running.set_last_acquisition(current_instant);
}
} }
} }

View File

@ -777,6 +777,8 @@ TextSimulation::on_set(const Tokens& arguments)
if (policy == 0) if (policy == 0)
{ {
Simulation::get_instance().set_policy(*it); Simulation::get_instance().set_policy(*it);
cout << endl << (*it)->get_name() << " scheduling policy selected." << endl;
// FIXME: dedicate a function to set resource policy // FIXME: dedicate a function to set resource policy
// which includes the following default one // which includes the following default one
ResourcePolicyManager & rpm = *ResourcePoliciesGatekeeper::get_instance().get_registered().at(0); ResourcePolicyManager & rpm = *ResourcePoliciesGatekeeper::get_instance().get_registered().at(0);
@ -1001,7 +1003,7 @@ TextSimulation::on_show_cpu_policies(const Tokens& arguments)
for (CPUPolicyIt it = policies.begin(); it != policies.end(); ++it) for (CPUPolicyIt it = policies.begin(); it != policies.end(); ++it)
{ {
ostringstream oss; ostringstream oss;
oss << index << ". " << (*it)->get_name() << endl; oss << endl << index << ". " << (*it)->get_name() << endl;
oss << "\t" << (*it)->get_description() << endl; oss << "\t" << (*it)->get_description() << endl;
p_stdout(oss.str()); p_stdout(oss.str());