Browse Source

Fleshed out WorkerPool.cpp

git-svn-id: https://svn.microneil.com/svn/SNF4CGP/trunk@15 59e8e3e7-56fa-483b-b4b4-fa6ab0af3dfc
master
madscientist 15 years ago
parent
commit
fba5c00979
2 changed files with 137 additions and 5 deletions
  1. 129
    4
      SNF4CGP/WorkerPool.cpp
  2. 8
    1
      SNF4CGP/WorkerPool.hpp

+ 129
- 4
SNF4CGP/WorkerPool.cpp View File

@@ -8,9 +8,134 @@
using namespace std;
const ThreadType OutputProcessor::Type("Output Processor");
//// Worker ////////////////////////////////////////////////////////////////////
const static Worker::ThreadType Type("Worker");
const static Worker::ThreadState Waiting("Waiting");
const static Worker::ThreadState Working("Working");
const ThreadType Worker::Type("Worker");
const ThreadState Worker::Waiting("Waiting");
const ThreadState Worker::Working("Working");
Worker::Worker(WorkerPool& W) : // Initialize and start the thread.
Workers(W),
TimeToStop(false),
myJob_(0) {
}
virtual Worker::~Worker() { // Cleanup on the way down.
try { stop(); }
catch(...) {}
}
bool Worker::isJob() { // Job pointer check.
return(0 != myJob_);
}
RuntimeCheck CheckForValidJobPointer("Worker::myJob() Check(0 != myJob_)");
Job& myJob() { // Safe access to myJob.
CheckForValidJobPointer(0 != myJob_);
return(*myJob_);
}
LogicCheck CheckForOneJobAtATime("Worker::doJob() Check(false == isJob())");
void Worker::doJob(Job& J) { // Give this worker a job to do.
ScopeMutex CallTheBall(Busy);
CheckForOneJobAtATime(false == isJob());
myJob_ = &J;
StoppingPoint.produce();
}
LogicCheck CheckForNoJobsAtStop("Worker::stop() Check(false == isJob())");
void Worker::stop() { // Stop the worker thread.
ScopeMutext CallTheBall(Busy); // Don't stop while busy or vv.
if(false == TimeToStop) { // Do this only once.
CheckForNoJobsAtStop(false == isJob());
TimeToStop = true; // It is time to stop.
StoppingPoint.produce(); // Get busy stopping.
join(); // Wait for the end.
}
}
bool Worker::doWork() { // Get the job done.
ScopeMutex CallTheBall(Busy); // We're busy - don't touch.
if(isJob()) { // Only do a job if we have one.
myJob().doIt();
return true;
}
return false;
}
void Worker::myTask() { // Task for the thread. (my loop).
for(;;) {
CurrentThreadState(Waiting);
StoppingPoint.consume(); // Wait for a job.
CurrentThreadState(Working);
if(doWork()) { // After we do a job
Workers.drop(*this); // drop ourselves back in the pool
continue; // and look for another job.
} // If we didn't have a real job
else break; // then it's time to stop.
}
}
//// WorkerPool ////////////////////////////////////////////////////////////////
/*
class WorkerPool { // Worker pools look like this...
private:
Mutex AllocationMutex; // Worker pool allocation control.
ProductionQueue<Worker*> RecycledWorkers; // Where do recyceled workers go.
unsigned int AllocatedWorkers; // Count of workers to clean up.
bool Started; // True if we're ready to go.
public:
WorkerPool(); // Set startup defaults.
~WorkerPool(); // Cleanup if needed.
void init(); // Start making workers.
Worker& grab(); // Give me a worker to use.
void drop(Worker& W); // Take this worker back.
void stop(); // Destroy all workers and stop.
};
*/
WorkerPool::WorkerPool() :
AllocatedWorkers(0),
Started(false) {
}
WorkerPool::~WorkerPool() {
try { stop(); }
catch(...) {}
}
void WorkerPool::init() { // Initialize the worker pool.
Worker& FirstWorker = makeWorker(); // Make the first worker and
drop(FirstWorker); // drop it into the pool.
Started = true; // We are now initialized.
}
Worker* WorkerPool::makeWorker() { // Make and count workers.
Worker* NewWorker = 0;
NewWorker = new Worker(*this); // Allocate the worker.
++AllocatedWorkers; // If successful count it.
return Worker; // Return a reference.
}
RuntimeCheck CheckForValidGrabbedWorker("WorkerPool:grab() Check(0 != GrabbedWorker)");
Worker& WorkerPool::grab() { // Grab a worker from the pool.
ScopeMutex Busy(AllocationMutex);
Worker* GrabbedWorker = 0;
if(0 < RecycledWorkers.size()) GrabbedWorker = RecycledWorkers.take(); // Prefer to use recycled workers.
else GrabbedWorker = makeWorker(); // Make new ones if needed.
CheckForValidGrabbedWorker(0 != GrabbedWorker);
return (*GrabbedWorker);
}
void WorkerPool::drop(Worker& W) {
}
void WorkerPool::stop() {
}

+ 8
- 1
SNF4CGP/WorkerPool.hpp View File

@@ -25,14 +25,19 @@ class Worker : private Thread {
private:
WorkerPool& Workers; // Every worker knows where it lives.
ProductionGateway StoppingPoint; // Thread stops here and waits.
Mutex Busy; // Busy state protection.
bool TimeToStop; // stop() has been called.
void doWork(); // Get a job done.
void myTask(); // Task for the thread. (my loop).
Job* myJob_; // The job to do (or 0 to stop).
bool isJob(); // Job pointer check.
Job& myJob(); // Safe access to myJob.
public:
Worker(WorkerPool& W); // Initialize and start the thread.
virtual ~Worker(); // Cleanup on the way down.
void doJob(Job& J); // Give this worker a job to do.
stop(); // Stop the worker thread.
void stop(); // Stop the worker thread.
const static ThreadType Type;
const static ThreadState Waiting;
@@ -47,6 +52,8 @@ class WorkerPool {
unsigned int AllocatedWorkers; // Count of workers to clean up.
bool Started; // True if we're ready to go.
Worker* makeWorker(); // Make and count workers.
public:
WorkerPool(); // Set startup defaults.
~WorkerPool(); // Cleanup if needed.

Loading…
Cancel
Save