Browse Source

cleaned up namespace in threading

master
Pete McNeil 4 years ago
parent
commit
687f552466
2 changed files with 104 additions and 93 deletions
  1. 26
    24
      threading.cpp
  2. 78
    69
      threading.hpp

+ 26
- 24
threading.cpp View File



#include "threading.hpp" #include "threading.hpp"


namespace cd = codedweller;
namespace codedweller {


ThreadManager Threads; // Master thread manager. ThreadManager Threads; // Master thread manager.


ScopeMutex ThereCanBeOnlyOne(MyMutex); // Protect our set -- a moment in time. ScopeMutex ThereCanBeOnlyOne(MyMutex); // Protect our set -- a moment in time.
ThreadStatusReport Answer; // Create our vector to hold the report. ThreadStatusReport Answer; // Create our vector to hold the report.
for( // Loop through all of the Threads. for( // Loop through all of the Threads.
set<Thread*>::iterator iT = KnownThreads.begin();
std::set<Thread*>::iterator iT = KnownThreads.begin();
iT != KnownThreads.end(); iT++ iT != KnownThreads.end(); iT++
) { // Grab each Threads' report. ) { // Grab each Threads' report.
Thread& X = *(*iT); // Handy reference to the Thread. Thread& X = *(*iT); // Handy reference to the Thread.
return true; // return true; return true; // return true;
} }


const cd::RuntimeCheck ThreadingCheck1("ThreadManager::unlockExistingThread():ThreadingCheck1(0 != LockedThread)");
const cd::RuntimeCheck ThreadingCheck2("ThreadManager::unlockExistingThread():ThreadingCheck2(T == LockedThread)");
const RuntimeCheck ThreadingCheck1("ThreadManager::unlockExistingThread():ThreadingCheck1(0 != LockedThread)");
const RuntimeCheck ThreadingCheck2("ThreadManager::unlockExistingThread():ThreadingCheck2(T == LockedThread)");


void ThreadManager::unlockExistingThread(Thread* T) { // Unlocks ThreadManager if T locked. void ThreadManager::unlockExistingThread(Thread* T) { // Unlocks ThreadManager if T locked.
ThreadingCheck1(0 != LockedThread); // We had better have a locked thread. ThreadingCheck1(0 != LockedThread); // We had better have a locked thread.


bool Thread::isBad() { return BadFlag; } // Return BadFlag state. bool Thread::isBad() { return BadFlag; } // Return BadFlag state.


const string Thread::MyFault() { return BadWhat; } // Return exception Bad fault if any.
const string Thread::MyName() { return MyThreadName; } // Return the instance name if any.
const std::string Thread::MyFault() { return BadWhat; } // Return exception Bad fault if any.
const std::string Thread::MyName() { return MyThreadName; } // Return the instance name if any.
const ThreadType& Thread::MyType() { return MyThreadType; } // Return the instance Thread Type. const ThreadType& Thread::MyType() { return MyThreadType; } // Return the instance Thread Type.
const ThreadState& Thread::MyState() { return (*MyThreadState); } // Thread state for this instance. const ThreadState& Thread::MyState() { return (*MyThreadState); } // Thread state for this instance.


CurrentThreadState(ThreadStarted); // Set the running state. CurrentThreadState(ThreadStarted); // Set the running state.
myTask(); // myTask() is called. myTask(); // myTask() is called.
} // myTask() should handle exceptions. } // myTask() should handle exceptions.
catch(exception& e) { // Unhandled exceptions are informative:
catch(const std::exception& e) { // Unhandled exceptions are informative:
BadFlag = true; // They mean the thread went bad but BadFlag = true; // They mean the thread went bad but
BadWhat = e.what(); // we have an idea what went wrong. BadWhat = e.what(); // we have an idea what went wrong.
} // We shouldn't get other kinds of } // We shouldn't get other kinds of
CurrentThreadState(ThreadInitialized); // Set our initialized state. CurrentThreadState(ThreadInitialized); // Set our initialized state.
} }


