Sfoglia il codice sorgente

Implementing Child with I/O threads, and blocking I/O.


git-svn-id: https://svn.microneil.com/svn/CodeDweller/branches/adeniz_1@81 d34b734f-a00e-4b39-a726-e4eeb87269ab
adeniz_1
adeniz 9 anni fa
parent
commit
96c9d39914
2 ha cambiato i file con 1298 aggiunte e 5 eliminazioni
  1. 858
    0
      child.cpp
  2. 440
    5
      child.hpp

+ 858
- 0
child.cpp Vedi File

@@ -32,7 +32,9 @@
#endif
#include <stdexcept>
#include <utility>
#include "CodeDweller/timing.hpp"
#include "CodeDweller/child.hpp"
namespace CodeDweller {
@@ -692,4 +694,860 @@ namespace CodeDweller {
}
Child::CircularBuffer::CircularBuffer(size_t maxSize) :
buffer(maxSize),
capacity(maxSize) {
iBegin = 0;
iEnd = 0;
}
bool Child::CircularBuffer::empty() const {
return (iBegin == iEnd);
}
size_t Child::CircularBuffer::nUsed() const {
return capacity - nFree();
}
void Child::CircularBuffer::clear() {
iBegin = 0;
iEnd = 0;
}
size_t Child::CircularBuffer::nFree() const {
if (iBegin <= iEnd) {
return capacity - (iEnd - iBegin);
}
return iBegin - iEnd;
}
void Child::CircularBuffer::put(char const *ptr, size_t nBytes) {
for (size_t i = 0; i < nBytes; i++) {
buffer[iEnd] = *ptr;
ptr++;
nextIndex(iEnd);
}
}
void Child::CircularBuffer::getAndErase(std::string &buf, size_t nBytes) {
if ( (0 == nBytes) || (nBytes > nUsed()) ) {
nBytes = nUsed();
}
buf.clear();
if (buf.capacity() < nBytes) {
buf.reserve(nBytes);
}
for (size_t i = 0; i < nBytes; i++) {
buf.push_back(buffer[iBegin]);
nextIndex(iBegin);
}
}
Child::Child(std::vector<std::string> const &args,
size_t bufSize,
std::uint16_t nominalAboveMinPollTime_ms,
std::uint16_t deltaPollTime_ms) :
bufferCapacity(bufSize),
readBuffer(bufferCapacity),
writeBuffer(bufferCapacity),
nominalPollTime_ms(nominalAboveMinPollTime_ms +
CodeDweller::MinimumSleeperTime),
maximumPollTime_ms(nominalPollTime_ms + deltaPollTime_ms) {
init();
open(args);
}
Child::Child(std::string const &childpath,
size_t bufSize,
std::uint16_t nominalAboveMinPollTime_ms,
std::uint16_t deltaPollTime_ms) :
bufferCapacity(bufSize),
readBuffer(bufferCapacity),
writeBuffer(bufferCapacity),
nominalPollTime_ms(nominalAboveMinPollTime_ms +
CodeDweller::MinimumSleeperTime),
maximumPollTime_ms(nominalPollTime_ms + deltaPollTime_ms) {
init();
open(childpath);
}
Child::Child(size_t bufSize,
std::uint16_t nominalAboveMinPollTime_ms,
std::uint16_t deltaPollTime_ms) :
bufferCapacity(bufSize),
readBuffer(bufferCapacity),
writeBuffer(bufferCapacity),
nominalPollTime_ms(nominalAboveMinPollTime_ms +
CodeDweller::MinimumSleeperTime),
maximumPollTime_ms(nominalPollTime_ms + deltaPollTime_ms) {
init();
}
Child::~Child() {
try {
close();
} catch (...) {
;
}
}
void Child::open(std::vector<std::string> const &args) {
cmdArgs = args;
if (isRunning()) {
throw std::logic_error("The child process was already active.");
}
init();
run();
}
void Child::open(std::string const &childpath) {
if (isRunning()) {
throw std::logic_error("The child process was already active.");
}
cmdArgs.clear();
cmdArgs.push_back(childpath);
init();
run();
}
void Child::init() {
childStarted = false;
childExited = false;
exitCodeObtainedFlag = false;
exitCode = 0;
readBuffer.clear();
nWriteBytes = 0;
nTransmitBytes = 0;
threadsAreRunning = false;
stopFlag = true;
errorText.clear();
}
bool Child::write(std::string const &data) {
if (!isRunning()) {
throw std::logic_error("No child process is running.");
}
#warning Check that this is okay before locking
if (data.size() > bufferCapacity - nWriteBytes) {
return false;
}
std::lock_guard<std::mutex> lock(writeBufferMutex);
std::copy(data.data(),
data.data() + data.size(),
&(writeBuffer[nWriteBytes]));
nWriteBytes += data.size();
return true;
}
size_t Child::writeAndShrink(std::string &data) {
if (!isRunning()) {
throw std::logic_error("No child process is running.");
}
#warning Check that this is okay before locking
size_t nFree = bufferCapacity - nWriteBytes;
if (0 == nFree) {
return 0;
}
std::lock_guard<std::mutex> lock(writeBufferMutex);
size_t nBytesToCopy = data.size();
if (nBytesToCopy > nFree) {
nBytesToCopy = nFree;
}
std::copy(data.data(),
data.data() + nBytesToCopy,
&(writeBuffer[nWriteBytes]));
nWriteBytes += nBytesToCopy;
data.erase(0, nBytesToCopy);
return nBytesToCopy;
}
bool Child::isFinishedWriting() const {
return ( (0 == nWriteBytes) &&
(0 == nTransmitBytes) );
}
size_t Child::read(std::string &data, size_t nBytes) {
if (!isRunning()) {
throw std::logic_error("No child process is running.");
}
data.clear();
#warning Check that this is okay before locking
if (readBuffer.empty()) {
return 0;
}
std::lock_guard<std::mutex> lock(readBufferMutex);
size_t nBytesToRead = nBytes;
if (nBytesToRead > readBuffer.nUsed()) {
nBytesToRead = readBuffer.nUsed();
}
readBuffer.getAndErase(data, nBytesToRead);
return data.size();
}
void Child::close() {
if (!childStarted) {
throw std::logic_error("Child process was not started "
"when close() was called");
}
// Stop the reader and writer threads. Note: None of the error
// conditions that cause an exception to be thrown by join()
// can ever occur.
stopFlag = true;
// Terminate the child if it's running.
if (isRunning()) {
#ifdef _WIN32
if (!TerminateProcess(childProcess, terminateExitCode)) {
#else
if (kill(childPid, SIGTERM) != 0) {
#endif
throw std::runtime_error("Error terminating the child process: " +
getErrorText());
}
}
if (threadsAreRunning) {
readerThread.join();
writerThread.join();
threadsAreRunning = false;
}
// Reset.
init();
}
bool Child::isRunning() const {
return childStarted && !childExited;
}
bool Child::errorOccurred(std::string &errorDescription) const {
errorDescription = errorText;
return !errorDescription.empty();
}
void Child::run() {
if (childStarted) {
throw std::logic_error("Child process was active when "
"run() was called");
}
if (cmdArgs.empty()) {
throw std::invalid_argument("A child executable must be specified.");
}
#ifdef _WIN32
// Set the bInheritHandle flag so pipe handles are inherited.
SECURITY_ATTRIBUTES securityAttributes;
securityAttributes.nLength = sizeof(SECURITY_ATTRIBUTES);
securityAttributes.bInheritHandle = true;
securityAttributes.lpSecurityDescriptor = NULL;
// Create a pipe for the child process's STDOUT.
HANDLE childStdOutAtChild;
HANDLE childStdOutAtParent;
HANDLE childStdInAtChild;
HANDLE childStdInAtParent;
int bufferSize = 0;
if (!CreatePipe(&childStdOutAtParent,
&childStdOutAtChild,
&securityAttributes,
bufferSize)) {
throw std::runtime_error("Error from CreatePipe for stdout: " +
getErrorText());
}
// Ensure the read handle to the pipe for STDOUT is not inherited.
int inheritFlag = 0;
if (!SetHandleInformation(childStdOutAtParent,
HANDLE_FLAG_INHERIT,
inheritFlag) ) {
throw std::runtime_error("Error from GetHandleInformation for stdout: " +
getErrorText());
}
// Create a pipe for the child process's STDIN.
if (! CreatePipe(&childStdInAtChild,
&childStdInAtParent,
&securityAttributes,
bufferSize)) {
throw std::runtime_error("Error from CreatePipe for stdin: " +
getErrorText());
}
// Ensure the write handle to the pipe for STDIN is not inherited.
if (!SetHandleInformation(childStdInAtParent,
HANDLE_FLAG_INHERIT,
inheritFlag)) {
throw std::runtime_error("Error from GetHandleInformation for stdin: " +
getErrorText());
}
// Set up members of the PROCESS_INFORMATION structure.
PROCESS_INFORMATION processInfo;
std::fill((char *) &processInfo,
((char *) &processInfo) + sizeof(PROCESS_INFORMATION),
0);
// Set up members of the STARTUPINFO structure. This structure
// specifies the STDIN and STDOUT handles for redirection.
STARTUPINFO startInfo;
std::fill((char *) &startInfo,
((char *) &startInfo) + sizeof(STARTUPINFO),
0);
startInfo.cb = sizeof(STARTUPINFO);
startInfo.hStdError = childStdOutAtChild;
startInfo.hStdOutput = childStdOutAtChild;
startInfo.hStdInput = childStdInAtChild;
startInfo.dwFlags |= STARTF_USESTDHANDLES;
// Assemble the command line.
std::string cmdline;
if (cmdArgs.size() == 1) {
cmdline = cmdArgs[0];
} else {
// Append all but last command-line arguments.
for (size_t i = 0; i < cmdArgs.size() - 1; i++) {
cmdline += cmdArgs[i] + " ";
}
cmdline += cmdArgs.back(); // Append last command-line argument.
}
// Create the child process.
bool status;
status = CreateProcess(NULL,
(char *) cmdline.c_str(),
NULL, // process security attributes
NULL, // primary thread security attributes
true, // handles are inherited
0, // creation flags
NULL, // use parent's environment
NULL, // use parent's current directory
&startInfo, // STARTUPINFO pointer
&processInfo); // receives PROCESS_INFORMATION
// If an error occurs, exit the application.
if (!status ) {
throw std::runtime_error("Error from CreateProcess with "
"command line \"" + cmdline + "\": " +
getErrorText());
}
// Provide the stream buffers with the handles for communicating
// with the child process.
inputHandle = childStdOutAtParent;
outputHandle = childStdInAtParent;
// Save the handles to the child process and its primary thread.
childProcess = processInfo.hProcess;
childThread = processInfo.hThread;
// Close the child's end of the pipes.
if (!CloseHandle(childStdOutAtChild)) {
throw std::runtime_error("Error closing the child process "
"stdout handle: " + getErrorText());
}
if (!CloseHandle(childStdInAtChild)) {
throw std::runtime_error("Error closing the child process "
"stdin handle: " + getErrorText());
}
#else
// Create the pipes for the stdin and stdout.
int childStdInPipe[2];
int childStdOutPipe[2];
if (pipe(childStdInPipe) != 0) {
throw std::runtime_error("Error creating pipe for stdin: " +
getErrorText());
}
if (pipe(childStdOutPipe) != 0) {
::close(childStdInPipe[0]);
::close(childStdInPipe[1]);
throw std::runtime_error("Error creating pipe for stdout: " +
getErrorText());
}
// Create the child process.
childPid = fork();
if (-1 == childPid) {
for (int i = 0; i < 2; i++) {
::close(childStdInPipe[i]);
::close(childStdOutPipe[i]);
}
throw std::runtime_error("Error creating child process: " +
getErrorText());
}
if (0 == childPid) {
// The child executes this. Redirect stdin.
if (dup2(childStdInPipe[0], STDIN_FILENO) == -1) {
std::string errMsg;
// Send message to parent.
errMsg = "Error redirecting stdin in the child: " + getErrorText();
::write(childStdOutPipe[1], errMsg.data(), errMsg.size());
exit(-1);
}
// Redirect stdout.
if (dup2(childStdOutPipe[1], STDOUT_FILENO) == -1) {
std::string errMsg;
// Send message to parent.
errMsg = "Error redirecting stdout in the child: " + getErrorText();
::write(childStdOutPipe[1], errMsg.data(), errMsg.size());
exit(-1);
}
// Close pipes.
if ( (::close(childStdInPipe[0]) != 0) ||
(::close(childStdInPipe[1]) != 0) ||
(::close(childStdOutPipe[0]) != 0) ||
(::close(childStdOutPipe[1]) != 0) ) {
std::string errMsg;
// Send message to parent.
errMsg = "Error closing the pipes in the child: " + getErrorText();
::write(STDOUT_FILENO, errMsg.data(), errMsg.size());
exit(-1);
}
// Prepare the arguments.
std::vector<const char *> execvArgv;
for (auto &arg : cmdArgs) {
execvArgv.push_back(arg.c_str());
}
execvArgv.push_back((char *) NULL);
// Run the child process image.
(void) execv(execvArgv[0], (char * const *) &(execvArgv[0]));
// Error from exec.
std::string errMsg;
// Send message to parent.
errMsg = "Error from exec: " + getErrorText();
::write(STDOUT_FILENO, errMsg.data(), errMsg.size());
exit(-1);
}
// Provide the stream buffers with the file descriptors for
// communicating with the child process.
inputFileDescriptor = childStdOutPipe[0];
outputFileDescriptor = childStdInPipe[1];
// Close the child's end of the pipes.
if ( (::close(childStdInPipe[0]) != 0) ||
(::close(childStdOutPipe[1]) != 0) ) {
std::string errMsg;
throw std::runtime_error("Error closing child's end of pipes in "
"the parent: " + getErrorText());
}
#endif
childStarted = true;
// Start the reader and writer threads.
stopFlag = false;
try {
std::thread readerTemp(&Child::readFromChild, this);
readerThread = std::move(readerTemp);
} catch (std::exception &e) {
throw std::runtime_error("Error starting reader thread: " +
getErrorText());
}
try {
std::thread writerTemp(&Child::writeToChild, this);
writerThread = std::move(writerTemp);
} catch (std::exception &e) {
stopFlag = true;
readerThread.join();
throw std::runtime_error("Error starting writer thread: " +
getErrorText());
}
threadsAreRunning = true;
}
bool Child::isDone() {
if (childExited) {
return true;
}
if (!childStarted) {
throw std::logic_error("Child process was not started "
"when isDone() was called");
}
int result;
#ifdef _WIN32
if (!GetExitCodeProcess(childProcess, (LPDWORD) &result)) {
throw std::runtime_error("Error checking status of child process: " +
getErrorText());
}
if (STILL_ACTIVE == result) {
return false;
}
// Child process has exited. Save the exit code.
exitCode = result;
exitCodeObtainedFlag = true;
#else
int status = 0;
result = waitpid(childPid, &status, WNOHANG);
if (-1 == result) {
throw std::runtime_error("Error checking status of child process: " +
getErrorText());
} else if (0 == result) {
// Child is still running.
return false;
}
if (WIFEXITED(status)) {
// Child exited normally.
exitCode = WEXITSTATUS(status);
exitCodeObtainedFlag = true;
}
#endif
childExited = true;
// Stop threads.
stopFlag = true;
readerThread.join();
writerThread.join();
threadsAreRunning = false;
return true;
}
int32_t Child::result() {
if (exitCodeObtainedFlag) {
return exitCode;
}
// Check whether the process is running, and get the exit code.
if (!isDone()) {
throw std::logic_error("Child process was still running "
"when result() was called");
}
// Child process has exited.
if (!exitCodeObtainedFlag) {
// Exit code is not available.
throw std::runtime_error("Child process has exited but the exit "
"code is not available");
}
return exitCode;
}
std::string Child::getErrorText() {
#ifdef _WIN32
LPVOID winMsgBuf;
DWORD lastError = GetLastError();
FormatMessage(
FORMAT_MESSAGE_ALLOCATE_BUFFER |
FORMAT_MESSAGE_FROM_SYSTEM |
FORMAT_MESSAGE_IGNORE_INSERTS,
NULL,
lastError,
MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
(char *) &winMsgBuf,
0, NULL );
std::string errMsg((char *) winMsgBuf);
LocalFree(winMsgBuf);
return errMsg;
#else
return strerror(errno);
#endif
}
void Child::readFromChild() {
std::vector<char> rxBuf(bufferCapacity);
CodeDweller::PollTimer pollTimer(nominalPollTime_ms, maximumPollTime_ms);
auto sleepTime = std::chrono::milliseconds(maximumPollTime_ms);
while (!stopFlag) {
char *bufferPtr;
bufferPtr = &(rxBuf[0]);
// Blocking read from the child.
#ifdef _WIN32
DWORD nBytesRead;
if (!ReadFile(inputHandle,
bufferPtr,
bufferCapacity,
&nBytesRead,
NULL)) {
if (stopFlag) {
break;
}
errorText = "Error reading from the child process: ";
errorText += getErrorText();
break;
}
#else
ssize_t nBytesRead;
nBytesRead = ::read(inputFileDescriptor,
bufferPtr,
bufferCapacity);
if (-1 == nBytesRead) {
if (stopFlag) {
break;
}
errorText = "Error reading from the child process: ";
errorText += getErrorText();
break;
}
#endif
// Copy to the shared read buffer.
while ((nBytesRead > 0) && !stopFlag) {
int nBytesToPut;
nBytesToPut = nBytesRead;
#warning Check thread safety
if (nBytesToPut > readBuffer.nFree()) {
nBytesToPut = readBuffer.nFree();
}
if (nBytesToPut > 0) {
std::lock_guard<std::mutex> lock(readBufferMutex);
readBuffer.put(bufferPtr, nBytesToPut);
bufferPtr += nBytesToPut;
nBytesRead -= nBytesToPut;
pollTimer.reset();
} else {
pollTimer.pause();
}
}
}
}
void Child::writeToChild() {
std::vector<char> localWriteBuffer(bufferCapacity);
size_t nLocalWriteBytes;
CodeDweller::PollTimer pollTimer(nominalPollTime_ms, maximumPollTime_ms);
auto sleepTime = std::chrono::milliseconds(maximumPollTime_ms);
while (!stopFlag) {
char *bufferPtr;
// Poll for data in the shared write buffer.
while ((0 == nWriteBytes) && !stopFlag) {
pollTimer.pause();
}
if (stopFlag) {
goto exit;
}
// Copy from the shared write buffer.
{
std::lock_guard<std::mutex> lock(writeBufferMutex);
localWriteBuffer.swap(writeBuffer);
nLocalWriteBytes = nWriteBytes;
nWriteBytes = 0;
}
if (stopFlag) {
goto exit;
}
pollTimer.reset();
// Blocking write to the child.
bufferPtr = &(localWriteBuffer[0]);
while (nLocalWriteBytes > 0) {
#ifdef _WIN32
DWORD nBytesWritten;
if (!WriteFile(outputHandle,
bufferPtr,
nLocalWriteBytes,
&nBytesWritten,
NULL)) {
if (stopFlag) {
goto exit;
}
errorText = "Error writing to the child process: ";
errorText += getErrorText();
goto exit;
}
if (stopFlag) {
goto exit;
}
#else
ssize_t nBytesWritten;
nBytesWritten = ::write(outputFileDescriptor,
bufferPtr,
nLocalWriteBytes);
if (stopFlag) {
goto exit;
}
if (-1 == nBytesWritten) {
if (ENOSPC != errno) {
// Some error other than no space.
errorText = "Error writing to the child process: ";
errorText += getErrorText();
goto exit;
}
}
#endif
nLocalWriteBytes -= nBytesWritten;
bufferPtr += nBytesWritten;
}
}
exit: return;
}
}

+ 440
- 5
child.hpp Vedi File

@@ -37,6 +37,8 @@
#include <iostream>
#include <string>
#include <vector>
#include <thread>
#include <mutex>

namespace CodeDweller {

@@ -48,7 +50,8 @@ namespace CodeDweller {

*/

/** Class that abstracts the communication with a child process.
/** Class that abstracts the iostreams communication with a child
process.

This class provides functionality to create a child process,
communicate with the child process via streams and signals, and
@@ -75,9 +78,6 @@ namespace CodeDweller {
//
explicit ChildStreambuf(std::size_t bufSize = 4096);

/// Thread start function to send data to the child.
void sendToChild();

#ifdef _WIN32

/// Set the handle to read the standard output of the child
@@ -261,7 +261,7 @@ namespace CodeDweller {
*/
bool isRunning() const;

/** Terminite the child process.
/** Terminate the child process.

\throws runtime_error if an error occurs.

@@ -320,6 +320,441 @@ namespace CodeDweller {
/// Child executable path and command-line parameters.
std::vector<std::string> cmdArgs;

#ifdef _WIN32
/// Child's process handle.
HANDLE childProcess;

/// Child's thread handle.
HANDLE childThread;
#else
/// Child process ID.
pid_t childPid;
#endif

/// Exit value of the process.
int32_t exitCode;

/// True if the exit code has been obtained.
bool exitCodeObtainedFlag;

/// Return text for the most recent error.
//
// \returns Human-readable description of the most recent error.
//
static std::string getErrorText();

};

/** Class that abstracts the non-blocking communication with a child
process.

This class provides functionality to create a child process,
communicate with the child process via non-blocking methods, and
obtain the exit code of the child process.

*/

class Child {

private:

/** Circular buffer of characters. */
class CircularBuffer {

public:

/** Constructor specifies the capacity.

@param[in] maxSize is the capacity of the buffer.

*/
CircularBuffer(size_t maxSize);

/** Check whether the container is empty.

@returns true if the container is empty, false otherwise.

*/
bool empty() const;

/** Get the size.

@returns the number of bytes in the buffer.

*/
size_t nUsed() const;

/** Get the available space.

@returns the number of bytes that can be written to the
buffer without overwriting any existing data.

*/
size_t nFree() const;

/** Clear the buffer. */
void clear();

/** Put bytes to the buffer.

@param[in] ptr is the address of the first byte to put.

@param[in] nBytes is the number of bytes to put.

*/
void put(char const *ptr, size_t nBytes);

/** Get bytes from the buffer.

This method gets the specified number of bytes from the
buffer, and erases those bytes from the buffer.

@param[out] buf receives the data. The contents of buf are
replaced with the data in the circular buffer.

@param[in] nBytes is the number of bytes to get. Specifying
a value of zero for nBytes gets and erases all the data.

*/
void getAndErase(std::string &buf, size_t nBytes = 0);

private:

/** Increment the index.

@param[in] index is the index to increment.

*/
void nextIndex(size_t &index) const {
index++;
if (index >= capacity)
index = 0;
}

/// Buffer to hold data.
std::vector<char> buffer;

/// Capacity of the buffer.
size_t capacity;

/// Index of first element.
size_t iBegin;

/// Index of one past the last element.
size_t iEnd;

};

public:

/** Constructor for spawning with command-line parameters.

The constructor configures the object, and spawns the child
process.

\param[in] args contains the child executable file name and
command-line parameters. args[0] contains the full path of the
executable, and args[1] thru args[n] are the command-line
parameters.

\param[in] bufSize is the input and output buffer size of the
stream used to communicate with the child process.

\param[in] nominalAboveMinPollTime_ms is used to determine the
minimum time in milliseconds that the writer thread sleeps
when there's no data in the output buffer, and that the reader
thread sleeps when there's no room in the input buffer. The
minimum time is nominalAboveMinPollTime_ms +
CodeDweller::MinimumSleeperTime.

\param[in] deltaPollTime_ms is how much longer, in
milliseconds, the maximum time to sleep is than the minimum time.

\throws runtime_error if an error occurs.

*/
Child(std::vector<std::string> const &args,
size_t bufSize = 128 * 1024,
std::uint16_t nominalAboveMinPollTime_ms = 0,
std::uint16_t deltaPollTime_ms = 500);

/** Constructor for spawning without command-line parameters.

The constructor configures the object, and spawns the child
process.

\param[in] childpath contains the child executable file name.

\param[in] bufSize is the input and output buffer size of the
stream used to communicate with the child process.

\param[in] nominalAboveMinPollTime_ms is used to determine the
minimum time in milliseconds that the writer thread sleeps
when there's no data in the output buffer, and that the reader
thread sleeps when there's no room in the input buffer. The
minimum time is nominalAboveMinPollTime_ms +
CodeDweller::MinimumSleeperTime.

\param[in] deltaPollTime_ms is how much longer, in
milliseconds, the maximum time to sleep is than the minimum time.

\throws runtime_error if an error occurs.

*/
Child(std::string const &childpath,
size_t bufSize = 128 * 1024,
std::uint16_t nominalAboveMinPollTime_ms = 0,
std::uint16_t deltaPollTime_ms = 500);

/** Constructor.

The constructor configures the I/O buffers, but doesn't spawn
any child process.

\param[in] bufSize is the input and output buffer size of the
stream used to communicate with the child process.

\param[in] nominalAboveMinPollTime_ms is used to determine the
minimum time in milliseconds that the writer thread sleeps
when there's no data in the output buffer, and that the reader
thread sleeps when there's no room in the input buffer. The
minimum time is nominalAboveMinPollTime_ms +
CodeDweller::MinimumSleeperTime.

\param[in] deltaPollTime_ms is how much longer, in
milliseconds, the maximum time to sleep is than the minimum time.

*/
Child(size_t bufSize = 4096,
std::uint16_t nominalAboveMinPollTime_ms = 0,
std::uint16_t deltaPollTime_ms = 500);

/** Destructor terminates the child process. */
~Child();

/** Spawn the child process.

\param[in] args contains the child executable file name and
command-line parameters. args[0] contains the full path of the
executable, and args[1] thru args[n] are the command-line
parameters.

\throws runtime_error if an error occurs.

\throws runtime_error if an error occurs.

*/
void open(std::vector<std::string> const &args);

/** Spawn the child process.

\param[in] childpath contains the child executable file name.

\throws runtime_error if an error occurs.

*/
void open(std::string const &childpath);

/** All-or-nothing non-blocking queue write request to the child.

This methods attempts to queue a write request consisting of
the entire contents of the string.

@param[in] data is the data to queue.

@returns true if the write request was queued, false
otherwise.

*/
bool write(std::string const &data);

/** Non-blocking queue write request to the child.

This methods attempts to queue a write request consisting of
as much as possible of the contents of the string.

@param[in, out] data on input is the data to queue. On
output, data contains the data that was not queued.

@returns the number of bytes queued.

*/
size_t writeAndShrink(std::string &data);

/** Check if all queued data was transmitted.

@returns true if all the queued data was transmitted to the
child, false otherwise.

*/
bool isFinishedWriting() const;

/** Non-blocking request to get data read from the child.

This method attempts to get up to a specified number of bytes
of data from the input buffer containing data received from
the child. The data that is provided is erased from the input
buffer.

@param[out] data contains the copied data.

@param[in] nBytes is the number of bytes to attempt to copy.
If nBytes is zero, the contents of the entire input buffer is
moved to data.

@returns the number of bytes copied.

*/
size_t read(std::string &data, size_t nBytes = 0);

/** Check whether the child process is running.

\returns True if the child process is running, false
otherwise.

*/
bool isRunning() const;

/** Check error status.

This method checks whether an error occurred when
communicating with the child process.

\param[out] errorDescription contains any description of the
error.

\returns true if an error occurred, false otherwise.

*/
bool errorOccurred(std::string &errorDescription) const;

/** Close the connection.

This method terminate the child process if it is running, and
resets the object. After this method is invoked, open() can
be invoked to spawn and communicate with another child
process.

\throws runtime_error if an error occurs.

\throws logic_error if the child process is not running.

*/
void close();

/** Check whether the child process has exited.

\returns True if the child process has exited, false
otherwise.

\throws runtime_error if an error occurs.

\throws logic_error if the child process is not running.

*/
bool isDone();

/** Get the exit value of the child process.

\returns The exit value of the child process if the child
process has exited.

\throws runtime_error if an error occurs.

\throws logic_error if the child process has not exited.

\throws logic_error if the child process is not running.

*/
int32_t result();

private:

/// Initialize data members.
void init();

/** Spawn the child process.

\throws runtime_error if an error occurs.

*/
void run();

/// Reader thread object.
std::thread readerThread;

/// Thread start function to read data from the child.
void readFromChild();

/// Writer thread object.
std::thread writerThread;

/// Thread start function to send data to the child.
void writeToChild();

/// True if readerThread and writerThread are to stop.
bool stopFlag;

/// True if both the reader and writer the writer threads are
/// running, false otherwise.
bool threadsAreRunning;

/// Description of any error.
std::string errorText;

/// Input and output handles.
#ifdef _WIN32
HANDLE inputHandle;
HANDLE outputHandle;
#else
int inputFileDescriptor;
int outputFileDescriptor;
#endif

/// Capacity of buffers.
std::size_t bufferCapacity;

/// Read buffer.
CircularBuffer readBuffer;

/// Mutex to serialize access to readBuffer.
std::mutex readBufferMutex;

/// Write buffer.
std::vector<char> writeBuffer;

/// Number of bytes in writeBuffer.
size_t nWriteBytes;

/// Mutex to serialize access to writeBuffer.
std::mutex writeBufferMutex;

/// Number of bytes in writer thread transmit buffer.
size_t nTransmitBytes;

/// Nominal poll time.
//
// If there isn't room in readBuffer, readerThread aperiodically
// checks whether room is available. The check times are
// determined by a CodeDweller::PollTimer object, which requires a
// nominal poll time and a maximum poll time.
int nominalPollTime_ms;

/// Maximum poll time.
int maximumPollTime_ms;

/// Exit code to use when terminating the child process.
static const uint32_t terminateExitCode = 0;

/// True if the child process was successfully started.
bool childStarted;

/// True if the child process has exited.
bool childExited;

/// Child executable path and command-line parameters.
std::vector<std::string> cmdArgs;

#ifdef _WIN32
/// Child's process handle.
HANDLE childProcess;

Loading…
Annulla
Salva