// snfXCImgr.cpp
// Copyright (C) 2007 - 2009 ARM Research Labs, LLC
// See www.armresearch.com for the copyright terms.
//
// See snfXCImgr.hpp for details.
#include "SNFMulti.hpp"
#include "snfXCImgr.hpp"
using namespace std;
// snfXCIServerCommandHandler Virtual Base Class Default Processor.
const string XCIServerCommandDefaultResponse =
"\n";
string snfXCIServerCommandHandler::processXCIRequest(snf_xci& X) { // A Server using SNFMulti
return XCIServerCommandDefaultResponse; // can provide a useful processor.
}
// snfXCIJob encapsulates a single XCI transaction.
void snfXCIJob::clear() { // Clear the buffers.
Request.clear(); // Clear the request and
Response.clear(); // response buffers.
SetupTime = 0; // No setup time yet.
}
// snfXCIJobProcessor encapsulates the logic to respond to an XCI request.
snfXCIJobProcessor::snfXCIJobProcessor(snf_RulebaseHandler* H) : // Setup scanner.
myHome(H) { // Establish myHome from H.
myEngine = new snf_EngineHandler(); // Create an engine handler and
myEngine->open(H); // tie it in to our home rulebase.
}
snfXCIJobProcessor::~snfXCIJobProcessor() { // Tear down scanner.
if(myEngine) { // Checking first that we have one,
myEngine->close(); // close the engine and then
delete myEngine; // delete it. Set the pointer to
myEngine = 0; // NULL to enforce the point.
}
myHome = 0; // NULL out our home too.
}
//// This collection of functions handle the processing of all XCI requests.
bool snfXCIJobProcessor::isScanJob() { // True if myXCI is a scan job.
if(0 < myXCI.scanner_scan_file.length()) return true; // If we have a scan file: true!
return false; // otherwise false.
}
bool snfXCIJobProcessor::isGBUdbJob() { // True if myXCI is a GBUdb job.
if( // GBUdb jobs have either
0 < myXCI.gbudb_test_ip.length() || // an IP to test or
0 < myXCI.gbudb_set_ip.length() || // an IP to setup or
0 < myXCI.gbudb_bad_ip.length() || // a bad IP to flag or
0 < myXCI.gbudb_good_ip.length() || // a good IP to flag or
0 < myXCI.gbudb_drop_ip.length()
) return true; // If we have one of these: true!
return false; // otherwise false.
}
bool snfXCIJobProcessor::isReportJob() { // True if myXCI is a Report job.
if(0 < myXCI.report_request_status_class.length()) return true; // If we have a report status class
return false; // it's a report otherwise it's not.
}
bool snfXCIJobProcessor::isCommandJob() { // True if myXCI is a Command job.
if(0 < myXCI.xci_server_command.length()) return true; // If we have a command string: true!
return false; // otherwise false.
}
void snfXCIJobProcessor::processScan(snfXCIJob& J) { // Process a scan request.
try { // Safely perform our scan.
// Check for forced IP.
IP4Address ForcedIP = 0UL; // Don't expect a forced IP.
if(0 < myXCI.scanner_scan_ip.length()) { // If we have one then
ForcedIP = myXCI.scanner_scan_ip; // convert it from the string.
}
// Scan the message file.
int ScanResult = // Scan the file using our
myEngine->scanMessageFile( // engine. Use the file
myXCI.scanner_scan_file.c_str(), // path in the XCI request, and
J.SetupTime, // the recorded setup time. Use the
ForcedIP // forced IP if provided.
);
// Create a proper xci resposne.
ostringstream ResultString; // Use a stringstream to make it easier.
ResultString
<< " and
<< "/>" // emit the closing elements.
<< endl; // End of the line.
} else { // If optional data is requested:
ResultString << ">" << endl; // Complete the open tag.
if(true == myXCI.scanner_scan_xhdr) { // Optionally include XHDR data.
ResultString // If xheaders are requested...
<< "" << myEngine->getXHDRs() // Emit the xhdr element & contents.
<< "" << endl; // End the xhdr and end of line.
}
if(true == myXCI.scanner_scan_log) { // Optionally include XMLLog data.
ResultString // If the log data is requested...
<< "" << myEngine->getXMLLog() // Emit the log element & data.
<< "" << endl; // End the log data and end of line.
}
ResultString << ""; // Emit the closing elements.
}
J.Response = ResultString.str(); // Capture the formatted response.
}
// Decode the known exceptions
catch(snf_EngineHandler::AllocationError& e) {
J.Response = "\n");
}
catch(snf_EngineHandler::BadMatrix& e) {
J.Response = "\n");
}
catch(snf_EngineHandler::Busy& e) {
J.Response = "\n");
}
catch(snf_EngineHandler::FileError& e) {
J.Response = "\n");
}
catch(snf_EngineHandler::MaxEvals& e) {
J.Response = "\n");
}
catch(snf_EngineHandler::Panic& e) {
J.Response = "\n");
}
catch(snf_EngineHandler::XHDRError& e) {
J.Response = "\n");
}
// Decode the unknown exceptions
catch(exception& e) {
J.Response = "\n");
}
catch(...) {
J.Response = "\n";
}
}
string snfXCIJobProcessor::processGBUdb() { // Process a GBUdb request.
GBUdb& myGBUdb = myHome->MyGBUdb; // Make a convenient GBUdb handle.
IP4Address IP; // We will work with an IP.
GBUdbRecord R; // We will get a record to return.
// Test an IP - return it's current data.
if(0 < myXCI.gbudb_test_ip.length()) { // IF: Test an IP
IP = myXCI.gbudb_test_ip; // Convert the IP.
} else
// Set or update an IP's data.
if(0 < myXCI.gbudb_set_ip.length()) { // IF: Set an IP's data.
IP = myXCI.gbudb_set_ip; // Convert the IP.
if( // Check for a compound update:
0 <= myXCI.gbudb_set_bad_count || // If we are changing the bad
0 <= myXCI.gbudb_set_good_count // or good count then this is
) { // a compound update (read then write).
R = myGBUdb.getRecord(IP); // Get the record (or a safe blank).
if(0 <= myXCI.gbudb_set_bad_count) // If we have a bad count to set
R.Bad(myXCI.gbudb_set_bad_count); // then set the bad count.
if(0 <= myXCI.gbudb_set_good_count) // If we have a good count to set
R.Good(myXCI.gbudb_set_good_count); // then set the good count.
if(0 < myXCI.gbudb_set_type.length()) { // If type, set type...
switch(myXCI.gbudb_set_type.at(0)) { // Determine the type based on the
case 'g': case 'G': { R.Flag(Good); break; } // first character of the name and
case 'b': case 'B': { R.Flag(Bad); break; } // set the appropriate flag.
case 'u': case 'U': { R.Flag(Ugly); break; }
case 'i': case 'I': { R.Flag(Ignore); break; }
}
}
myGBUdb.setRecord(IP, R); // Save the data.
} else // This might be a simple flag change.
if(0 < myXCI.gbudb_set_type.length()) { // If type, set type...
switch(myXCI.gbudb_set_type.at(0)) { // Determine the type based on the
case 'g': case 'G': { R = myGBUdb.setGood(IP); break; } // first character of the name and
case 'b': case 'B': { R = myGBUdb.setBad(IP); break; } // set the appropriate flag. Simple
case 'u': case 'U': { R = myGBUdb.setUgly(IP); break; } // flag changes are atomic so there is
case 'i': case 'I': { R = myGBUdb.setIgnore(IP); break; } // no need to "save" later.
}
} else { // Empty set command?
return XCIBadSetResponse; // That's bad. Use test!
}
} else
// Add a bad event to an IPs data.
if(0 < myXCI.gbudb_bad_ip.length()) { // IF: Add a bad mark for this IP
IP = myXCI.gbudb_bad_ip; // Convert the IP.
R = myGBUdb.addBad(IP); // Add a bad mark.
} else
// Add a good event to an IPs data.
if(0 < myXCI.gbudb_good_ip.length()) { // IF: Add a good mark for this IP
IP = myXCI.gbudb_good_ip; // Convert the IP.
R = myGBUdb.addGood(IP); // Add a bad mark.
} else
// Drop an IP from the database.
if(0 < myXCI.gbudb_drop_ip.length()) { // IF: Drop an IP's data.
IP = myXCI.gbudb_drop_ip; // Convert the IP.
myGBUdb.dropRecord(IP); // Forget about it.
}
// Return the final state of the IP's data.
IPTestRecord IPState(IP);
myHome->performIPTest(IPState);
ostringstream Response; // Use a stringstream for our output.
Response
<< "" // Finish it up.
<< endl;
return Response.str(); // Return the formatted response.
}
string snfXCIJobProcessor::processStatusReport() { // Process a report request.
string ReportToSend; // Keep this in scope.
if(0 == myXCI.report_request_status_class.find("hour")) { // Please send the hour report.
ReportToSend = myHome->MyLOGmgr.getStatusHourReport();
} else
if(0 == myXCI.report_request_status_class.find("minute")) { // Please send the minute report.
ReportToSend = myHome->MyLOGmgr.getStatusMinuteReport();
} else { // Please send the second report.
ReportToSend = myHome->MyLOGmgr.getStatusSecondReport();
}
string Response = ""; // Construct the response using the
Response.append(ReportToSend); // snf/xci template and the selected
Response.append(""); // status report text.
return Response; // Return the response.
}
void snfXCIJobProcessor::process(snfXCIJob& J) { // Process a Job.
// Parse the XCI request and check for an error.
myXCI.read(J.Request); // Parse the request.
if(myXCI.bad()) { // If it's bad then
J.Response = XCIErrorResponse; // respond with an error.
myHome->logThisError("XCI",-1,"Bad Request"); // Log the error.
return; // Done.
} else
// Process scan requests.
if(isScanJob()) { // If this is a Scan request
processScan(J); // respond with the result.
return; // Done.
} else
// Process gbudb requests.
if(isGBUdbJob()) { // If this is a GBUdb request
J.Response = processGBUdb(); // respond with the result.
return; // Done.
} else
// Process report requests.
if(isReportJob()) { // If this is a Status report request
J.Response = processStatusReport(); // respond with the desired report.
return; // Done.
} else
// Process server commands.
if(isCommandJob()) { // If this is a server command
J.Response = myHome->processXCIServerCommandRequest(myXCI); // pass it up and return the
return; // result. Done.
} else
// If we get to this point we don't understand the well formed request.
J.Response = XCIErrorResponse; // Don't understand?
myHome->logThisError("XCI",-2,"Unrecognized Request"); // Log the error. Respond with
return; // the standard error response.
}
// ChannelJob encapsulates a Client Job while in the queue and how long it has
// been in the system (since created).
ChannelJob::ChannelJob() : myClient(0) {} // Empty is the null client.
ChannelJob::ChannelJob(TCPClient* C) : // We are created like this.
myClient(C) { // We capture the client and
} // our timer starts automaticially.
msclock ChannelJob::Age() { // How old is this job?
return Lifetime.getElapsedTime(); // Return the elapsed time in ms.
}
TCPClient* ChannelJob::Client() { // What client does it hold?
return myClient; // Return the Client pointer.
}
// snfXCITCPChannel encapsulates the logic to queue and handle TCPClients for
// the XCI interface. The queued TCPClients each represent a single request.
// Each request is handled in turn by reading the request into an snfXCIJob,
// handing that snfXCIJob to an snfXCIJobProcessor, transmitting the result
// back to the TCPClient, closing the connection, and recycling the snfXCIJob
// object for the next round.
// snfXCITCPChannel shuts down when given a NULL TCPClient; This allows any
// jobs in queue to be handled before the thread stops. To shut down a channel
// { C->submit(NULL); C->join(); delete C; C = NULL;}
void snfXCITCPChannel::give(ChannelJob& J) { // Give a job to the queue.
ScopeMutex OneAtATimePlease(QueueMutex); // Protected with a mutex...
JobQueue.push(J); // Push the job in.
LatestSize = JobQueue.size(); // Set the blinking light.
QueueGateway.produce(); // Add the item to our gateway.
}
ChannelJob snfXCITCPChannel::take() { // Take a job from the queue.
QueueGateway.consume(); // Hold on until there is work.
ScopeMutex OneAtATimePlease(QueueMutex); // Queue Data Protected with a mutex.
ChannelJob J = JobQueue.front(); // Grab the next job in the queue.
JobQueue.pop(); // Pop that job out of the queue.
LatestSize = JobQueue.size(); // Set the blinking light.
return J; // Return the Job.
}
const int RWTimeLimit = 30000; // RWTimeLimit in ms. 30 seconds.
const string endSNF = ""; // snf_xci snf element terminator.
const int RWPollMin = 15; // Minimum time between polls.
const int RWPollMax = 75; // Maximum time between polls.
const int MaxQueueLength = 32; // Most waiting in any queue.
const int MaxTCPQueueLength = 4 * MaxQueueLength; // Most connections waiting.
void snfXCITCPChannel::readRequest(TCPClient* Client) { // Read Job.Request from Client.
Timeout ReadTimeLimit(RWTimeLimit); // We have time limits.
PollTimer ReadThrottle(RWPollMin, RWPollMax); // Throttle with a spiral delay.
while(
false == ReadTimeLimit.isExpired() && // Read stuff until we're out of time
string::npos == Job.Request.find(endSNF,0) // or we have a complete request.
) {
memset(LineBuffer, 0, sizeof(LineBuffer)); // Clear the buffer.
int bytes = Client->delimited_receive( // Read up to all but one byte
LineBuffer, sizeof(LineBuffer)-1, '\n'); // of the buffer up to the first \n.
if(0 < bytes) { // If we got some bytes
Job.Request.append(LineBuffer); // Append the data we got and
ReadThrottle.reset(); // reset the throttle.
} else { // If we didn't get any bytes then
ReadThrottle.pause(); // wait a little bit more each round.
}
} // When we're done we will return.
}
void snfXCITCPChannel::writeResponse(TCPClient* Client) { // Write Job.Request from Client.
Timeout WriteTimeLimit(RWTimeLimit); // We have a time limit.
PollTimer WriteThrottle(RWPollMin, RWPollMax); // Throttle with a spiral delay.
for( // For all the bytes in the response:
int Length = Job.Response.length(), BytesThisTime = 0, Bytes = 0; // Bytes to send, this time and sent.
Bytes < Length && // Keep going if we've got more to
false == WriteTimeLimit.isExpired(); // send and we still have time.
) {
BytesThisTime = Client->transmit( // Transmit some bytes.
&Job.Response[Bytes], Job.Response.length()-Bytes); // from where we are, what is left.
if(0 < BytesThisTime) { // If we sent bytes
Bytes += BytesThisTime; // then keep track of how many
WriteThrottle.reset(); // and reset our throttle to min.
} else { // If we didn't then pause a bit
WriteThrottle.pause(); // and let our delay grow.
}
}
}
const int XCI_Reading = 0; // XCI Mode Flags.
const int XCI_Processing = 1;
const int XCI_Writing = 2;
void snfXCITCPChannel::myTask() { // Thread's main loop.
bool WeAreAlive = true; // It's not over 'til it's over.
while(WeAreAlive) { // While we are alive:
CurrentThreadState(XCI_Wait); // Mark our state.
ChannelJob J = take(); // Pull a Client Job from the queue.
if(0 == J.Client()) { // If the job is empty we're done.
CurrentThreadState(XCI_Shutdown); // Mark our state.
WeAreAlive = false; // Turn off the alive flag and
break; // break out of the loop.
} else { // When we have a job to do:
int XCIMode;
try {
CurrentThreadState(XCI_Read);
XCIMode = XCI_Reading; // Now we are reading.
readRequest(J.Client()); // Read the client job.
CurrentThreadState(XCI_Process);
XCIMode = XCI_Processing; // Now we are processing.
Job.SetupTime = J.Age(); // Capture the read and queue time.
Processor.process(Job); // Pass the XCIJob to our processor.
CurrentThreadState(XCI_Write);
XCIMode = XCI_Writing; // Now we are writing.
writeResponse(J.Client()); // Write the response.
}
// Log any exceptions that were thrown.
catch(...) {
switch(XCIMode) {
case XCI_Reading: {
myHome->logThisError("XCI",-5,"SocketReadError");
break;
}
case XCI_Processing: {
myHome->logThisError("XCI",-6,"ProcessError");
break;
}
case XCI_Writing: {
myHome->logThisError("XCI",-7,"SocketWriteError");
break;
}
}
}
}
// At the end of every job we clean up no matter what.
if(0 != J.Client()) { // If we have a client
CurrentThreadState(XCI_Close);
J.Client()->close(); // Close the client.
delete J.Client(); // Delete the client.
}
CurrentThreadState(XCI_Clear);
Job.clear(); // Clear the job buffer.
} // Go again.
}
const ThreadType snfXCITCPChannel::Type("snfXCITCPChannel"); // The thread's type.
//// XCI Thread States
const ThreadState snfXCITCPChannel::XCI_Wait("Waiting For Take()");
const ThreadState snfXCITCPChannel::XCI_Read("Reading Request");
const ThreadState snfXCITCPChannel::XCI_Process("Processing Job");
const ThreadState snfXCITCPChannel::XCI_Write("Writing Results");
const ThreadState snfXCITCPChannel::XCI_Close("Closing Connection");
const ThreadState snfXCITCPChannel::XCI_Clear("Clearing Workspace");
const ThreadState snfXCITCPChannel::XCI_Shutdown("Shutting Down");
snfXCITCPChannel::snfXCITCPChannel(snf_RulebaseHandler* H, string N) : // Create these with a home rulebase.
Thread(snfXCITCPChannel::Type, N), // XCI TCP Channel Type & name.
myHome(H), // We know our home.
Processor(H), // Our processor has a rulebase.
LatestSize(0) { // Our job queue size is zero.
run(); // We start our thread.
}
snfXCITCPChannel::~snfXCITCPChannel() { // Destroy them very carefully.
ChannelJob EndJob; // On the way down feed ourselves
give(EndJob); // an empty job - that will end our
join(); // thread once other jobs are done.
myHome = 0; // Once joined our home is gone.
} // We're done.
int snfXCITCPChannel::Size() { // Keep track of how full they are.
return LatestSize; // Flash the blinking light.
}
void snfXCITCPChannel::submit(TCPClient* C) { // This is how we submit jobs.
ChannelJob J(C); // Create a Job for this client.
give(J); // Give it (copy) to the queue.
}
// snfXCImgr encapsulates a service engine that takes XCI requests via TCP,
// performs the required actions, and returns an XCI response. It also checks
// to see if the configuration for the XCI interface has changed.
void snfXCImgr::checkCFG() { // Checks the configuration.
CurrentThreadState(XCI_CheckConfig); // Update our status.
int NEW_XCI_Port; // Prepare for a change in port.
// Quickly as we can, grab a config packet, capture the XCI parts, and
// then let it go.
if(myHome->isReady()) { // If we know our home then
snfCFGPacket MyCFGPacket(myHome); // Grab a configuration packet.
if(MyCFGPacket.bad()) { // If it's not valid then
return; // wait (skip this) till next time.
} else { // If we've got a good config then
CFG_XCI_ON = MyCFGPacket.Config()->XCI_OnOff; // Is XCI turned on?
NEW_XCI_Port = MyCFGPacket.Config()->XCI_Port; // What port we listen to?
} // If our rulebase manager was
} else return; // not ready (skip this) for now.
if(CFG_XCI_ON) { // If the XCI is configured up:
if(NEW_XCI_Port != CFG_XCI_PORT) { // Check for a port change. If the
CFG_XCI_PORT = NEW_XCI_Port; // port changed then check for a live
if(Listener) { // listener. For a live port change
shutdown_Listener(); // shut down the current listener and
myHome->logThisInfo("XCI", 0, "ListenerDown:PortChanged"); // log the activity.
startup_Listener(); // Restart the listener with the new
myHome->logThisInfo("XCI", 0, "ListenerUp:PortChanged"); // port and log the event.
}
}
startup_XCI(); // Make sure the XCI is up.
} else { // If the XCI is configured down
shutdown_XCI(); // then make sure it is down.
}
}
snfXCITCPChannel* LowestQueue(snfXCITCPChannel* A, snfXCITCPChannel* B) { // Pick the lowest queue of two.
return ((A->Size() < B->Size()) ? A : B); // Pick one and return it.
}
snfXCITCPChannel* snfXCImgr::BestAvailableChannel() { // Selects XCI channel w/ lowest queue.
return LowestQueue( // Pick the lowest of the lowest.
LowestQueue(C0, C1),
LowestQueue(C2, C3)
);
}
void snfXCImgr::startup_Listener() { // Listener startup function.
if(0 == Listener) { // If we need a new listener:
Listener = new TCPListener(CFG_XCI_PORT); // Create a new listener.
Listener->MaxPending = MaxTCPQueueLength; // We may get a lot of hits ;-)
Listener->open(); // Open it for business.
Listener->makeNonBlocking(); // Make it non-blocking.
}
}
void snfXCImgr::shutdown_Listener() { // Listener shutdown function.
if(Listener) { // Only act if there is a listener:
Listener->close(); // The listener gets closed,
delete Listener; // then deleted, then the
Listener = 0; // Listener pointer is zeroed.
}
}
void snfXCImgr::startup_XCI() { // XCI startup function.
if(true == XCI_UP) return; // If we're already up we're done.
ScopeMutex IGotIt(ChannelMutex); // Serialize state control for safety.
if(myHome) { // We need to know our home.
if(CFG_XCI_ON) { // If XCI is configured on, startup!
C0 = new snfXCITCPChannel(myHome, "C0"); // Launch our 4 processing channels.
C1 = new snfXCITCPChannel(myHome, "C1");
C2 = new snfXCITCPChannel(myHome, "C2");
C3 = new snfXCITCPChannel(myHome, "C3");
startup_Listener(); // Start up our listener.
myHome->logThisInfo("XCI", 0, "Startup"); // Log the startup.
XCI_UP = true; // Set the flag. We're up!
}
}
}
void snfXCImgr::shutdown_XCI() { // XCI shutdown function.
if(false == XCI_UP) return; // If we're already down we're done.
ScopeMutex IGotIt(ChannelMutex); // Serialize state control for safety.
shutdown_Listener(); // If up, take down & 0 the Listener.
if(C0) { delete C0; C0 = 0; } // If up, take C0 down and NULL it.
if(C1) { delete C1; C1 = 0; } // If up, take C1 down and NULL it.
if(C2) { delete C2; C2 = 0; } // If up, take C2 down and NULL it.
if(C3) { delete C3; C3 = 0; } // If up, take C3 down and NULL it.
myHome->logThisInfo("XCI", 0, "Shutdown"); // Log the shutdown.
XCI_UP = false; // Set the flag. We're down!
}
int snfXCImgr::pollLoopCount() { // Retrieve & reset Loop Count.
int x = diagLoopCount;
diagLoopCount = 0;
return x;
}
int snfXCImgr::pollClientCount() { // Retrieve & reset Client Count.
int x = diagClientCount;
diagClientCount = 0;
return x;
}
const ThreadState snfXCImgr::XCI_InitialConfig("Initial Config"); // Getting initial configuration.
const ThreadState snfXCImgr::XCI_InitialStartup("Initial Startup"); // Performing first startup.
const ThreadState snfXCImgr::XCI_CheckConfig("Checking Config"); // Checking configuration.
const ThreadState snfXCImgr::XCI_PollingListener("Polling Listener"); // Polling Listener for jobs.
const ThreadState snfXCImgr::XCI_SubmittingJob("Submitting Job"); // Submitting a new job.
const ThreadState snfXCImgr::XCI_ListenerDown("Listener Down!"); // Listener is down.
const ThreadState snfXCImgr::XCI_Stopping("Exited Polling Loop"); // XCImgr Exiting Big Loop
void snfXCImgr::myTask() { // Main thread task.
PollTimer PollingThrottle(RWPollMin, RWPollMax); // Set up a dynamic delay.
Timeout WaitForCFG(1000); // CFG Check every second or so.
// Wait for our initial configuration.
CurrentThreadState(XCI_InitialConfig); // Update our status.
Sleeper WaitATic(1000); // One second sleeper.
while(false == CFG_XCI_ON) { // Before we've been turned on
if(TimeToStop) return; // loop unless it's time to stop.
checkCFG(); WaitForCFG.restart(); // Check our configuration
WaitATic(); // every second or so.
}
// Once our configuration is good and we are turned on we get here.
try { // Safely accept/process requests.
CurrentThreadState(XCI_InitialStartup); // Update our status.
startup_XCI(); // We're on, so turn on!
while(false == TimeToStop) { // While it is not time to stop:
// Occasionally we check to see what our configuration says. If
// the XCI is configured up, or down, or if the port changes then
// the checkCFG() function handles the changes. After that all we
// need to do here is check for a listener -- if we're up we will
// have one and if not then we won't. Without a listener we will
// slow down and keep checking for a configuration change.
if(WaitForCFG.isExpired()) { checkCFG(); WaitForCFG.restart(); } // Check the CFG periodically.
// Get a new client if we have room in the queue
// and the listener is live.
int JobsThisRound = 0; // Keep track of each batch.
if(Listener) { // Check for a good listener.
CurrentThreadState(XCI_PollingListener); // Update our status.
TCPClient* NewClient; // This will be our client.
do { // Fast as we can - grab the work:
++diagLoopCount; // Count Polling Loops.
NewClient = 0; // Clear our client pointer.
snfXCITCPChannel* Channel = BestAvailableChannel(); // Pick a channel to use then
if(MaxQueueLength > Channel->Size()) { // If we have room in the queue
NewClient = Listener->acceptClient(); // get a new client.
if(NewClient) { // If we got one:
CurrentThreadState(XCI_SubmittingJob); // Update our status.
++diagClientCount; // Count Clients.
NewClient->makeNonBlocking(); // Make the client non-blocking.
Channel->submit(NewClient); // Submit the new client.
}
}
} while( // Keep getting work in this tight
(0 != NewClient)&& // loop until we run out of work
(MaxTCPQueueLength > diagClientCount) // or we've pulled a full queue.
);
} else {
CurrentThreadState(XCI_ListenerDown); // Update our status.
} // Throttle our loop to keep it real:
if(0 == JobsThisRound) PollingThrottle.pause(); // If we got nothing then slow down.
else PollingThrottle.reset(); // If we got some, keep getting it!
} // When we're done with the big loop:
CurrentThreadState(XCI_Stopping); // Update our status.
shutdown_XCI(); // Shutdown if we're not already.
} // End of the active section.
catch(exception& e) { // If we get a knowable exception
myHome->logThisError("XCI", -9, e.what()); // then we report it in detail,
try { shutdown_XCI(); } catch(...) {} // shutdown if we're not already,
WaitATic(); // wait a tic and try again.
}
catch(...) { // If we have an unhandled exception
myHome->logThisError("XCI", -10, "Panic!"); // Panic and reset. Notify the log.
try { shutdown_XCI(); } catch(...) {} // Shutdown if we're not already.
WaitATic(); // Pause to let things settle.
} // Let's try this again.
}
const ThreadType snfXCImgr::Type("snfXCIManager"); // The thread's type.
const int XCI_Default_Port = 9001; // Listener Default port = 9001.
snfXCImgr::snfXCImgr() : // Construct with no home.
Thread(snfXCImgr::Type, "XCI Manager"), // XCI Manager type and Name.
CFG_XCI_ON(false), // Everything starts off,
CFG_XCI_PORT(XCI_Default_Port), // default, and
myHome(0), // nulled.
XCI_UP(false),
C0(0), C1(0), C2(0), C3(0),
Listener(0), diagLoopCount(0), diagClientCount(0),
TimeToStop(true) {
}
snfXCImgr::~snfXCImgr() { // Stop when we are destroyed.
stop(); // Like I said, stop().
}
void snfXCImgr::linkHome(snf_RulebaseHandler* Home) { // Link to Home and set up shop.
if(0 != Home && 0 == myHome) { // If we are getting our home
myHome = Home; // then capture it,
myHome->use(); // Update it's use count.
TimeToStop = false; // clear the time to stop bit,
run(); // run our thread.
}
}
int snfXCImgr::TotalQueue() { // Return the total work queue size.
ScopeMutex IGotIt(ChannelMutex); // Serialize state control for safety.
return (
((0 == C0) ? 0 : C0->Size()) +
((0 == C1) ? 0 : C1->Size()) +
((0 == C2) ? 0 : C2->Size()) +
((0 == C3) ? 0 : C3->Size())
);
}
void snfXCImgr::stop() { // Called to shut down.
if(false == TimeToStop) { // If we are not stopped then
TimeToStop = true; // it is time to stop.
join(); // Wait for our main thread first,
shutdown_XCI(); // then shut down the XCI.
myHome->unuse(); // Let go of the rulebase manager.
myHome = 0; // Null it out for safety.
}
}