Thread::Thread(const ThreadType& T, const string N) : // POSIX Specific Thread Constructor.
Thread::Thread(const ThreadType& T, const std::string N) : // POSIX Specific Thread Constructor.
MyThreadType(T), // Use a generic Thread Type. MyThreadType(T), // Use a generic Thread Type.
MyThreadName(N), // Use a generic Thread Name. MyThreadName(N), // Use a generic Thread Name.
RunningFlag(false), // Can't be running yet. RunningFlag(false), // Can't be running yet.


// POSIX Mutex Implementation ////////////////////////////////////////////////// // POSIX Mutex Implementation //////////////////////////////////////////////////


const cd::RuntimeCheck ThreadingCheck7("Mutex::Mutex():ThreadingCheck7(0 == pthread_mutex_init(&MyMutex,NULL))");
const RuntimeCheck ThreadingCheck7("Mutex::Mutex():ThreadingCheck7(0 == pthread_mutex_init(&MyMutex,NULL))");


Mutex::Mutex() : // Constructing a POSIX mutex means Mutex::Mutex() : // Constructing a POSIX mutex means
IAmLocked(false) { // setting the IAmLocked flag to false and IAmLocked(false) { // setting the IAmLocked flag to false and
ThreadingCheck7(0 == pthread_mutex_init(&MyMutex,NULL)); // initializing the mutex_t object. ThreadingCheck7(0 == pthread_mutex_init(&MyMutex,NULL)); // initializing the mutex_t object.
} }


const cd::ExitCheck ThreadingCheck8("Mutex::~Mutex():ThreadingCheck8(false == IAmLocked)");
const cd::ExitCheck ThreadingCheck9("Mutex::~Mutex():ThreadingCheck9(0 == pthread_mutex_destroy(&MyMutex))");
const ExitCheck ThreadingCheck8("Mutex::~Mutex():ThreadingCheck8(false == IAmLocked)");
const ExitCheck ThreadingCheck9("Mutex::~Mutex():ThreadingCheck9(0 == pthread_mutex_destroy(&MyMutex))");


Mutex::~Mutex() { // Before we destroy our mutex we check Mutex::~Mutex() { // Before we destroy our mutex we check
ThreadingCheck8(false == IAmLocked); // to see that it is not locked and ThreadingCheck8(false == IAmLocked); // to see that it is not locked and
ThreadingCheck9(0 == pthread_mutex_destroy(&MyMutex)); // destroy the primative. ThreadingCheck9(0 == pthread_mutex_destroy(&MyMutex)); // destroy the primative.
} }


const cd::RuntimeCheck ThreadingCheck10("Mutex::lock():ThreadingCheck10(0 == pthread_mutex_lock(&MyMutex));");
const RuntimeCheck ThreadingCheck10("Mutex::lock():ThreadingCheck10(0 == pthread_mutex_lock(&MyMutex));");


void Mutex::lock() { // Locking a POSIX mutex means void Mutex::lock() { // Locking a POSIX mutex means
ThreadingCheck10(0 == pthread_mutex_lock(&MyMutex)); // asserting our lock was successful and ThreadingCheck10(0 == pthread_mutex_lock(&MyMutex)); // asserting our lock was successful and
IAmLocked = true; // setting the IAmLocked flag. IAmLocked = true; // setting the IAmLocked flag.
} }


const cd::LogicCheck ThreadingCheck11("Mutex::unlock():ThreadingCheck11(true == IAmLocked)");
const cd::RuntimeCheck ThreadingCheck12("Mutex::unlock():ThreadingCheck12(0 == pthread_mutex_unlock(&MyMutex))");
const LogicCheck ThreadingCheck11("Mutex::unlock():ThreadingCheck11(true == IAmLocked)");
const RuntimeCheck ThreadingCheck12("Mutex::unlock():ThreadingCheck12(0 == pthread_mutex_unlock(&MyMutex))");


