38 #include "blocxx/BLOCXX_config.h" 65 #define BLOCXX_POOL_LOG_DEBUG(logger, arg) do { BLOCXX_LOG_DEBUG(logger, m_poolName + ": " + arg); } while (0) 66 #define BLOCXX_POOL_LOG_DEBUG2(logger, arg) do { BLOCXX_LOG_DEBUG2(logger, m_poolName + ": " + arg); } while (0) 67 #define BLOCXX_POOL_LOG_DEBUG3(logger, arg) do { BLOCXX_LOG_DEBUG3(logger, m_poolName + ": " + arg); } while (0) 68 #define BLOCXX_POOL_LOG_ERROR(logger, arg) do { BLOCXX_LOG_ERROR(logger, m_poolName + ": " + arg); } while (0) 69 #define BLOCXX_POOL_LOG_FATAL_ERROR(logger, arg) do { BLOCXX_LOG_FATAL_ERROR(logger, m_poolName + ": " + arg); } while (0) 87 class FixedSizePoolImpl;
89 class FixedSizePoolWorkerThread :
public Thread
92 FixedSizePoolWorkerThread(FixedSizePoolImpl* thePool)
99 virtual void doShutdown()
107 virtual void doCooperativeCancel()
115 virtual void doDefinitiveCancel()
130 FixedSizePoolWorkerThread(
const FixedSizePoolWorkerThread&);
131 FixedSizePoolWorkerThread& operator=(
const FixedSizePoolWorkerThread&);
134 class CommonPoolImpl :
public ThreadPoolImpl
137 CommonPoolImpl(UInt32 maxQueueSize,
const Logger& logger,
const String& poolName)
146 virtual ~CommonPoolImpl()
151 virtual bool queueIsFull()
const 157 bool queueClosed()
const 174 if (finishWorkInQueue)
176 TimeoutTimer timer(timeout);
179 if (timer.infinite())
187 if (!
m_queueEmpty.timedWait(l, timer.asAbsoluteTimeout()))
199 virtual void waitForEmptyQueue()
212 TimeoutTimer shutdownTimer(shutdownTimeout);
213 TimeoutTimer dTimer(definitiveCancelTimeout);
214 if (!finishOffWorkInQueue(finishWorkInQueue, shutdownTimer.asAbsoluteTimeout()))
223 if (!shutdownTimer.infinite())
233 Timeout absoluteShutdownTimeout(shutdownTimer.asAbsoluteTimeout());
237 m_threads[
i]->timedWait(absoluteShutdownTimeout);
247 if (!dTimer.infinite())
250 Timeout absoluteDefinitiveTimeout(dTimer.asAbsoluteTimeout());
256 if (!
m_threads[
i]->definitiveCancel(absoluteDefinitiveTimeout))
261 catch (CancellationDeniedException& e)
311 incrementWorkerCount();
328 virtual void incrementWorkerCount()
332 virtual void decrementWorkerCount()
351 class FixedSizePoolImpl :
public CommonPoolImpl
354 FixedSizePoolImpl(UInt32 numThreads, UInt32 maxQueueSize,
const Logger& logger,
const String& poolName)
355 : CommonPoolImpl(maxQueueSize, logger, poolName)
359 for (UInt32
i = 0;
i < numThreads; ++
i)
363 for (UInt32
i = 0;
i < numThreads; ++
i)
369 catch (ThreadException& e)
381 virtual bool addWork(
const RunnableRef& work,
const Timeout& timeout)
391 TimeoutTimer timer(timeout);
392 while ( queueIsFull() && !queueClosed() )
426 shutdownThreads(finishWorkInQueue, shutdownTimeout, definitiveCancelTimeout);
428 virtual ~FixedSizePoolImpl()
446 friend class FixedSizePoolWorkerThread;
455 catch (ThreadCancelledException&)
459 catch (Exception& ex)
462 std::clog <<
"!!! Exception: " << ex.type() <<
" caught in ThreadPool worker: " << ex << std::endl;
465 BLOCXX_LOG_ERROR(logger, Format(
"!!! Exception caught in ThreadPool worker: %1", ex));
467 catch(std::exception& ex)
470 std::clog <<
"!!! std::exception what = \"" << ex.what() <<
"\" caught in ThreadPool worker" << std::endl;
473 BLOCXX_LOG_ERROR(logger, Format(
"!!! std::exception caught in ThreadPool worker: %1", ex.what()));
478 std::clog <<
"!!! Unknown Exception caught in ThreadPool worker" << std::endl;
481 BLOCXX_LOG_ERROR(logger,
"!!! Unknown Exception caught in ThreadPool worker.");
484 Int32 FixedSizePoolWorkerThread::run()
507 class DynamicSizePoolImpl;
509 class DynamicSizePoolWorkerThread :
public Thread
512 DynamicSizePoolWorkerThread(DynamicSizePoolImpl* thePool)
519 virtual void doShutdown()
527 virtual void doCooperativeCancel()
535 virtual void doDefinitiveCancel()
550 DynamicSizePoolWorkerThread(
const DynamicSizePoolWorkerThread&);
551 DynamicSizePoolWorkerThread& operator=(
const DynamicSizePoolWorkerThread&);
554 class DynamicSizePoolImpl :
public CommonPoolImpl
557 DynamicSizePoolImpl(UInt32 maxThreads, UInt32 maxQueueSize,
const Logger& logger,
const String& poolName)
558 : CommonPoolImpl(maxQueueSize, logger, poolName)
563 virtual bool addWork(
const RunnableRef& work,
const Timeout& timeout)
598 TimeoutTimer timer(timeout);
599 while ( queueIsFull() && !queueClosed() )
635 ThreadRef theThread(
new DynamicSizePoolWorkerThread(
this));
642 catch (ThreadException& e)
656 shutdownThreads(finishWorkInQueue, shutdownTimeout, definitiveCancelTimeout);
658 virtual ~DynamicSizePoolImpl()
677 UInt32 getMaxThreads()
const 685 friend class DynamicSizePoolWorkerThread;
687 Int32 DynamicSizePoolWorkerThread::run()
713 class DynamicSizeNoQueuePoolImpl :
public DynamicSizePoolImpl
716 DynamicSizeNoQueuePoolImpl(UInt32 maxThreads,
const Logger& logger,
const String& poolName)
717 : DynamicSizePoolImpl(maxThreads, maxThreads, logger, poolName)
722 virtual ~DynamicSizeNoQueuePoolImpl()
726 virtual void incrementWorkerCount()
731 virtual void decrementWorkerCount()
740 virtual bool queueIsFull()
const 745 return (freeThreads <=
m_queue.size());
762 m_impl =
new FixedSizePoolImpl(numThreads, maxQueueSize, logger, poolName);
765 m_impl =
new DynamicSizePoolImpl(numThreads, maxQueueSize, logger, poolName);
768 m_impl =
new DynamicSizeNoQueuePoolImpl(numThreads, logger, poolName);
778 m_impl =
new FixedSizePoolImpl(numThreads, maxQueueSize, logger, poolName);
781 m_impl =
new DynamicSizePoolImpl(numThreads, maxQueueSize, logger, poolName);
784 m_impl =
new DynamicSizeNoQueuePoolImpl(numThreads, logger, poolName);
801 return m_impl->addWork(work, timeout);
811 m_impl->shutdown(finishWorkInQueue, timeout, timeout);
816 m_impl->shutdown(finishWorkInQueue, shutdownTimeout, definitiveCancelTimeout);
821 m_impl->waitForEmptyQueue();
bool tryAddWork(const RunnableRef &work)
Add an RunnableRef for the pool to execute.
int AtomicGet(Atomic_t const &v)
#define BLOCXX_POOL_LOG_FATAL_ERROR(logger, arg)
virtual void waitForEmptyQueue()=0
#define BLOCXX_LOG_ERROR(logger, message)
Log message to logger with the Error level.
void shutdown(EShutdownQueueFlag finishWorkInQueue=E_FINISH_WORK_IN_QUEUE, const Timeout &timeout=Timeout::infinite)
Instruct all threads to exit and stop working.
This String class is an abstract data type that represents as NULL terminated string of characters.
std::deque< RunnableRef > m_queue
LazyGlobal< String, char const *const > GlobalString
NonRecursiveMutex m_queueLock
#define BLOCXX_POOL_LOG_DEBUG(logger, arg)
This logger just discards all log messages.
bool addWork(const RunnableRef &work)
Add an RunnableRef for the pool to execute.
Condition m_queueNotEmpty
static Timeout relative(float seconds)
ThreadPool & operator=(const ThreadPool &x)
The ThreadPool class is used to coordinate a group of threads.
#define BLOCXX_POOL_LOG_ERROR(logger, arg)
GlobalString COMPONENT_NAME
RunnableRef m_currentRunnable
Array< ThreadRef > m_threads
virtual ~ThreadPoolImpl()
A timeout can be absolute, which means that it will happen at the specified DateTime.
#define BLOCXX_POOL_LOG_DEBUG3(logger, arg)
IntrusiveReference< ThreadPoolImpl > m_impl
virtual void shutdown(ThreadPool::EShutdownQueueFlag finishWorkInQueue, const Timeout &shutdownTimeout, const Timeout &definitiveCancelTimeout)=0
#define BLOCXX_GLOBAL_STRING_INIT(str)
virtual bool addWork(const RunnableRef &work, const Timeout &timeout)=0
#define BLOCXX_POOL_LOG_DEBUG2(logger, arg)
class BLOCXX_COMMON_API Logger
static void yield()
Voluntarily yield to the processor giving the next thread in the chain the opportunity to run.
FixedSizePoolImpl * m_thePool
#define BLOCXX_DEFINE_EXCEPTION(NAME)
Define a new exception class named <NAME>Exception that derives from Exception.
IntrusiveReference< Runnable > RunnableRef
ThreadPool(PoolType poolType, UInt32 numThreads, UInt32 maxQueueSize, const Logger &logger, const String &poolName="")
Constructor.
IntrusiveReference< Thread > ThreadRef
void waitForEmptyQueue()
Wait for the queue to empty out.