// threading.cpp // // (C) 2006 - 2009 MicroNeil Research Corporation. // // This program is part of the MicroNeil Research Open Library Project. For // more information go to http://www.microneil.com/OpenLibrary/index.html // // This program is free software; you can redistribute it and/or modify it // under the terms of the GNU General Public License as published by the // Free Software Foundation; either version 2 of the License, or (at your // option) any later version. // // This program is distributed in the hope that it will be useful, but WITHOUT // ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or // FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for // more details. // // You should have received a copy of the GNU General Public License along with // this program; if not, write to the Free Software Foundation, Inc., 59 Temple // Place, Suite 330, Boston, MA 02111-1307 USA // For details on the Threading module and development history see threading.hpp #include "threading.hpp" using namespace std; // Introduce std namespace. ThreadManager Threads; // Master thread manager. void ThreadManager::rememberThread(Thread* T) { // Threads register themselves. ScopeMutex ThereCanBeOnlyOne(MyMutex); // Protect the known pool. KnownThreads.insert(T); // Add the new thread pointer. } void ThreadManager::forgetThread(Thread* T) { // Threads remove themselves. ScopeMutex ThereCanBeOnlyOne(MyMutex); // Protect the known pool. KnownThreads.erase(T); // Add the new thread pointer. } ThreadStatusReport ThreadManager::StatusReport() { // Get a status report, All Threads. ScopeMutex ThereCanBeOnlyOne(MyMutex); // Protect our set -- a moment in time. ThreadStatusReport Answer; // Create our vector to hold the report. for( // Loop through all of the Threads. set::iterator iT = KnownThreads.begin(); iT != KnownThreads.end(); iT++ ) { // Grab each Threads' report. Thread& X = *(*iT); // Handy reference to the Thread. Answer.push_back(X.StatusReport()); // Push back each Thread's report. } return Answer; // Return the finished report. } bool ThreadManager::lockExistingThread(Thread* T) { // Locks ThreadManager if T exists. MyMutex.lock(); // Lock the mutex for everyone. if(KnownThreads.end() == KnownThreads.find(T)) { // If we do not find T in our set MyMutex.unlock(); // then unlock the mutex and return return false; // false. } // If we did find it then LockedThread = T; // set our locked thread and return true; // return true; } const RuntimeCheck ThreadingCheck1("ThreadManager::unlockExistingThread():ThreadingCheck1(0 != LockedThread)"); const RuntimeCheck ThreadingCheck2("ThreadManager::unlockExistingThread():ThreadingCheck2(T == LockedThread)"); void ThreadManager::unlockExistingThread(Thread* T) { // Unlocks ThreadManager if T locked. ThreadingCheck1(0 != LockedThread); // We had better have a locked thread. ThreadingCheck2(T == LockedThread); // The locked thread had better match. LockedThread = 0; // Clear the locked thread. MyMutex.unlock(); // Unlock the mutex. } //// Scope Thread Lock allows for a safe way to lock threads through the Threads //// object for delivering short messages. Just like a ScopeMutex, when the object //// goes away the lock is released. ScopeThreadLock::ScopeThreadLock(Thread* T) : // Construct a scope lock on a Thread. MyLockedThread(0) { // To star with we have no lock. if(Threads.lockExistingThread(T)) { // If we achieve a lock then we MyLockedThread = T; // remember it. Our destructor will } // unlock it if we were successful. } ScopeThreadLock::~ScopeThreadLock() { // Destruct a scope lock on a Thread. if(0 != MyLockedThread) { // If we were successfully constructed Threads.unlockExistingThread(MyLockedThread); // we can unlock the thread and MyLockedThread = 0; // forget about it before we go away. } } bool ScopeThreadLock::isGood() { // If we have successfully locked T return (0 != MyLockedThread) ? true:false; // it will NOT be 0, so return true. } bool ScopeThreadLock::isBad() { // If we did not successfully lock T return (0 == MyLockedThread) ? false:true; // it will be 0, so return false. } //////////////////////////////////////////////////////////////////////////////// // Thread const ThreadType Thread::Type("Generic Thread"); const ThreadState Thread::ThreadInitialized("Thread Initialized"); const ThreadState Thread::ThreadStarted("Thread Started"); const ThreadState Thread::ThreadFailed("Thread Failed"); const ThreadState Thread::ThreadStopped("Thread Stopped"); const ThreadState Thread::ThreadDestroyed("Thread Destroyed"); bool Thread::isRunning() { return RunningFlag; } // Return RunningFlag state. bool Thread::isBad() { return BadFlag; } // Return BadFlag state. const string Thread::MyFault() { return BadWhat; } // Return exception Bad fault if any. const string Thread::MyName() { return MyThreadName; } // Return the instance name if any. const ThreadType& Thread::MyType() { return MyThreadType; } // Return the instance Thread Type. const ThreadState& Thread::MyState() { return (*MyThreadState); } // Thread state for this instance. void Thread::CurrentThreadState(const ThreadState& TS) { // Set Current Thread State. MyThreadState = const_cast(&TS); } const ThreadState& Thread::CurrentThreadState() { return (*MyThreadState); } // Get Current Thread State. ThreadStatusRecord Thread::StatusReport() { // Get a status report from this thread. return ThreadStatusRecord( // Status record. this, const_cast(MyThreadType), *MyThreadState, RunningFlag, BadFlag, BadWhat, MyThreadName ); } // launchTask() calls and monitors myTask for exceptions and set's the correct // states for the isBad and isRunning flags. void Thread::launchTask() { // Launch and watch myTask() try { // Do this safely. RunningFlag = true; // Now we are running. CurrentThreadState(ThreadStarted); // Set the running state. myTask(); // myTask() is called. } // myTask() should handle exceptions. catch(exception& e) { // Unhandled exceptions are informative: BadFlag = true; // They mean the thread went bad but BadWhat = e.what(); // we have an idea what went wrong. } // We shouldn't get other kinds of catch(...) { // exceptions because if things go BadFlag = true; // wrong and one gets through this BadWhat = "Unkown Exception(...)"; // is all we can say about it. } RunningFlag = false; // When we're done, we're done. if(BadFlag) CurrentThreadState(ThreadFailed); // If we're bad we failed. else CurrentThreadState(ThreadStopped); // If we're not bad we stopped. } // getMyThread() returns the local thread primative. thread_primative Thread::getMyThread() { return MyThread; } // Return my thread primative. // runThreadTask() is a helper function to start threads. It is the function // that is acutally launched as a new thread. It's whole job is to call the // myTask() method on the object passed to it as it is launched. // The run() method creates a new thread with ThreadRunner() as the main // function, having passed it's object. // WIN32 and POSIX have different versions of both the main thread function // and the way to launch it. #ifdef WIN32 Thread::Thread() : // When constructing a WIN32 thread MyThreadType(Thread::Type), // Use generic Thread Type. MyThreadName("UnNamed Thread"), // Use a generic Thread Name. MyThread(NULL), // Null the thread handle. RunningFlag(false), // Couldn't be running yet. BadFlag(false) { // Couldn't be bad yet. Threads.rememberThread(this); // Remember this thread. CurrentThreadState(ThreadInitialized); // Set our initialized state. } Thread::Thread(const ThreadType& T, const string N) : // Construct with specific Type/Name MyThreadType(T), // Use generic Thread Type. MyThreadName(N), // Use a generic Thread Name. MyThread(NULL), // Null the thread handle. RunningFlag(false), // Couldn't be running yet. BadFlag(false) { // Couldn't be bad yet. Threads.rememberThread(this); // Remember this thread. CurrentThreadState(ThreadInitialized); // Set our initialized state. } Thread::~Thread() { // In WIN32 land when we destroy the if(NULL != MyThread) { // thread object check for a valid CloseHandle(MyThread); // thread handle and destroy it if } // it exists. RunningFlag = false; // The thread is not running. Threads.forgetThread(this); // Forget this thread. CurrentThreadState(ThreadDestroyed); // The Thread has left the building. } unsigned __stdcall runThreadTask(void* thread_object) { // The WIN32 version has this form. ((Thread*)thread_object)->launchTask(); // Run the task. _endthreadex(0); // Signal the thread is finished. return 0; // Satisfy the unsigned return. } void Thread::run() { // Run a WIN32 thread... unsigned tid; // Thread id to toss. Only need Handle. MyThread = (HANDLE) _beginthreadex(NULL,0,runThreadTask,this,0,&tid); // Create a thread calling ThreadRunner if(NULL == MyThread) BadFlag = true; // and test that the resutl was valid. } void Thread::join() { // To join in WIN32 WaitForSingleObject(MyThread, INFINITE); // Wait for the thread by handle. } #else Thread::Thread() : // POSIX Thread constructor. MyThreadType(Thread::Type), // Use a generic Thread Type. MyThreadName("UnNamed Thread"), // Use a generic Thread Name. RunningFlag(false), // Can't be running yet. BadFlag(false) { // Can't be bad yet. Threads.rememberThread(this); // Remember this thread. CurrentThreadState(ThreadInitialized); // Set our initialized state. } Thread::Thread(const ThreadType& T, const string N) : // POSIX Specific Thread Constructor. MyThreadType(T), // Use a generic Thread Type. MyThreadName(N), // Use a generic Thread Name. RunningFlag(false), // Can't be running yet. BadFlag(false) { // Can't be bad yet. Threads.rememberThread(this); // Remember this thread. CurrentThreadState(ThreadInitialized); // Set our initialized state. } Thread::~Thread() { // POSIX destructor. RunningFlag = false; // Not running now for sure. Threads.forgetThread(this); // Forget this thread. CurrentThreadState(ThreadDestroyed); // The Thread has left the building. } void* runThreadTask(void* thread_object) { // The POSIX version has this form. ((Thread*)thread_object)->launchTask(); return NULL; } void Thread::run() { // Run a POSIX thread... int result = pthread_create(&MyThread, NULL, runThreadTask, this); // Create a thread calling ThreadRunner if(0 != result) BadFlag = true; // and test that there was no error. } void Thread::join() { // To join in POSIX pthread_join(MyThread, NULL); // call pthread_join with MyThread. } #endif // End Thread //////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////// // Mutex #ifdef WIN32 // WIN32 Mutex Implementation ////////////////////////////////////////////////// // The original design of the WIN32 Mutex used critical sections. However after // additional research it was determined that the use of a Semaphore with an // initial count of 1 would work better overall on multiple Winx platforms - // especially SMP systems. const RuntimeCheck ThreadingCheck3("Mutex::Mutex():ThreadingCheck3(NULL != MyMutex)"); Mutex::Mutex() : // Creating a WIN32 Mutex means IAmLocked(false) { // Setting IAmLocked to false and MyMutex = CreateSemaphore(NULL, 1, 1, NULL); // create a semaphore object with ThreadingCheck3(NULL != MyMutex); // a count of 1. } const ExitCheck ThreadingCheck4("Mutex::~Mutex():"); Mutex::~Mutex() { // Destroying a WIN32 Mutex means ThreadingCheck4(false == IAmLocked); // Make sure we're not in use and CloseHandle(MyMutex); // destroy the semaphore object. } bool Mutex::tryLock() { // Trying to lock WIN32 Mutex means bool DoIHaveIt = false; // Start with a pessimistic assumption if( false == IAmLocked && // If we have a shot at this and WAIT_OBJECT_0 == WaitForSingleObject(MyMutex, 0) // we actually get hold of the semaphore ) { // then we can set our flags... IAmLocked = true; // Set IAmLocked, because we are and DoIHaveIt = true; // set our result to true. } return DoIHaveIt; // Return true if we got it (see above). } const RuntimeCheck ThreadingCheck5("Mutex::lock():ThreadingCheck5(WAIT_OBJECT_0 == WaitForSingleObject(MyMutex, INFINITE))"); void Mutex::lock() { // Locking the WIN32 Mutex means ThreadingCheck5(WAIT_OBJECT_0 == WaitForSingleObject(MyMutex, INFINITE)); // Wait on the semaphore - only 1 will IAmLocked = true; // get through or we have a big problem. } const LogicCheck ThreadingCheck6("Mutex::unlock():ThreadingCheck6(true == IAmLocked)"); void Mutex::unlock() { // Unlocking the WIN32 Mutex means ThreadingCheck6(true == IAmLocked); // making sure we're really locked then IAmLocked = false; // reset the IAmLocked flag and ReleaseSemaphore(MyMutex, 1, NULL); // release the semaphore. } bool Mutex::isLocked() { return IAmLocked; } // Return the IAmLocked flag. #else // POSIX Mutex Implementation ////////////////////////////////////////////////// const RuntimeCheck ThreadingCheck7("Mutex::Mutex():ThreadingCheck7(0 == pthread_mutex_init(&MyMutex,NULL))"); Mutex::Mutex() : // Constructing a POSIX mutex means IAmLocked(false) { // setting the IAmLocked flag to false and ThreadingCheck7(0 == pthread_mutex_init(&MyMutex,NULL)); // initializing the mutex_t object. } const ExitCheck ThreadingCheck8("Mutex::~Mutex():ThreadingCheck8(false == IAmLocked)"); const ExitCheck ThreadingCheck9("Mutex::~Mutex():ThreadingCheck9(0 == pthread_mutex_destroy(&MyMutex))"); Mutex::~Mutex() { // Before we destroy our mutex we check ThreadingCheck8(false == IAmLocked); // to see that it is not locked and ThreadingCheck9(0 == pthread_mutex_destroy(&MyMutex)); // destroy the primative. } const RuntimeCheck ThreadingCheck10("Mutex::lock():ThreadingCheck10(0 == pthread_mutex_lock(&MyMutex));"); void Mutex::lock() { // Locking a POSIX mutex means ThreadingCheck10(0 == pthread_mutex_lock(&MyMutex)); // asserting our lock was successful and IAmLocked = true; // setting the IAmLocked flag. } const LogicCheck ThreadingCheck11("Mutex::unlock():ThreadingCheck11(true == IAmLocked)"); const RuntimeCheck ThreadingCheck12("Mutex::unlock():ThreadingCheck12(0 == pthread_mutex_unlock(&MyMutex))"); void Mutex::unlock() { // Unlocking a POSIX mutex means ThreadingCheck11(true == IAmLocked); // asserting that we are locked, IAmLocked = false; // clearing the IAmLocked flag, and ThreadingCheck12(0 == pthread_mutex_unlock(&MyMutex)); // unlocking the actual mutex. } bool Mutex::tryLock() { // Trying to lock a POSIX mutex means bool DoIHaveIt = false; // starting off pessimistically. if(false == IAmLocked) { // If we are not locked yet then we if(0 == pthread_mutex_trylock(&MyMutex)) { // try to lock the mutex. If we succeed IAmLocked = true; // we set our IAmLocked flag and our DoIHaveIt = true; // DoIHaveIt flag to true; } } return DoIHaveIt; // In any case we return the result. } bool Mutex::isLocked() { return IAmLocked; } // Return the IAmLocked flag. #endif // End Mutex //////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////// // ScopeMutex ScopeMutex::ScopeMutex(Mutex& M) : // When constructing a ScopeMutex, MyMutex(M) { // Initialize MyMutex with what we are given MyMutex.lock(); // and then immediately lock it. } ScopeMutex::~ScopeMutex() { // When a ScopeMutex is destroyed, MyMutex.unlock(); // it first unlocks it's mutex. } // End ScopeMutex //////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////// // Production Gateway #ifdef WIN32 // Win32 Implementation //////////////////////////////////////////////////////// const RuntimeCheck ThreadingCheck13("ProductionGateway::ProductionGateway():ThreadingCheck13(NULL != MySemaphore)"); ProductionGateway::ProductionGateway() { // Construct in Windows like this: const int HUGENUMBER = 0x7fffffL; // Work without any real limits. MySemaphore = CreateSemaphore(NULL, 0, HUGENUMBER, NULL); // Create a Semaphore for signalling. ThreadingCheck13(NULL != MySemaphore); // That should always work. } ProductionGateway::~ProductionGateway() { // Be sure to close it when we're done. CloseHandle(MySemaphore); } void ProductionGateway::produce() { // To produce() in WIN32 we ReleaseSemaphore(MySemaphore, 1, NULL); // release 1 count into the semaphore. } void ProductionGateway::consume() { // To consume() in WIN32 we WaitForSingleObject(MySemaphore, INFINITE); // wait for a count in the semaphore. } #else // POSIX Implementation //////////////////////////////////////////////////////// const RuntimeCheck ThreadingCheck14("ProductionGateway::ProductionGateway():ThreadingCheck14(0 == pthread_mutex_init(&MyMutex, NULL));"); const RuntimeCheck ThreadingCheck15("ProductionGateway::ProductionGateway():ThreadingCheck15(0 == pthread_cond_init(&MyConditionVariable, NULL))"); ProductionGateway::ProductionGateway() : // Construct in POSIX like this: Product(0), // All of our counts start at zero. Waiting(0), Signaled(0) { ThreadingCheck14(0 == pthread_mutex_init(&MyMutex, NULL)); // Initialize our mutex. ThreadingCheck15(0 == pthread_cond_init(&MyConditionVariable, NULL)); // Initialize our condition variable. } const ExitCheck ThreadingCheck16("ProductionGateway::~ProductionGateway():ThreadingCheck16(0 == pthread_mutex_destroy(&MyMutex))"); const ExitCheck ThreadingCheck17("ProductionGateway::~ProductionGateway():ThreadingCheck17(0 == pthread_cond_destroy(&MyConditionVariable))"); ProductionGateway::~ProductionGateway() { // When we're done we must destroy ThreadingCheck16(0 == pthread_mutex_destroy(&MyMutex)); // our local mutex and ThreadingCheck17(0 == pthread_cond_destroy(&MyConditionVariable)); // our condition variable. } const RuntimeCheck ThreadingCheck18("ProductionGateway::produce():ThreadingCheck18(0 == pthread_mutex_lock(&MyMutex))"); const RuntimeCheck ThreadingCheck19("ProductionGateway::produce():ThreadingCheck19(0 == pthread_cond_signal(&MyConditionVariable))"); const RuntimeCheck ThreadingCheck20("ProductionGateway::produce():ThreadingCheck20(0 == pthread_mutex_unlock(&MyMutex))"); void ProductionGateway::produce() { // To produce in POSIX ThreadingCheck18(0 == pthread_mutex_lock(&MyMutex)); // Lock our mutex. ++Product; // Add an item to our product count. if(Signaled < Waiting) { // If anybody is waiting that has not ThreadingCheck19(0 == pthread_cond_signal(&MyConditionVariable)); // yet been signaled then signal them ++Signaled; // and keep track. They will count this } // down as they awaken. ThreadingCheck20(0 == pthread_mutex_unlock(&MyMutex)); // At the end unlock our mutex so } // waiting threads can fly free :-) const RuntimeCheck ThreadingCheck21("ProductionGateway::consume():ThreadingCheck21(0 == pthread_mutex_lock(&MyMutex))"); const RuntimeCheck ThreadingCheck22("ProductionGateway::consume():ThreadingCheck22(0 == pthread_cond_wait(&MyConditionVariable, &MyMutex))"); const RuntimeCheck ThreadingCheck23("ProductionGateway::consume():ThreadingCheck23(0 == pthread_mutex_unlock(&MyMutex))"); void ProductionGateway::consume() { // To consume in POSIX ThreadingCheck21(0 == pthread_mutex_lock(&MyMutex)); // Lock our mutex. while(0 >= Product) { // Until we have something to consume, ++Waiting; // wait for a signal from ThreadingCheck22(0 == pthread_cond_wait(&MyConditionVariable, &MyMutex)); // our producer. When we have a signal --Waiting; // we are done waiting and we have --Signaled; // been signaled. Of course, somebody } // may have beaten us to it so check. --Product; // If we have product then take it. ThreadingCheck23(0 == pthread_mutex_unlock(&MyMutex)); // At the end unlock our mutex so } #endif // End Production Gateway ////////////////////////////////////////////////////////////////////////////////