// snfXCImgr.cpp // Copyright (C) 2007 - 2009 ARM Research Labs, LLC // See www.armresearch.com for the copyright terms. // // See snfXCImgr.hpp for details. #include "SNFMulti/SNFMulti.hpp" #include "SNFMulti/snfXCImgr.hpp" using namespace std; using namespace CodeDweller; namespace SNFMulti { // snfXCIServerCommandHandler Virtual Base Class Default Processor. const string XCIServerCommandDefaultResponse = "\n"; string snfXCIServerCommandHandler::processXCIRequest(snf_xci& X) { // A Server using SNFMulti return XCIServerCommandDefaultResponse; // can provide a useful processor. } // snfXCIJob encapsulates a single XCI transaction. void snfXCIJob::clear() { // Clear the buffers. Request.clear(); // Clear the request and Response.clear(); // response buffers. SetupTime = 0; // No setup time yet. } // snfXCIJobProcessor encapsulates the logic to respond to an XCI request. snfXCIJobProcessor::snfXCIJobProcessor(snf_RulebaseHandler* H) : // Setup scanner. myHome(H) { // Establish myHome from H. myEngine = new snf_EngineHandler(); // Create an engine handler and myEngine->open(H); // tie it in to our home rulebase. } snfXCIJobProcessor::~snfXCIJobProcessor() { // Tear down scanner. if(myEngine) { // Checking first that we have one, myEngine->close(); // close the engine and then delete myEngine; // delete it. Set the pointer to myEngine = 0; // NULL to enforce the point. } myHome = 0; // NULL out our home too. } //// This collection of functions handle the processing of all XCI requests. bool snfXCIJobProcessor::isScanJob() { // True if myXCI is a scan job. if(0 < myXCI.scanner_scan_file.length()) return true; // If we have a scan file: true! return false; // otherwise false. } bool snfXCIJobProcessor::isGBUdbJob() { // True if myXCI is a GBUdb job. if( // GBUdb jobs have either 0 < myXCI.gbudb_test_ip.length() || // an IP to test or 0 < myXCI.gbudb_set_ip.length() || // an IP to setup or 0 < myXCI.gbudb_bad_ip.length() || // a bad IP to flag or 0 < myXCI.gbudb_good_ip.length() || // a good IP to flag or 0 < myXCI.gbudb_drop_ip.length() ) return true; // If we have one of these: true! return false; // otherwise false. } bool snfXCIJobProcessor::isReportJob() { // True if myXCI is a Report job. if(0 < myXCI.report_request_status_class.length()) return true; // If we have a report status class return false; // it's a report otherwise it's not. } bool snfXCIJobProcessor::isCommandJob() { // True if myXCI is a Command job. if(0 < myXCI.xci_server_command.length()) return true; // If we have a command string: true! return false; // otherwise false. } void snfXCIJobProcessor::processScan(snfXCIJob& J) { // Process a scan request. try { // Safely perform our scan. // Check for forced IP. IP4Address ForcedIP = 0UL; // Don't expect a forced IP. if(0 < myXCI.scanner_scan_ip.length()) { // If we have one then ForcedIP = myXCI.scanner_scan_ip; // convert it from the string. } // Scan the message file. int ScanResult = // Scan the file using our myEngine->scanMessageFile( // engine. Use the file myXCI.scanner_scan_file.c_str(), // path in the XCI request, and J.SetupTime, // the recorded setup time. Use the ForcedIP // forced IP if provided. ); // Create a proper xci resposne. ostringstream ResultString; // Use a stringstream to make it easier. ResultString << " and << "/>\n" // emit the closing elements. << endl; // End of the line. } else { // If optional data is requested: ResultString << ">" << endl; // Complete the open tag. if(true == myXCI.scanner_scan_xhdr) { // Optionally include XHDR data. ResultString // If xheaders are requested... << "" << myEngine->getXHDRs() // Emit the xhdr element & contents. << "" << endl; // End the xhdr and end of line. } if(true == myXCI.scanner_scan_log) { // Optionally include XMLLog data. ResultString // If the log data is requested... << "" << myEngine->getXMLLog() // Emit the log element & data. << "" << endl; // End the log data and end of line. } ResultString << "\n"; // Emit the closing elements. } J.Response = ResultString.str(); // Capture the formatted response. } // Decode the known exceptions catch(snf_EngineHandler::AllocationError& e) { J.Response = "\n"); } catch(snf_EngineHandler::BadMatrix& e) { J.Response = "\n"); } catch(snf_EngineHandler::Busy& e) { J.Response = "\n"); } catch(snf_EngineHandler::FileError& e) { J.Response = "\n"); } catch(snf_EngineHandler::MaxEvals& e) { J.Response = "\n"); } catch(snf_EngineHandler::Panic& e) { J.Response = "\n"); } catch(snf_EngineHandler::XHDRError& e) { J.Response = "\n"); } // Decode the unknown exceptions catch(exception& e) { J.Response = "\n"); } catch(...) { J.Response = "\n"; } } string snfXCIJobProcessor::processGBUdb() { // Process a GBUdb request. GBUdb& myGBUdb = myHome->MyGBUdb; // Make a convenient GBUdb handle. IP4Address IP; // We will work with an IP. GBUdbRecord R; // We will get a record to return. // Test an IP - return it's current data. if(0 < myXCI.gbudb_test_ip.length()) { // IF: Test an IP IP = myXCI.gbudb_test_ip; // Convert the IP. } else // Set or update an IP's data. if(0 < myXCI.gbudb_set_ip.length()) { // IF: Set an IP's data. IP = myXCI.gbudb_set_ip; // Convert the IP. if( // Check for a compound update: 0 <= myXCI.gbudb_set_bad_count || // If we are changing the bad 0 <= myXCI.gbudb_set_good_count // or good count then this is ) { // a compound update (read then write). R = myGBUdb.getRecord(IP); // Get the record (or a safe blank). if(0 <= myXCI.gbudb_set_bad_count) // If we have a bad count to set R.Bad(myXCI.gbudb_set_bad_count); // then set the bad count. if(0 <= myXCI.gbudb_set_good_count) // If we have a good count to set R.Good(myXCI.gbudb_set_good_count); // then set the good count. if(0 < myXCI.gbudb_set_type.length()) { // If type, set type... switch(myXCI.gbudb_set_type.at(0)) { // Determine the type based on the case 'g': case 'G': { R.Flag(Good); break; } // first character of the name and case 'b': case 'B': { R.Flag(Bad); break; } // set the appropriate flag. case 'u': case 'U': { R.Flag(Ugly); break; } case 'i': case 'I': { R.Flag(Ignore); break; } } } myGBUdb.setRecord(IP, R); // Save the data. } else // This might be a simple flag change. if(0 < myXCI.gbudb_set_type.length()) { // If type, set type... switch(myXCI.gbudb_set_type.at(0)) { // Determine the type based on the case 'g': case 'G': { R = myGBUdb.setGood(IP); break; } // first character of the name and case 'b': case 'B': { R = myGBUdb.setBad(IP); break; } // set the appropriate flag. Simple case 'u': case 'U': { R = myGBUdb.setUgly(IP); break; } // flag changes are atomic so there is case 'i': case 'I': { R = myGBUdb.setIgnore(IP); break; } // no need to "save" later. } } else { // Empty set command? return XCIBadSetResponse; // That's bad. Use test! } } else // Add a bad event to an IPs data. if(0 < myXCI.gbudb_bad_ip.length()) { // IF: Add a bad mark for this IP IP = myXCI.gbudb_bad_ip; // Convert the IP. R = myGBUdb.addBad(IP); // Add a bad mark. } else // Add a good event to an IPs data. if(0 < myXCI.gbudb_good_ip.length()) { // IF: Add a good mark for this IP IP = myXCI.gbudb_good_ip; // Convert the IP. R = myGBUdb.addGood(IP); // Add a bad mark. } else // Drop an IP from the database. if(0 < myXCI.gbudb_drop_ip.length()) { // IF: Drop an IP's data. IP = myXCI.gbudb_drop_ip; // Convert the IP. myGBUdb.dropRecord(IP); // Forget about it. } // Return the final state of the IP's data. IPTestRecord IPState(IP); myHome->performIPTest(IPState); ostringstream Response; // Use a stringstream for our output. Response << "" // Finish it up. << endl; return Response.str(); // Return the formatted response. } string snfXCIJobProcessor::processStatusReport() { // Process a report request. string ReportToSend; // Keep this in scope. if(0 == myXCI.report_request_status_class.find("hour")) { // Please send the hour report. ReportToSend = myHome->MyLOGmgr.getStatusHourReport(); } else if(0 == myXCI.report_request_status_class.find("minute")) { // Please send the minute report. ReportToSend = myHome->MyLOGmgr.getStatusMinuteReport(); } else { // Please send the second report. ReportToSend = myHome->MyLOGmgr.getStatusSecondReport(); } string Response = ""; // Construct the response using the Response.append(ReportToSend); // snf/xci template and the selected Response.append(""); // status report text. return Response; // Return the response. } void snfXCIJobProcessor::process(snfXCIJob& J) { // Process a Job. // Parse the XCI request and check for an error. myXCI.read(J.Request); // Parse the request. if(myXCI.bad()) { // If it's bad then J.Response = XCIErrorResponse; // respond with an error. myHome->logThisError("XCI",-1,"Bad Request"); // Log the error. return; // Done. } else // Process scan requests. if(isScanJob()) { // If this is a Scan request processScan(J); // respond with the result. return; // Done. } else // Process gbudb requests. if(isGBUdbJob()) { // If this is a GBUdb request J.Response = processGBUdb(); // respond with the result. return; // Done. } else // Process report requests. if(isReportJob()) { // If this is a Status report request J.Response = processStatusReport(); // respond with the desired report. return; // Done. } else // Process server commands. if(isCommandJob()) { // If this is a server command J.Response = myHome->processXCIServerCommandRequest(myXCI); // pass it up and return the return; // result. Done. } else // If we get to this point we don't understand the well formed request. J.Response = XCIErrorResponse; // Don't understand? myHome->logThisError("XCI",-2,"Unrecognized Request"); // Log the error. Respond with return; // the standard error response. } // ChannelJob encapsulates a Client Job while in the queue and how long it has // been in the system (since created). ChannelJob::ChannelJob() : myClient(0) {} // Empty is the null client. ChannelJob::ChannelJob(TCPClient* C) : // We are created like this. myClient(C) { // We capture the client and } // our timer starts automaticially. msclock ChannelJob::Age() { // How old is this job? return Lifetime.getElapsedTime(); // Return the elapsed time in ms. } TCPClient* ChannelJob::Client() { // What client does it hold? return myClient; // Return the Client pointer. } // snfXCITCPChannel encapsulates the logic to queue and handle TCPClients for // the XCI interface. The queued TCPClients each represent a single request. // Each request is handled in turn by reading the request into an snfXCIJob, // handing that snfXCIJob to an snfXCIJobProcessor, transmitting the result // back to the TCPClient, closing the connection, and recycling the snfXCIJob // object for the next round. // snfXCITCPChannel shuts down when given a NULL TCPClient; This allows any // jobs in queue to be handled before the thread stops. To shut down a channel // { C->submit(NULL); C->join(); delete C; C = NULL;} void snfXCITCPChannel::give(ChannelJob& J) { // Give a job to the queue. ScopeMutex OneAtATimePlease(QueueMutex); // Protected with a mutex... JobQueue.push(J); // Push the job in. LatestSize = JobQueue.size(); // Set the blinking light. QueueGateway.produce(); // Add the item to our gateway. } ChannelJob snfXCITCPChannel::take() { // Take a job from the queue. QueueGateway.consume(); // Hold on until there is work. ScopeMutex OneAtATimePlease(QueueMutex); // Queue Data Protected with a mutex. ChannelJob J = JobQueue.front(); // Grab the next job in the queue. JobQueue.pop(); // Pop that job out of the queue. LatestSize = JobQueue.size(); // Set the blinking light. return J; // Return the Job. } const int RWTimeLimit = 30000; // RWTimeLimit in ms. 30 seconds. const string endSNF = ""; // snf_xci snf element terminator. const int RWPollMin = 15; // Minimum time between polls. const int RWPollMax = 75; // Maximum time between polls. const int MaxQueueLength = 32; // Most waiting in any queue. const int MaxTCPQueueLength = 4 * MaxQueueLength; // Most connections waiting. void snfXCITCPChannel::readRequest(TCPClient* Client) { // Read Job.Request from Client. Timeout ReadTimeLimit(RWTimeLimit); // We have time limits. PollTimer ReadThrottle(RWPollMin, RWPollMax); // Throttle with a spiral delay. while( false == ReadTimeLimit.isExpired() && // Read stuff until we're out of time string::npos == Job.Request.find(endSNF,0) // or we have a complete request. ) { memset(LineBuffer, 0, sizeof(LineBuffer)); // Clear the buffer. int bytes = Client->delimited_receive( // Read up to all but one byte LineBuffer, sizeof(LineBuffer)-1, '\n'); // of the buffer up to the first \n. if(0 < bytes) { // If we got some bytes Job.Request.append(LineBuffer); // Append the data we got and ReadThrottle.reset(); // reset the throttle. } else { // If we didn't get any bytes then ReadThrottle.pause(); // wait a little bit more each round. } } // When we're done we will return. } void snfXCITCPChannel::writeResponse(TCPClient* Client) { // Write Job.Request from Client. Timeout WriteTimeLimit(RWTimeLimit); // We have a time limit. PollTimer WriteThrottle(RWPollMin, RWPollMax); // Throttle with a spiral delay. for( // For all the bytes in the response: int Length = Job.Response.length(), BytesThisTime = 0, Bytes = 0; // Bytes to send, this time and sent. Bytes < Length && // Keep going if we've got more to false == WriteTimeLimit.isExpired(); // send and we still have time. ) { BytesThisTime = Client->transmit( // Transmit some bytes. &Job.Response[Bytes], Job.Response.length()-Bytes); // from where we are, what is left. if(0 < BytesThisTime) { // If we sent bytes Bytes += BytesThisTime; // then keep track of how many WriteThrottle.reset(); // and reset our throttle to min. } else { // If we didn't then pause a bit WriteThrottle.pause(); // and let our delay grow. } } } const int XCI_Reading = 0; // XCI Mode Flags. const int XCI_Processing = 1; const int XCI_Writing = 2; void snfXCITCPChannel::myTask() { // Thread's main loop. bool WeAreAlive = true; // It's not over 'til it's over. while(WeAreAlive) { // While we are alive: CurrentThreadState(XCI_Wait); // Mark our state. ChannelJob J = take(); // Pull a Client Job from the queue. if(0 == J.Client()) { // If the job is empty we're done. CurrentThreadState(XCI_Shutdown); // Mark our state. WeAreAlive = false; // Turn off the alive flag and break; // break out of the loop. } else { // When we have a job to do: int XCIMode = XCI_Reading; try { CurrentThreadState(XCI_Read); XCIMode = XCI_Reading; // Now we are reading. readRequest(J.Client()); // Read the client job. CurrentThreadState(XCI_Process); XCIMode = XCI_Processing; // Now we are processing. Job.SetupTime = J.Age(); // Capture the read and queue time. Processor.process(Job); // Pass the XCIJob to our processor. CurrentThreadState(XCI_Write); XCIMode = XCI_Writing; // Now we are writing. writeResponse(J.Client()); // Write the response. } // Log any exceptions that were thrown. catch(...) { switch(XCIMode) { case XCI_Reading: { myHome->logThisError("XCI",-5,"SocketReadError"); break; } case XCI_Processing: { myHome->logThisError("XCI",-6,"ProcessError"); break; } case XCI_Writing: { myHome->logThisError("XCI",-7,"SocketWriteError"); break; } } } } // At the end of every job we clean up no matter what. if(0 != J.Client()) { // If we have a client CurrentThreadState(XCI_Close); J.Client()->close(); // Close the client. delete J.Client(); // Delete the client. } CurrentThreadState(XCI_Clear); Job.clear(); // Clear the job buffer. } // Go again. } const ThreadType snfXCITCPChannel::Type("snfXCITCPChannel"); // The thread's type. //// XCI Thread States const ThreadState snfXCITCPChannel::XCI_Wait("Waiting For Take()"); const ThreadState snfXCITCPChannel::XCI_Read("Reading Request"); const ThreadState snfXCITCPChannel::XCI_Process("Processing Job"); const ThreadState snfXCITCPChannel::XCI_Write("Writing Results"); const ThreadState snfXCITCPChannel::XCI_Close("Closing Connection"); const ThreadState snfXCITCPChannel::XCI_Clear("Clearing Workspace"); const ThreadState snfXCITCPChannel::XCI_Shutdown("Shutting Down"); snfXCITCPChannel::snfXCITCPChannel(snf_RulebaseHandler* H, string N) : // Create these with a home rulebase. Thread(snfXCITCPChannel::Type, N), // XCI TCP Channel Type & name. myHome(H), // We know our home. Processor(H), // Our processor has a rulebase. LatestSize(0) { // Our job queue size is zero. run(); // We start our thread. } snfXCITCPChannel::~snfXCITCPChannel() { // Destroy them very carefully. ChannelJob EndJob; // On the way down feed ourselves give(EndJob); // an empty job - that will end our join(); // thread once other jobs are done. myHome = 0; // Once joined our home is gone. } // We're done. int snfXCITCPChannel::Size() { // Keep track of how full they are. return LatestSize; // Flash the blinking light. } void snfXCITCPChannel::submit(TCPClient* C) { // This is how we submit jobs. ChannelJob J(C); // Create a Job for this client. give(J); // Give it (copy) to the queue. } // snfXCImgr encapsulates a service engine that takes XCI requests via TCP, // performs the required actions, and returns an XCI response. It also checks // to see if the configuration for the XCI interface has changed. void snfXCImgr::checkCFG() { // Checks the configuration. CurrentThreadState(XCI_CheckConfig); // Update our status. int NEW_XCI_Port; // Prepare for a change in port. // Quickly as we can, grab a config packet, capture the XCI parts, and // then let it go. if(myHome->isReady()) { // If we know our home then snfCFGPacket MyCFGPacket(myHome); // Grab a configuration packet. if(MyCFGPacket.bad()) { // If it's not valid then return; // wait (skip this) till next time. } else { // If we've got a good config then CFG_XCI_ON = MyCFGPacket.Config()->XCI_OnOff; // Is XCI turned on? NEW_XCI_Port = MyCFGPacket.Config()->XCI_Port; // What port we listen to? } // If our rulebase manager was } else return; // not ready (skip this) for now. if(CFG_XCI_ON) { // If the XCI is configured up: if(NEW_XCI_Port != CFG_XCI_PORT) { // Check for a port change. If the CFG_XCI_PORT = NEW_XCI_Port; // port changed then check for a live if(Listener) { // listener. For a live port change shutdown_Listener(); // shut down the current listener and myHome->logThisInfo("XCI", 0, "ListenerDown:PortChanged"); // log the activity. startup_Listener(); // Restart the listener with the new myHome->logThisInfo("XCI", 0, "ListenerUp:PortChanged"); // port and log the event. } } startup_XCI(); // Make sure the XCI is up. } else { // If the XCI is configured down shutdown_XCI(); // then make sure it is down. } } snfXCITCPChannel* LowestQueue(snfXCITCPChannel* A, snfXCITCPChannel* B) { // Pick the lowest queue of two. return ((A->Size() < B->Size()) ? A : B); // Pick one and return it. } snfXCITCPChannel* snfXCImgr::BestAvailableChannel() { // Selects XCI channel w/ lowest queue. return LowestQueue( // Pick the lowest of the lowest. LowestQueue( LowestQueue(C0, C1), LowestQueue(C2, C3) ), LowestQueue( LowestQueue(C4, C5), LowestQueue(C6, C7) ) ); } void snfXCImgr::startup_Listener() { // Listener startup function. if(0 == Listener) { // If we need a new listener: Listener = new TCPListener(CFG_XCI_PORT); // Create a new listener. Listener->MaxPending = MaxTCPQueueLength; // We may get a lot of hits ;-) Listener->open(); // Open it for business. Listener->makeNonBlocking(); // Make it non-blocking. } } void snfXCImgr::shutdown_Listener() { // Listener shutdown function. if(Listener) { // Only act if there is a listener: Listener->close(); // The listener gets closed, delete Listener; // then deleted, then the Listener = 0; // Listener pointer is zeroed. } } void snfXCImgr::startup_XCI() { // XCI startup function. if(true == XCI_UP) return; // If we're already up we're done. ScopeMutex IGotIt(ChannelMutex); // Serialize state control for safety. if(myHome) { // We need to know our home. if(CFG_XCI_ON) { // If XCI is configured on, startup! C0 = new snfXCITCPChannel(myHome, "C0"); // Launch our 8 processing channels. C1 = new snfXCITCPChannel(myHome, "C1"); C2 = new snfXCITCPChannel(myHome, "C2"); C3 = new snfXCITCPChannel(myHome, "C3"); C4 = new snfXCITCPChannel(myHome, "C4"); C5 = new snfXCITCPChannel(myHome, "C5"); C6 = new snfXCITCPChannel(myHome, "C6"); C7 = new snfXCITCPChannel(myHome, "C7"); startup_Listener(); // Start up our listener. myHome->logThisInfo("XCI", 0, "Startup"); // Log the startup. XCI_UP = true; // Set the flag. We're up! } } } void snfXCImgr::shutdown_XCI() { // XCI shutdown function. if(false == XCI_UP) return; // If we're already down we're done. ScopeMutex IGotIt(ChannelMutex); // Serialize state control for safety. shutdown_Listener(); // If up, take down & 0 the Listener. if(C0) { delete C0; C0 = 0; } // If up, take C0 down and NULL it. if(C1) { delete C1; C1 = 0; } // If up, take C1 down and NULL it. if(C2) { delete C2; C2 = 0; } // If up, take C2 down and NULL it. if(C3) { delete C3; C3 = 0; } // If up, take C3 down and NULL it. if(C4) { delete C4; C4 = 0; } // If up, take C4 down and NULL it. if(C5) { delete C5; C5 = 0; } // If up, take C5 down and NULL it. if(C6) { delete C6; C6 = 0; } // If up, take C6 down and NULL it. if(C7) { delete C7; C7 = 0; } // If up, take C7 down and NULL it. myHome->logThisInfo("XCI", 0, "Shutdown"); // Log the shutdown. XCI_UP = false; // Set the flag. We're down! } int snfXCImgr::pollLoopCount() { // Retrieve & reset Loop Count. int x = diagLoopCount; diagLoopCount = 0; return x; } int snfXCImgr::pollClientCount() { // Retrieve & reset Client Count. int x = diagClientCount; diagClientCount = 0; return x; } const ThreadState snfXCImgr::XCI_InitialConfig("Initial Config"); // Getting initial configuration. const ThreadState snfXCImgr::XCI_InitialStartup("Initial Startup"); // Performing first startup. const ThreadState snfXCImgr::XCI_CheckConfig("Checking Config"); // Checking configuration. const ThreadState snfXCImgr::XCI_PollingListener("Polling Listener"); // Polling Listener for jobs. const ThreadState snfXCImgr::XCI_SubmittingJob("Submitting Job"); // Submitting a new job. const ThreadState snfXCImgr::XCI_ListenerDown("Listener Down!"); // Listener is down. const ThreadState snfXCImgr::XCI_Stopping("Exited Polling Loop"); // XCImgr Exiting Big Loop void snfXCImgr::myTask() { // Main thread task. PollTimer PollingThrottle(RWPollMin, RWPollMax); // Set up a dynamic delay. Timeout WaitForCFG(1000); // CFG Check every second or so. // Wait for our initial configuration. CurrentThreadState(XCI_InitialConfig); // Update our status. Sleeper WaitATic(1000); // One second sleeper. while(false == CFG_XCI_ON) { // Before we've been turned on if(TimeToStop) return; // loop unless it's time to stop. checkCFG(); WaitForCFG.restart(); // Check our configuration WaitATic(); // every second or so. } // Once our configuration is good and we are turned on we get here. try { // Safely accept/process requests. CurrentThreadState(XCI_InitialStartup); // Update our status. startup_XCI(); // We're on, so turn on! while(false == TimeToStop) { // While it is not time to stop: // Occasionally we check to see what our configuration says. If // the XCI is configured up, or down, or if the port changes then // the checkCFG() function handles the changes. After that all we // need to do here is check for a listener -- if we're up we will // have one and if not then we won't. Without a listener we will // slow down and keep checking for a configuration change. if(WaitForCFG.isExpired()) { checkCFG(); WaitForCFG.restart(); } // Check the CFG periodically. // Get a new client if we have room in the queue // and the listener is live. int JobsThisRound = 0; // Keep track of each batch. if(Listener) { // Check for a good listener. CurrentThreadState(XCI_PollingListener); // Update our status. TCPClient* NewClient; // This will be our client. do { // Fast as we can - grab the work: ++diagLoopCount; // Count Polling Loops. NewClient = 0; // Clear our client pointer. snfXCITCPChannel* Channel = BestAvailableChannel(); // Pick a channel to use then if(MaxQueueLength > Channel->Size()) { // If we have room in the queue NewClient = Listener->acceptClient(); // get a new client. if(NewClient) { // If we got one: CurrentThreadState(XCI_SubmittingJob); // Update our status. ++diagClientCount; // Count Clients. NewClient->makeNonBlocking(); // Make the client non-blocking. Channel->submit(NewClient); // Submit the new client. } } } while( // Keep getting work in this tight (0 != NewClient)&& // loop until we run out of work (MaxTCPQueueLength > diagClientCount) // or we've pulled a full queue. ); } else { CurrentThreadState(XCI_ListenerDown); // Update our status. } // Throttle our loop to keep it real: if(0 == JobsThisRound) PollingThrottle.pause(); // If we got nothing then slow down. else PollingThrottle.reset(); // If we got some, keep getting it! } // When we're done with the big loop: CurrentThreadState(XCI_Stopping); // Update our status. shutdown_XCI(); // Shutdown if we're not already. } // End of the active section. catch(exception& e) { // If we get a knowable exception myHome->logThisError("XCI", -9, e.what()); // then we report it in detail, try { shutdown_XCI(); } catch(...) {} // shutdown if we're not already, WaitATic(); // wait a tic and try again. } catch(...) { // If we have an unhandled exception myHome->logThisError("XCI", -10, "Panic!"); // Panic and reset. Notify the log. try { shutdown_XCI(); } catch(...) {} // Shutdown if we're not already. WaitATic(); // Pause to let things settle. } // Let's try this again. } const ThreadType snfXCImgr::Type("snfXCIManager"); // The thread's type. const int XCI_Default_Port = 9001; // Listener Default port = 9001. snfXCImgr::snfXCImgr() : // Construct with no home. Thread(snfXCImgr::Type, "XCI Manager"), // XCI Manager type and Name. CFG_XCI_ON(false), // Everything starts off, CFG_XCI_PORT(XCI_Default_Port), // default, and myHome(0), // nulled. C0(0), C1(0), C2(0), C3(0), Listener(0), XCI_UP(false), diagLoopCount(0), diagClientCount(0), TimeToStop(true) { // We don't run until linkHome(). } snfXCImgr::~snfXCImgr() { // Stop when we are destroyed. stop(); // Like I said, stop(). } void snfXCImgr::linkHome(snf_RulebaseHandler* Home) { // Link to Home and set up shop. if(0 != Home && 0 == myHome) { // If we are getting our home myHome = Home; // then capture it, myHome->use(); // Update it's use count. TimeToStop = false; // clear the time to stop bit, run(); // run our thread. } } int snfXCImgr::TotalQueue() { // Return the total work queue size. ScopeMutex IGotIt(ChannelMutex); // Serialize state control for safety. return ( ((0 == C0) ? 0 : C0->Size()) + ((0 == C1) ? 0 : C1->Size()) + ((0 == C2) ? 0 : C2->Size()) + ((0 == C3) ? 0 : C3->Size()) ); } void snfXCImgr::stop() { // Called to shut down. if(false == TimeToStop) { // If we are not stopped then TimeToStop = true; // it is time to stop. join(); // Wait for our main thread first, shutdown_XCI(); // then shut down the XCI. myHome->unuse(); // Let go of the rulebase manager. myHome = 0; // Null it out for safety. } } }