void Mutex::unlock() { // Unlocking a POSIX mutex means void Mutex::unlock() { // Unlocking a POSIX mutex means
ThreadingCheck11(true == IAmLocked); // asserting that we are locked, ThreadingCheck11(true == IAmLocked); // asserting that we are locked,


// POSIX Implementation //////////////////////////////////////////////////////// // POSIX Implementation ////////////////////////////////////////////////////////


const cd::RuntimeCheck ThreadingCheck14("ProductionGateway::ProductionGateway():ThreadingCheck14(0 == pthread_mutex_init(&MyMutex, NULL));");
const cd::RuntimeCheck ThreadingCheck15("ProductionGateway::ProductionGateway():ThreadingCheck15(0 == pthread_cond_init(&MyConditionVariable, NULL))");
const RuntimeCheck ThreadingCheck14("ProductionGateway::ProductionGateway():ThreadingCheck14(0 == pthread_mutex_init(&MyMutex, NULL));");
const RuntimeCheck ThreadingCheck15("ProductionGateway::ProductionGateway():ThreadingCheck15(0 == pthread_cond_init(&MyConditionVariable, NULL))");


ProductionGateway::ProductionGateway() : // Construct in POSIX like this: ProductionGateway::ProductionGateway() : // Construct in POSIX like this:
Product(0), // All of our counts start at zero. Product(0), // All of our counts start at zero.
ThreadingCheck15(0 == pthread_cond_init(&MyConditionVariable, NULL)); // Initialize our condition variable. ThreadingCheck15(0 == pthread_cond_init(&MyConditionVariable, NULL)); // Initialize our condition variable.
} }


const cd::ExitCheck ThreadingCheck16("ProductionGateway::~ProductionGateway():ThreadingCheck16(0 == pthread_mutex_destroy(&MyMutex))");
const cd::ExitCheck ThreadingCheck17("ProductionGateway::~ProductionGateway():ThreadingCheck17(0 == pthread_cond_destroy(&MyConditionVariable))");
const ExitCheck ThreadingCheck16("ProductionGateway::~ProductionGateway():ThreadingCheck16(0 == pthread_mutex_destroy(&MyMutex))");
const ExitCheck ThreadingCheck17("ProductionGateway::~ProductionGateway():ThreadingCheck17(0 == pthread_cond_destroy(&MyConditionVariable))");


ProductionGateway::~ProductionGateway() { // When we're done we must destroy ProductionGateway::~ProductionGateway() { // When we're done we must destroy
ThreadingCheck16(0 == pthread_mutex_destroy(&MyMutex)); // our local mutex and ThreadingCheck16(0 == pthread_mutex_destroy(&MyMutex)); // our local mutex and
ThreadingCheck17(0 == pthread_cond_destroy(&MyConditionVariable)); // our condition variable. ThreadingCheck17(0 == pthread_cond_destroy(&MyConditionVariable)); // our condition variable.
} }


const cd::RuntimeCheck ThreadingCheck18("ProductionGateway::produce():ThreadingCheck18(0 == pthread_mutex_lock(&MyMutex))");
const cd::RuntimeCheck ThreadingCheck19("ProductionGateway::produce():ThreadingCheck19(0 == pthread_cond_signal(&MyConditionVariable))");
const cd::RuntimeCheck ThreadingCheck20("ProductionGateway::produce():ThreadingCheck20(0 == pthread_mutex_unlock(&MyMutex))");
const RuntimeCheck ThreadingCheck18("ProductionGateway::produce():ThreadingCheck18(0 == pthread_mutex_lock(&MyMutex))");
const RuntimeCheck ThreadingCheck19("ProductionGateway::produce():ThreadingCheck19(0 == pthread_cond_signal(&MyConditionVariable))");
const RuntimeCheck ThreadingCheck20("ProductionGateway::produce():ThreadingCheck20(0 == pthread_mutex_unlock(&MyMutex))");


void ProductionGateway::produce() { // To produce in POSIX void ProductionGateway::produce() { // To produce in POSIX
ThreadingCheck18(0 == pthread_mutex_lock(&MyMutex)); // Lock our mutex. ThreadingCheck18(0 == pthread_mutex_lock(&MyMutex)); // Lock our mutex.
ThreadingCheck20(0 == pthread_mutex_unlock(&MyMutex)); // At the end unlock our mutex so ThreadingCheck20(0 == pthread_mutex_unlock(&MyMutex)); // At the end unlock our mutex so
} // waiting threads can fly free :-) } // waiting threads can fly free :-)


