Browse Source

Re-factored for multiplexed configuration access and cycle stealing updates.

git-svn-id: https://svn.microneil.com/svn/SNF4CGP/trunk@25 59e8e3e7-56fa-483b-b4b4-fa6ab0af3dfc
master
madscientist 15 years ago
parent
commit
105cc56b2f
4 changed files with 140 additions and 15 deletions
  1. 91
    8
      SNF4CGP/JobPool.cpp
  2. 24
    6
      SNF4CGP/JobPool.hpp
  3. 15
    0
      SNF4CGP/ScannerPool.cpp
  4. 10
    1
      SNF4CGP/ScannerPool.hpp

+ 91
- 8
SNF4CGP/JobPool.cpp View File

void Job::emitCommentPrefix() { void Job::emitCommentPrefix() {
} }
void Job::emitComment(string Comment) {
void Job::emitComment(const string& Comment) {
ostringstream O; ostringstream O;
O << "* " << Comment << endl; O << "* " << Comment << endl;
OutputBuffer.append(O.str()); OutputBuffer.append(O.str());
return CGPString; return CGPString;
} }
void Job::emitADDHEADER(string Headers) {
void Job::emitADDHEADER(const string& Headers) {
ostringstream O; ostringstream O;
O << CurrentCommand.Number << " ADDHEADER " << formatAsCGPString(Headers) << endl; O << CurrentCommand.Number << " ADDHEADER " << formatAsCGPString(Headers) << endl;
OutputBuffer.append(O.str()); OutputBuffer.append(O.str());
} }
void Job::emitERROR(string Report) {
void Job::emitERROR(const string& Report) {
ostringstream O; ostringstream O;
O << CurrentCommand.Number << " ERROR " << formatAsCGPString(Report) << endl; O << CurrentCommand.Number << " ERROR " << formatAsCGPString(Report) << endl;
OutputBuffer.append(O.str()); OutputBuffer.append(O.str());
OutputBuffer.append(O.str()); OutputBuffer.append(O.str());
} }
void Job::emitREJECTED(string Report) {
void Job::emitREJECTED(const string& Report) {
ostringstream O;
O << CurrentCommand.Number << " REJECTED " << formatAsCGPString(Report) << endl;
} }
void Job::finalize() { void Job::finalize() {
} }
void Job::doFILE() { void Job::doFILE() {
doRead();
doScan();
doAction();
}
RuntimeCheck CheckMessageReaderIsValid("Job::Reader() Check(0 != Reader_)");
ifstream& Job::Reader() { // Safe access to Reader_.
CheckMessageReaderIsValid(0 != Reader_);
return (*Reader_);
}
LogicFault FaultMessageReaderOpenedTwice("Job::openReader() Fault(0 != Reader_)");
RuntimeCheck CheckopenReaderWasSuccessful("Job::openReader() Check(Reader().good())");
void Job::openReader(string Path) {
FaultMessageReaderOpenedTwice(0 != Reader_);
Reader_ = new ifstream(Path.c_str(), ios::binary);
CheckopenReaderWasSuccessful(Reader().good());
} }
void Job::readTopOfMessage(ifstream& Reader) {
void Job::closeReader() {
if(Reader_) {
Reader().close();
delete Reader_;
Reader_ = 0;
}
}
RuntimeCheck CheckJobWriterIsValid("Job::Writer() Check(0 != Writer_)");
ofstream& Job::Writer() { // Safe access to Writer_.
CheckJobWriterIsValid(0 != Writer_);
return (*Writer_);
}
LogicFault FaultMessageWriterOpenedTwice("Job::openWriter() Fault(0 != Writer_)");
RuntimeCheck CheckopenWriterWasSuccessful("Job::openWriter() Check(Writer().good())");
void Job::openWriter(string Path) {
FaultMessageWriterOpenedTwice(0 != Writer_);
Writer_ = new ofstream(Path.c_str(), ios::trunc | ios::binary);
CheckopenWriterWasSuccessful(Writer().good());
}
void Job::closeWriter() {
if(Writer_) {
Writer().close();
delete Writer_;
Writer_ = 0;
}
} }
void Job::doRead() { void Job::doRead() {
openReader(Job::CurrentCommand.Data);
ReadBuffer.assign(ReadBufferSize, 0);
Reader().read(reinterpret_cast<char*>(&ReadBuffer[0]), ReadBuffer.size());
}
string Job::ScanName() { // Scan name from FILE command
ostringstream ScanNameFormatter;
ScanNameFormatter << "[" << CurrentCommand.Number << "]" << CurrentCommand.Data;
return ScanNameFormatter.str();
} }
void Job::doScan() { void Job::doScan() {
ScopeScanner myScanner(Scanners);
ScanResultCode =
myScanner.Engine().scanMessage(
&ReadBuffer[0], Reader().gcount(), ScanName(), JobTimer.getElapsedTime()
);
HeadersToInject = myScanner.Engine().getXHDRs();
} }
void Job::doAction() { void Job::doAction() {
// Select action based on current configuration
// Do the appropriate action
} }
void Job::doBypass() { void Job::doBypass() {
Scanners(S), Scanners(S),
Output(O), Output(O),
ScanResultCode(0), ScanResultCode(0),
ReadLength(0) { // Minimize heap thrashing.
Reader_(0),
Writer_(0) { // Minimize heap thrashing.
OutputBuffer.reserve(StringReserveSize); OutputBuffer.reserve(StringReserveSize);
HeadersToInject.reserve(StringReserveSize); HeadersToInject.reserve(StringReserveSize);
ReadBuffer.reserve(ReadBufferSize); ReadBuffer.reserve(ReadBufferSize);
HeadersToInject.clear(); HeadersToInject.clear();
MessageMoveFilePath.clear(); MessageMoveFilePath.clear();
ReadBuffer.clear(); ReadBuffer.clear();
ReadLength = 0;
closeReader();
closeWriter();
JobTimer.clear();
} }
LogicFault FaultIfQuitGetsHere("Job::setCommand() Fault(Command::QUIT == C.Type)"); LogicFault FaultIfQuitGetsHere("Job::setCommand() Fault(Command::QUIT == C.Type)");
void Job::setCommand(Command& C) { // Assign a command for this job. void Job::setCommand(Command& C) { // Assign a command for this job.
JobTimer.start();
FaultIfQuitGetsHere(Command::QUIT == C.Type); FaultIfQuitGetsHere(Command::QUIT == C.Type);
CurrentCommand = C; CurrentCommand = C;
} }
void Job::doIt() { // Get the job done.
void Job::executeCommand() {
switch(CurrentCommand.Type) { switch(CurrentCommand.Type) {
case Command::WAKE: { doWakeUp(); break; } case Command::WAKE: { doWakeUp(); break; }
case Command::INTF: { doINTF(); break; } case Command::INTF: { doINTF(); break; }
case Command::FILE: { doFILE(); break; } case Command::FILE: { doFILE(); break; }
default: { doFAIL(); break; } default: { doFAIL(); break; }
} }
}
void Job::emitException(const string& What) {
}
void Job::emitUnknownException() {
}
void Job::doIt() { // Get the job done.
try { executeCommand(); }
catch(exception& e) { emitException(e.what()); }
catch(...) { emitUnknownException(); }
finalize(); finalize();
} }

+ 24
- 6
SNF4CGP/JobPool.hpp View File

#define IncludedJobPool #define IncludedJobPool
#include "../SNFMulti/SNFMulti.hpp" #include "../SNFMulti/SNFMulti.hpp"
#include "../CodeDweller/timing.hpp"
#include "../CodeDweller/threading.hpp" #include "../CodeDweller/threading.hpp"
#include "Command.hpp" #include "Command.hpp"
#include <fstream>
#include <string> #include <string>
#include <vector> #include <vector>
Command CurrentCommand; // Has a current comand. Command CurrentCommand; // Has a current comand.
string OutputBuffer; // Preserves an output buffer. string OutputBuffer; // Preserves an output buffer.
Timer JobTimer; // Track job processing time.
//// SNF Scan and move data and tools //// SNF Scan and move data and tools
int ScanResultCode; int ScanResultCode;
string HeadersToInject; string HeadersToInject;
string MessageMoveFilePath; string MessageMoveFilePath;
vector<unsigned char> ReadBuffer; // Preserves a file read buffer. vector<unsigned char> ReadBuffer; // Preserves a file read buffer.
unsigned int ReadLength; // How much data in the buffer.
void readTopOfMessage(ifstream& Reader);
ifstream* Reader_; // Open ifstream for current message.
ifstream& Reader(); // Safe access to Reader_.
void openReader(string Path);
void closeReader();
ofstream* Writer_; // Open ofstream for current message.
ofstream& Writer(); // Safe access to Writer_.
void openWriter(string Path);
void closeWriter();
void moveMessageToHoldPath(ifstream& Reader); void moveMessageToHoldPath(ifstream& Reader);
//// Utility methods //// Utility methods
void emitCommentPrefix(); void emitCommentPrefix();
void emitComment(string Comment);
void emitComment(const string& Comment);
void emitOK(); void emitOK();
void emitINTF(); void emitINTF();
void emitFAILURE(); void emitFAILURE();
void emitADDHEADER(string Headers);
void emitERROR(string Report);
void emitADDHEADER(const string& Headers);
void emitERROR(const string& Report);
void emitDISCARD(); void emitDISCARD();
void emitREJECTED(string Report);
void emitREJECTED(const string& Report);
void finalize(); void finalize();
string Job::ScanName(); // Scan name from FILE command
void executeCommand();
void emitException(const string& What);
void Job::emitUnknownException();
//// These methods embody how we get jobs done. //// These methods embody how we get jobs done.
void doWakeUp(); void doWakeUp();

+ 15
- 0
SNF4CGP/ScannerPool.cpp View File

ScannerPool::ScannerPool() : // Constructed simply. ScannerPool::ScannerPool() : // Constructed simply.
Rulebase_(new snf_RulebaseHandler()), Rulebase_(new snf_RulebaseHandler()),
ScannerConfiguration_(new ConfigurationManager(Rulebase())),
ScannerCount(0), ScannerCount(0),
Started(false) { Started(false) {
} }
PooledScanners.give(&S); PooledScanners.give(&S);
} }
RuntimeCheck CheckForValidConfigurationManager("ScannerPool::ScannerConfiguration() Check(0 != ScannerConfiguration_)");
ConfigurationManager& ScannerPool::ScannerConfiguration() { // Safe coniguration access & update logic.
CheckForValidConfigurationManager(0 != ScannerConfiguration_);
ScopeMutex CheckingConfiguration(ConfigurationMutex);
ConfigurationManager& TheConfiguration = (*ScannerConfiguration_);
if(TheConfiguration.isOutOfDate()) TheConfiguration.update();
return TheConfiguration;
}
ResultConfiguration ScannerPool::ConfigurationForResultCode(int Code) { // Multiplexed access to configuration.
return ScannerConfiguration().ConfigurationForResultCode(Code);
}
//// ScopeScanner ////////////////////////////////////////////////////////////// //// ScopeScanner //////////////////////////////////////////////////////////////
ScopeScanner::ScopeScanner(ScannerPool& P) : // Constructed with the pool. ScopeScanner::ScopeScanner(ScannerPool& P) : // Constructed with the pool.

