Nevar pievienot vairāk kā 25 tēmas Tēmai ir jāsākas ar burtu vai ciparu, tā var saturēt domu zīmes ('-') un var būt līdz 35 simboliem gara.

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802
  1. // snfXCImgr.cpp
  2. // Copyright (C) 2007 - 2020 ARM Research Labs, LLC
  3. // See www.armresearch.com for the copyright terms.
  4. //
  5. // See snfXCImgr.hpp for details.
  6. #include "SNFMulti.hpp"
  7. #include "snfXCImgr.hpp"
  8. namespace cd = codedweller;
  9. // snfXCIServerCommandHandler Virtual Base Class Default Processor.
  10. const std::string XCIServerCommandDefaultResponse =
  11. "<snf><xci><server><response message=\'Not Implemented\' code=\'-1\'/></server></xci></snf>\n";
  12. std::string snfXCIServerCommandHandler::processXCIRequest(snf_xci& X) { // A Server using SNFMulti
  13. return XCIServerCommandDefaultResponse; // can provide a useful processor.
  14. }
  15. // snfXCIJob encapsulates a single XCI transaction.
  16. void snfXCIJob::clear() { // Clear the buffers.
  17. Request.clear(); // Clear the request and
  18. Response.clear(); // response buffers.
  19. SetupTime = 0; // No setup time yet.
  20. }
  21. // snfXCIJobProcessor encapsulates the logic to respond to an XCI request.
  22. snfXCIJobProcessor::snfXCIJobProcessor(snf_RulebaseHandler* H) : // Setup scanner.
  23. myHome(H) { // Establish myHome from H.
  24. myEngine = new snf_EngineHandler(); // Create an engine handler and
  25. myEngine->open(H); // tie it in to our home rulebase.
  26. }
  27. snfXCIJobProcessor::~snfXCIJobProcessor() { // Tear down scanner.
  28. if(myEngine) { // Checking first that we have one,
  29. myEngine->close(); // close the engine and then
  30. delete myEngine; // delete it. Set the pointer to
  31. myEngine = 0; // NULL to enforce the point.
  32. }
  33. myHome = 0; // NULL out our home too.
  34. }
  35. //// This collection of functions handle the processing of all XCI requests.
  36. bool snfXCIJobProcessor::isScanJob() { // True if myXCI is a scan job.
  37. if(0 < myXCI.scanner_scan_file.length()) return true; // If we have a scan file: true!
  38. return false; // otherwise false.
  39. }
  40. bool snfXCIJobProcessor::isGBUdbJob() { // True if myXCI is a GBUdb job.
  41. if( // GBUdb jobs have either
  42. 0 < myXCI.gbudb_test_ip.length() || // an IP to test or
  43. 0 < myXCI.gbudb_set_ip.length() || // an IP to setup or
  44. 0 < myXCI.gbudb_bad_ip.length() || // a bad IP to flag or
  45. 0 < myXCI.gbudb_good_ip.length() || // a good IP to flag or
  46. 0 < myXCI.gbudb_drop_ip.length()
  47. ) return true; // If we have one of these: true!
  48. return false; // otherwise false.
  49. }
  50. bool snfXCIJobProcessor::isReportJob() { // True if myXCI is a Report job.
  51. if(0 < myXCI.report_request_status_class.length()) return true; // If we have a report status class
  52. return false; // it's a report otherwise it's not.
  53. }
  54. bool snfXCIJobProcessor::isCommandJob() { // True if myXCI is a Command job.
  55. if(0 < myXCI.xci_server_command.length()) return true; // If we have a command string: true!
  56. return false; // otherwise false.
  57. }
  58. void snfXCIJobProcessor::processScan(snfXCIJob& J) { // Process a scan request.
  59. try { // Safely perform our scan.
  60. // Check for forced IP.
  61. cd::IP4Address ForcedIP = 0UL; // Don't expect a forced IP.
  62. if(0 < myXCI.scanner_scan_ip.length()) { // If we have one then
  63. ForcedIP = myXCI.scanner_scan_ip; // convert it from the string.
  64. }
  65. // Scan the message file.
  66. int ScanResult = // Scan the file using our
  67. myEngine->scanMessageFile( // engine. Use the file
  68. myXCI.scanner_scan_file.c_str(), // path in the XCI request, and
  69. J.SetupTime, // the recorded setup time. Use the
  70. ForcedIP // forced IP if provided.
  71. );
  72. // Create a proper xci resposne.
  73. std::ostringstream ResultString; // Use a stringstream to make it easier.
  74. ResultString
  75. << "<snf><xci><scanner><result code=\'" // Emit the preamble.
  76. << ScanResult << "\'"; // Emit the scan result.
  77. if( // Check for optional data requests.
  78. false == myXCI.scanner_scan_xhdr &&
  79. false == myXCI.scanner_scan_log
  80. ) { // If no optional data was requested
  81. ResultString // then close the <request/> and
  82. << "/></scanner></xci></snf>\n" // emit the closing elements.
  83. << std::endl; // End of the line.
  84. } else { // If optional data is requested:
  85. ResultString << ">" << std::endl; // Complete the <result> open tag.
  86. if(true == myXCI.scanner_scan_xhdr) { // Optionally include XHDR data.
  87. ResultString // If xheaders are requested...
  88. << "<xhdr>" << myEngine->getXHDRs() // Emit the xhdr element & contents.
  89. << "</xhdr>" << std::endl; // End the xhdr and end of line.
  90. }
  91. if(true == myXCI.scanner_scan_log) { // Optionally include XMLLog data.
  92. ResultString // If the log data is requested...
  93. << "<log>" << myEngine->getXMLLog() // Emit the log element & data.
  94. << "</log>" << std::endl; // End the log data and end of line.
  95. }
  96. ResultString << "</result></scanner></xci></snf>\n"; // Emit the closing elements.
  97. }
  98. J.Response = ResultString.str(); // Capture the formatted response.
  99. }
  100. // Decode the known exceptions
  101. catch(const snf_EngineHandler::AllocationError& e) {
  102. J.Response = "<snf><xci><error message=\'AllocationError ";
  103. J.Response.append(e.what());
  104. J.Response.append("\'/></xci></snf>\n");
  105. }
  106. catch(snf_EngineHandler::BadMatrix& e) {
  107. J.Response = "<snf><xci><error message=\'BadMatrix ";
  108. J.Response.append(e.what());
  109. J.Response.append("\'/></xci></snf>\n");
  110. }
  111. catch(snf_EngineHandler::Busy& e) {
  112. J.Response = "<snf><xci><error message=\'Busy ";
  113. J.Response.append(e.what());
  114. J.Response.append("\'/></xci></snf>\n");
  115. }
  116. catch(snf_EngineHandler::FileError& e) {
  117. J.Response = "<snf><xci><error message=\'FileError ";
  118. J.Response.append(e.what());
  119. J.Response.append("\'/></xci></snf>\n");
  120. }
  121. catch(snf_EngineHandler::MaxEvals& e) {
  122. J.Response = "<snf><xci><error message=\'MaxEvals ";
  123. J.Response.append(e.what());
  124. J.Response.append("\'/></xci></snf>\n");
  125. }
  126. catch(snf_EngineHandler::Panic& e) {
  127. J.Response = "<snf><xci><error message=\'Panic ";
  128. J.Response.append(e.what());
  129. J.Response.append("\'/></xci></snf>\n");
  130. }
  131. catch(snf_EngineHandler::XHDRError& e) {
  132. J.Response = "<snf><xci><error message=\'XHDRError ";
  133. J.Response.append(e.what());
  134. J.Response.append("\'/></xci></snf>\n");
  135. }
  136. // Decode the unknown exceptions
  137. catch(const std::exception& e) {
  138. J.Response = "<snf><xci><error message=\'Exception! ";
  139. J.Response.append(e.what());
  140. J.Response.append("\'/></xci></snf>\n");
  141. }
  142. catch(...) {
  143. J.Response = "<snf><xci><error message=\'... Thrown!\'/></xci></snf>\n";
  144. }
  145. }
  146. std::string snfXCIJobProcessor::processGBUdb() { // Process a GBUdb request.
  147. GBUdb& myGBUdb = myHome->MyGBUdb; // Make a convenient GBUdb handle.
  148. cd::IP4Address IP; // We will work with an IP.
  149. GBUdbRecord R; // We will get a record to return.
  150. // Test an IP - return it's current data.
  151. if(0 < myXCI.gbudb_test_ip.length()) { // IF: Test an IP
  152. IP = myXCI.gbudb_test_ip; // Convert the IP.
  153. } else
  154. // Set or update an IP's data.
  155. if(0 < myXCI.gbudb_set_ip.length()) { // IF: Set an IP's data.
  156. IP = myXCI.gbudb_set_ip; // Convert the IP.
  157. if( // Check for a compound update:
  158. 0 <= myXCI.gbudb_set_bad_count || // If we are changing the bad
  159. 0 <= myXCI.gbudb_set_good_count // or good count then this is
  160. ) { // a compound update (read then write).
  161. R = myGBUdb.getRecord(IP); // Get the record (or a safe blank).
  162. if(0 <= myXCI.gbudb_set_bad_count) // If we have a bad count to set
  163. R.Bad(myXCI.gbudb_set_bad_count); // then set the bad count.
  164. if(0 <= myXCI.gbudb_set_good_count) // If we have a good count to set
  165. R.Good(myXCI.gbudb_set_good_count); // then set the good count.
  166. if(0 < myXCI.gbudb_set_type.length()) { // If type, set type...
  167. switch(myXCI.gbudb_set_type.at(0)) { // Determine the type based on the
  168. case 'g': case 'G': { R.Flag(Good); break; } // first character of the name and
  169. case 'b': case 'B': { R.Flag(Bad); break; } // set the appropriate flag.
  170. case 'u': case 'U': { R.Flag(Ugly); break; }
  171. case 'i': case 'I': { R.Flag(Ignore); break; }
  172. }
  173. }
  174. myGBUdb.setRecord(IP, R); // Save the data.
  175. } else // This might be a simple flag change.
  176. if(0 < myXCI.gbudb_set_type.length()) { // If type, set type...
  177. switch(myXCI.gbudb_set_type.at(0)) { // Determine the type based on the
  178. case 'g': case 'G': { R = myGBUdb.setGood(IP); break; } // first character of the name and
  179. case 'b': case 'B': { R = myGBUdb.setBad(IP); break; } // set the appropriate flag. Simple
  180. case 'u': case 'U': { R = myGBUdb.setUgly(IP); break; } // flag changes are atomic so there is
  181. case 'i': case 'I': { R = myGBUdb.setIgnore(IP); break; } // no need to "save" later.
  182. }
  183. } else { // Empty set command?
  184. return XCIBadSetResponse; // That's bad. Use test!
  185. }
  186. } else
  187. // Add a bad event to an IPs data.
  188. if(0 < myXCI.gbudb_bad_ip.length()) { // IF: Add a bad mark for this IP
  189. IP = myXCI.gbudb_bad_ip; // Convert the IP.
  190. R = myGBUdb.addBad(IP); // Add a bad mark.
  191. } else
  192. // Add a good event to an IPs data.
  193. if(0 < myXCI.gbudb_good_ip.length()) { // IF: Add a good mark for this IP
  194. IP = myXCI.gbudb_good_ip; // Convert the IP.
  195. R = myGBUdb.addGood(IP); // Add a bad mark.
  196. } else
  197. // Drop an IP from the database.
  198. if(0 < myXCI.gbudb_drop_ip.length()) { // IF: Drop an IP's data.
  199. IP = myXCI.gbudb_drop_ip; // Convert the IP.
  200. myGBUdb.dropRecord(IP); // Forget about it.
  201. }
  202. // Return the final state of the IP's data.
  203. IPTestRecord IPState(IP);
  204. myHome->performIPTest(IPState);
  205. std::ostringstream Response; // Use a stringstream for our output.
  206. Response
  207. << "<snf><xci><gbudb><result " // Get the response started.
  208. << "ip=\'" << (std::string) IP // Emit the ip.
  209. << "\' type=\'" // Emit the type.
  210. << ((Good == IPState.G.Flag()) ? "good" :
  211. ((Bad == IPState.G.Flag()) ? "bad" :
  212. ((Ugly == IPState.G.Flag()) ? "ugly" :
  213. ((Ignore == IPState.G.Flag()) ? "ignore" : "error"))))
  214. << "\' p=\'" << IPState.G.Probability() // Emit the probability.
  215. << "\' c=\'" << IPState.G.Confidence() // Emit the confidence.
  216. << "\' b=\'" << IPState.G.Bad() // Emit the bad count.
  217. << "\' g=\'" << IPState.G.Good() // Emit the good count.
  218. << "\' range=\'"
  219. << ((snfIPRange::Unknown == IPState.R) ? "unknown" :
  220. ((snfIPRange::White == IPState.R) ? "white" :
  221. ((snfIPRange::Normal == IPState.R) ? "normal" :
  222. ((snfIPRange::New == IPState.R) ? "new" :
  223. ((snfIPRange::Caution == IPState.R) ? "caution" :
  224. ((snfIPRange::Black == IPState.R) ? "black" :
  225. ((snfIPRange::Truncate == IPState.R) ? "truncate" : "error")))))))
  226. << "\' code=\'" << IPState.Code
  227. << "\'"
  228. << "/></gbudb></xci></snf>" // Finish it up.
  229. << std::endl;
  230. return Response.str(); // Return the formatted response.
  231. }
  232. std::string snfXCIJobProcessor::processStatusReport() { // Process a report request.
  233. std::string ReportToSend; // Keep this in scope.
  234. if(0 == myXCI.report_request_status_class.find("hour")) { // Please send the hour report.
  235. ReportToSend = myHome->MyLOGmgr.getStatusHourReport();
  236. } else
  237. if(0 == myXCI.report_request_status_class.find("minute")) { // Please send the minute report.
  238. ReportToSend = myHome->MyLOGmgr.getStatusMinuteReport();
  239. } else { // Please send the second report.
  240. ReportToSend = myHome->MyLOGmgr.getStatusSecondReport();
  241. }
  242. std::string Response = "<snf><xci><report><response>"; // Construct the response using the
  243. Response.append(ReportToSend); // snf/xci template and the selected
  244. Response.append("</response></report></xci></snf>"); // status report text.
  245. return Response; // Return the response.
  246. }
  247. void snfXCIJobProcessor::process(snfXCIJob& J) { // Process a Job.
  248. // Parse the XCI request and check for an error.
  249. myXCI.read(J.Request); // Parse the request.
  250. if(myXCI.bad()) { // If it's bad then
  251. J.Response = XCIErrorResponse; // respond with an error.
  252. myHome->logThisError("XCI",-1,"Bad Request"); // Log the error.
  253. return; // Done.
  254. } else
  255. // Process scan requests.
  256. if(isScanJob()) { // If this is a Scan request
  257. processScan(J); // respond with the result.
  258. return; // Done.
  259. } else
  260. // Process gbudb requests.
  261. if(isGBUdbJob()) { // If this is a GBUdb request
  262. J.Response = processGBUdb(); // respond with the result.
  263. return; // Done.
  264. } else
  265. // Process report requests.
  266. if(isReportJob()) { // If this is a Status report request
  267. J.Response = processStatusReport(); // respond with the desired report.
  268. return; // Done.
  269. } else
  270. // Process server commands.
  271. if(isCommandJob()) { // If this is a server command
  272. J.Response = myHome->processXCIServerCommandRequest(myXCI); // pass it up and return the
  273. return; // result. Done.
  274. } else
  275. // If we get to this point we don't understand the well formed request.
  276. J.Response = XCIErrorResponse; // Don't understand?
  277. myHome->logThisError("XCI",-2,"Unrecognized Request"); // Log the error. Respond with
  278. return; // the standard error response.
  279. }
  280. // ChannelJob encapsulates a Client Job while in the queue and how long it has
  281. // been in the system (since created).
  282. ChannelJob::ChannelJob() : myClient(0) {} // Empty is the null client.
  283. ChannelJob::ChannelJob(cd::TCPClient* C) : // We are created like this.
  284. myClient(C) { // We capture the client and
  285. } // our timer starts automaticially.
  286. cd::msclock ChannelJob::Age() { // How old is this job?
  287. return Lifetime.getElapsedTime(); // Return the elapsed time in ms.
  288. }
  289. cd::TCPClient* ChannelJob::Client() { // What client does it hold?
  290. return myClient; // Return the Client pointer.
  291. }
  292. // snfXCITCPChannel encapsulates the logic to queue and handle TCPClients for
  293. // the XCI interface. The queued TCPClients each represent a single request.
  294. // Each request is handled in turn by reading the request into an snfXCIJob,
  295. // handing that snfXCIJob to an snfXCIJobProcessor, transmitting the result
  296. // back to the TCPClient, closing the connection, and recycling the snfXCIJob
  297. // object for the next round.
  298. // snfXCITCPChannel shuts down when given a NULL TCPClient; This allows any
  299. // jobs in queue to be handled before the thread stops. To shut down a channel
  300. // { C->submit(NULL); C->join(); delete C; C = NULL;}
  301. void snfXCITCPChannel::give(ChannelJob& J) { // Give a job to the queue.
  302. cd::ScopeMutex OneAtATimePlease(QueueMutex); // Protected with a mutex...
  303. JobQueue.push(J); // Push the job in.
  304. LatestSize = JobQueue.size(); // Set the blinking light.
  305. QueueGateway.produce(); // Add the item to our gateway.
  306. }
  307. ChannelJob snfXCITCPChannel::take() { // Take a job from the queue.
  308. QueueGateway.consume(); // Hold on until there is work.
  309. cd::ScopeMutex OneAtATimePlease(QueueMutex); // Queue Data Protected with a mutex.
  310. ChannelJob J = JobQueue.front(); // Grab the next job in the queue.
  311. JobQueue.pop(); // Pop that job out of the queue.
  312. LatestSize = JobQueue.size(); // Set the blinking light.
  313. return J; // Return the Job.
  314. }
  315. const int RWTimeLimit = 30000; // RWTimeLimit in ms. 30 seconds.
  316. const std::string endSNF = "</snf>"; // snf_xci snf element terminator.
  317. const int RWPollMin = 15; // Minimum time between polls.
  318. const int RWPollMax = 75; // Maximum time between polls.
  319. const int MaxQueueLength = 32; // Most waiting in any queue.
  320. const int MaxTCPQueueLength = 4 * MaxQueueLength; // Most connections waiting.
  321. void snfXCITCPChannel::readRequest(cd::TCPClient* Client) { // Read Job.Request from Client.
  322. cd::Timeout ReadTimeLimit(RWTimeLimit); // We have time limits.
  323. cd::PollTimer ReadThrottle(RWPollMin, RWPollMax); // Throttle with a spiral delay.
  324. while(
  325. false == ReadTimeLimit.isExpired() && // Read stuff until we're out of time
  326. std::string::npos == Job.Request.find(endSNF,0) // or we have a complete request.
  327. ) {
  328. memset(LineBuffer, 0, sizeof(LineBuffer)); // Clear the buffer.
  329. int bytes = Client->delimited_receive( // Read up to all but one byte
  330. LineBuffer, sizeof(LineBuffer)-1, '\n'); // of the buffer up to the first \n.
  331. if(0 < bytes) { // If we got some bytes
  332. Job.Request.append(LineBuffer); // Append the data we got and
  333. ReadThrottle.reset(); // reset the throttle.
  334. } else { // If we didn't get any bytes then
  335. ReadThrottle.pause(); // wait a little bit more each round.
  336. }
  337. } // When we're done we will return.
  338. }
  339. void snfXCITCPChannel::writeResponse(cd::TCPClient* Client) { // Write Job.Request from Client.
  340. cd::Timeout WriteTimeLimit(RWTimeLimit); // We have a time limit.
  341. cd::PollTimer WriteThrottle(RWPollMin, RWPollMax); // Throttle with a spiral delay.
  342. for( // For all the bytes in the response:
  343. int Length = Job.Response.length(), BytesThisTime = 0, Bytes = 0; // Bytes to send, this time and sent.
  344. Bytes < Length && // Keep going if we've got more to
  345. false == WriteTimeLimit.isExpired(); // send and we still have time.
  346. ) {
  347. BytesThisTime = Client->transmit( // Transmit some bytes.
  348. &Job.Response[Bytes], Job.Response.length()-Bytes); // from where we are, what is left.
  349. if(0 < BytesThisTime) { // If we sent bytes
  350. Bytes += BytesThisTime; // then keep track of how many
  351. WriteThrottle.reset(); // and reset our throttle to min.
  352. } else { // If we didn't then pause a bit
  353. WriteThrottle.pause(); // and let our delay grow.
  354. }
  355. }
  356. }
  357. const int XCI_Reading = 0; // XCI Mode Flags.
  358. const int XCI_Processing = 1;
  359. const int XCI_Writing = 2;
  360. void snfXCITCPChannel::myTask() { // Thread's main loop.
  361. bool WeAreAlive = true; // It's not over 'til it's over.
  362. while(WeAreAlive) { // While we are alive:
  363. CurrentThreadState(XCI_Wait); // Mark our state.
  364. ChannelJob J = take(); // Pull a Client Job from the queue.
  365. if(0 == J.Client()) { // If the job is empty we're done.
  366. CurrentThreadState(XCI_Shutdown); // Mark our state.
  367. WeAreAlive = false; // Turn off the alive flag and
  368. break; // break out of the loop.
  369. } else { // When we have a job to do:
  370. int XCIMode = XCI_Reading;
  371. try {
  372. CurrentThreadState(XCI_Read);
  373. XCIMode = XCI_Reading; // Now we are reading.
  374. readRequest(J.Client()); // Read the client job.
  375. CurrentThreadState(XCI_Process);
  376. XCIMode = XCI_Processing; // Now we are processing.
  377. Job.SetupTime = J.Age(); // Capture the read and queue time.
  378. Processor.process(Job); // Pass the XCIJob to our processor.
  379. CurrentThreadState(XCI_Write);
  380. XCIMode = XCI_Writing; // Now we are writing.
  381. writeResponse(J.Client()); // Write the response.
  382. }
  383. // Log any exceptions that were thrown.
  384. catch(...) {
  385. switch(XCIMode) {
  386. case XCI_Reading: {
  387. myHome->logThisError("XCI",-5,"SocketReadError");
  388. break;
  389. }
  390. case XCI_Processing: {
  391. myHome->logThisError("XCI",-6,"ProcessError");
  392. break;
  393. }
  394. case XCI_Writing: {
  395. myHome->logThisError("XCI",-7,"SocketWriteError");
  396. break;
  397. }
  398. }
  399. }
  400. }
  401. // At the end of every job we clean up no matter what.
  402. if(0 != J.Client()) { // If we have a client
  403. CurrentThreadState(XCI_Close);
  404. J.Client()->close(); // Close the client.
  405. delete J.Client(); // Delete the client.
  406. }
  407. CurrentThreadState(XCI_Clear);
  408. Job.clear(); // Clear the job buffer.
  409. } // Go again.
  410. }
  411. const cd::ThreadType snfXCITCPChannel::Type("snfXCITCPChannel"); // The thread's type.
  412. //// XCI Thread States
  413. const cd::ThreadState snfXCITCPChannel::XCI_Wait("Waiting For Take()");
  414. const cd::ThreadState snfXCITCPChannel::XCI_Read("Reading Request");
  415. const cd::ThreadState snfXCITCPChannel::XCI_Process("Processing Job");
  416. const cd::ThreadState snfXCITCPChannel::XCI_Write("Writing Results");
  417. const cd::ThreadState snfXCITCPChannel::XCI_Close("Closing Connection");
  418. const cd::ThreadState snfXCITCPChannel::XCI_Clear("Clearing Workspace");
  419. const cd::ThreadState snfXCITCPChannel::XCI_Shutdown("Shutting Down");
  420. snfXCITCPChannel::snfXCITCPChannel(snf_RulebaseHandler* H, std::string N) : // Create these with a home rulebase.
  421. Thread(snfXCITCPChannel::Type, N), // XCI TCP Channel Type & name.
  422. myHome(H), // We know our home.
  423. Processor(H), // Our processor has a rulebase.
  424. LatestSize(0) { // Our job queue size is zero.
  425. run(); // We start our thread.
  426. }
  427. snfXCITCPChannel::~snfXCITCPChannel() { // Destroy them very carefully.
  428. ChannelJob EndJob; // On the way down feed ourselves
  429. give(EndJob); // an empty job - that will end our
  430. join(); // thread once other jobs are done.
  431. myHome = 0; // Once joined our home is gone.
  432. } // We're done.
  433. int snfXCITCPChannel::Size() { // Keep track of how full they are.
  434. return LatestSize; // Flash the blinking light.
  435. }
  436. void snfXCITCPChannel::submit(cd::TCPClient* C) { // This is how we submit jobs.
  437. ChannelJob J(C); // Create a Job for this client.
  438. give(J); // Give it (copy) to the queue.
  439. }
  440. // snfXCImgr encapsulates a service engine that takes XCI requests via TCP,
  441. // performs the required actions, and returns an XCI response. It also checks
  442. // to see if the configuration for the XCI interface has changed.
  443. void snfXCImgr::checkCFG() { // Checks the configuration.
  444. CurrentThreadState(XCI_CheckConfig); // Update our status.
  445. int NEW_XCI_Port; // Prepare for a change in port.
  446. // Quickly as we can, grab a config packet, capture the XCI parts, and
  447. // then let it go.
  448. if(myHome->isReady()) { // If we know our home then
  449. snfCFGPacket MyCFGPacket(myHome); // Grab a configuration packet.
  450. if(MyCFGPacket.bad()) { // If it's not valid then
  451. return; // wait (skip this) till next time.
  452. } else { // If we've got a good config then
  453. CFG_XCI_ON = MyCFGPacket.Config()->XCI_OnOff; // Is XCI turned on?
  454. NEW_XCI_Port = MyCFGPacket.Config()->XCI_Port; // What port we listen to?
  455. } // If our rulebase manager was
  456. } else return; // not ready (skip this) for now.
  457. if(CFG_XCI_ON) { // If the XCI is configured up:
  458. if(NEW_XCI_Port != CFG_XCI_PORT) { // Check for a port change. If the
  459. CFG_XCI_PORT = NEW_XCI_Port; // port changed then check for a live
  460. if(Listener) { // listener. For a live port change
  461. shutdown_Listener(); // shut down the current listener and
  462. myHome->logThisInfo("XCI", 0, "ListenerDown:PortChanged"); // log the activity.
  463. startup_Listener(); // Restart the listener with the new
  464. myHome->logThisInfo("XCI", 0, "ListenerUp:PortChanged"); // port and log the event.
  465. }
  466. }
  467. startup_XCI(); // Make sure the XCI is up.
  468. } else { // If the XCI is configured down
  469. shutdown_XCI(); // then make sure it is down.
  470. }
  471. }
  472. snfXCITCPChannel* LowestQueue(snfXCITCPChannel* A, snfXCITCPChannel* B) { // Pick the lowest queue of two.
  473. return ((A->Size() < B->Size()) ? A : B); // Pick one and return it.
  474. }
  475. snfXCITCPChannel* snfXCImgr::BestAvailableChannel() { // Selects XCI channel w/ lowest queue.
  476. return LowestQueue( // Pick the lowest of the lowest.
  477. LowestQueue(
  478. LowestQueue(C0, C1),
  479. LowestQueue(C2, C3)
  480. ),
  481. LowestQueue(
  482. LowestQueue(C4, C5),
  483. LowestQueue(C6, C7)
  484. )
  485. );
  486. }
  487. void snfXCImgr::startup_Listener() { // Listener startup function.
  488. if(0 == Listener) { // If we need a new listener:
  489. Listener = new cd::TCPListener(CFG_XCI_PORT); // Create a new listener.
  490. Listener->MaxPending = MaxTCPQueueLength; // We may get a lot of hits ;-)
  491. Listener->open(); // Open it for business.
  492. Listener->makeNonBlocking(); // Make it non-blocking.
  493. }
  494. }
  495. void snfXCImgr::shutdown_Listener() { // Listener shutdown function.
  496. if(Listener) { // Only act if there is a listener:
  497. Listener->close(); // The listener gets closed,
  498. delete Listener; // then deleted, then the
  499. Listener = 0; // Listener pointer is zeroed.
  500. }
  501. }
  502. void snfXCImgr::startup_XCI() { // XCI startup function.
  503. if(true == XCI_UP) return; // If we're already up we're done.
  504. cd::ScopeMutex IGotIt(ChannelMutex); // Serialize state control for safety.
  505. if(myHome) { // We need to know our home.
  506. if(CFG_XCI_ON) { // If XCI is configured on, startup!
  507. C0 = new snfXCITCPChannel(myHome, "C0"); // Launch our 8 processing channels.
  508. C1 = new snfXCITCPChannel(myHome, "C1");
  509. C2 = new snfXCITCPChannel(myHome, "C2");
  510. C3 = new snfXCITCPChannel(myHome, "C3");
  511. C4 = new snfXCITCPChannel(myHome, "C4");
  512. C5 = new snfXCITCPChannel(myHome, "C5");
  513. C6 = new snfXCITCPChannel(myHome, "C6");
  514. C7 = new snfXCITCPChannel(myHome, "C7");
  515. startup_Listener(); // Start up our listener.
  516. myHome->logThisInfo("XCI", 0, "Startup"); // Log the startup.
  517. XCI_UP = true; // Set the flag. We're up!
  518. }
  519. }
  520. }
  521. void snfXCImgr::shutdown_XCI() { // XCI shutdown function.
  522. if(false == XCI_UP) return; // If we're already down we're done.
  523. cd::ScopeMutex IGotIt(ChannelMutex); // Serialize state control for safety.
  524. shutdown_Listener(); // If up, take down & 0 the Listener.
  525. if(C0) { delete C0; C0 = 0; } // If up, take C0 down and NULL it.
  526. if(C1) { delete C1; C1 = 0; } // If up, take C1 down and NULL it.
  527. if(C2) { delete C2; C2 = 0; } // If up, take C2 down and NULL it.
  528. if(C3) { delete C3; C3 = 0; } // If up, take C3 down and NULL it.
  529. if(C4) { delete C4; C4 = 0; } // If up, take C4 down and NULL it.
  530. if(C5) { delete C5; C5 = 0; } // If up, take C5 down and NULL it.
  531. if(C6) { delete C6; C6 = 0; } // If up, take C6 down and NULL it.
  532. if(C7) { delete C7; C7 = 0; } // If up, take C7 down and NULL it.
  533. myHome->logThisInfo("XCI", 0, "Shutdown"); // Log the shutdown.
  534. XCI_UP = false; // Set the flag. We're down!
  535. }
  536. int snfXCImgr::pollLoopCount() { // Retrieve & reset Loop Count.
  537. int x = diagLoopCount;
  538. diagLoopCount = 0;
  539. return x;
  540. }
  541. int snfXCImgr::pollClientCount() { // Retrieve & reset Client Count.
  542. int x = diagClientCount;
  543. diagClientCount = 0;
  544. return x;
  545. }
  546. const cd::ThreadState snfXCImgr::XCI_InitialConfig("Initial Config"); // Getting initial configuration.
  547. const cd::ThreadState snfXCImgr::XCI_InitialStartup("Initial Startup"); // Performing first startup.
  548. const cd::ThreadState snfXCImgr::XCI_CheckConfig("Checking Config"); // Checking configuration.
  549. const cd::ThreadState snfXCImgr::XCI_PollingListener("Polling Listener"); // Polling Listener for jobs.
  550. const cd::ThreadState snfXCImgr::XCI_SubmittingJob("Submitting Job"); // Submitting a new job.
  551. const cd::ThreadState snfXCImgr::XCI_ListenerDown("Listener Down!"); // Listener is down.
  552. const cd::ThreadState snfXCImgr::XCI_Stopping("Exited Polling Loop"); // XCImgr Exiting Big Loop
  553. void snfXCImgr::myTask() { // Main thread task.
  554. cd::PollTimer PollingThrottle(RWPollMin, RWPollMax); // Set up a dynamic delay.
  555. cd::Timeout WaitForCFG(1000); // CFG Check every second or so.
  556. // Wait for our initial configuration.
  557. CurrentThreadState(XCI_InitialConfig); // Update our status.
  558. cd::Sleeper WaitATic(1000); // One second sleeper.
  559. while(false == CFG_XCI_ON) { // Before we've been turned on
  560. if(TimeToStop) return; // loop unless it's time to stop.
  561. checkCFG(); WaitForCFG.restart(); // Check our configuration
  562. WaitATic(); // every second or so.
  563. }
  564. // Once our configuration is good and we are turned on we get here.
  565. try { // Safely accept/process requests.
  566. CurrentThreadState(XCI_InitialStartup); // Update our status.
  567. startup_XCI(); // We're on, so turn on!
  568. while(false == TimeToStop) { // While it is not time to stop:
  569. // Occasionally we check to see what our configuration says. If
  570. // the XCI is configured up, or down, or if the port changes then
  571. // the checkCFG() function handles the changes. After that all we
  572. // need to do here is check for a listener -- if we're up we will
  573. // have one and if not then we won't. Without a listener we will
  574. // slow down and keep checking for a configuration change.
  575. if(WaitForCFG.isExpired()) { checkCFG(); WaitForCFG.restart(); } // Check the CFG periodically.
  576. // Get a new client if we have room in the queue
  577. // and the listener is live.
  578. int JobsThisRound = 0; // Keep track of each batch.
  579. if(Listener) { // Check for a good listener.
  580. CurrentThreadState(XCI_PollingListener); // Update our status.
  581. cd::TCPClient* NewClient; // This will be our client.
  582. do { // Fast as we can - grab the work:
  583. ++diagLoopCount; // Count Polling Loops.
  584. NewClient = 0; // Clear our client pointer.
  585. snfXCITCPChannel* Channel = BestAvailableChannel(); // Pick a channel to use then
  586. if(MaxQueueLength > Channel->Size()) { // If we have room in the queue
  587. NewClient = Listener->acceptClient(); // get a new client.
  588. if(NewClient) { // If we got one:
  589. CurrentThreadState(XCI_SubmittingJob); // Update our status.
  590. ++diagClientCount; // Count Clients.
  591. NewClient->makeNonBlocking(); // Make the client non-blocking.
  592. Channel->submit(NewClient); // Submit the new client.
  593. }
  594. }
  595. } while( // Keep getting work in this tight
  596. (0 != NewClient)&& // loop until we run out of work
  597. (MaxTCPQueueLength > diagClientCount) // or we've pulled a full queue.
  598. );
  599. } else {
  600. CurrentThreadState(XCI_ListenerDown); // Update our status.
  601. } // Throttle our loop to keep it real:
  602. if(0 == JobsThisRound) PollingThrottle.pause(); // If we got nothing then slow down.
  603. else PollingThrottle.reset(); // If we got some, keep getting it!
  604. } // When we're done with the big loop:
  605. CurrentThreadState(XCI_Stopping); // Update our status.
  606. shutdown_XCI(); // Shutdown if we're not already.
  607. } // End of the active section.
  608. catch(const std::exception& e) { // If we get a knowable exception
  609. myHome->logThisError("XCI", -9, e.what()); // then we report it in detail,
  610. try { shutdown_XCI(); } catch(...) {} // shutdown if we're not already,
  611. WaitATic(); // wait a tic and try again.
  612. }
  613. catch(...) { // If we have an unhandled exception
  614. myHome->logThisError("XCI", -10, "Panic!"); // Panic and reset. Notify the log.
  615. try { shutdown_XCI(); } catch(...) {} // Shutdown if we're not already.
  616. WaitATic(); // Pause to let things settle.
  617. } // Let's try this again.
  618. }
  619. const cd::ThreadType snfXCImgr::Type("snfXCIManager"); // The thread's type.
  620. const int XCI_Default_Port = 9001; // Listener Default port = 9001.
  621. snfXCImgr::snfXCImgr() : // Construct with no home.
  622. cd::Thread(snfXCImgr::Type, "XCI Manager"), // XCI Manager type and Name.
  623. CFG_XCI_ON(false), // Everything starts off,
  624. CFG_XCI_PORT(XCI_Default_Port), // default, and
  625. myHome(0), // nulled.
  626. C0(0), C1(0), C2(0), C3(0),
  627. Listener(0),
  628. XCI_UP(false),
  629. diagLoopCount(0), diagClientCount(0),
  630. TimeToStop(true) { // We don't run until linkHome().
  631. }
  632. snfXCImgr::~snfXCImgr() { // Stop when we are destroyed.
  633. stop(); // Like I said, stop().
  634. }
  635. void snfXCImgr::linkHome(snf_RulebaseHandler* Home) { // Link to Home and set up shop.
  636. if(0 != Home && 0 == myHome) { // If we are getting our home
  637. myHome = Home; // then capture it,
  638. myHome->use(); // Update it's use count.
  639. TimeToStop = false; // clear the time to stop bit,
  640. run(); // run our thread.
  641. }
  642. }
  643. int snfXCImgr::TotalQueue() { // Return the total work queue size.
  644. cd::ScopeMutex IGotIt(ChannelMutex); // Serialize state control for safety.
  645. return (
  646. ((0 == C0) ? 0 : C0->Size()) +
  647. ((0 == C1) ? 0 : C1->Size()) +
  648. ((0 == C2) ? 0 : C2->Size()) +
  649. ((0 == C3) ? 0 : C3->Size())
  650. );
  651. }
  652. void snfXCImgr::stop() { // Called to shut down.
  653. if(false == TimeToStop) { // If we are not stopped then
  654. TimeToStop = true; // it is time to stop.
  655. join(); // Wait for our main thread first,
  656. shutdown_XCI(); // then shut down the XCI.
  657. myHome->unuse(); // Let go of the rulebase manager.
  658. myHome = 0; // Null it out for safety.
  659. }
  660. }