const cd::RuntimeCheck ThreadingCheck21("ProductionGateway::consume():ThreadingCheck21(0 == pthread_mutex_lock(&MyMutex))");
const cd::RuntimeCheck ThreadingCheck22("ProductionGateway::consume():ThreadingCheck22(0 == pthread_cond_wait(&MyConditionVariable, &MyMutex))");
const cd::RuntimeCheck ThreadingCheck23("ProductionGateway::consume():ThreadingCheck23(0 == pthread_mutex_unlock(&MyMutex))");
const RuntimeCheck ThreadingCheck21("ProductionGateway::consume():ThreadingCheck21(0 == pthread_mutex_lock(&MyMutex))");
const RuntimeCheck ThreadingCheck22("ProductionGateway::consume():ThreadingCheck22(0 == pthread_cond_wait(&MyConditionVariable, &MyMutex))");
const RuntimeCheck ThreadingCheck23("ProductionGateway::consume():ThreadingCheck23(0 == pthread_mutex_unlock(&MyMutex))");


void ProductionGateway::consume() { // To consume in POSIX void ProductionGateway::consume() { // To consume in POSIX
ThreadingCheck21(0 == pthread_mutex_lock(&MyMutex)); // Lock our mutex. ThreadingCheck21(0 == pthread_mutex_lock(&MyMutex)); // Lock our mutex.


// End Production Gateway // End Production Gateway
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////

} // End namespace codedweller

+ 78
- 69
threading.hpp View File



// Include MNR_threading Once Only ============================================= // Include MNR_threading Once Only =============================================


#ifndef MNR_threading
#define MNR_threading
#pragma once


#include <set> #include <set>
#include <vector> #include <vector>
#include <string> #include <string>
#include <queue>
#include "faults.hpp"
#include <queue>
#include "faults.hpp"


using namespace std;
namespace codedweller {


class ThreadManager; // ThreadManager does exist. class ThreadManager; // ThreadManager does exist.
extern ThreadManager Threads; // Master thread manager. extern ThreadManager Threads; // Master thread manager.


class ThreadState { // Thread State Object. class ThreadState { // Thread State Object.
public: public:
const string Name; // Text name of thread descriptor.
ThreadState(string N) : Name(N) {} // Constructor requires text name.
const std::string Name; // Text name of thread descriptor.
ThreadState(std::string N) : Name(N) {} // Constructor requires text name.
}; };


// ThreadType objects are constant static objects defined for each Thread class // ThreadType objects are constant static objects defined for each Thread class


class ThreadType { class ThreadType {
public: public:
const string Name;
ThreadType(string N) : Name(N) {}
const std::string Name;
ThreadType(std::string N) : Name(N) {}
}; };


class Thread; // There is such thing as a Thread. class Thread; // There is such thing as a Thread.
Thread* Pointer; // A pointer to the thread. Thread* Pointer; // A pointer to the thread.
ThreadType* Type; // A descriptor of it's type. ThreadType* Type; // A descriptor of it's type.
ThreadState* State; // A descriptor of it's state. ThreadState* State; // A descriptor of it's state.
string Name; // Name of the thread if any.
std::string Name; // Name of the thread if any.
bool isRunning; // True if the thread is running. bool isRunning; // True if the thread is running.
bool isBad; // True if the thread is bad. bool isBad; // True if the thread is bad.
string Fault; // Bad Thread's Fault if any.
std::string Fault; // Bad Thread's Fault if any.


public: public:
ThreadStatusRecord( // Initialize all items. ThreadStatusRecord( // Initialize all items.
ThreadState& S, ThreadState& S,
bool R, bool R,
bool B, bool B,
string F,
string N
std::string F,
std::string N
) : ) :
Pointer(P), Pointer(P),
Type(&T), Type(&T),
State(&S), State(&S),
Name(N),
Name(N),
isRunning(R), isRunning(R),
isBad(B), isBad(B),
Fault(F) Fault(F)
isRunning = Right.isRunning; isRunning = Right.isRunning;
isBad = Right.isBad; isBad = Right.isBad;
Fault = Right.Fault; Fault = Right.Fault;
Name = Right.Name;
Name = Right.Name;
return *this; return *this;
} }


