// threading.hpp // // (C) 2006 - 2009 MicroNeil Research Corporation. // // This program is part of the MicroNeil Research Open Library Project. For // more information go to http://www.microneil.com/OpenLibrary/index.html // // This program is free software; you can redistribute it and/or modify it // under the terms of the GNU General Public License as published by the // Free Software Foundation; either version 2 of the License, or (at your // option) any later version. // // This program is distributed in the hope that it will be useful, but WITHOUT // ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or // FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for // more details. // // You should have received a copy of the GNU General Public License along with // this program; if not, write to the Free Software Foundation, Inc., 59 Temple // Place, Suite 330, Boston, MA 02111-1307 USA // The "Threading" module is a basic, cross-platform, multi-threading tool kit. // The differences between posix compatible systems and win32 based systems are // abstracted. On win32 systems, native win32 primatives are used to construct. // efficient, lightweight objects. // On others we assume we can use pthreads. In either case the objects we have // here are designed to cover all of the basics efficiently while hiding the // required under-cover work. // A lot of this module is coded here in the header with the inline keyword // because it is likely that the more basic objects can be efficiently compiled // as inline abstractions to native calls. Really basic systems won't need // anything beyond what is in this file. // 20070202.1601 _M Further research has suggested that using a Semaphore in // WIN32 environments in place of a CRITICAL_SECTION may provide the best // performance and stability on all platforms. Specifically, SMP platforms may // race and waste resources with CRITICAL_SECTIONs and in those cases it is // recommended that the CRITICAL_SECTIONs may be "throttled" using Semaphores // to limit the number of threads that may contend for a critical section. It // is also suggested that if the Semaphore has an initialization value of 1 // the CRITICAL_SECTION is redundant. So this code has been modified to do // precisely that! // // This new version also includes a ProductionGateway object that simplifies // the producer/consumer model. The object keeps track of the number of calls // to produce() and consume() and ensures that threads will block on consume() // until a sufficient number of calls to produce() are made. That is, for every // one call to produce(), a call to consume() will be allowed to proceed. The // object also allows for the potentially asynchronous nature of these calls. // 20070530.1751 _M Added top level exception handling in threads along with // isRunning() and isBad() methods. // 20060528.1647 _M All of the basics are complete and tested on both WIN32 and // RHEL4 single and multiple processors. // Include MNR_threading Once Only ============================================= #ifndef MNR_threading #define MNR_threading #include #include #include #include #include using namespace std; class ThreadManager; // ThreadManager does exist. extern ThreadManager Threads; // Master thread manager. //////////////////////////////////////////////////////////////////////////////// // Thread Status & Type // // ThreadState objects are constant static objects defined for each Thread class // so that the thread can update it's state by changing a pointer. The state // can then be compared between threads of the same type and can be read-out // as text for debugging purposes. class ThreadState { // Thread State Object. public: const string Name; // Text name of thread descriptor. ThreadState(string N) : Name(N) {} // Constructor requires text name. }; // ThreadType objects are constant static objects defined for each Thread class // so that classes can be identified by type using a pointer to the constant. class ThreadType { public: const string Name; ThreadType(string N) : Name(N) {} }; class Thread; // There is such thing as a Thread. class ThreadStatusRecord { // Describes a Thread's condition. private: Thread* Pointer; // A pointer to the thread. ThreadType* Type; // A descriptor of it's type. ThreadState* State; // A descriptor of it's state. string Name; // Name of the thread if any. bool isRunning; // True if the thread is running. bool isBad; // True if the thread is bad. string Fault; // Bad Thread's Fault if any. public: ThreadStatusRecord( // Initialize all items. Thread* P, ThreadType& T, ThreadState& S, bool R, bool B, string F, string N ) : Pointer(P), Type(&T), State(&S), Name(N), isRunning(R), isBad(B), Fault(F) {} ThreadStatusRecord& operator=(const ThreadStatusRecord& Right) { // Minimal Assignment Operator Pointer = Right.Pointer; Type = Right.Type; State = Right.State; isRunning = Right.isRunning; isBad = Right.isBad; Fault = Right.Fault; Name = Right.Name; return *this; } bool operator<(const ThreadStatusRecord& Right) { // Minimal Comparison Operator. return (Pointer < Right.Pointer); } // How to get the details of the report. const Thread* getPointer() { return Pointer; } const ThreadType& getType() { return *Type; } const ThreadState& getState() { return *State; } bool getRunning() { return isRunning; } bool getBad() { return isBad; } string getFault() { return Fault; } string getName() { return Name; } }; typedef vector ThreadStatusReport; // Status report type. // End ThreadDescriptor //////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////// // Win32 / POSIX abstractions #ifdef WIN32 // When in WIN32 land... // Remember to compile (on GNU anyway) with -mthreads #include #include typedef HANDLE thread_primative; // The WIN32 thread primative abstracts // HANDLE typedef HANDLE mutex_primative; // The WIN32 mutex primative abstracts // a HANDLE to a Semaphore. inline void threading_yield() { // When we want to yield time in WIN32 SwitchToThread(); // we call SwitchToThread(); } #else // When in POSIX land... // Remember to compile (on GMU anyway) with -pthread #include #include typedef pthread_t thread_primative; // The POSIX thread primative abstracts // pthread_t typedef pthread_mutex_t mutex_primative; // The POSIX mutex primative abstracts // pthread_mutex_t inline void threading_yield() { // When we want to yield time in POSIX sched_yield(); // we call sched_yield(); } #endif // End Win32 / POSIX abstractions //////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////// // The Thread class gets extended to do any specific work. The pure virtual // function MyTask is overloaded by the derived class to define that work. It // is expected that the class will be initialized with any parameters that // will be used by the thread and that the thread will make available any // results through public interfaces either during and/or after the thread // has finished running. class Thread { private: ThreadState* MyThreadState; // Track current thread state. protected: const ThreadType& MyThreadType; // Identify thread type. const string MyThreadName; // Name string of this instance. thread_primative MyThread; // Abstracted thread. bool RunningFlag; // True when thread is in myTask() bool BadFlag; // True when myTask() throws! string BadWhat; // Bad exception what() if any. void CurrentThreadState(const ThreadState& TS); // Set thread state. public: Thread(); // Constructor (just in case) Thread(const ThreadType& T, string N); // Construct with specific Type/Name virtual ~Thread(); // Destructor (just in case) void run(); // Method to launch this thread. void join(); // Method to Join this thread. void launchTask(); // Launch and watch myTask(). virtual void myTask() = 0; // The actual task must be overloaded. thread_primative getMyThread(); // Inspect my thread primative. bool isRunning(); // Return the Running flag state. bool isBad(); // Return the Bad flag state. const string MyFault(); // Return exception Bad fault if any. const string MyName(); // The thread's name. const ThreadType& MyType(); // Thread type for this thread. const ThreadState& MyState(); // Returns the current thread state. const ThreadState& CurrentThreadState(); // Returns the current thread state. ThreadStatusRecord StatusReport(); // Return's the thread's status reprt. // Constants for Thread... const static ThreadType Type; // The thread's type. const static ThreadState ThreadInitialized; // Constructed successfully. const static ThreadState ThreadStarted; // Started. const static ThreadState ThreadFailed; // Failed by unhandled exception. const static ThreadState ThreadStopped; // Stopped normally. const static ThreadState ThreadDestroyed; // Safety value for destructed Threads. }; // End Thread //////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////// // The Mutex class abstracts a lightweight, very basic mutex object. // As with the Thread object, more ellaborate forms can be built up from // this basic mechanism. An important design constraint for this basic // mutex object is that it work even if the thread that's running was not // created with the Thread object... that ensures that it can be used in // code that is destined to function in other applications. class Mutex { private: mutex_primative MyMutex; // Here is our primative mutex. volatile bool IAmLocked; // Here is our Lock Count. public: Mutex(); // Construct the mutex. ~Mutex(); // Destroy the mutex. void lock(); // Lock it. void unlock(); // Unlock it. bool tryLock(); // Try to lock it. bool isLocked(); // Check to see if it's locked. }; // End of Mutex //////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////// // ScopeMutex // A ScopeMutex is a nifty trick for locking a mutex during some segment of // code. On construction, it locks the Mutex that it is given and keeps it // locked until it is destroyed. Of course this also means that it will unlock // the mutex when it goes out of scope - which is precisely the point :-) // // The right way to use a ScopeMutex is to create it just before you need to // have control and then forget about it. From a design perspective, you might // want to make sure that whatever happens after the ScopeMutex has been // created is as short as possible and if it is not then you may want to // use the Mutex directly. // // The best place to use a ScopeMutex is where you might leave the controling // bit of code through a number of logical paths such as a logic tree or even // due to some exceptions. In this context it saves you having to track down // all of the possible cases and unlock the mutex in each of them. class ScopeMutex { private: Mutex& MyMutex; // ScopeMutex has an ordinary Mutex to use. public: ScopeMutex(Mutex& M); // Constructing a ScopeMutex requires a Mutex ~ScopeMutex(); // We do have special code for descrution. }; // End ScopeMutex //////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////// // ProductionGateway // A ProductionGateway encapsulates the thread synchronization required for a // producer / consumer relationship. For each call to the produce() method one // call to the consume() method can proceed. The object takes into account that // these methods may be called out of sequence and that, for example, produce() // might be called several times before any calls to consume. #ifdef WIN32 // Win32 Implementation //////////////////////////////////////////////////////// class ProductionGateway { private: HANDLE MySemaphore; // WIN32 makes this one easy w/ a 0 semi. public: ProductionGateway(); // The constructor and destructor handle ~ProductionGateway(); // creating and destroying the semi. void produce(); // Produce "releases" the semi. void consume(); // Consume "waits" if needed. }; #else // POSIX Implementation //////////////////////////////////////////////////////// class ProductionGateway { // Posix needs a few pieces for this. private: mutex_primative MyMutex; // Mutex to protect the data. pthread_cond_t MyConditionVariable; // A condition variable for signaling. int Product; // A count of unused calls to produce() int Waiting; // A count of waiting threads. int Signaled; // A count of signaled threads. public: ProductionGateway(); // The constructor and destructor handle ~ProductionGateway(); // creating and destroying the semi. void produce(); // Produce "releases" the semi. void consume(); // Consume "waits" if needed. }; #endif // End ProductionGateway //////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////// // The ThreadManager class provides a global thread management tool. All Thread // objects register themselves with the Threads object upon construction and // remove themselves from the registry upon destruction. The Threads object can // produce a status report for all of the known threads on the system and can // temporarily lock the existing thread so that it can be contacted reliably. // locking and unlocking the ThreadManager is intended only for short messages // that set flags in the thread or pass some small data packet. The lock only // prevents the thread from being destroyed before the message can be sent so // that the thread that owns the threadlock will not make any calls to a dead // pointer. Most apps should be designed so that the threadlock mechanism is // not required. class ThreadManager { // Central manager for threads. friend class Thread; // Threads are friends. private: Mutex MyMutex; // Protect our data with this. set KnownThreads; // Keep track of all threads. void rememberThread(Thread* T); // Threads register themselves. void forgetThread(Thread* T); // Threads remove themselves. Thread* LockedThread; // Pointer to locked thread if any. public: ThreadManager():LockedThread(0){} // Initialize nice and clean. ThreadStatusReport StatusReport(); // Get a status report. bool lockExistingThread(Thread* T); // Locks ThreadManager if T exists. void unlockExistingThread(Thread* T); // Unlocks ThreadManager if T locked. }; class ScopeThreadLock { // This is like a ScopeMutex for private: // the ThreadManager. Thread* MyLockedThread; // It needs to know it's Thread. public: ScopeThreadLock(Thread* T); // Locks T in ThreadManager if it can. ~ScopeThreadLock(); // Unlocks T in ThreadManager if locked. bool isGood(); // True if T was locked. bool isBad(); // False if T was not locked. }; // End Thread Manager //////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////// // A ProductionQueue is a templated, thread safe mechanism for implementing // a producer/consumer relationship. The objects in the queue should be simple // data so that they can be created, destroyed, and copied without trouble. Put // another way - the objects in the ProductionQueue should be lightweight // handles for other things. Those things should be created and destroyed // elsewhere. template // Templatized class ProductionQueue { // Production Queue Class private: Mutex myMutex; // Contains a mutex and volatile int LatestSize; // a volatile (blinking light) size ProductionGateway myGateway; // integrated with a production queue myQueue; // gateway and a queue. public: ProductionQueue() : LatestSize(0) {} // The size always starts at zero. T take() { // To consume a queued object myGateway.consume(); // we wait on the production gateway ScopeMutex OneAtATimePlease(myMutex); // and when we get through we lock T O = myQueue.front(); // the mutext, take the object on the myQueue.pop(); // front of the queue, pop it out, LatestSize = myQueue.size(); // and rest our size (blinking light). return O; // Then return the object we got. } void give(T O) { // To produce a queued object ScopeMutex OneAtATimePlease(myMutex); // we wait on the mutex. When we myQueue.push(O); // get through we push our object LatestSize = myQueue.size(); // into the queue, reset our size myGateway.produce(); // indicator and tell the gateway. } // When we're done it can be grabbed. int size() { // To check the size we look at return LatestSize; // the blinking light. } }; // End Production Queue //////////////////////////////////////////////////////////////////////////////// #endif // End Of Include MNR_threading Once Only ======================================