|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485 |
- // threading.hpp
- //
- // (C) 2006 - 2009 MicroNeil Research Corporation.
- //
- // This program is part of the MicroNeil Research Open Library Project. For
- // more information go to http://www.microneil.com/OpenLibrary/index.html
- //
- // This program is free software; you can redistribute it and/or modify it
- // under the terms of the GNU General Public License as published by the
- // Free Software Foundation; either version 2 of the License, or (at your
- // option) any later version.
- //
- // This program is distributed in the hope that it will be useful, but WITHOUT
- // ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- // FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
- // more details.
- //
- // You should have received a copy of the GNU General Public License along with
- // this program; if not, write to the Free Software Foundation, Inc., 59 Temple
- // Place, Suite 330, Boston, MA 02111-1307 USA
-
- // The "Threading" module is a basic, cross-platform, multi-threading tool kit.
- // The differences between posix compatible systems and win32 based systems are
- // abstracted. On win32 systems, native win32 primatives are used to construct.
- // efficient, lightweight objects.
-
- // On others we assume we can use pthreads. In either case the objects we have
- // here are designed to cover all of the basics efficiently while hiding the
- // required under-cover work.
-
- // A lot of this module is coded here in the header with the inline keyword
- // because it is likely that the more basic objects can be efficiently compiled
- // as inline abstractions to native calls. Really basic systems won't need
- // anything beyond what is in this file.
-
- // 20070202.1601 _M Further research has suggested that using a Semaphore in
- // WIN32 environments in place of a CRITICAL_SECTION may provide the best
- // performance and stability on all platforms. Specifically, SMP platforms may
- // race and waste resources with CRITICAL_SECTIONs and in those cases it is
- // recommended that the CRITICAL_SECTIONs may be "throttled" using Semaphores
- // to limit the number of threads that may contend for a critical section. It
- // is also suggested that if the Semaphore has an initialization value of 1
- // the CRITICAL_SECTION is redundant. So this code has been modified to do
- // precisely that!
- //
- // This new version also includes a ProductionGateway object that simplifies
- // the producer/consumer model. The object keeps track of the number of calls
- // to produce() and consume() and ensures that threads will block on consume()
- // until a sufficient number of calls to produce() are made. That is, for every
- // one call to produce(), a call to consume() will be allowed to proceed. The
- // object also allows for the potentially asynchronous nature of these calls.
-
- // 20070530.1751 _M Added top level exception handling in threads along with
- // isRunning() and isBad() methods.
-
- // 20060528.1647 _M All of the basics are complete and tested on both WIN32 and
- // RHEL4 single and multiple processors.
-
- // Include MNR_threading Once Only =============================================
-
- #ifndef MNR_threading
- #define MNR_threading
-
- #include <set>
- #include <vector>
- #include <string>
- #include <queue>
- #include "faults.hpp"
-
- namespace CodeDweller {
-
- class ThreadManager; // ThreadManager does exist.
- extern ThreadManager Threads; // Master thread manager.
-
- ////////////////////////////////////////////////////////////////////////////////
- // Thread Status & Type
- //
- // ThreadState objects are constant static objects defined for each Thread class
- // so that the thread can update it's state by changing a pointer. The state
- // can then be compared between threads of the same type and can be read-out
- // as text for debugging purposes.
-
- class ThreadState { // Thread State Object.
- public:
- const std::string Name; // Text name of thread descriptor.
- ThreadState(std::string N) : Name(N) {} // Constructor requires text name.
- };
-
- // ThreadType objects are constant static objects defined for each Thread class
- // so that classes can be identified by type using a pointer to the constant.
-
- class ThreadType {
- public:
- const std::string Name;
- ThreadType(std::string N) : Name(N) {}
- };
-
- class Thread; // There is such thing as a Thread.
-
- class ThreadStatusRecord { // Describes a Thread's condition.
- private:
- Thread* Pointer; // A pointer to the thread.
- ThreadType* Type; // A descriptor of it's type.
- ThreadState* State; // A descriptor of it's state.
- std::string Name; // Name of the thread if any.
- bool isRunning; // True if the thread is running.
- bool isBad; // True if the thread is bad.
- std::string Fault; // Bad Thread's Fault if any.
-
- public:
- ThreadStatusRecord( // Initialize all items.
- Thread* P,
- ThreadType& T,
- ThreadState& S,
- bool R,
- bool B,
- std::string F,
- std::string N
- ) :
- Pointer(P),
- Type(&T),
- State(&S),
- Name(N),
- isRunning(R),
- isBad(B),
- Fault(F)
- {}
-
- ThreadStatusRecord& operator=(const ThreadStatusRecord& Right) { // Minimal Assignment Operator
- Pointer = Right.Pointer;
- Type = Right.Type;
- State = Right.State;
- isRunning = Right.isRunning;
- isBad = Right.isBad;
- Fault = Right.Fault;
- Name = Right.Name;
- return *this;
- }
-
- bool operator<(const ThreadStatusRecord& Right) { // Minimal Comparison Operator.
- return (Pointer < Right.Pointer);
- }
-
- // How to get the details of the report.
-
- const Thread* getPointer() { return Pointer; }
- const ThreadType& getType() { return *Type; }
- const ThreadState& getState() { return *State; }
- bool getRunning() { return isRunning; }
- bool getBad() { return isBad; }
- std::string getFault() { return Fault; }
- std::string getName() { return Name; }
- };
-
- typedef std::vector<ThreadStatusRecord> ThreadStatusReport; // Status report type.
-
- // End ThreadDescriptor
- ////////////////////////////////////////////////////////////////////////////////
-
- ////////////////////////////////////////////////////////////////////////////////
- // Win32 / POSIX abstractions
-
- #ifdef WIN32
-
- // When in WIN32 land...
- // Remember to compile (on GNU anyway) with -mthreads
-
- #include <windows.h>
- #include <process.h>
-
- typedef HANDLE thread_primative; // The WIN32 thread primative abstracts
- // HANDLE
-
- typedef HANDLE mutex_primative; // The WIN32 mutex primative abstracts
- // a HANDLE to a Semaphore.
-
- inline void threading_yield() { // When we want to yield time in WIN32
- SwitchToThread(); // we call SwitchToThread();
- }
-
- #else
-
- // When in POSIX land...
- // Remember to compile (on GMU anyway) with -pthread
-
- #include <pthread.h>
- #include <sched.h>
-
- typedef pthread_t thread_primative; // The POSIX thread primative abstracts
- // pthread_t
-
- typedef pthread_mutex_t mutex_primative; // The POSIX mutex primative abstracts
- // pthread_mutex_t
-
- inline void threading_yield() { // When we want to yield time in POSIX
- sched_yield(); // we call sched_yield();
- }
-
- #endif
-
- // End Win32 / POSIX abstractions
- ////////////////////////////////////////////////////////////////////////////////
-
- ////////////////////////////////////////////////////////////////////////////////
- // The Thread class gets extended to do any specific work. The pure virtual
- // function MyTask is overloaded by the derived class to define that work. It
- // is expected that the class will be initialized with any parameters that
- // will be used by the thread and that the thread will make available any
- // results through public interfaces either during and/or after the thread
- // has finished running.
-
- class Thread {
-
- private:
-
- ThreadState* MyThreadState; // Track current thread state.
-
- protected:
-
- const ThreadType& MyThreadType; // Identify thread type.
- const std::string MyThreadName; // Name string of this instance.
-
- thread_primative MyThread; // Abstracted thread.
- bool RunningFlag; // True when thread is in myTask()
- bool BadFlag; // True when myTask() throws!
- std::string BadWhat; // Bad exception what() if any.
- void CurrentThreadState(const ThreadState& TS); // Set thread state.
-
- public:
-
- Thread(); // Constructor (just in case)
- Thread(const ThreadType& T, std::string N); // Construct with specific Type/Name
- virtual ~Thread(); // Destructor (just in case)
-
- void run(); // Method to launch this thread.
- void join(); // Method to Join this thread.
- void launchTask(); // Launch and watch myTask().
-
- virtual void myTask() = 0; // The actual task must be overloaded.
-
- thread_primative getMyThread(); // Inspect my thread primative.
-
- bool isRunning(); // Return the Running flag state.
- bool isBad(); // Return the Bad flag state.
- const std::string MyFault(); // Return exception Bad fault if any.
-
- const std::string MyName() const; // The thread's name.
- const ThreadType& MyType(); // Thread type for this thread.
- const ThreadState& MyState(); // Returns the current thread state.
- const ThreadState& CurrentThreadState(); // Returns the current thread state.
-
- ThreadStatusRecord StatusReport(); // Return's the thread's status reprt.
-
- // Constants for Thread...
-
- const static ThreadType Type; // The thread's type.
-
- const static ThreadState ThreadInitialized; // Constructed successfully.
- const static ThreadState ThreadStarted; // Started.
- const static ThreadState ThreadFailed; // Failed by unhandled exception.
- const static ThreadState ThreadStopped; // Stopped normally.
- const static ThreadState ThreadDestroyed; // Safety value for destructed Threads.
-
- };
-
- // End Thread
- ////////////////////////////////////////////////////////////////////////////////
-
- ////////////////////////////////////////////////////////////////////////////////
- // The Mutex class abstracts a lightweight, very basic mutex object.
- // As with the Thread object, more ellaborate forms can be built up from
- // this basic mechanism. An important design constraint for this basic
- // mutex object is that it work even if the thread that's running was not
- // created with the Thread object... that ensures that it can be used in
- // code that is destined to function in other applications.
-
- class Mutex {
-
- private:
-
- mutex_primative MyMutex; // Here is our primative mutex.
- volatile bool IAmLocked; // Here is our Lock Count.
-
- public:
-
- Mutex(); // Construct the mutex.
- ~Mutex(); // Destroy the mutex.
-
- void lock(); // Lock it.
- void unlock(); // Unlock it.
- bool tryLock(); // Try to lock it.
- bool isLocked(); // Check to see if it's locked.
-
- };
-
- // End of Mutex
- ////////////////////////////////////////////////////////////////////////////////
-
- ////////////////////////////////////////////////////////////////////////////////
- // ScopeMutex
- // A ScopeMutex is a nifty trick for locking a mutex during some segment of
- // code. On construction, it locks the Mutex that it is given and keeps it
- // locked until it is destroyed. Of course this also means that it will unlock
- // the mutex when it goes out of scope - which is precisely the point :-)
- //
- // The right way to use a ScopeMutex is to create it just before you need to
- // have control and then forget about it. From a design perspective, you might
- // want to make sure that whatever happens after the ScopeMutex has been
- // created is as short as possible and if it is not then you may want to
- // use the Mutex directly.
- //
- // The best place to use a ScopeMutex is where you might leave the controling
- // bit of code through a number of logical paths such as a logic tree or even
- // due to some exceptions. In this context it saves you having to track down
- // all of the possible cases and unlock the mutex in each of them.
-
- class ScopeMutex {
-
- private:
-
- Mutex& MyMutex; // ScopeMutex has an ordinary Mutex to use.
-
- public:
-
- ScopeMutex(Mutex& M); // Constructing a ScopeMutex requires a Mutex
- ~ScopeMutex(); // We do have special code for descrution.
-
- };
-
- // End ScopeMutex
- ////////////////////////////////////////////////////////////////////////////////
-
- ////////////////////////////////////////////////////////////////////////////////
- // ProductionGateway
- // A ProductionGateway encapsulates the thread synchronization required for a
- // producer / consumer relationship. For each call to the produce() method one
- // call to the consume() method can proceed. The object takes into account that
- // these methods may be called out of sequence and that, for example, produce()
- // might be called several times before any calls to consume.
-
- #ifdef WIN32
-
- // Win32 Implementation ////////////////////////////////////////////////////////
-
- class ProductionGateway {
-
- private:
-
- HANDLE MySemaphore; // WIN32 makes this one easy w/ a 0 semi.
-
- public:
-
- ProductionGateway(); // The constructor and destructor handle
- ~ProductionGateway(); // creating and destroying the semi.
-
- void produce(); // Produce "releases" the semi.
- void consume(); // Consume "waits" if needed.
-
- };
-
- #else
-
- // POSIX Implementation ////////////////////////////////////////////////////////
-
- class ProductionGateway { // Posix needs a few pieces for this.
-
- private:
-
- mutex_primative MyMutex; // Mutex to protect the data.
- pthread_cond_t MyConditionVariable; // A condition variable for signaling.
-
- int Product; // A count of unused calls to produce()
- int Waiting; // A count of waiting threads.
- int Signaled; // A count of signaled threads.
-
- public:
-
- ProductionGateway(); // The constructor and destructor handle
- ~ProductionGateway(); // creating and destroying the semi.
-
- void produce(); // Produce "releases" the semi.
- void consume(); // Consume "waits" if needed.
-
- };
-
- #endif
-
- // End ProductionGateway
- ////////////////////////////////////////////////////////////////////////////////
-
- ////////////////////////////////////////////////////////////////////////////////
- // The ThreadManager class provides a global thread management tool. All Thread
- // objects register themselves with the Threads object upon construction and
- // remove themselves from the registry upon destruction. The Threads object can
- // produce a status report for all of the known threads on the system and can
- // temporarily lock the existing thread so that it can be contacted reliably.
- // locking and unlocking the ThreadManager is intended only for short messages
- // that set flags in the thread or pass some small data packet. The lock only
- // prevents the thread from being destroyed before the message can be sent so
- // that the thread that owns the threadlock will not make any calls to a dead
- // pointer. Most apps should be designed so that the threadlock mechanism is
- // not required.
-
- class ThreadManager { // Central manager for threads.
- friend class Thread; // Threads are friends.
- private:
-
- Mutex MyMutex; // Protect our data with this.
- std::set<Thread*> KnownThreads; // Keep track of all threads.
-
- void rememberThread(Thread* T); // Threads register themselves.
- void forgetThread(Thread* T); // Threads remove themselves.
-
- Thread* LockedThread; // Pointer to locked thread if any.
-
- public:
-
- ThreadManager():LockedThread(0){} // Initialize nice and clean.
-
- ThreadStatusReport StatusReport(); // Get a status report.
- bool lockExistingThread(Thread* T); // Locks ThreadManager if T exists.
- void unlockExistingThread(Thread* T); // Unlocks ThreadManager if T locked.
-
- };
-
- class ScopeThreadLock { // This is like a ScopeMutex for
- private: // the ThreadManager.
- Thread* MyLockedThread; // It needs to know it's Thread.
-
- public:
- ScopeThreadLock(Thread* T); // Locks T in ThreadManager if it can.
- ~ScopeThreadLock(); // Unlocks T in ThreadManager if locked.
- bool isGood(); // True if T was locked.
- bool isBad(); // False if T was not locked.
- };
-
- // End Thread Manager
- ////////////////////////////////////////////////////////////////////////////////
-
- ////////////////////////////////////////////////////////////////////////////////
- // A ProductionQueue is a templated, thread safe mechanism for implementing
- // a producer/consumer relationship. The objects in the queue should be simple
- // data so that they can be created, destroyed, and copied without trouble. Put
- // another way - the objects in the ProductionQueue should be lightweight
- // handles for other things. Those things should be created and destroyed
- // elsewhere.
-
- template<typename T> // Templatized
- class ProductionQueue { // Production Queue Class
- private:
- Mutex myMutex; // Contains a mutex and
- volatile unsigned int LatestSize; // a volatile (blinking light) size
- ProductionGateway myGateway; // integrated with a production
- std::queue<T> myQueue; // gateway and a queue.
-
- public:
- ProductionQueue() : LatestSize(0) {} // The size always starts at zero.
-
- T take() { // To consume a queued object
- myGateway.consume(); // we wait on the production gateway
- ScopeMutex OneAtATimePlease(myMutex); // and when we get through we lock
- T O = myQueue.front(); // the mutext, take the object on the
- myQueue.pop(); // front of the queue, pop it out,
- LatestSize = myQueue.size(); // and rest our size (blinking light).
- return O; // Then return the object we got.
- }
-
- void give(T O) { // To produce a queued object
- ScopeMutex OneAtATimePlease(myMutex); // we wait on the mutex. When we
- myQueue.push(O); // get through we push our object
- LatestSize = myQueue.size(); // into the queue, reset our size
- myGateway.produce(); // indicator and tell the gateway.
- } // When we're done it can be grabbed.
-
- unsigned int size() { // To check the size we look at
- return LatestSize; // the blinking light.
- }
- };
-
- // End Production Queue
- ////////////////////////////////////////////////////////////////////////////////
- }
- #endif
-
- // End Of Include MNR_threading Once Only ======================================
|