const ThreadState& getState() { return *State; } const ThreadState& getState() { return *State; }
bool getRunning() { return isRunning; } bool getRunning() { return isRunning; }
bool getBad() { return isBad; } bool getBad() { return isBad; }
string getFault() { return Fault; }
string getName() { return Name; }
std::string getFault() { return Fault; }
std::string getName() { return Name; }
}; };


typedef vector<ThreadStatusRecord> ThreadStatusReport; // Status report type.
typedef std::vector<ThreadStatusRecord> ThreadStatusReport; // Status report type.


// End ThreadDescriptor // End ThreadDescriptor
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////


} // End namespace codedweller

//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
// Win32 / POSIX abstractions // Win32 / POSIX abstractions


#include <windows.h> #include <windows.h>
#include <process.h> #include <process.h>


namespace codedweller {

typedef HANDLE thread_primative; // The WIN32 thread primative abstracts typedef HANDLE thread_primative; // The WIN32 thread primative abstracts
// HANDLE // HANDLE


SwitchToThread(); // we call SwitchToThread(); SwitchToThread(); // we call SwitchToThread();
} }


} // End namespace codedweller

#else #else


// When in POSIX land... // When in POSIX land...
#include <pthread.h> #include <pthread.h>
#include <sched.h> #include <sched.h>


namespace codedweller {

typedef pthread_t thread_primative; // The POSIX thread primative abstracts typedef pthread_t thread_primative; // The POSIX thread primative abstracts
// pthread_t // pthread_t


sched_yield(); // we call sched_yield(); sched_yield(); // we call sched_yield();
} }


} // End namespace codedweller

#endif #endif


// End Win32 / POSIX abstractions // End Win32 / POSIX abstractions
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////


