Browse Source

Added guts to JobPool

Added safety to ScannerPool and WorkerPool


git-svn-id: https://svn.microneil.com/svn/SNF4CGP/trunk@22 59e8e3e7-56fa-483b-b4b4-fa6ab0af3dfc
master
madscientist 15 years ago
parent
commit
27aafa90e0
3 changed files with 58 additions and 8 deletions
  1. 54
    6
      SNF4CGP/JobPool.cpp
  2. 2
    1
      SNF4CGP/ScannerPool.cpp
  3. 2
    1
      SNF4CGP/WorkerPool.cpp

+ 54
- 6
SNF4CGP/JobPool.cpp View File

void Job::emitDISCARD() { void Job::emitDISCARD() {
} }
void job::finalize() {
void Job::finalize() {
} }
void Job::doWakeUp() { void Job::doWakeUp() {
ScanResultCode(0), ScanResultCode(0),
ReadLength(0) { // Minimize heap thrashing. ReadLength(0) { // Minimize heap thrashing.
OutputBuffer.reserve(StringReserveSize); OutputBuffer.reserve(StringReserveSize);
HeadersToInject.researve(StringReserveSize);
HeadersToInject.reserve(StringReserveSize);
ReadBuffer.reserve(ReadBufferSize); ReadBuffer.reserve(ReadBufferSize);
} }
ReadLength = 0; ReadLength = 0;
} }
LogicFault FaultIfQuitGetsHere("Job::setCommand() Fault(Command::QUIT == C.Type)");
void Job::setCommand(Command& C) { // Assign a command for this job. void Job::setCommand(Command& C) { // Assign a command for this job.
FaultIfQuitGetsHere(Command::QUIT == C.Type);
CurrentCommand = C; CurrentCommand = C;
} }
JobPool::JobPool() : // Simple constructor. JobPool::JobPool() : // Simple constructor.
Output_(0), Output_(0),
Scanners_(0),
Scanners_(new ScannerPool()),
AllocatedJobs(0), AllocatedJobs(0),
Started(false) { Started(false) {
} }
JobPool::~JobPool() { // Clean up on the way out. JobPool::~JobPool() { // Clean up on the way out.
try { stop(); }
try {
stop();
Output_ = 0;
if(Scanners_) delete Scanners_;
Scanners_ = 0;
}
catch(...) {} catch(...) {}
} }
RuntimeCheck CheckForValidOutputPool("JobPool::Output() Check(0 != Output_)");
OutputProcessor& JobPool::Output() {
CheckForValidOutputPool(0 != Output_);
return (*Output_);
}
RuntimeCheck CheckForValidScannerPool("JobPool::Scanners() Check(0 != Scanners_)");
ScannerPool& JobPool::Scanners() {
CheckForValidScannerPool(0 != Scanners_);
return (*Scanners_);
}
// Watch out -- Initializing the JobPool also means starting up SNFMulti and // Watch out -- Initializing the JobPool also means starting up SNFMulti and
// the ScannerPool. // the ScannerPool.
void JobPool::init(string Configuration, OutputProcessor& O) { // Initialize the JobPool. void JobPool::init(string Configuration, OutputProcessor& O) { // Initialize the JobPool.
Output_ = &O;
Scanners().init(Configuration);
ScopeMutex Busy(AllocationMutex);
Job* FirstJob = makeJob();
Jobs.give(FirstJob);
Started = true;
} }
Job* JobPool::makeJob() { // Create and count a Job. Job* JobPool::makeJob() { // Create and count a Job.
Job* NewJob = new Job(Scanners(), Output());
++AllocatedJobs;
return NewJob;
} }
LogicCheck CheckGrabJobsOnlyWhenStarted("JobPool::grab() Check(Started)");
RuntimeCheck CheckForValidGrabbedJob("JobPool::grab() Check(0 != GrabbedJob)");
Job& JobPool::grab() { // Grab a job (prefer from the pool). Job& JobPool::grab() { // Grab a job (prefer from the pool).
ScopeMutex Busy(AllocationMutex);
CheckGrabJobsOnlyWhenStarted(Started);
Job* GrabbedJob = 0;
if(0 < Jobs.size()) GrabbedJob = Jobs.take(); // Prefer jobs from the pool.
else GrabbedJob = makeJob(); // Make new ones if needed.
CheckForValidGrabbedJob(0 != GrabbedJob);
return (*GrabbedJob);
} }
void JobPool::drop(Job& J) { // Drop a job into the pool. void JobPool::drop(Job& J) { // Drop a job into the pool.
Jobs.give(&J);
} }
void JobPool::killJobFromPool() { // Kill and count a Job from the pool. void JobPool::killJobFromPool() { // Kill and count a Job from the pool.
Job* JobToKill = 0;
JobToKill = Jobs.take();
delete JobToKill;
--AllocatedJobs;
} }
// Watch out -- Stopping the JobPool also means shutting down SNFMulti and // Watch out -- Stopping the JobPool also means shutting down SNFMulti and
// the ScannerPool. // the ScannerPool.
void JobPool::stop() { // Shut down the JobPool. void JobPool::stop() { // Shut down the JobPool.
// Kill off all of the allocated Jobs.
// Shut down and destroy the ScannerPool.
ScopeMutex Busy(AllocationMutex);
while(0 < AllocatedJobs) killJobFromPool();
Scanners().stop();
Started = false;
} }

+ 2
- 1
SNF4CGP/ScannerPool.cpp View File

void ScannerPool::init(const string Configuration) { // Get ready to provide scanners. void ScannerPool::init(const string Configuration) { // Get ready to provide scanners.
Rulebase().open(Configuration.c_str(), "", ""); // Light up the rulebase manager. Rulebase().open(Configuration.c_str(), "", ""); // Light up the rulebase manager.
Scanner* FirstScanner = new Scanner(Rulebase()); // Create our first scanner.
ScopeMutex Busy(AllocationControl);
Scanner* FirstScanner = makeScanner(); // Create our first scanner.
PooledScanners.give(FirstScanner); // Drop it into the pool. PooledScanners.give(FirstScanner); // Drop it into the pool.
Started = true; Started = true;
} }

+ 2
- 1
SNF4CGP/WorkerPool.cpp View File

} }
void WorkerPool::init() { // Initialize the worker pool. void WorkerPool::init() { // Initialize the worker pool.
ScopeMutex Busy(AllocationMutex);
Worker* FirstWorker = makeWorker(); // Make the first worker and Worker* FirstWorker = makeWorker(); // Make the first worker and
drop(*FirstWorker); // drop it into the pool.
RecycledWorkers.give(FirstWorker); // drop it into the pool.
Started = true; // We are now initialized. Started = true; // We are now initialized.
} }

Loading…
Cancel
Save