+ 10
- 1
SNF4CGP/ScannerPool.hpp View File

#ifndef IncludedScannerPool #ifndef IncludedScannerPool
#define IncludedScannerPool #define IncludedScannerPool
#include "ConfigurationEngine.hpp"
#include "../CodeDweller/threading.hpp" #include "../CodeDweller/threading.hpp"
#include <string> #include <string>
// Constructing a scanner is essentially constructing an snf_EngineHandler in a wrapper. // Constructing a scanner is essentially constructing an snf_EngineHandler in a wrapper.
class snf_RulebaseHandler; // These classes exists.
class snf_RulebaseHandler;
class snf_EngineHandler; class snf_EngineHandler;
class Scanner { // SNF Scanner instance for the pool class Scanner { // SNF Scanner instance for the pool
private: private:
snf_RulebaseHandler* Rulebase_; // Allocates an SNF rulebase. snf_RulebaseHandler* Rulebase_; // Allocates an SNF rulebase.
snf_RulebaseHandler& Rulebase(); // Safe access to the rulebase. snf_RulebaseHandler& Rulebase(); // Safe access to the rulebase.
Mutex ConfigurationMutex;
ConfigurationManager* ScannerConfiguration_; // Active configuration parser.
ConfigurationManager& ScannerConfiguration(); // Safe access & update logic.
unsigned int ScannerCount; // Allocates and counts scanners. unsigned int ScannerCount; // Allocates and counts scanners.
ProductionQueue<Scanner*> PooledScanners; // Pool of ready-to-go scanners. ProductionQueue<Scanner*> PooledScanners; // Pool of ready-to-go scanners.
void stop(); // Shutdown and clean up. void stop(); // Shutdown and clean up.
Scanner& grab(); // Provides a Scanner from the pool. Scanner& grab(); // Provides a Scanner from the pool.
void drop(Scanner& S); // Returns a Scanner to the pool. void drop(Scanner& S); // Returns a Scanner to the pool.
ResultConfiguration ConfigurationForResultCode(int Code); // Multiplexed access to configuration.
}; };
// Since Scanners are used for as short a time as possible it is usefule to have them // Since Scanners are used for as short a time as possible it is usefule to have them

Loading…
Cancel
Save