- Request queues are now correctly managed by the ConcreteEnvironment

copy constructor and by the add_resource and remove_resource methods
  found in ConcreteHistory.
- Scheduler now adds the requests in the queue when appropriate, and
  removes them when exhausted.
- Still to implement the management of the state of requests depending
  on their position in the queue
- Still to implement the way threads block depending on the state of
  their requests
- step_forward now reuses some bunch of code taken from the prototype


git-svn-id: svn://svn.gna.org/svn/sgpemv2/trunk@824 3ecf2c5c-341e-0410-92b4-d18e462d057c
This commit is contained in:
matrevis 2006-08-05 17:09:45 +00:00
parent 132db18b8c
commit efe7dedd61
5 changed files with 360 additions and 105 deletions

View File

@ -21,6 +21,7 @@
#include "concrete_environment.hh"
#include "dynamic_process.hh"
#include "dynamic_resource.hh"
#include "dynamic_sub_request.hh"
#include "sub_request.hh"
#include "thread.hh"
@ -41,16 +42,13 @@ ConcreteEnvironment::ConcreteEnvironment()
ConcreteEnvironment::ConcreteEnvironment(const ConcreteEnvironment& ce) :
Environment(ce), _resources(ce._resources), _processes(), _sched_queue()
Environment(ce), _resources(ce._resources), _processes(), _sched_queue(), _sreq_queues(ce._sreq_queues)
{
// The ReadyQueue won't be copied. Pointers to objects contained into
// the ready queue _will_ have changed in the new one. The ready queue
// needs to be reset: it is Scheduler that builds it again from time to time.
// Update resource pointers in a way you won't like :-)
// (for Marco -> optimization is the root of all evil! We have to
// copy DynamicResource too; this make things simpler (and
// future code modifications to DynamicResource easier))
{
for(Resources::iterator it = _resources.begin(); it != _resources.end(); it++)
it->second = new DynamicResource(dynamic_cast<const DynamicResource&>(*it->second));
@ -64,6 +62,62 @@ ConcreteEnvironment::ConcreteEnvironment(const ConcreteEnvironment& ce) :
for(Processes::const_iterator orig = ce_proc.begin(); orig != ce_proc.end(); orig++)
*dest++ = new DynamicProcess(dynamic_cast<const DynamicProcess&>(**orig));
}
// Update the subrequest queues.
// for each subrequest
typedef Processes::const_iterator it1_t;
typedef std::vector<Thread*> v2_t;
typedef v2_t::const_iterator it2_t;
typedef std::vector<Request*> v3_t;
typedef v3_t::const_iterator it3_t;
typedef std::vector<SubRequest*> v4_t;
typedef v4_t::const_iterator it4_t;
typedef SubRequestQueue::iterator it5_t;
for(it1_t it1 = _processes.begin(); it1 != _processes.end(); it1++)
{
const v2_t& threads = (*it1)->get_threads();
for(it2_t it2 = threads.begin(); it2 != threads.end(); it2++)
{
const v3_t& reqs = (*it2)->get_requests();
for(it3_t it3 = reqs.begin(); it3 != reqs.end(); it3++)
{
// an optimization here: there is no reason in iterating through
// future or exausted requests. (Do you know why?)
const v4_t& subr = (*it3)->get_subrequests();
for(it4_t it4 = subr.begin(); it4 != subr.end(); it4++)
{
SubRequest::state curr_state = (*it4)->get_state();
if(curr_state != Request::state_future && curr_state != Request::state_exhausted)
{
// the subrequest is the following queue:
SubRequestQueue & queue = get_request_queue((*it4)->get_resource_key());
// we must replace the old pointer:
bool found = false;
for(it5_t it5 = queue.begin(); !found && it5 != queue.end(); it5++)
{
DynamicSubRequest& _old = dynamic_cast<DynamicSubRequest&>(**it5);
DynamicSubRequest& _new = dynamic_cast<DynamicSubRequest&>(**it4);
if (&_old.get_core() == &_new.get_core())
{
found = true;
*it5 = *it4;
}
}
}
}
}
}
}
}
@ -115,6 +169,13 @@ ConcreteEnvironment::get_request_queue(resource_key_t resource_key)
return _sreq_queues[resource_key];
}
ConcreteEnvironment::SubRequestQueues&
ConcreteEnvironment::get_subrequest_queues()
{
return _sreq_queues;
}
const ReadyQueue&
ConcreteEnvironment::get_sorted_queue() const
@ -153,40 +214,3 @@ ConcreteEnvironment::~ConcreteEnvironment()
// -------------------------------- TO BE FIXED ----------------
// Prepare subrequest list for each resource:
// Requests request_queue;
// typedef Processes::const_iterator it1_t;
// typedef std::vector<Thread*> v2_t;
// typedef v2_t::const_iterator it2_t;
// typedef std::vector<Request*> v3_t;
// typedef v3_t::const_iterator it3_t;
// typedef std::vector<SubRequest*> v4_t;
// typedef v4_t::const_iterator it4_t;
// // Cyclomatic complexity will go nuts here. Feel the love. _ALL_ of it.
// for(it1_t it1 = _processes.begin(); it1 != _processes.end(); it1++)
// {
// const v2_t& threads = (*it1)->get_threads();
// for(it2_t it2 = threads.begin(); it2 != threads.end(); it2++)
// {
// const v3_t& reqs = (*it2)->get_requests();
// for(it3_t it3 = reqs.begin(); it3 != reqs.end(); it3++)
// {
// const v4_t& subr = (*it3)->get_subrequests();
// for(it4_t it4 = subr.begin(); it4 != subr.end(); it4++)
// {
// if((*it4)->get_resource_key() == resource_key)
// {
// request_queue.push_back(*it3);
// break;
// }
// }
// }
// }
// }

