Thread states for WorkerPool git-svn-id: https://svn.microneil.com/svn/SNF4CGP/trunk@14 59e8e3e7-56fa-483b-b4b4-fa6ab0af3dfcmaster
@@ -62,7 +62,7 @@ class Job { | |||
~Job(); // Cleanup when destructing. | |||
void setCommand(Command C); // Assign a command for this job. | |||
void doIt(); // Get the job done. | |||
string& Output(); // Read the job's output. | |||
string& Responses(); // Read the job's report. | |||
void clear(); // Cleanup for the next command. | |||
}; | |||
@@ -73,7 +73,7 @@ class JobPool { | |||
ScannerPool* Scanners_; // Scanner Pool. | |||
ScannerPool& Scanners(); // Safe access to the Scanner Pool. | |||
Mutex myMutex; // Syncrhonization point. | |||
Mutex AllocationMutex; // Syncrhonization point. | |||
ProductionQueue<Job*> Jobs; // Recycled jobs holder. | |||
unsigned int AllocatedJobs; // Count of jobs to clean up. | |||
@@ -9,18 +9,17 @@ | |||
using namespace std; | |||
const ThreadType OutputProcessor::Type("Output Processor"); | |||
const ThreadState OutputProcessor::Uninitialized("Uninitialized"); | |||
const ThreadState OutputProcessor::StartingUp("Starting"); | |||
const ThreadState OutputProcessor::WaitingForJobs("Waiting"); | |||
const ThreadState OutputProcessor::PostingResponses("Posting"); | |||
const ThreadState OutputProcessor::ShuttingDown("Stopping"); | |||
OutputProcessor::OutputProcessor() : // Constructor sets up the basics. | |||
Thread(OutputProcessor::Type, "Output"), // Name the thread. | |||
RecycledJobs(0), // No job pool yet. | |||
isTimeToStop(false) {} // Not time to stop. | |||
isTimeToStop(false) { // Not time to stop. | |||
} | |||
OutputProcessor::~OutputProcessor() { // Destructor to clean things up. | |||
try { stop(); } catch (...) {} // Call stop() just in case. | |||
} | |||
RuntimeCheck CheckRecyclingNotNull("OutputProcessor::RecycledJobs() Check(0 != RecycledJobs_)"); | |||
@@ -31,13 +30,33 @@ JobPool& OutputProcessor::RecycledJobs() { | |||
} | |||
void OutputProcessor::init(JobPool& J) { // Start processing. | |||
RecycledJobs_ = &J; | |||
run(); | |||
} | |||
void OutputProcessor::outputJob(Job* J) { // Take this job and ... you know. | |||
void OutputProcessor::outputJob(Job& J) { // Take this job and ... you know. | |||
CompletedJobs.give(&J); | |||
} | |||
void OutputProcessor::stop() { // Finish off the queue and quit. | |||
if(false == isTimeToStop) { // Do this only once. | |||
isTimeToStop = true; // Set the stop flag. | |||
CompletedJobs.give(0); // Add the last job to kill the loop. | |||
} | |||
} | |||
void OutputProcessor::handleJob(Job& J) { // Process a job. | |||
CurrentThreadState(PostingResponses); | |||
cout << J.Responses(); | |||
J.clear(); | |||
RecycledJobs.drop(J); | |||
} | |||
void OutputProcessor::myTask() { // This is how we do it. | |||
Job* J = 0; // Pull Job pointers from the queue. | |||
CurrentThreadState(WaitingForJobs); | |||
while(J = CompletedJobs.take()) { // Process every job until we get | |||
handleJob(*J); // a 0 pointer. Then we're done. | |||
CurrentThreadState(WaitingForJobs); | |||
} | |||
} |
@@ -25,6 +25,8 @@ class OutputProcessor : private Thread { | |||
bool isTimeToStop; // Flag for when we need to stop. | |||
void myTask(); // Task for the thread. (my loop). | |||
void handleJob(Job& J); // Process a job. | |||
public: | |||
OutputProcessor(); // Constructed with the JobPool. | |||
@@ -35,12 +37,8 @@ class OutputProcessor : private Thread { | |||
void stop(); // Finish off the queue and quit. | |||
const static ThreadType Type; | |||
const static ThreadState Uninitialized; | |||
const static ThreadState StartingUp; | |||
const static ThreadState WaitingForJobs; | |||
const static ThreadState PostingResponses; | |||
const static ThreadState ShuttingDown; | |||
}; | |||
#endif |
@@ -1,3 +1,16 @@ | |||
// SNF4CGP/WorkerPool.cpp | |||
// Copyright (C) 2009 ARM Research Labs, LLC. | |||
// See www.armresearch.com for more information. | |||
#include "WorkerPool.hpp" | |||
#include "../SNF4CGP/CodeDweller/faults.hpp" | |||
using namespace std; | |||
const ThreadType OutputProcessor::Type("Output Processor"); | |||
const static Worker::ThreadType Type("Worker"); | |||
const static Worker::ThreadState Waiting("Waiting"); | |||
const static Worker::ThreadState Working("Working"); | |||
@@ -34,11 +34,15 @@ class Worker : private Thread { | |||
void doJob(Job& J); // Give this worker a job to do. | |||
stop(); // Stop the worker thread. | |||
const static ThreadType Type; | |||
const static ThreadState Waiting; | |||
const static ThreadState Working; | |||
}; | |||
class WorkerPool { // Worker pools look like this... | |||
private: | |||
Mutex myMutex; // Worker pool allocation control. | |||
Mutex AllocationMutex; // Worker pool allocation control. | |||
ProductionQueue<Worker*> RecycledWorkers; // Where do recyceled workers go. | |||
unsigned int AllocatedWorkers; // Count of workers to clean up. | |||
bool Started; // True if we're ready to go. |