|
|
|
|
|
|
|
|
// snfXCImgr.cpp |
|
|
// snfXCImgr.cpp |
|
|
// Copyright (C) 2007 - 2009 ARM Research Labs, LLC |
|
|
|
|
|
|
|
|
// Copyright (C) 2007 - 2020 ARM Research Labs, LLC |
|
|
// See www.armresearch.com for the copyright terms. |
|
|
// See www.armresearch.com for the copyright terms. |
|
|
// |
|
|
// |
|
|
// See snfXCImgr.hpp for details. |
|
|
// See snfXCImgr.hpp for details. |
|
|
|
|
|
|
|
|
#include "SNFMulti.hpp" |
|
|
#include "SNFMulti.hpp" |
|
|
#include "snfXCImgr.hpp" |
|
|
#include "snfXCImgr.hpp" |
|
|
|
|
|
|
|
|
using namespace std; |
|
|
|
|
|
|
|
|
namespace cd = codedweller; |
|
|
|
|
|
|
|
|
// snfXCIServerCommandHandler Virtual Base Class Default Processor. |
|
|
// snfXCIServerCommandHandler Virtual Base Class Default Processor. |
|
|
|
|
|
|
|
|
const string XCIServerCommandDefaultResponse = |
|
|
|
|
|
|
|
|
const std::string XCIServerCommandDefaultResponse = |
|
|
"<snf><xci><server><response message=\'Not Implemented\' code=\'-1\'/></server></xci></snf>\n"; |
|
|
"<snf><xci><server><response message=\'Not Implemented\' code=\'-1\'/></server></xci></snf>\n"; |
|
|
|
|
|
|
|
|
string snfXCIServerCommandHandler::processXCIRequest(snf_xci& X) { // A Server using SNFMulti |
|
|
|
|
|
|
|
|
std::string snfXCIServerCommandHandler::processXCIRequest(snf_xci& X) { // A Server using SNFMulti |
|
|
return XCIServerCommandDefaultResponse; // can provide a useful processor. |
|
|
return XCIServerCommandDefaultResponse; // can provide a useful processor. |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Check for forced IP. |
|
|
// Check for forced IP. |
|
|
|
|
|
|
|
|
IP4Address ForcedIP = 0UL; // Don't expect a forced IP. |
|
|
|
|
|
|
|
|
cd::IP4Address ForcedIP = 0UL; // Don't expect a forced IP. |
|
|
if(0 < myXCI.scanner_scan_ip.length()) { // If we have one then |
|
|
if(0 < myXCI.scanner_scan_ip.length()) { // If we have one then |
|
|
ForcedIP = myXCI.scanner_scan_ip; // convert it from the string. |
|
|
ForcedIP = myXCI.scanner_scan_ip; // convert it from the string. |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Create a proper xci resposne. |
|
|
// Create a proper xci resposne. |
|
|
|
|
|
|
|
|
ostringstream ResultString; // Use a stringstream to make it easier. |
|
|
|
|
|
|
|
|
std::ostringstream ResultString; // Use a stringstream to make it easier. |
|
|
ResultString |
|
|
ResultString |
|
|
<< "<snf><xci><scanner><result code=\'" // Emit the preamble. |
|
|
<< "<snf><xci><scanner><result code=\'" // Emit the preamble. |
|
|
<< ScanResult << "\'"; // Emit the scan result. |
|
|
<< ScanResult << "\'"; // Emit the scan result. |
|
|
|
|
|
|
|
|
) { // If no optional data was requested |
|
|
) { // If no optional data was requested |
|
|
ResultString // then close the <request/> and |
|
|
ResultString // then close the <request/> and |
|
|
<< "/></scanner></xci></snf>\n" // emit the closing elements. |
|
|
<< "/></scanner></xci></snf>\n" // emit the closing elements. |
|
|
<< endl; // End of the line. |
|
|
|
|
|
|
|
|
<< std::endl; // End of the line. |
|
|
|
|
|
|
|
|
} else { // If optional data is requested: |
|
|
} else { // If optional data is requested: |
|
|
ResultString << ">" << endl; // Complete the <result> open tag. |
|
|
|
|
|
|
|
|
ResultString << ">" << std::endl; // Complete the <result> open tag. |
|
|
|
|
|
|
|
|
if(true == myXCI.scanner_scan_xhdr) { // Optionally include XHDR data. |
|
|
if(true == myXCI.scanner_scan_xhdr) { // Optionally include XHDR data. |
|
|
ResultString // If xheaders are requested... |
|
|
ResultString // If xheaders are requested... |
|
|
<< "<xhdr>" << myEngine->getXHDRs() // Emit the xhdr element & contents. |
|
|
<< "<xhdr>" << myEngine->getXHDRs() // Emit the xhdr element & contents. |
|
|
<< "</xhdr>" << endl; // End the xhdr and end of line. |
|
|
|
|
|
|
|
|
<< "</xhdr>" << std::endl; // End the xhdr and end of line. |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
if(true == myXCI.scanner_scan_log) { // Optionally include XMLLog data. |
|
|
if(true == myXCI.scanner_scan_log) { // Optionally include XMLLog data. |
|
|
ResultString // If the log data is requested... |
|
|
ResultString // If the log data is requested... |
|
|
<< "<log>" << myEngine->getXMLLog() // Emit the log element & data. |
|
|
<< "<log>" << myEngine->getXMLLog() // Emit the log element & data. |
|
|
<< "</log>" << endl; // End the log data and end of line. |
|
|
|
|
|
|
|
|
<< "</log>" << std::endl; // End the log data and end of line. |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
ResultString << "</result></scanner></xci></snf>\n"; // Emit the closing elements. |
|
|
ResultString << "</result></scanner></xci></snf>\n"; // Emit the closing elements. |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Decode the known exceptions |
|
|
// Decode the known exceptions |
|
|
|
|
|
|
|
|
catch(snf_EngineHandler::AllocationError& e) { |
|
|
|
|
|
|
|
|
catch(const snf_EngineHandler::AllocationError& e) { |
|
|
J.Response = "<snf><xci><error message=\'AllocationError "; |
|
|
J.Response = "<snf><xci><error message=\'AllocationError "; |
|
|
J.Response.append(e.what()); |
|
|
J.Response.append(e.what()); |
|
|
J.Response.append("\'/></xci></snf>\n"); |
|
|
J.Response.append("\'/></xci></snf>\n"); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Decode the unknown exceptions |
|
|
// Decode the unknown exceptions |
|
|
|
|
|
|
|
|
catch(exception& e) { |
|
|
|
|
|
|
|
|
catch(const std::exception& e) { |
|
|
J.Response = "<snf><xci><error message=\'Exception! "; |
|
|
J.Response = "<snf><xci><error message=\'Exception! "; |
|
|
J.Response.append(e.what()); |
|
|
J.Response.append(e.what()); |
|
|
J.Response.append("\'/></xci></snf>\n"); |
|
|
J.Response.append("\'/></xci></snf>\n"); |
|
|
|
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
string snfXCIJobProcessor::processGBUdb() { // Process a GBUdb request. |
|
|
|
|
|
|
|
|
std::string snfXCIJobProcessor::processGBUdb() { // Process a GBUdb request. |
|
|
GBUdb& myGBUdb = myHome->MyGBUdb; // Make a convenient GBUdb handle. |
|
|
GBUdb& myGBUdb = myHome->MyGBUdb; // Make a convenient GBUdb handle. |
|
|
IP4Address IP; // We will work with an IP. |
|
|
|
|
|
|
|
|
cd::IP4Address IP; // We will work with an IP. |
|
|
GBUdbRecord R; // We will get a record to return. |
|
|
GBUdbRecord R; // We will get a record to return. |
|
|
|
|
|
|
|
|
// Test an IP - return it's current data. |
|
|
// Test an IP - return it's current data. |
|
|
|
|
|
|
|
|
IPTestRecord IPState(IP); |
|
|
IPTestRecord IPState(IP); |
|
|
myHome->performIPTest(IPState); |
|
|
myHome->performIPTest(IPState); |
|
|
|
|
|
|
|
|
ostringstream Response; // Use a stringstream for our output. |
|
|
|
|
|
|
|
|
std::ostringstream Response; // Use a stringstream for our output. |
|
|
Response |
|
|
Response |
|
|
<< "<snf><xci><gbudb><result " // Get the response started. |
|
|
<< "<snf><xci><gbudb><result " // Get the response started. |
|
|
<< "ip=\'" << (string) IP // Emit the ip. |
|
|
|
|
|
|
|
|
<< "ip=\'" << (std::string) IP // Emit the ip. |
|
|
<< "\' type=\'" // Emit the type. |
|
|
<< "\' type=\'" // Emit the type. |
|
|
<< ((Good == IPState.G.Flag()) ? "good" : |
|
|
<< ((Good == IPState.G.Flag()) ? "good" : |
|
|
((Bad == IPState.G.Flag()) ? "bad" : |
|
|
((Bad == IPState.G.Flag()) ? "bad" : |
|
|
|
|
|
|
|
|
<< "\' code=\'" << IPState.Code |
|
|
<< "\' code=\'" << IPState.Code |
|
|
<< "\'" |
|
|
<< "\'" |
|
|
<< "/></gbudb></xci></snf>" // Finish it up. |
|
|
<< "/></gbudb></xci></snf>" // Finish it up. |
|
|
<< endl; |
|
|
|
|
|
|
|
|
<< std::endl; |
|
|
|
|
|
|
|
|
return Response.str(); // Return the formatted response. |
|
|
return Response.str(); // Return the formatted response. |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
string snfXCIJobProcessor::processStatusReport() { // Process a report request. |
|
|
|
|
|
string ReportToSend; // Keep this in scope. |
|
|
|
|
|
|
|
|
std::string snfXCIJobProcessor::processStatusReport() { // Process a report request. |
|
|
|
|
|
std::string ReportToSend; // Keep this in scope. |
|
|
|
|
|
|
|
|
if(0 == myXCI.report_request_status_class.find("hour")) { // Please send the hour report. |
|
|
if(0 == myXCI.report_request_status_class.find("hour")) { // Please send the hour report. |
|
|
ReportToSend = myHome->MyLOGmgr.getStatusHourReport(); |
|
|
ReportToSend = myHome->MyLOGmgr.getStatusHourReport(); |
|
|
|
|
|
|
|
|
ReportToSend = myHome->MyLOGmgr.getStatusSecondReport(); |
|
|
ReportToSend = myHome->MyLOGmgr.getStatusSecondReport(); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
string Response = "<snf><xci><report><response>"; // Construct the response using the |
|
|
|
|
|
|
|
|
std::string Response = "<snf><xci><report><response>"; // Construct the response using the |
|
|
Response.append(ReportToSend); // snf/xci template and the selected |
|
|
Response.append(ReportToSend); // snf/xci template and the selected |
|
|
Response.append("</response></report></xci></snf>"); // status report text. |
|
|
Response.append("</response></report></xci></snf>"); // status report text. |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ChannelJob::ChannelJob() : myClient(0) {} // Empty is the null client. |
|
|
ChannelJob::ChannelJob() : myClient(0) {} // Empty is the null client. |
|
|
|
|
|
|
|
|
ChannelJob::ChannelJob(TCPClient* C) : // We are created like this. |
|
|
|
|
|
|
|
|
ChannelJob::ChannelJob(cd::TCPClient* C) : // We are created like this. |
|
|
myClient(C) { // We capture the client and |
|
|
myClient(C) { // We capture the client and |
|
|
} // our timer starts automaticially. |
|
|
} // our timer starts automaticially. |
|
|
|
|
|
|
|
|
msclock ChannelJob::Age() { // How old is this job? |
|
|
|
|
|
|
|
|
cd::msclock ChannelJob::Age() { // How old is this job? |
|
|
return Lifetime.getElapsedTime(); // Return the elapsed time in ms. |
|
|
return Lifetime.getElapsedTime(); // Return the elapsed time in ms. |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
TCPClient* ChannelJob::Client() { // What client does it hold? |
|
|
|
|
|
|
|
|
cd::TCPClient* ChannelJob::Client() { // What client does it hold? |
|
|
return myClient; // Return the Client pointer. |
|
|
return myClient; // Return the Client pointer. |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// { C->submit(NULL); C->join(); delete C; C = NULL;} |
|
|
// { C->submit(NULL); C->join(); delete C; C = NULL;} |
|
|
|
|
|
|
|
|
void snfXCITCPChannel::give(ChannelJob& J) { // Give a job to the queue. |
|
|
void snfXCITCPChannel::give(ChannelJob& J) { // Give a job to the queue. |
|
|
ScopeMutex OneAtATimePlease(QueueMutex); // Protected with a mutex... |
|
|
|
|
|
|
|
|
cd::ScopeMutex OneAtATimePlease(QueueMutex); // Protected with a mutex... |
|
|
JobQueue.push(J); // Push the job in. |
|
|
JobQueue.push(J); // Push the job in. |
|
|
LatestSize = JobQueue.size(); // Set the blinking light. |
|
|
LatestSize = JobQueue.size(); // Set the blinking light. |
|
|
QueueGateway.produce(); // Add the item to our gateway. |
|
|
QueueGateway.produce(); // Add the item to our gateway. |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ChannelJob snfXCITCPChannel::take() { // Take a job from the queue. |
|
|
ChannelJob snfXCITCPChannel::take() { // Take a job from the queue. |
|
|
QueueGateway.consume(); // Hold on until there is work. |
|
|
QueueGateway.consume(); // Hold on until there is work. |
|
|
ScopeMutex OneAtATimePlease(QueueMutex); // Queue Data Protected with a mutex. |
|
|
|
|
|
|
|
|
cd::ScopeMutex OneAtATimePlease(QueueMutex); // Queue Data Protected with a mutex. |
|
|
ChannelJob J = JobQueue.front(); // Grab the next job in the queue. |
|
|
ChannelJob J = JobQueue.front(); // Grab the next job in the queue. |
|
|
JobQueue.pop(); // Pop that job out of the queue. |
|
|
JobQueue.pop(); // Pop that job out of the queue. |
|
|
LatestSize = JobQueue.size(); // Set the blinking light. |
|
|
LatestSize = JobQueue.size(); // Set the blinking light. |
|
|
|
|
|
|
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
const int RWTimeLimit = 30000; // RWTimeLimit in ms. 30 seconds. |
|
|
const int RWTimeLimit = 30000; // RWTimeLimit in ms. 30 seconds. |
|
|
const string endSNF = "</snf>"; // snf_xci snf element terminator. |
|
|
|
|
|
|
|
|
const std::string endSNF = "</snf>"; // snf_xci snf element terminator. |
|
|
const int RWPollMin = 15; // Minimum time between polls. |
|
|
const int RWPollMin = 15; // Minimum time between polls. |
|
|
const int RWPollMax = 75; // Maximum time between polls. |
|
|
const int RWPollMax = 75; // Maximum time between polls. |
|
|
const int MaxQueueLength = 32; // Most waiting in any queue. |
|
|
const int MaxQueueLength = 32; // Most waiting in any queue. |
|
|
const int MaxTCPQueueLength = 4 * MaxQueueLength; // Most connections waiting. |
|
|
const int MaxTCPQueueLength = 4 * MaxQueueLength; // Most connections waiting. |
|
|
|
|
|
|
|
|
void snfXCITCPChannel::readRequest(TCPClient* Client) { // Read Job.Request from Client. |
|
|
|
|
|
Timeout ReadTimeLimit(RWTimeLimit); // We have time limits. |
|
|
|
|
|
PollTimer ReadThrottle(RWPollMin, RWPollMax); // Throttle with a spiral delay. |
|
|
|
|
|
|
|
|
void snfXCITCPChannel::readRequest(cd::TCPClient* Client) { // Read Job.Request from Client. |
|
|
|
|
|
cd::Timeout ReadTimeLimit(RWTimeLimit); // We have time limits. |
|
|
|
|
|
cd::PollTimer ReadThrottle(RWPollMin, RWPollMax); // Throttle with a spiral delay. |
|
|
while( |
|
|
while( |
|
|
false == ReadTimeLimit.isExpired() && // Read stuff until we're out of time |
|
|
false == ReadTimeLimit.isExpired() && // Read stuff until we're out of time |
|
|
string::npos == Job.Request.find(endSNF,0) // or we have a complete request. |
|
|
|
|
|
|
|
|
std::string::npos == Job.Request.find(endSNF,0) // or we have a complete request. |
|
|
) { |
|
|
) { |
|
|
memset(LineBuffer, 0, sizeof(LineBuffer)); // Clear the buffer. |
|
|
memset(LineBuffer, 0, sizeof(LineBuffer)); // Clear the buffer. |
|
|
int bytes = Client->delimited_receive( // Read up to all but one byte |
|
|
int bytes = Client->delimited_receive( // Read up to all but one byte |
|
|
|
|
|
|
|
|
} // When we're done we will return. |
|
|
} // When we're done we will return. |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
void snfXCITCPChannel::writeResponse(TCPClient* Client) { // Write Job.Request from Client. |
|
|
|
|
|
Timeout WriteTimeLimit(RWTimeLimit); // We have a time limit. |
|
|
|
|
|
PollTimer WriteThrottle(RWPollMin, RWPollMax); // Throttle with a spiral delay. |
|
|
|
|
|
|
|
|
void snfXCITCPChannel::writeResponse(cd::TCPClient* Client) { // Write Job.Request from Client. |
|
|
|
|
|
cd::Timeout WriteTimeLimit(RWTimeLimit); // We have a time limit. |
|
|
|
|
|
cd::PollTimer WriteThrottle(RWPollMin, RWPollMax); // Throttle with a spiral delay. |
|
|
for( // For all the bytes in the response: |
|
|
for( // For all the bytes in the response: |
|
|
int Length = Job.Response.length(), BytesThisTime = 0, Bytes = 0; // Bytes to send, this time and sent. |
|
|
int Length = Job.Response.length(), BytesThisTime = 0, Bytes = 0; // Bytes to send, this time and sent. |
|
|
Bytes < Length && // Keep going if we've got more to |
|
|
Bytes < Length && // Keep going if we've got more to |
|
|
|
|
|
|
|
|
} // Go again. |
|
|
} // Go again. |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
const ThreadType snfXCITCPChannel::Type("snfXCITCPChannel"); // The thread's type. |
|
|
|
|
|
|
|
|
const cd::ThreadType snfXCITCPChannel::Type("snfXCITCPChannel"); // The thread's type. |
|
|
|
|
|
|
|
|
//// XCI Thread States |
|
|
//// XCI Thread States |
|
|
|
|
|
|
|
|
const ThreadState snfXCITCPChannel::XCI_Wait("Waiting For Take()"); |
|
|
|
|
|
const ThreadState snfXCITCPChannel::XCI_Read("Reading Request"); |
|
|
|
|
|
const ThreadState snfXCITCPChannel::XCI_Process("Processing Job"); |
|
|
|
|
|
const ThreadState snfXCITCPChannel::XCI_Write("Writing Results"); |
|
|
|
|
|
const ThreadState snfXCITCPChannel::XCI_Close("Closing Connection"); |
|
|
|
|
|
const ThreadState snfXCITCPChannel::XCI_Clear("Clearing Workspace"); |
|
|
|
|
|
const ThreadState snfXCITCPChannel::XCI_Shutdown("Shutting Down"); |
|
|
|
|
|
|
|
|
const cd::ThreadState snfXCITCPChannel::XCI_Wait("Waiting For Take()"); |
|
|
|
|
|
const cd::ThreadState snfXCITCPChannel::XCI_Read("Reading Request"); |
|
|
|
|
|
const cd::ThreadState snfXCITCPChannel::XCI_Process("Processing Job"); |
|
|
|
|
|
const cd::ThreadState snfXCITCPChannel::XCI_Write("Writing Results"); |
|
|
|
|
|
const cd::ThreadState snfXCITCPChannel::XCI_Close("Closing Connection"); |
|
|
|
|
|
const cd::ThreadState snfXCITCPChannel::XCI_Clear("Clearing Workspace"); |
|
|
|
|
|
const cd::ThreadState snfXCITCPChannel::XCI_Shutdown("Shutting Down"); |
|
|
|
|
|
|
|
|
snfXCITCPChannel::snfXCITCPChannel(snf_RulebaseHandler* H, string N) : // Create these with a home rulebase. |
|
|
|
|
|
|
|
|
snfXCITCPChannel::snfXCITCPChannel(snf_RulebaseHandler* H, std::string N) : // Create these with a home rulebase. |
|
|
Thread(snfXCITCPChannel::Type, N), // XCI TCP Channel Type & name. |
|
|
Thread(snfXCITCPChannel::Type, N), // XCI TCP Channel Type & name. |
|
|
myHome(H), // We know our home. |
|
|
myHome(H), // We know our home. |
|
|
Processor(H), // Our processor has a rulebase. |
|
|
Processor(H), // Our processor has a rulebase. |
|
|
|
|
|
|
|
|
return LatestSize; // Flash the blinking light. |
|
|
return LatestSize; // Flash the blinking light. |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
void snfXCITCPChannel::submit(TCPClient* C) { // This is how we submit jobs. |
|
|
|
|
|
|
|
|
void snfXCITCPChannel::submit(cd::TCPClient* C) { // This is how we submit jobs. |
|
|
ChannelJob J(C); // Create a Job for this client. |
|
|
ChannelJob J(C); // Create a Job for this client. |
|
|
give(J); // Give it (copy) to the queue. |
|
|
give(J); // Give it (copy) to the queue. |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
snfXCITCPChannel* snfXCImgr::BestAvailableChannel() { // Selects XCI channel w/ lowest queue. |
|
|
snfXCITCPChannel* snfXCImgr::BestAvailableChannel() { // Selects XCI channel w/ lowest queue. |
|
|
return LowestQueue( // Pick the lowest of the lowest.
|
|
|
|
|
|
|
|
|
return LowestQueue( // Pick the lowest of the lowest. |
|
|
LowestQueue( |
|
|
LowestQueue( |
|
|
LowestQueue(C0, C1), |
|
|
LowestQueue(C0, C1), |
|
|
LowestQueue(C2, C3)
|
|
|
|
|
|
),
|
|
|
|
|
|
LowestQueue(
|
|
|
|
|
|
|
|
|
LowestQueue(C2, C3) |
|
|
|
|
|
), |
|
|
|
|
|
LowestQueue( |
|
|
LowestQueue(C4, C5), |
|
|
LowestQueue(C4, C5), |
|
|
LowestQueue(C6, C7)
|
|
|
|
|
|
|
|
|
LowestQueue(C6, C7) |
|
|
) |
|
|
) |
|
|
); |
|
|
); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
void snfXCImgr::startup_Listener() { // Listener startup function. |
|
|
void snfXCImgr::startup_Listener() { // Listener startup function. |
|
|
if(0 == Listener) { // If we need a new listener: |
|
|
if(0 == Listener) { // If we need a new listener: |
|
|
Listener = new TCPListener(CFG_XCI_PORT); // Create a new listener. |
|
|
|
|
|
|
|
|
Listener = new cd::TCPListener(CFG_XCI_PORT); // Create a new listener. |
|
|
Listener->MaxPending = MaxTCPQueueLength; // We may get a lot of hits ;-) |
|
|
Listener->MaxPending = MaxTCPQueueLength; // We may get a lot of hits ;-) |
|
|
Listener->open(); // Open it for business. |
|
|
Listener->open(); // Open it for business. |
|
|
Listener->makeNonBlocking(); // Make it non-blocking. |
|
|
Listener->makeNonBlocking(); // Make it non-blocking. |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void snfXCImgr::startup_XCI() { // XCI startup function. |
|
|
void snfXCImgr::startup_XCI() { // XCI startup function. |
|
|
if(true == XCI_UP) return; // If we're already up we're done. |
|
|
if(true == XCI_UP) return; // If we're already up we're done. |
|
|
ScopeMutex IGotIt(ChannelMutex); // Serialize state control for safety. |
|
|
|
|
|
|
|
|
cd::ScopeMutex IGotIt(ChannelMutex); // Serialize state control for safety. |
|
|
if(myHome) { // We need to know our home. |
|
|
if(myHome) { // We need to know our home. |
|
|
if(CFG_XCI_ON) { // If XCI is configured on, startup! |
|
|
if(CFG_XCI_ON) { // If XCI is configured on, startup! |
|
|
C0 = new snfXCITCPChannel(myHome, "C0"); // Launch our 8 processing channels. |
|
|
C0 = new snfXCITCPChannel(myHome, "C0"); // Launch our 8 processing channels. |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void snfXCImgr::shutdown_XCI() { // XCI shutdown function. |
|
|
void snfXCImgr::shutdown_XCI() { // XCI shutdown function. |
|
|
if(false == XCI_UP) return; // If we're already down we're done. |
|
|
if(false == XCI_UP) return; // If we're already down we're done. |
|
|
ScopeMutex IGotIt(ChannelMutex); // Serialize state control for safety. |
|
|
|
|
|
|
|
|
cd::ScopeMutex IGotIt(ChannelMutex); // Serialize state control for safety. |
|
|
shutdown_Listener(); // If up, take down & 0 the Listener. |
|
|
shutdown_Listener(); // If up, take down & 0 the Listener. |
|
|
if(C0) { delete C0; C0 = 0; } // If up, take C0 down and NULL it. |
|
|
if(C0) { delete C0; C0 = 0; } // If up, take C0 down and NULL it. |
|
|
if(C1) { delete C1; C1 = 0; } // If up, take C1 down and NULL it. |
|
|
if(C1) { delete C1; C1 = 0; } // If up, take C1 down and NULL it. |
|
|
|
|
|
|
|
|
return x; |
|
|
return x; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
const ThreadState snfXCImgr::XCI_InitialConfig("Initial Config"); // Getting initial configuration. |
|
|
|
|
|
const ThreadState snfXCImgr::XCI_InitialStartup("Initial Startup"); // Performing first startup. |
|
|
|
|
|
const ThreadState snfXCImgr::XCI_CheckConfig("Checking Config"); // Checking configuration. |
|
|
|
|
|
const ThreadState snfXCImgr::XCI_PollingListener("Polling Listener"); // Polling Listener for jobs. |
|
|
|
|
|
const ThreadState snfXCImgr::XCI_SubmittingJob("Submitting Job"); // Submitting a new job. |
|
|
|
|
|
const ThreadState snfXCImgr::XCI_ListenerDown("Listener Down!"); // Listener is down. |
|
|
|
|
|
const ThreadState snfXCImgr::XCI_Stopping("Exited Polling Loop"); // XCImgr Exiting Big Loop |
|
|
|
|
|
|
|
|
const cd::ThreadState snfXCImgr::XCI_InitialConfig("Initial Config"); // Getting initial configuration. |
|
|
|
|
|
const cd::ThreadState snfXCImgr::XCI_InitialStartup("Initial Startup"); // Performing first startup. |
|
|
|
|
|
const cd::ThreadState snfXCImgr::XCI_CheckConfig("Checking Config"); // Checking configuration. |
|
|
|
|
|
const cd::ThreadState snfXCImgr::XCI_PollingListener("Polling Listener"); // Polling Listener for jobs. |
|
|
|
|
|
const cd::ThreadState snfXCImgr::XCI_SubmittingJob("Submitting Job"); // Submitting a new job. |
|
|
|
|
|
const cd::ThreadState snfXCImgr::XCI_ListenerDown("Listener Down!"); // Listener is down. |
|
|
|
|
|
const cd::ThreadState snfXCImgr::XCI_Stopping("Exited Polling Loop"); // XCImgr Exiting Big Loop |
|
|
|
|
|
|
|
|
void snfXCImgr::myTask() { // Main thread task. |
|
|
void snfXCImgr::myTask() { // Main thread task. |
|
|
PollTimer PollingThrottle(RWPollMin, RWPollMax); // Set up a dynamic delay. |
|
|
|
|
|
Timeout WaitForCFG(1000); // CFG Check every second or so. |
|
|
|
|
|
|
|
|
cd::PollTimer PollingThrottle(RWPollMin, RWPollMax); // Set up a dynamic delay. |
|
|
|
|
|
cd::Timeout WaitForCFG(1000); // CFG Check every second or so. |
|
|
|
|
|
|
|
|
// Wait for our initial configuration. |
|
|
// Wait for our initial configuration. |
|
|
CurrentThreadState(XCI_InitialConfig); // Update our status. |
|
|
CurrentThreadState(XCI_InitialConfig); // Update our status. |
|
|
|
|
|
|
|
|
Sleeper WaitATic(1000); // One second sleeper. |
|
|
|
|
|
|
|
|
cd::Sleeper WaitATic(1000); // One second sleeper. |
|
|
while(false == CFG_XCI_ON) { // Before we've been turned on |
|
|
while(false == CFG_XCI_ON) { // Before we've been turned on |
|
|
if(TimeToStop) return; // loop unless it's time to stop. |
|
|
if(TimeToStop) return; // loop unless it's time to stop. |
|
|
checkCFG(); WaitForCFG.restart(); // Check our configuration |
|
|
checkCFG(); WaitForCFG.restart(); // Check our configuration |
|
|
|
|
|
|
|
|
int JobsThisRound = 0; // Keep track of each batch. |
|
|
int JobsThisRound = 0; // Keep track of each batch. |
|
|
if(Listener) { // Check for a good listener. |
|
|
if(Listener) { // Check for a good listener. |
|
|
CurrentThreadState(XCI_PollingListener); // Update our status. |
|
|
CurrentThreadState(XCI_PollingListener); // Update our status. |
|
|
TCPClient* NewClient; // This will be our client. |
|
|
|
|
|
|
|
|
cd::TCPClient* NewClient; // This will be our client. |
|
|
do { // Fast as we can - grab the work: |
|
|
do { // Fast as we can - grab the work: |
|
|
++diagLoopCount; // Count Polling Loops. |
|
|
++diagLoopCount; // Count Polling Loops. |
|
|
NewClient = 0; // Clear our client pointer. |
|
|
NewClient = 0; // Clear our client pointer. |
|
|
|
|
|
|
|
|
shutdown_XCI(); // Shutdown if we're not already. |
|
|
shutdown_XCI(); // Shutdown if we're not already. |
|
|
} // End of the active section. |
|
|
} // End of the active section. |
|
|
|
|
|
|
|
|
catch(exception& e) { // If we get a knowable exception |
|
|
|
|
|
|
|
|
catch(const std::exception& e) { // If we get a knowable exception |
|
|
myHome->logThisError("XCI", -9, e.what()); // then we report it in detail, |
|
|
myHome->logThisError("XCI", -9, e.what()); // then we report it in detail, |
|
|
try { shutdown_XCI(); } catch(...) {} // shutdown if we're not already, |
|
|
try { shutdown_XCI(); } catch(...) {} // shutdown if we're not already, |
|
|
WaitATic(); // wait a tic and try again. |
|
|
WaitATic(); // wait a tic and try again. |
|
|
|
|
|
|
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
const ThreadType snfXCImgr::Type("snfXCIManager"); // The thread's type. |
|
|
|
|
|
|
|
|
const cd::ThreadType snfXCImgr::Type("snfXCIManager"); // The thread's type. |
|
|
const int XCI_Default_Port = 9001; // Listener Default port = 9001. |
|
|
const int XCI_Default_Port = 9001; // Listener Default port = 9001. |
|
|
|
|
|
|
|
|
snfXCImgr::snfXCImgr() : // Construct with no home. |
|
|
snfXCImgr::snfXCImgr() : // Construct with no home. |
|
|
Thread(snfXCImgr::Type, "XCI Manager"), // XCI Manager type and Name. |
|
|
|
|
|
|
|
|
cd::Thread(snfXCImgr::Type, "XCI Manager"), // XCI Manager type and Name. |
|
|
CFG_XCI_ON(false), // Everything starts off, |
|
|
CFG_XCI_ON(false), // Everything starts off, |
|
|
CFG_XCI_PORT(XCI_Default_Port), // default, and |
|
|
CFG_XCI_PORT(XCI_Default_Port), // default, and |
|
|
myHome(0), // nulled. |
|
|
myHome(0), // nulled. |
|
|
C0(0), C1(0), C2(0), C3(0), |
|
|
C0(0), C1(0), C2(0), C3(0), |
|
|
Listener(0),
|
|
|
|
|
|
XCI_UP(false),
|
|
|
|
|
|
|
|
|
Listener(0), |
|
|
|
|
|
XCI_UP(false), |
|
|
diagLoopCount(0), diagClientCount(0), |
|
|
diagLoopCount(0), diagClientCount(0), |
|
|
TimeToStop(true) { // We don't run until linkHome(). |
|
|
TimeToStop(true) { // We don't run until linkHome(). |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
int snfXCImgr::TotalQueue() { // Return the total work queue size. |
|
|
int snfXCImgr::TotalQueue() { // Return the total work queue size. |
|
|
ScopeMutex IGotIt(ChannelMutex); // Serialize state control for safety. |
|
|
|
|
|
|
|
|
cd::ScopeMutex IGotIt(ChannelMutex); // Serialize state control for safety. |
|
|
return ( |
|
|
return ( |
|
|
((0 == C0) ? 0 : C0->Size()) + |
|
|
((0 == C0) ? 0 : C0->Size()) + |
|
|
((0 == C1) ? 0 : C1->Size()) + |
|
|
((0 == C1) ? 0 : C1->Size()) + |