// threading.hpp // // Copyright (C) 2004-2020 MicroNeil Research Corporation. // // This software is released under the MIT license. See LICENSE.TXT. // The "Threading" module is a basic, cross-platform, multi-threading tool kit. // The differences between posix compatible systems and win32 based systems are // abstracted. On win32 systems, native win32 primatives are used to construct. // efficient, lightweight objects. // On others we assume we can use pthreads. In either case the objects we have // here are designed to cover all of the basics efficiently while hiding the // required under-cover work. #pragma once #include #include #include #include #include "faults.hpp" namespace codedweller { class ThreadManager; // ThreadManager does exist. extern ThreadManager Threads; // Master thread manager. //////////////////////////////////////////////////////////////////////////////// // Thread Status & Type // // ThreadState objects are constant static objects defined for each Thread class // so that the thread can update it's state by changing a pointer. The state // can then be compared between threads of the same type and can be read-out // as text for debugging purposes. class ThreadState { // Thread State Object. public: const std::string Name; // Text name of thread descriptor. ThreadState(std::string N) : Name(N) {} // Constructor requires text name. }; // ThreadType objects are constant static objects defined for each Thread class // so that classes can be identified by type using a pointer to the constant. class ThreadType { public: const std::string Name; ThreadType(std::string N) : Name(N) {} }; class Thread; // There is such thing as a Thread. class ThreadStatusRecord { // Describes a Thread's condition. private: Thread* Pointer; // A pointer to the thread. ThreadType* Type; // A descriptor of it's type. ThreadState* State; // A descriptor of it's state. std::string Name; // Name of the thread if any. bool isRunning; // True if the thread is running. bool isBad; // True if the thread is bad. std::string Fault; // Bad Thread's Fault if any. public: ThreadStatusRecord( // Initialize all items. Thread* P, ThreadType& T, ThreadState& S, bool R, bool B, std::string F, std::string N ) : Pointer(P), Type(&T), State(&S), Name(N), isRunning(R), isBad(B), Fault(F) {} ThreadStatusRecord& operator=(const ThreadStatusRecord& Right) { // Minimal Assignment Operator Pointer = Right.Pointer; Type = Right.Type; State = Right.State; isRunning = Right.isRunning; isBad = Right.isBad; Fault = Right.Fault; Name = Right.Name; return *this; } bool operator<(const ThreadStatusRecord& Right) { // Minimal Comparison Operator. return (Pointer < Right.Pointer); } // How to get the details of the report. const Thread* getPointer() { return Pointer; } const ThreadType& getType() { return *Type; } const ThreadState& getState() { return *State; } bool getRunning() { return isRunning; } bool getBad() { return isBad; } std::string getFault() { return Fault; } std::string getName() { return Name; } }; typedef std::vector ThreadStatusReport; // Status report type. // End ThreadDescriptor //////////////////////////////////////////////////////////////////////////////// } // End namespace codedweller //////////////////////////////////////////////////////////////////////////////// // Win32 / POSIX abstractions #ifdef WIN32 // When in WIN32 land... // Remember to compile (on GNU anyway) with -mthreads #include #include namespace codedweller { typedef HANDLE thread_primative; // The WIN32 thread primative abstracts // HANDLE typedef HANDLE mutex_primative; // The WIN32 mutex primative abstracts // a HANDLE to a Semaphore. inline void threading_yield() { // When we want to yield time in WIN32 SwitchToThread(); // we call SwitchToThread(); } } // End namespace codedweller #else // When in POSIX land... // Remember to compile (on GMU anyway) with -pthread #include #include namespace codedweller { typedef pthread_t thread_primative; // The POSIX thread primative abstracts // pthread_t typedef pthread_mutex_t mutex_primative; // The POSIX mutex primative abstracts // pthread_mutex_t inline void threading_yield() { // When we want to yield time in POSIX sched_yield(); // we call sched_yield(); } } // End namespace codedweller #endif // End Win32 / POSIX abstractions //////////////////////////////////////////////////////////////////////////////// namespace codedweller { //////////////////////////////////////////////////////////////////////////////// // The Thread class gets extended to do any specific work. The pure virtual // function MyTask is overloaded by the derived class to define that work. It // is expected that the class will be initialized with any parameters that // will be used by the thread and that the thread will make available any // results through public interfaces either during and/or after the thread // has finished running. class Thread { private: ThreadState* MyThreadState; // Track current thread state. protected: const ThreadType& MyThreadType; // Identify thread type. const std::string MyThreadName; // Name string of this instance. thread_primative MyThread; // Abstracted thread. bool RunningFlag; // True when thread is in myTask() bool BadFlag; // True when myTask() throws! std::string BadWhat; // Bad exception what() if any. void CurrentThreadState(const ThreadState& TS); // Set thread state. public: Thread(); // Constructor (just in case) Thread(const ThreadType& T, std::string N); // Construct with specific Type/Name virtual ~Thread(); // Destructor (just in case) void run(); // Method to launch this thread. void join(); // Method to Join this thread. void launchTask(); // Launch and watch myTask(). virtual void myTask() = 0; // The actual task must be overloaded. thread_primative getMyThread(); // Inspect my thread primative. bool isRunning(); // Return the Running flag state. bool isBad(); // Return the Bad flag state. const std::string MyFault(); // Return exception Bad fault if any. const std::string MyName(); // The thread's name. const ThreadType& MyType(); // Thread type for this thread. const ThreadState& MyState(); // Returns the current thread state. const ThreadState& CurrentThreadState(); // Returns the current thread state. ThreadStatusRecord StatusReport(); // Return's the thread's status reprt. // Constants for Thread... const static ThreadType Type; // The thread's type. const static ThreadState ThreadInitialized; // Constructed successfully. const static ThreadState ThreadStarted; // Started. const static ThreadState ThreadFailed; // Failed by unhandled exception. const static ThreadState ThreadStopped; // Stopped normally. const static ThreadState ThreadDestroyed; // Safety value for destructed Threads. }; // End Thread //////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////// // The Mutex class abstracts a lightweight, very basic mutex object. // As with the Thread object, more ellaborate forms can be built up from // this basic mechanism. An important design constraint for this basic // mutex object is that it work even if the thread that's running was not // created with the Thread object... that ensures that it can be used in // code that is destined to function in other applications. class Mutex { private: mutex_primative MyMutex; // Here is our primative mutex. volatile bool IAmLocked; // Here is our Lock Count. public: Mutex(); // Construct the mutex. ~Mutex(); // Destroy the mutex. void lock(); // Lock it. void unlock(); // Unlock it. bool tryLock(); // Try to lock it. bool isLocked(); // Check to see if it's locked. }; // End of Mutex //////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////// // ScopeMutex // A ScopeMutex is a nifty trick for locking a mutex during some segment of // code. On construction, it locks the Mutex that it is given and keeps it // locked until it is destroyed. Of course this also means that it will unlock // the mutex when it goes out of scope - which is precisely the point :-) // // The right way to use a ScopeMutex is to create it just before you need to // have control and then forget about it. From a design perspective, you might // want to make sure that whatever happens after the ScopeMutex has been // created is as short as possible and if it is not then you may want to // use the Mutex directly. // // The best place to use a ScopeMutex is where you might leave the controling // bit of code through a number of logical paths such as a logic tree or even // due to some exceptions. In this context it saves you having to track down // all of the possible cases and unlock the mutex in each of them. class ScopeMutex { private: Mutex& MyMutex; // ScopeMutex has an ordinary Mutex to use. public: ScopeMutex(Mutex& M); // Constructing a ScopeMutex requires a Mutex ~ScopeMutex(); // We do have special code for descrution. }; // End ScopeMutex //////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////// // ProductionGateway // A ProductionGateway encapsulates the thread synchronization required for a // producer / consumer relationship. For each call to the produce() method one // call to the consume() method can proceed. The object takes into account that // these methods may be called out of sequence and that, for example, produce() // might be called several times before any calls to consume. #ifdef WIN32 // Win32 Implementation //////////////////////////////////////////////////////// class ProductionGateway { private: HANDLE MySemaphore; // WIN32 makes this one easy w/ a 0 semi. public: ProductionGateway(); // The constructor and destructor handle ~ProductionGateway(); // creating and destroying the semi. void produce(); // Produce "releases" the semi. void consume(); // Consume "waits" if needed. }; #else // POSIX Implementation //////////////////////////////////////////////////////// class ProductionGateway { // Posix needs a few pieces for this. private: mutex_primative MyMutex; // Mutex to protect the data. pthread_cond_t MyConditionVariable; // A condition variable for signaling. int Product; // A count of unused calls to produce() int Waiting; // A count of waiting threads. int Signaled; // A count of signaled threads. public: ProductionGateway(); // The constructor and destructor handle ~ProductionGateway(); // creating and destroying the semi. void produce(); // Produce "releases" the semi. void consume(); // Consume "waits" if needed. }; #endif // End ProductionGateway //////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////// // The ThreadManager class provides a global thread management tool. All Thread // objects register themselves with the Threads object upon construction and // remove themselves from the registry upon destruction. The Threads object can // produce a status report for all of the known threads on the system and can // temporarily lock the existing thread so that it can be contacted reliably. // locking and unlocking the ThreadManager is intended only for short messages // that set flags in the thread or pass some small data packet. The lock only // prevents the thread from being destroyed before the message can be sent so // that the thread that owns the threadlock will not make any calls to a dead // pointer. Most apps should be designed so that the threadlock mechanism is // not required. class ThreadManager { // Central manager for threads. friend class Thread; // Threads are friends. private: Mutex MyMutex; // Protect our data with this. std::set KnownThreads; // Keep track of all threads. void rememberThread(Thread* T); // Threads register themselves. void forgetThread(Thread* T); // Threads remove themselves. Thread* LockedThread; // Pointer to locked thread if any. public: ThreadManager():LockedThread(0){} // Initialize nice and clean. ThreadStatusReport StatusReport(); // Get a status report. bool lockExistingThread(Thread* T); // Locks ThreadManager if T exists. void unlockExistingThread(Thread* T); // Unlocks ThreadManager if T locked. }; class ScopeThreadLock { // This is like a ScopeMutex for private: // the ThreadManager. Thread* MyLockedThread; // It needs to know it's Thread. public: ScopeThreadLock(Thread* T); // Locks T in ThreadManager if it can. ~ScopeThreadLock(); // Unlocks T in ThreadManager if locked. bool isGood(); // True if T was locked. bool isBad(); // False if T was not locked. }; // End Thread Manager //////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////// // A ProductionQueue is a templated, thread safe mechanism for implementing // a producer/consumer relationship. The objects in the queue should be simple // data so that they can be created, destroyed, and copied without trouble. Put // another way - the objects in the ProductionQueue should be lightweight // handles for other things. Those things should be created and destroyed // elsewhere. template // Templatized class ProductionQueue { // Production Queue Class private: Mutex myMutex; // Contains a mutex and volatile unsigned int LatestSize; // a volatile (blinking light) size ProductionGateway myGateway; // integrated with a production std::queue myQueue; // gateway and a queue. public: ProductionQueue() : LatestSize(0) {} // The size always starts at zero. T take() { // To consume a queued object myGateway.consume(); // we wait on the production gateway ScopeMutex OneAtATimePlease(myMutex); // and when we get through we lock T O = myQueue.front(); // the mutext, take the object on the myQueue.pop(); // front of the queue, pop it out, LatestSize = myQueue.size(); // and rest our size (blinking light). return O; // Then return the object we got. } void give(T O) { // To produce a queued object ScopeMutex OneAtATimePlease(myMutex); // we wait on the mutex. When we myQueue.push(O); // get through we push our object LatestSize = myQueue.size(); // into the queue, reset our size myGateway.produce(); // indicator and tell the gateway. } // When we're done it can be grabbed. unsigned int size() { // To check the size we look at return LatestSize; // the blinking light. } }; // End Production Queue //////////////////////////////////////////////////////////////////////////////// } // End namespace codedweller