Browse Source

Corrected shutdown deadlock in Workers.

Completed WAKE process.
It's Alive! Starts, WAKEs, QUITs.
Now it needs a brain (how to do jobs).


git-svn-id: https://svn.microneil.com/svn/SNF4CGP/trunk@23 59e8e3e7-56fa-483b-b4b4-fa6ab0af3dfc
master
madscientist 15 years ago
parent
commit
d38973aa63

+ 12
- 6
SNF4CGP/ExecutiveProcess.cpp View File

@@ -55,16 +55,22 @@ void ExecutiveProcess::doStartup() {
initializeWorkerPool();
}
void ExecutiveProcess::doProcessing() { // Command processing happens here.
void ExecutiveProcess::dispatchWakeupCommand() {
Command C;
C.Type = Command::WAKE; // Create the WAKE command to announce
C.Data = VersionInfo; // that we are alive (and who we are)
dispatchCommand(C); // and dispatch a job with it.
// Then process all incoming commands:
for(;0 == QuitJobNumber;) { // Loop until we have a QUIT job.
C = Input.getCommand(); // Get a command from the input.
if(Command::QUIT == C.Type) QuitJobNumber = C.Number; // QUIT command means it's time to stop.
else dispatchCommand(C); // Dispatch a job for all other commands.
}
void ExecutiveProcess::doProcessing() { // Command processing happens here.
dispatchWakeupCommand(); // First job is to announce ourselves.
for(;;) { // Then process all jobs but QUIT.
Command C = Input.getCommand();
if(Command::QUIT != C.Type) dispatchCommand(C);
else {
QuitJobNumber = C.Number;
return;
}
}
}

+ 1
- 0
SNF4CGP/ExecutiveProcess.hpp View File

@@ -39,6 +39,7 @@ class ExecutiveProcess {
void shutdownOutput();
void dispatchCommand(Command& C); // Job + Worker + Command; go!
void dispatchWakeupCommand();
public:

+ 44
- 5
SNF4CGP/InputProcessor.cpp View File

@@ -3,15 +3,54 @@
// See www.armresearch.com for more information.
#include "InputProcessor.hpp"
#include "Command.hpp"
#include <iostream>
#include <sstream>
#include <string>
using namespace std;
//// InputProcessor ////////////////////////////////////////////////////////////
Command& parseCommandNumber(istringstream& S, Command& C) {
S >> C.Number;
S.ignore();
return C;
}
const string INTFCommand = "INTF";
const string QUITCommand = "QUIT";
const string FILECommand = "FILE";
Command& parseCommandType(istringstream& S, Command& C) {
string TypeString;
S >> TypeString;
S.ignore();
switch(TypeString[0]) {
case 'I' : if(INTFCommand == TypeString) C.Type = Command::INTF; break;
case 'F' : if(FILECommand == TypeString) C.Type = Command::FILE; break;
case 'Q' : if(QUITCommand == TypeString) C.Type = Command::QUIT; break;
default : break;
}
return C;
}
Command& parseCommandData(istringstream& S, Command& C) {
getline(S,C.Data);
return C;
}
Command InputProcessor::getCommand() { // How to get/parse a command
// get a line from cin
// parse the number
// parse the type
// parse the remaining data
// return the command
string InputLine;
getline(cin, InputLine);
istringstream InputLineStream(InputLine);
Command NewCommand;
parseCommandNumber(InputLineStream, NewCommand); // First thing is always a number.
parseCommandType(InputLineStream, NewCommand); // Next is always a command name.
parseCommandData(InputLineStream, NewCommand); // The rest is data for the command.
return NewCommand;
}

+ 7
- 0
SNF4CGP/JobPool.cpp View File

@@ -5,6 +5,7 @@
#include "JobPool.hpp"
#include "Command.hpp"
#include "ScannerPool.hpp"
#include "OutputProcessor.hpp"
#include "../SNFMulti/SNFMulti.hpp"
@@ -27,6 +28,9 @@ void Job::emitCommentPrefix() {
}
void Job::emitComment(string Comment) {
ostringstream O;
O << "* " << Comment << endl;
OutputBuffer.append(O.str());
}
void Job::emitResponsePrevix() {
@@ -51,9 +55,12 @@ void Job::emitDISCARD() {
}
void Job::finalize() {
Output.outputJob(*this);
}
void Job::doWakeUp() {
emitComment("SNF4CGP Waking Up");
emitComment(CurrentCommand.Data);
}
void Job::doINTF() {

+ 15
- 6
SNF4CGP/WorkerPool.cpp View File

@@ -19,6 +19,7 @@ Worker::Worker(WorkerPool& W) :
Workers(W),
TimeToStop(false),
myJob_(0) {
run();
}
Worker::~Worker() { // Cleanup on the way down.
@@ -52,14 +53,22 @@ void Worker::doJob(Job& J) {
LogicCheck CheckForNoJobsAtStop("Worker::stop() Check(false == isJob())");
void Worker::stop() { // Stop the worker thread.
ScopeMutex CallTheBall(Busy); // Don't stop while busy or vv.
if(false == TimeToStop) { // Do this only once.
bool Worker::trySetTimeToStop() { // Statefully set time to stop.
bool SuccessFlag = false;
ScopeMutex CallTheBall(Busy); // Don't stop while busy.
if(false == TimeToStop) { // Do this only if not done yet.
CheckForNoJobsAtStop(false == isJob());
TimeToStop = true; // It is time to stop.
StoppingPoint.produce(); // Get busy stopping.
join(); // Wait for the end.
TimeToStop = true;
SuccessFlag = true;
}
return SuccessFlag;
}
void Worker::stop() { // Stop the worker thread.
if(trySetTimeToStop()) { // First try - set time to stop.
StoppingPoint.produce(); // Release the hounds.
join(); // Wait for the end.
} // Later tries do nothing.
}
bool Worker::doWork() { // Get the job done.

+ 2
- 1
SNF4CGP/WorkerPool.hpp View File

@@ -25,7 +25,8 @@ class Worker : private Thread {
WorkerPool& Workers; // Every worker knows where it lives.
ProductionGateway StoppingPoint; // Thread stops here and waits.
Mutex Busy; // Busy state protection.
bool TimeToStop; // stop() has been called.
bool TimeToStop; // True if stop() has been called.
bool trySetTimeToStop(); // Statefully set TimeToSTop.
bool doWork(); // Get a job done.
void myTask(); // Task for the thread. (my loop).
Job* myJob_; // The job to do (or 0 to stop).

+ 0
- 2
SNF4CGP/main.cpp View File

@@ -38,8 +38,6 @@ void go(string ConfigurationPath) {
catch(...) { // Display unusual exceptions.
cout << "* SNF4CGP Unusual Excetption!" << endl;
}
cout << SNF4CGP_VERSION_INFO << endl;
cout << ConfigurationPath << endl;
}
int main(int argc, char* argv[]) { // Main entry point.

Loading…
Cancel
Save