View File

@ -128,7 +128,19 @@ namespace sgpem
SubRequestQueue&
get_request_queue(resource_key_t resource_key);
typedef std::map<resource_key_t, SubRequestQueue> SubRequestQueues;
/// \brief Returns the set of request queues.
/// Returns a reference to the map from resources to subreuqest queues.
/// It is needed by history to delete the queue associated to a deleted
/// resource.
SubRequestQueues&
get_subrequest_queues();
/// \brief Returns a snapshot of the current scheduler's ready queue.
/// Returns a ReadyQueue object representing the queue
@ -158,7 +170,6 @@ namespace sgpem
private:
typedef std::map<resource_key_t, SubRequestQueue> SubRequestQueues;
/// \brief The container of all Resource objecs.
/// Actually contains only DynamicResource objects.

View File

@ -149,6 +149,14 @@ ConcreteHistory::remove(resource_key_t resource_key)
delete found->second;
resources.erase(found);
// Delete the queue associated with the resource.
ConcreteEnvironment::SubRequestQueues& srq = initial.get_subrequest_queues();
ConcreteEnvironment::SubRequestQueues::iterator qfound = srq.find(resource_key);
// There is always one!
assert(qfound != srq.end());
srq.erase(qfound);
// Now search and erase subrequest that had a ref to the
// removed resource
@ -252,6 +260,9 @@ ConcreteHistory::remove(Request& request)
void
ConcreteHistory::remove(SubRequest& subrequest)
{
// this function makes one relevant assumption:
// the initial environment does contain empty request queues only.
DynamicSubRequest& dyn_sub = dynamic_cast<DynamicSubRequest&>(subrequest);
DynamicRequest& dyn_req = dyn_sub.get_request();
DynamicThread& dyn_thr = dyn_req.get_thread();
@ -288,7 +299,7 @@ ConcreteHistory::add_resource(const Glib::ustring& name,
reset(false);
typedef ConcreteEnvironment::Resources Resources;
typedef ConcreteEnvironment::SubRequestQueue SubRequestQueue;
// And preemptable and availability?? FIXME!
StaticResource* core = new StaticResource(name, places);
@ -304,6 +315,10 @@ ConcreteHistory::add_resource(const Glib::ustring& name,
// Found a hole in the map, fill it like little Hans,
// its finger and the spilling dam.
Resources::iterator temp = resources.insert(pair<resource_key_t,Resource*>(index, resource)).first;
// The same for request queues.
SubRequestQueue emptysrq;
_snapshots.front()->get_subrequest_queues().insert(pair<resource_key_t,SubRequestQueue>(index, emptysrq));
notify_change();

View File

@ -80,49 +80,224 @@ static void prepare_ready_queue(ConcreteEnvironment& snapshot,
// For the current thread, see if there are requests that are exhausted
static void
update_requests_for_old_running_thread(DynamicThread& running_thread)
{
Requests& reqs = running_thread.get_dynamic_requests();
bool running_terminated = running_thread.get_state() == Schedulable::state_terminated;
for(Requests::iterator r_it = reqs.begin(); r_it != reqs.end(); r_it++)
{
DynamicRequest& rq = **r_it;
if(rq.get_state() == Request::state_allocated)
{
/* decrease remaining time for each allocated subrequest */
SubRequests& subreqs = rq.get_dynamic_subrequests();
for(SubRequests::iterator s_it = subreqs.begin(); s_it != subreqs.end(); s_it++)
{
DynamicSubRequest& subr = **s_it;
if(subr.get_state() == Request::state_allocated)
subr.decrease_remaining_time();
if(subr.get_remaining_time() == 0)
{
subr.set_state(Request::state_exhausted);
// ___BIG___ FIXME FIXME FIXME
// see extendRequest, case 0 and 1
// static void
// update_requests_for_old_running_thread(DynamicThread& running_thread)
// {
// }
// FIXME : if exhausted, it should be taken away from the queue of the
// requested resource
}
}
}
// If the running thread terminated uncoditionally put them in exhausted state
if(running_terminated)
/// \brief Manages a single SubRequest object, depending on its state.
/// Zero step: any -> terminated. Added to cope with malformed threads.
/// First step: allocated -> terminated.
/// Second step: non-allocable -> allocable.
/// Third step: allocable -> allocated, or future -> allocated.
///
/// The name and the structure of this method are ugly. They are inherited
/// from the whole visitor's structure, anyway we could simply switch on of
/// the state the SubRequest obejct, and we could (should?) factor out the
/// operations which check if the request is allocable or not, depending on
/// the queue position. Anyway, this factoring may lead to code dispersion.
/// I think it is better to hold the computational core in one single place.
void
extendSubRequest(DynamicSubRequest* sp, auto_ptr<ConcreteEnvironment> & env, int walk, int front)
{
DynamicSubRequest& s = *sp;
switch (walk)
{
/// Terminates directly the subrequest
case 0:
{
s.set_state(Request::state_exhausted);
/// Remove the subrequest (pointer) from the queue.
bool found = false;
typedef Environment::SubRequestQueue SubRequestQueue;
SubRequestQueue& queue = env->get_request_queue(s.get_resource_key());
SubRequestQueue::iterator it = queue.begin();
for (; !found && it != queue.end(); it++)
if ((*it) == sp)
{
found = true;
queue.erase(it);
}
break;
}
/// Updates the state of an ALLOCATED subrequest, decreasing appropriate
/// counters, and checks if it become TERMINATED. In the latter case the
/// function finds the position of the subrequest (pointer) in the
/// requested resource's queue and removes it.
case 1:
{
if (s.get_state() != Request::state_allocated)
break;
/// Decrease remaining time, since the resource has been used.
s.decrease_remaining_time();
/// Check for termination.
if (s.get_remaining_time() == 0)
{
SubRequests& subreqs = rq.get_dynamic_subrequests();
for(SubRequests::iterator s_it = subreqs.begin(); s_it != subreqs.end(); s_it++)
(*s_it)->set_state(Request::state_exhausted);
continue; // go to next request
s.set_state(Request::state_exhausted);
/// Remove the subrequest (pointer) from the queue.
bool found = false;
typedef Environment::SubRequestQueue SubRequestQueue;
SubRequestQueue& queue = env->get_request_queue(s.get_resource_key());
SubRequestQueue::iterator it = queue.begin();
for (; !found && it != queue.end(); it++)
{
if ((*it) == sp)
{
found = true;
queue.erase(it);
}
}
}
} //~ for(over requests)
}
break;
}
/*
/// Updates the state of a NON-ALLOCABLE subrequest, in case it become
/// ALLOCABLE, which may happen only when a resource has been released.
/// We could jump this check if no resource were released.
/// It finds the position of the subrequest (pointer) in the requested
/// resource's queue. If the position is within the places of the
/// resource, the subrequest is ALLOCABLE.
case 2:
{
if (s.get_state() != Request::state_allocated)
break;
/// The subrequest is in the queue for sure. Let's find it!
/// I really need an integer for this operation.
int position = 0;
while (position <= s.resource_ptr->queue.size())
{
if (s.resource_ptr->queue[position]->has_same_id(s))
/// Found!
break;
/// This statement is not executed if we find it.
position++;
}
/// Watch out: in a resource with 2 places, 0 and 1 are valid queue
/// positions, 2 is right one place out.
if (position < s.resource_ptr->places)
/// If so, set it ALLOCABLE.
s.set_state("ALLOCABLE");
break;
}
/// Updates the state of a FUTURE subrequest when the time has come
/// for it to be raised, setting it as allocable it if it is the case,
/// or blocking it. Enqueues the subrequest (pointer) at the end of the
/// requested resource's queue.
/// The resource policy should be called to manage the queue.
/// If the position is within the places of the resource, the subrequest
/// is ALLOCABLE, otherwise it is NON-ALLOCABLE.
case 3:
{
if (s.get_state() != "FUTURE")
break;
/// Enqueue the subrequest at the back of the queue.
s.resource_ptr->queue.push_back(&s);
/// TODO: right here, right now we should call the resource policy to
/// update the queue. Updates the state of the subrequest depending
/// on the position in the queue, as explained before.
s.set_state(s.resource_ptr->queue.size() > s.resource_ptr->places ?
"NON-ALLOCABLE" : "ALLOCABLE");
// Oh I miss ML so much.
break;
}
/// This is ugly, but hey, none's perfect.
/// Updates the state of a ALLOCABLE subrequest allocating it.
case 4:
{
if (s.get_state() == "ALLOCABLE")
s.set_state("ALLOCATED");
break;
}
*/
}
}
/// \brief Manages a single Request object, depending on its state.
/// Updates the state of a request, depending on its state, recursively
/// updating the contained subrequests. The state of the request is then
/// a function of the states of the subrequests.
///
/// Zero step: any -> terminated. Added to cope with malformed threads.
/// First step: allocated -> terminated.
/// Second step: non-allocable -> allocable.
/// Third step: allocable -> allocated, or future -> allocated.
///
/// The following function may be reduced to a pair of lines.
///
/// Longer, safer and more efficient version (and hopefully much easier
/// to understand!)
void
extendRequest(DynamicRequest* rp, auto_ptr<ConcreteEnvironment> & env, int walk, int front)
{
DynamicRequest& r = *rp;
switch (walk)
{
case 0:
{
typedef vector<DynamicSubRequest*> SubRequests;
SubRequests list = r.get_dynamic_subrequests();
for (SubRequests::iterator it = list.begin(); it != list.end(); it++)
extendSubRequest(*it, env, walk, front);
break;
}
/// Updates the state of an ALLOCATED request.
case 1:
{
if (r.get_state() != Request::state_allocated)
break;
typedef vector<DynamicSubRequest*> SubRequests;
SubRequests list = r.get_dynamic_subrequests();
for (SubRequests::iterator it = list.begin(); it != list.end(); it++)
extendSubRequest(*it, env, walk, front);
break;
}
/*
/// Updates the state of a NON-ALLOCABLE request.
case 2:
{
if (r.get_state() != "NON-ALLOCABLE")
break;
for (int j = 0; j < r.subrequests.size(); j++)
r.subrequests[j].accept(*this);
break;
}
/// Updates the state of an ALLOCABLE or FUTURE request.
case 3:
{
/// This is the only interesting case. If the current instant, measured
/// over the containing process execution time, is equal to the instant
/// in which the request has to be raised, the subrequests are
/// recursively updated for the first time ever, and their status
/// changes from FUTURE to something else.
if (r.get_state() == "FUTURE" && r.at == front)
for (int j = 0; j < r.subrequests.size(); j++)
r.subrequests[j].accept(*this);
if (r.get_state() == "ALLOCABLE")
{
walk = 4; // this is an ugly patch please forgive me
for (int j = 0; j < r.subrequests.size(); j++)
r.subrequests[j].accept(*this);
walk = 3;
}
break;
}
*/
}
}
// ---------------------------------------------------------
@ -229,7 +404,13 @@ Scheduler::step_forward(History& history, CPUPolicy& cpu_policy) throw(UserInter
// 4a. Look for exhausted requests for the running thread
if(running_thread != NULL)
update_requests_for_old_running_thread(*running_thread);
{
bool running_terminated = running_thread->get_state() == Schedulable::state_terminated;
Requests& reqs = running_thread->get_dynamic_requests();
for(Requests::iterator r_it = reqs.begin(); r_it != reqs.end(); r_it++)
extendRequest(*r_it, new_snapshot, running_terminated ? 0 : 1, running_thread->get_elapsed_time());
}
// ---------- FIXME ----------------
@ -292,8 +473,9 @@ Scheduler::step_forward(History& history, CPUPolicy& cpu_policy) throw(UserInter
for(SubRequests::iterator s_it = subreqs.begin(); s_it != subreqs.end(); s_it++)
{
DynamicSubRequest& subr = **s_it;
// FIXME: allocation is always granted, by now. We'll need queues to
// implement it correctly
Environment::SubRequestQueue& queue = new_snapshot->get_request_queue(subr.get_resource_key());
queue.push_back(*s_it);
// FIXME this code has to control the position in the queue
if(subr.get_state() == Request::state_future)
subr.set_state(Request::state_allocated);
}
@ -330,10 +512,11 @@ Scheduler::step_forward(History& history, CPUPolicy& cpu_policy) throw(UserInter
for(SubRequests::iterator s_it = subreqs.begin(); s_it != subreqs.end(); s_it++)
{
DynamicSubRequest& subr = **s_it;
// FIXME: allocation is always granted, by now. We'll need queues to
// implement it correctly
if(subr.get_state() == Request::state_future)
subr.set_state(Request::state_allocated);
Environment::SubRequestQueue& queue = new_snapshot->get_request_queue(subr.get_resource_key());
queue.push_back(*s_it);
// FIXME this code has to control the position in the queue
if(subr.get_state() == Request::state_future)
subr.set_state(Request::state_allocated);
}
}
}
@ -405,3 +588,17 @@ Scheduler::step_forward(History& history, CPUPolicy& cpu_policy) throw(UserInter
}

View File

@ -1455,21 +1455,29 @@ TextSimulation::update(const History& changed_history)
p_stdout(oss.str());
oss.str(string());
// FIXME this code causes a segfault because an invalid reference is
// returned from get_request_queue()
// const Environment::SubRequestQueue& req_queue =
// env.get_request_queue(it->first);
//
// p_stdout(_("\t\t\tqueue: { "));
//
// for(unsigned int i = 0; i < req_queue.size(); ++i)
// {
// oss << req_queue[i]->get_request().get_thread().get_name() << " ~ ";
// p_stdout(oss.str());
// oss.str(string());
// }
//
// p_stdout("}\n");
const Environment::SubRequestQueue& req_queue =
env.get_request_queue(it->first);
p_stdout(_("\t\t\tqueue: { "));
for(unsigned int i = 0; i < req_queue.size(); ++i)
{
if (i == r.get_places())
oss << " || ";
else
if (i != 0)
oss << " ~ ";
if (req_queue[i]->get_state() == Request::state_allocated)
oss << "[" << req_queue[i]->get_request().get_thread().get_name() << "]";
else
oss << req_queue[i]->get_request().get_thread().get_name();
p_stdout(oss.str());
oss.str(string());
}
p_stdout(" }\n");
}
p_stdout(_("PROCESSES:\n"));