namespace codedweller {

//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
// The Thread class gets extended to do any specific work. The pure virtual // The Thread class gets extended to do any specific work. The pure virtual
// function MyTask is overloaded by the derived class to define that work. It // function MyTask is overloaded by the derived class to define that work. It
protected: protected:


const ThreadType& MyThreadType; // Identify thread type. const ThreadType& MyThreadType; // Identify thread type.
const string MyThreadName; // Name string of this instance.
const std::string MyThreadName; // Name string of this instance.


thread_primative MyThread; // Abstracted thread. thread_primative MyThread; // Abstracted thread.
bool RunningFlag; // True when thread is in myTask() bool RunningFlag; // True when thread is in myTask()
bool BadFlag; // True when myTask() throws! bool BadFlag; // True when myTask() throws!
string BadWhat; // Bad exception what() if any.
std::string BadWhat; // Bad exception what() if any.
void CurrentThreadState(const ThreadState& TS); // Set thread state. void CurrentThreadState(const ThreadState& TS); // Set thread state.


public: public:


Thread(); // Constructor (just in case) Thread(); // Constructor (just in case)
Thread(const ThreadType& T, string N); // Construct with specific Type/Name
Thread(const ThreadType& T, std::string N); // Construct with specific Type/Name
virtual ~Thread(); // Destructor (just in case) virtual ~Thread(); // Destructor (just in case)


void run(); // Method to launch this thread. void run(); // Method to launch this thread.


bool isRunning(); // Return the Running flag state. bool isRunning(); // Return the Running flag state.
bool isBad(); // Return the Bad flag state. bool isBad(); // Return the Bad flag state.
const string MyFault(); // Return exception Bad fault if any.
const std::string MyFault(); // Return exception Bad fault if any.


const string MyName(); // The thread's name.
const std::string MyName(); // The thread's name.
const ThreadType& MyType(); // Thread type for this thread. const ThreadType& MyType(); // Thread type for this thread.
const ThreadState& MyState(); // Returns the current thread state. const ThreadState& MyState(); // Returns the current thread state.
const ThreadState& CurrentThreadState(); // Returns the current thread state. const ThreadState& CurrentThreadState(); // Returns the current thread state.
private: private:


Mutex MyMutex; // Protect our data with this. Mutex MyMutex; // Protect our data with this.
set<Thread*> KnownThreads; // Keep track of all threads.
std::set<Thread*> KnownThreads; // Keep track of all threads.


void rememberThread(Thread* T); // Threads register themselves. void rememberThread(Thread* T); // Threads register themselves.
void forgetThread(Thread* T); // Threads remove themselves. void forgetThread(Thread* T); // Threads remove themselves.


// End Thread Manager // End Thread Manager
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
// A ProductionQueue is a templated, thread safe mechanism for implementing
// a producer/consumer relationship. The objects in the queue should be simple
// data so that they can be created, destroyed, and copied without trouble. Put
// another way - the objects in the ProductionQueue should be lightweight
// handles for other things. Those things should be created and destroyed
// elsewhere.
template<typename T> // Templatized
class ProductionQueue { // Production Queue Class
private:
Mutex myMutex; // Contains a mutex and
volatile unsigned int LatestSize; // a volatile (blinking light) size
ProductionGateway myGateway; // integrated with a production
queue<T> myQueue; // gateway and a queue.
public:
ProductionQueue() : LatestSize(0) {} // The size always starts at zero.
T take() { // To consume a queued object
myGateway.consume(); // we wait on the production gateway
ScopeMutex OneAtATimePlease(myMutex); // and when we get through we lock
T O = myQueue.front(); // the mutext, take the object on the
myQueue.pop(); // front of the queue, pop it out,
LatestSize = myQueue.size(); // and rest our size (blinking light).
return O; // Then return the object we got.
}
void give(T O) { // To produce a queued object
ScopeMutex OneAtATimePlease(myMutex); // we wait on the mutex. When we
myQueue.push(O); // get through we push our object
LatestSize = myQueue.size(); // into the queue, reset our size
myGateway.produce(); // indicator and tell the gateway.
} // When we're done it can be grabbed.
unsigned int size() { // To check the size we look at
return LatestSize; // the blinking light.
}
};
// End Production Queue
////////////////////////////////////////////////////////////////////////////////


#endif
////////////////////////////////////////////////////////////////////////////////
// A ProductionQueue is a templated, thread safe mechanism for implementing
// a producer/consumer relationship. The objects in the queue should be simple
// data so that they can be created, destroyed, and copied without trouble. Put
// another way - the objects in the ProductionQueue should be lightweight
// handles for other things. Those things should be created and destroyed
// elsewhere.

template<typename T> // Templatized
class ProductionQueue { // Production Queue Class
private:
Mutex myMutex; // Contains a mutex and
volatile unsigned int LatestSize; // a volatile (blinking light) size
ProductionGateway myGateway; // integrated with a production
std::queue<T> myQueue; // gateway and a queue.

public:
ProductionQueue() : LatestSize(0) {} // The size always starts at zero.

T take() { // To consume a queued object
myGateway.consume(); // we wait on the production gateway
ScopeMutex OneAtATimePlease(myMutex); // and when we get through we lock
T O = myQueue.front(); // the mutext, take the object on the
myQueue.pop(); // front of the queue, pop it out,
LatestSize = myQueue.size(); // and rest our size (blinking light).
return O; // Then return the object we got.
}

void give(T O) { // To produce a queued object
ScopeMutex OneAtATimePlease(myMutex); // we wait on the mutex. When we
myQueue.push(O); // get through we push our object
LatestSize = myQueue.size(); // into the queue, reset our size
myGateway.produce(); // indicator and tell the gateway.
} // When we're done it can be grabbed.

unsigned int size() { // To check the size we look at
return LatestSize; // the blinking light.
}
};

// End Production Queue
////////////////////////////////////////////////////////////////////////////////


// End Of Include MNR_threading Once Only ======================================
} // End namespace codedweller

Loading…
Cancel
Save