Nelze vybrat více než 25 témat Téma musí začínat písmenem nebo číslem, může obsahovat pomlčky („-“) a může být dlouhé až 35 znaků.

JobPool.cpp 22KB


  1. // SNF4CGP/JobPool.cpp
  2. // Copyright (C) 2009 ARM Research Labs, LLC.
  3. // See www.armresearch.com for more information.
  4. #include "JobPool.hpp"
  5. #include "Command.hpp"
  6. #include "ScannerPool.hpp"
  7. #include "OutputProcessor.hpp"
  8. #include "../SNFMulti/SNFMulti.hpp"
  9. #include "../CodeDweller/threading.hpp"
  10. #include "../CodeDweller/faults.hpp"
  11. #include <iostream>
  12. #include <string>
  13. #include <vector>
  14. using namespace std;
  15. //// Tuning Constants //////////////////////////////////////////////////////////
  16. const int ReadBufferSize = snf_ScanHorizon;
  17. const int StringReserveSize = 2048;
  18. //// Job ///////////////////////////////////////////////////////////////////////
  19. void Job::emitComment(const string& Comment) {
  20. ostringstream O;
  21. O << "* " << Comment << endl;
  22. OutputBuffer.append(O.str());
  23. }
  24. void Job::generateCommandCommentPrefix(ostringstream& O) {
  25. O << "* SNF4CGP[" << CurrentCommand.Number << "] ";
  26. }
  27. void Job::emitOK() {
  28. ostringstream O;
  29. O << CurrentCommand.Number << " OK" << endl;
  30. OutputBuffer.append(O.str());
  31. }
  32. const string INTFVersion = "1";
  33. void Job::emitINTF() {
  34. ostringstream O;
  35. O << CurrentCommand.Number << " INTF " << INTFVersion << endl;
  36. OutputBuffer.append(O.str());
  37. }
  38. void Job::emitFAILURE() {
  39. ostringstream O;
  40. O << CurrentCommand.Number << " FAILURE" << endl;
  41. OutputBuffer.append(O.str());
  42. }
  43. string formatAsCGPString(const string& S) {
  44. string CGPString;
  45. CGPString.push_back('"');
  46. for(size_t i = 0; i < S.length(); i++) {
  47. char C = S[i];
  48. switch(C) {
  49. case '\r': break;
  50. case '\t': { CGPString.append("\\t"); break; }
  51. case '\n': { CGPString.append("\\e"); break; }
  52. case '\\': { CGPString.append("\\"); break; }
  53. case '"': { CGPString.append("\\\""); break; }
  54. default: CGPString.push_back(C);
  55. }
  56. }
  57. CGPString.push_back('"');
  58. return CGPString;
  59. }
  60. void Job::emitADDHEADER(const string& Headers) {
  61. ostringstream O;
  62. O << CurrentCommand.Number << " ADDHEADER " << formatAsCGPString(Headers) << endl;
  63. OutputBuffer.append(O.str());
  64. }
  65. void Job::emitERROR(const string& Report) {
  66. ostringstream O;
  67. O << CurrentCommand.Number << " ERROR " << formatAsCGPString(Report) << endl;
  68. OutputBuffer.append(O.str());
  69. }
  70. void Job::emitDISCARD() {
  71. ostringstream O;
  72. O << CurrentCommand.Number << " DISCARD" << endl;
  73. OutputBuffer.append(O.str());
  74. }
  75. void Job::emitREJECTED(const string& Report) {
  76. ostringstream O;
  77. O << CurrentCommand.Number << " REJECTED " << formatAsCGPString(Report) << endl;
  78. OutputBuffer.append(O.str());
  79. }
  80. void Job::finalize() { // Cleanup and report this job.
  81. closeWriter(); // In case of exception let go of our
  82. closeReader(); // Reader and Writer now. The rest
  83. Output.outputJob(*this); // can wait for the output processor.
  84. }
  85. void Job::doWakeUp() {
  86. emitComment("SNF4CGP Waking Up");
  87. emitComment(CurrentCommand.Data);
  88. }
  89. void Job::doINTF() {
  90. emitINTF();
  91. }
  92. void Job::doUNKNOWN() {
  93. ostringstream O;
  94. O << "* SNF4CGP[" << CurrentCommand.Number << "] Does not understand: "
  95. << formatAsCGPString(CurrentCommand.Data) << endl;
  96. OutputBuffer.append(O.str());
  97. emitFAILURE();
  98. }
  99. void Job::doFILE() {
  100. doInitialRead();
  101. doScan();
  102. doAction();
  103. closeReader();
  104. }
  105. RuntimeCheck CheckMessageReaderIsValid("Job::Reader() Check(0 != Reader_)");
  106. ifstream& Job::Reader() { // Safe access to Reader_.
  107. CheckMessageReaderIsValid(0 != Reader_);
  108. return (*Reader_);
  109. }
  110. LogicFault FaultMessageReaderOpenedTwice("Job::openReader() Fault(0 != Reader_)");
  111. RuntimeCheck CheckopenReaderWasSuccessful("Job::openReader() Check(Reader().good())");
  112. void Job::openReader(string Path) {
  113. FaultMessageReaderOpenedTwice(0 != Reader_);
  114. Reader_ = new ifstream(Path.c_str(), ios::binary);
  115. CheckopenReaderWasSuccessful(Reader().good());
  116. }
  117. void Job::closeReader() {
  118. if(Reader_) {
  119. Reader().close();
  120. delete Reader_;
  121. Reader_ = 0;
  122. }
  123. }
  124. RuntimeCheck CheckJobWriterIsValid("Job::Writer() Check(0 != Writer_)");
  125. ofstream& Job::Writer() { // Safe access to Writer_.
  126. CheckJobWriterIsValid(0 != Writer_);
  127. return (*Writer_);
  128. }
  129. LogicFault FaultMessageWriterOpenedTwice("Job::openWriter() Fault(0 != Writer_)");
  130. RuntimeCheck CheckopenWriterWasSuccessful("Job::openWriter() Check(Writer().good())");
  131. void Job::openWriter(string Path) {
  132. FaultMessageWriterOpenedTwice(0 != Writer_);
  133. Writer_ = new ofstream(Path.c_str(), ios::trunc | ios::binary);
  134. CheckopenWriterWasSuccessful(Writer().good());
  135. }
  136. void Job::closeWriter() {
  137. if(Writer_) {
  138. Writer().close();
  139. delete Writer_;
  140. Writer_ = 0;
  141. }
  142. }
  143. void Job::doInitialRead() {
  144. openReader(Job::CurrentCommand.Data);
  145. ReadBuffer.assign(ReadBufferSize, 0);
  146. Reader().read(reinterpret_cast<char*>(&ReadBuffer[0]), ReadBuffer.size());
  147. }
  148. string Job::ScanName() { // Scan name from FILE command
  149. ostringstream ScanNameFormatter;
  150. ScanNameFormatter << "[" << CurrentCommand.Number << "]" << CurrentCommand.Data;
  151. return ScanNameFormatter.str();
  152. }
  153. const int NoScanYet = -1;
  154. LogicFault FaultIfSettingScanResultConfigBeforeScan("Job::setConfigurationFromScanResult() Fault(NoScanYet == ScanResultCode)");
  155. void Job::setConfigurationFromScanResult(ScopeScanner& myScanner) {
  156. FaultIfSettingScanResultConfigBeforeScan(NoScanYet == ScanResultCode);
  157. ScanResultConfiguration = Scanners.ConfigurationForResultCode(ScanResultCode);
  158. /*//// Testing -- forcing scan results
  159. ScanResultConfiguration.Action = ResultConfiguration::Hold;
  160. ScanResultConfiguration.HoldPath = "C:\\M\\Projects\\MessageSniffer\\SNF4CGP_Work\\TestEnvironment\\hold\\";
  161. ScanResultConfiguration.RejectionReason = "I don't like the look of it -- so there!";
  162. ScanResultConfiguration.LogComment = "This is my happy little log comment.";
  163. ScanResultConfiguration.EmitXMLLog = true;
  164. ScanResultConfiguration.EmitClassicLog = false;
  165. ScanResultConfiguration.InjectHeaders = true;
  166. *///////////////////////////////////////////////////////////////////////
  167. if(ScanResultConfiguration.InjectHeaders) HeadersToInject = myScanner.Engine().getXHDRs();
  168. if(ScanResultConfiguration.EmitXMLLog) XMLLogData = myScanner.Engine().getXMLLog();
  169. if(ScanResultConfiguration.EmitClassicLog) ClassicLogData = myScanner.Engine().getClassicLog();
  170. }
  171. void Job::doScan() {
  172. ScopeScanner myScanner(Scanners);
  173. ScanResultCode =
  174. myScanner.Engine().scanMessage(
  175. &ReadBuffer[0], Reader().gcount(), ScanName(), JobTimer.getElapsedTime()
  176. );
  177. setConfigurationFromScanResult(myScanner);
  178. }
  179. void convertLogLinesToComments(ostringstream& O, string& Lines) {
  180. istringstream I(Lines);
  181. while(I) {
  182. string LogLine;
  183. getline(I, LogLine);
  184. if(0 < LogLine.length()) {
  185. O << "* " << LogLine << endl;
  186. }
  187. }
  188. }
  189. void Job::emitLogComment() {
  190. ostringstream O;
  191. generateCommandCommentPrefix(O);
  192. O << "Comment: " << ScanResultConfiguration.LogComment << endl;
  193. OutputBuffer.append(O.str());
  194. }
  195. void Job::emitXMLLogData() {
  196. ostringstream O;
  197. generateCommandCommentPrefix(O);
  198. O << "XML Scan log data: " << endl;
  199. convertLogLinesToComments(O, XMLLogData);
  200. OutputBuffer.append(O.str());
  201. }
  202. void Job::emitClassicLogData() {
  203. ostringstream O;
  204. generateCommandCommentPrefix(O);
  205. O << "Classic Scan log data: " << endl;
  206. convertLogLinesToComments(O, ClassicLogData);
  207. OutputBuffer.append(O.str());
  208. }
  209. LogicFault FaultUhandledScanResultAction("Job::doAction() switch(ScanResultConfiguration.Action) fell to default!");
  210. void Job::doAction() {
  211. if(0 < ScanResultConfiguration.LogComment.length()) emitLogComment();
  212. if(ScanResultConfiguration.EmitXMLLog) emitXMLLogData();
  213. if(ScanResultConfiguration.EmitClassicLog) emitClassicLogData();
  214. switch(ScanResultConfiguration.Action) {
  215. case ResultConfiguration::Allow : { doAllow(); break; }
  216. case ResultConfiguration::Bypass : { doBypass(); break; }
  217. case ResultConfiguration::Delete : { doDelete(); break; }
  218. case ResultConfiguration::Hold : { doHold(); break; }
  219. case ResultConfiguration::Reject : { doReject(); break; }
  220. default: { throw FaultUhandledScanResultAction; }
  221. }
  222. }
  223. void Job::doBypass() {
  224. emitOK();
  225. }
  226. void Job::doAllow() {
  227. if(0 < HeadersToInject.length()) emitADDHEADER(HeadersToInject);
  228. else emitOK();
  229. }
  230. void Job::doReject() {
  231. emitREJECTED(ScanResultConfiguration.RejectionReason);
  232. }
  233. void Job::doDelete() {
  234. emitDISCARD();
  235. }
  236. void Job::emitMessageMovedTo(string& Destination) {
  237. ostringstream O;
  238. generateCommandCommentPrefix(O);
  239. O << "Moved message to " << Destination << endl;
  240. OutputBuffer.append(O.str());
  241. }
  242. void Job::identifyLocalLineEnd() { // Identify the line endings in use:
  243. if(0 < LocalLineEnd.length()) return; // Do this only once for any system.
  244. for(int i = 0; i < Reader().gcount(); i++) { // Look for the first line end.
  245. if('\r' == ReadBuffer[i] && '\n' == ReadBuffer[i+1]) { // It may be \r\n.
  246. LocalLineEnd = "\r\n"; return;
  247. } else
  248. if('\n' == ReadBuffer[i] && '\n' == ReadBuffer[i+1]) { // It may be \n. (find block end \n\n)
  249. LocalLineEnd = "\n"; return;
  250. }
  251. }
  252. }
  253. RuntimeCheck CheckLineEndsIdentified("Job::findHeaderInsertPoint() Check(0 < LocalLineEnd.length())");
  254. RuntimeCheck CheckInsertCursorPlausable("Job::findHeaderInsertPoint() Check(isMinimumSafeDistanceFromEOF(InsertCursor))");
  255. bool Job::isMinimumSafeDistanceFromEOF(const size_t Position) {
  256. string MinimalEmptyMessageBody = LocalLineEnd + "." + LocalLineEnd;
  257. size_t MinimalDistanceToEOF = MinimalEmptyMessageBody.length();
  258. size_t SafetyCalculation = Position + MinimalDistanceToEOF;
  259. size_t AvailableMessageLength = Reader().gcount();
  260. return (SafetyCalculation <= AvailableMessageLength);
  261. }
  262. void Job::moveInsertCursorForwardToBlockTerminus(size_t& InsertCursor, string& BlockTerminus) {
  263. size_t SafetyLimit = (Reader().gcount() - BlockTerminus.length());
  264. for(;InsertCursor < SafetyLimit; InsertCursor++) { // Scan forward safely until we find it.
  265. if( // At each position compare the BlockTerminus
  266. 0 == BlockTerminus.compare( // with an equal length string in the buffer.
  267. 0, BlockTerminus.length(),
  268. reinterpret_cast<char*>(&ReadBuffer[InsertCursor]), BlockTerminus.length())
  269. ) return;
  270. }
  271. }
  272. size_t Job::findHeaderInsertPoint() { // How to find the insert point:
  273. identifyLocalLineEnd();
  274. CheckLineEndsIdentified(0 < LocalLineEnd.length());
  275. string BlockTerminus = LocalLineEnd + LocalLineEnd;
  276. size_t InsertCursor = 0;
  277. moveInsertCursorForwardToBlockTerminus(InsertCursor, BlockTerminus); // Find the end of the CGP control block.
  278. InsertCursor += BlockTerminus.length(); // Move past this block terminus.
  279. moveInsertCursorForwardToBlockTerminus(InsertCursor, BlockTerminus); // Find the end of the message headers.
  280. InsertCursor += LocalLineEnd.length(); // Split the headers BlockTerminus.
  281. CheckInsertCursorPlausable(isMinimumSafeDistanceFromEOF(InsertCursor));
  282. return InsertCursor;
  283. }
  284. RuntimeCheck CheckFirstSegmentWriterOk("Job::writeUpToInjectionPoint() Check(Writer().good())");
  285. void Job::writeUpToInjectionPoint(size_t InjectionPoint) {
  286. Writer().write(reinterpret_cast<char*>(&ReadBuffer[0]), InjectionPoint);
  287. CheckFirstSegmentWriterOk(Writer().good());
  288. }
  289. RuntimeCheck CheckHeaderWriterOk("Job::writeHeaders() Check(Writer().good())");
  290. void Job::writeHeaders() { // Write (inject) headers while converting
  291. for(size_t i = 0; i < HeadersToInject.length(); i++) { // line ends to the format observed in the
  292. switch(HeadersToInject.at(i)) { // message file.
  293. case '\r' : { break; } // Convert by ignoring <CR> and
  294. case '\n' : { Writer() << LocalLineEnd; break; } // triggering the LocalLineEnd on <LF>
  295. default : { Writer().put(HeadersToInject.at(i)); break; } // All other bytes as seen in the message.
  296. }
  297. }
  298. CheckHeaderWriterOk(Writer().good());
  299. }
  300. RuntimeCheck CheckEndFirstBufferWriterOk("Job::writeRestOfFirstBuffer() Check(Writer().good())");
  301. void Job::writeRestOfFirstBuffer(size_t InjectionPoint) {
  302. int LengthOfRestOfBuffer = Reader().gcount() - (InjectionPoint + 1);
  303. Writer().write(reinterpret_cast<char*>(&ReadBuffer[InjectionPoint]), LengthOfRestOfBuffer);
  304. CheckEndFirstBufferWriterOk(Writer().good());
  305. }
  306. void Job::copyFirstBufferAndInjectHeadersInMessageMove() {
  307. size_t HeaderInjectionPoint = findHeaderInsertPoint();
  308. writeUpToInjectionPoint(HeaderInjectionPoint);
  309. writeHeaders();
  310. writeRestOfFirstBuffer(HeaderInjectionPoint);
  311. }
  312. RuntimeCheck CheckFirstBufferWriterOk("Job::copyFirstBufferInMessageMove() Check(Writer().good())");
  313. void Job::copyFirstBufferInMessageMove() {
  314. Writer().write(reinterpret_cast<char*>(&ReadBuffer[0]), Reader().gcount());
  315. CheckFirstBufferWriterOk(Writer().good());
  316. }
  317. RuntimeFault FaultLongCopyReaderBad("Job::readLongCopyBlock() Fault(Reader().bad())");
  318. void Job::readLongCopyBlock() {
  319. Reader().read(reinterpret_cast<char*>(&ReadBuffer[0]), ReadBufferSize);
  320. FaultLongCopyReaderBad(Reader().bad());
  321. }
  322. RuntimeFault FaultLongCopyWriterBad("Job::writeLongCopyBlock() Fault(Writer().bad())");
  323. void Job::writeLongCopyBlock() {
  324. if(Reader().gcount()) {
  325. Writer().write(reinterpret_cast<char*>(&ReadBuffer[0]), Reader().gcount());
  326. FaultLongCopyWriterBad(Writer().bad());
  327. }
  328. }
  329. void Job::copyRestOfLongMessage() {
  330. while(Reader().good()) {
  331. readLongCopyBlock();
  332. writeLongCopyBlock();
  333. }
  334. }
  335. void Job::moveMessageToHoldPath(string& Destination) {
  336. openWriter(Destination);
  337. if(ScanResultConfiguration.InjectHeaders) {
  338. copyFirstBufferAndInjectHeadersInMessageMove();
  339. } else {
  340. copyFirstBufferInMessageMove();
  341. }
  342. copyRestOfLongMessage();
  343. closeWriter();
  344. }
  345. const string PathSeparators = "\\/";
  346. string FileNamePart(string Path) { // How to get the file name part of a path:
  347. string::size_type Found = Path.find_last_of(PathSeparators);
  348. if(string::npos != Found) Path = Path.substr(Found + 1); // Return all after the last separator
  349. return Path; // or Path if no separators are found.
  350. }
  351. string Job::calculateDestinationFileName() { // How to get the message move destination:
  352. string Source = CurrentCommand.Data;
  353. string Destination = ScanResultConfiguration.HoldPath;
  354. Destination.append(FileNamePart(Source));
  355. return Destination;
  356. }
  357. void Job::doHold() {
  358. string Destination = calculateDestinationFileName();
  359. moveMessageToHoldPath(Destination);
  360. emitMessageMovedTo(Destination);
  361. emitDISCARD();
  362. }
  363. Job::Job(ScannerPool& S, OutputProcessor& O) : // Construct with important links.
  364. Scanners(S),
  365. Output(O),
  366. ScanResultCode(NoScanYet),
  367. Reader_(0),
  368. Writer_(0) { // Minimize heap thrashing.
  369. OutputBuffer.reserve(StringReserveSize);
  370. HeadersToInject.reserve(StringReserveSize);
  371. XMLLogData.reserve(StringReserveSize);
  372. ClassicLogData.reserve(StringReserveSize);
  373. ReadBuffer.reserve(ReadBufferSize);
  374. }
  375. Job::~Job() { // Cleanup when destructing.
  376. try { clear(); }
  377. catch(...) {}
  378. }
  379. void Job::clear() { // Cleanup for the next command.
  380. CurrentCommand.clear();
  381. ScanResultCode = NoScanYet;
  382. OutputBuffer.clear();
  383. HeadersToInject.clear();
  384. XMLLogData.clear();
  385. ClassicLogData.clear();
  386. ReadBuffer.clear();
  387. closeReader();
  388. closeWriter();
  389. JobTimer.clear();
  390. }
  391. LogicFault FaultIfQuitGetsHere("Job::setCommand() Fault(Command::QUIT == C.Type)");
  392. void Job::setCommand(Command& C) { // Assign a command for this job.
  393. JobTimer.start();
  394. FaultIfQuitGetsHere(Command::QUIT == C.Type);
  395. CurrentCommand = C;
  396. }
  397. void Job::executeCommand() {
  398. switch(CurrentCommand.Type) {
  399. case Command::WAKE: { doWakeUp(); break; }
  400. case Command::INTF: { doINTF(); break; }
  401. case Command::FILE: { doFILE(); break; }
  402. default: { doUNKNOWN(); break; }
  403. }
  404. }
  405. void Job::emitException(const string& What) {
  406. ostringstream O;
  407. generateCommandCommentPrefix(O);
  408. O << "Exception: " << What << endl;
  409. OutputBuffer.append(O.str());
  410. }
  411. void Job::emitUnknownException() {
  412. ostringstream O;
  413. generateCommandCommentPrefix(O);
  414. O << "Unknown Exception!" << endl;
  415. OutputBuffer.append(O.str());
  416. }
  417. void Job::doIt() { // Get the job done.
  418. try { // emitFAILURE() on all exceptions.
  419. try { executeCommand(); } // executeCommand() and report exceptions.
  420. catch(exception& e) { emitException(e.what()); throw; }
  421. catch(...) { emitUnknownException(); throw; }
  422. }
  423. catch(...) { emitFAILURE(); }
  424. finalize();
  425. }
  426. string& Job::Responses() { // Read the job's report.
  427. return OutputBuffer;
  428. }
  429. //// JobPool ///////////////////////////////////////////////////////////////////
  430. JobPool::JobPool() : // Simple constructor.
  431. Output_(0),
  432. Scanners_(new ScannerPool()),
  433. AllocatedJobs(0),
  434. Started(false) {
  435. }
  436. JobPool::~JobPool() { // Clean up on the way out.
  437. try {
  438. stop();
  439. Output_ = 0;
  440. if(Scanners_) delete Scanners_;
  441. Scanners_ = 0;
  442. }
  443. catch(...) {}
  444. }
  445. RuntimeCheck CheckForValidOutputPool("JobPool::Output() Check(0 != Output_)");
  446. OutputProcessor& JobPool::Output() {
  447. CheckForValidOutputPool(0 != Output_);
  448. return (*Output_);
  449. }
  450. RuntimeCheck CheckForValidScannerPool("JobPool::Scanners() Check(0 != Scanners_)");
  451. ScannerPool& JobPool::Scanners() {
  452. CheckForValidScannerPool(0 != Scanners_);
  453. return (*Scanners_);
  454. }
  455. // Watch out -- Initializing the JobPool also means starting up SNFMulti and
  456. // the ScannerPool.
  457. void JobPool::init(
  458. string VersionInfo,
  459. string Configuration,
  460. OutputProcessor& O) {
  461. Output_ = &O;
  462. Scanners().init(VersionInfo, Configuration);
  463. ScopeMutex Busy(AllocationMutex);
  464. Job* FirstJob = makeJob();
  465. Jobs.give(FirstJob);
  466. Started = true;
  467. }
  468. Job* JobPool::makeJob() { // Create and count a Job.
  469. Job* NewJob = new Job(Scanners(), Output());
  470. ++AllocatedJobs;
  471. return NewJob;
  472. }
  473. LogicCheck CheckGrabJobsOnlyWhenStarted("JobPool::grab() Check(Started)");
  474. RuntimeCheck CheckForValidGrabbedJob("JobPool::grab() Check(0 != GrabbedJob)");
  475. Job& JobPool::grab() { // Grab a job (prefer from the pool).
  476. ScopeMutex Busy(AllocationMutex);
  477. CheckGrabJobsOnlyWhenStarted(Started);
  478. Job* GrabbedJob = 0;
  479. if(0 < Jobs.size()) GrabbedJob = Jobs.take(); // Prefer jobs from the pool.
  480. else GrabbedJob = makeJob(); // Make new ones if needed.
  481. CheckForValidGrabbedJob(0 != GrabbedJob);
  482. return (*GrabbedJob);
  483. }
  484. void JobPool::drop(Job& J) { // Drop a job into the pool.
  485. Jobs.give(&J);
  486. }
  487. void JobPool::killJobFromPool() { // Kill and count a Job from the pool.
  488. Job* JobToKill = 0;
  489. JobToKill = Jobs.take();
  490. delete JobToKill;
  491. --AllocatedJobs;
  492. }
  493. // Watch out -- Stopping the JobPool also means shutting down SNFMulti and
  494. // the ScannerPool.
  495. void JobPool::stop() { // Shut down the JobPool.
  496. ScopeMutex Busy(AllocationMutex);
  497. while(0 < AllocatedJobs) killJobFromPool();
  498. Scanners().stop();
  499. Started = false;
  500. }