The CGrowableThreadPool Class

On-demand thread creation and thread pools both have some appealing characteristics and some disadvantages. We can build a sort of hybrid type of thread pool class that allows you to benefit from the advantages provided by both on-demand thread creation and the use of fixed population thread pools, while minimizing the downsides of each approach. The approach taken here is to implement a thread pool that has the following attributes:

Such a growable thread pool is initially populated with a fixed number of idle threads when the pool is created. As calls are made to the DispatchThread class member function, idle threads currently in the pool are dispatched to handle the new job. This is similar to the CFixedThreadPool class approach. However, if no threads are currently idling in the pool when the DispatchThread function is called, and if the maximum number of active threads has not yet been reached, a new thread is created and dispatched to handle the new job. Finally, any thread that has been sitting in the pool longer than the time decay value without being dispatched to a new job will automatically terminate itself if the number of active threads managed by the pool hasn’t already reached the initial thread count for the pool. Example 10-6 shows the declaration of the CGrowableThreadPool class.

Example 10-6. CGrowableThreadPool.h, the CGrowableThreadPool class declaration

//

// FILE: CGrowableThreadPool.h

//

// Copyright (c) 1997 by Aaron Michael Cohen and Mike Woodring

//

/////////////////////////////////////////////////////////////////////////

#ifndef __CGrowableThreadPool_H__

#define __CGrowableThreadPool_H__

#include <CMclGlobal.h>

#include <CMclAutoLock.h>

#include <CMclLinkedLists.h>

#include <CMclThread.h>

#include <CMclCritSec.h>

#include <CMclSemaphore.h>

#include <CMclEvent.h>

#include <CMclWaitableCollection.h>

#include <CMclAutoPtr.h>

#include "CThreadPool.h"

// CLinkedListRemovable

//

// This class provides a Remove() operation in addition to the

// regular operations provided by CMclLinkedList. To use

// this class, objects of type T must provide an equality (==)

// operator.

//

template <class T>

class CLinkedListRemovable : public CMclLinkedList<T>

{

public:

BOOL Remove( T & rData )

{

BOOL fFoundAndRemoved = FALSE;

// decrease the semaphore count...

// if the wait fails, there are no jobs in the queue...

if (CMclWaitTimeout(m_csNotEmpty.Wait(0)))

{

return fFoundAndRemoved;

}

CMclAutoLock Lock(m_cCritSec);

CMclLinkedListNode *pTargetNode = 0;

CMclLinkedListNode *pNode = m_MasterNode.m_pNext;

T NodeData;

// Search for a node with matchin data.

//

while( !fFoundAndRemoved && (pNode != &m_MasterNode) )

{

pNode->GetData(NodeData);

if( NodeData == rData )

{

// Found it! Make a note and terminate the

// search.

//

fFoundAndRemoved = TRUE;

pTargetNode = pNode;

}

else

{

pNode = pNode->m_pNext;

}

}

if( fFoundAndRemoved )

{

// Link the node before the target node to the node

// following the target node.

//

pTargetNode->m_pPrev->m_pNext = pTargetNode->m_pNext;

pTargetNode->m_pNext->m_pPrev = pTargetNode->m_pPrev;

// Toss the removed node back on the free list of nodes.

//

AddToFreeList(pTargetNode);

}

else

{

// no node to remove, we need to bump the semaphore

// count back up...

m_csNotEmpty.Release(1);

}

return(fFoundAndRemoved);

}

};

class CGrowableThreadPool : public CThreadPool, private CMclThreadHandler

{

public:

CGrowableThreadPool( long lMinThreads, long lMaxThreads, DWORD dwThreadLifetime );

virtual ~CGrowableThreadPool();

// CThreadPool implementation.

//

virtual BOOL DispatchThread( CMclThreadHandler *pHandler );

private:

// CMclThreadHandler implementation.

//

virtual unsigned ThreadHandlerProc( void );

private:

// CDispatchRecords are enqueued to signal a pending

// DispatchThread operation that needs to be serviced.

//

class CDispatchRecord

{

public:

CDispatchRecord();

CDispatchRecord( CMclThreadHandler *pThreadHandler );

unsigned Execute( void );

int operator == ( const CDispatchRecord& rhs );

private:

CMclThreadHandler *m_pUserThreadHandler;

};

// Because threads in this pool all execute the same

// thread handler in the CGrowableThreadPool class, and

// because threads may terminate themselves if they

// sit idle too long in the pool, the thread handler proce

// needs a way to recover the pointer to the CMclThread

// object that embodies the thread that is executing.

// To do this, we maintain a map between thread ids and

// CMclThread * pointers.

//

class CThreadIdToPtrMap

{

public:

CThreadIdToPtrMap( long lMaxThreads );

~CThreadIdToPtrMap();

void AddThread( DWORD dwThreadId, CMclThread *pThread );

CMclThread *GetThreadFromId( DWORD dwThreadId );

void FreeSlot( DWORD dwThreadId );

void FreeSlot( CMclThread *pThread );

private:

struct THREADINFO

{

BOOL fSlotUsed;

DWORD dwThreadId;

CMclThread *pThread;

};

THREADINFO *m_pThreadIdMap;

long m_lMaxThreads;

};

private:

// Thread pool synchronization and management.

//

CMclCritSec m_csPool;

CMclEvent m_ExitEvent;

DWORD m_dwThreadLifetime;

CThreadIdToPtrMap m_ThreadIdMap;

// Dispatch queue and thread pool.

//

CMclQueue<CDispatchRecord> m_DispatchQ;

CMclSemaphore m_JobsPending;

long m_lMinThreads; // Numbers of alive threads; not

long m_lMaxThreads; // counting who's busy and who's

long m_lCurThreads; // not busy.

// List of threads in the pool and a semaphore that

// indicates how many are actually free (idle).

//

CLinkedListRemovable<CMclThread *> m_ThreadsInPool;

CMclSemaphore m_ThreadsFree;

};

#endif // __CGrowableThreadPool_H__

The CGrowableThreadPool class implements the CThreadPool abstract base class by allowing the programmer to specify the number of threads, lMinThreads, that should initially populate the pool as a parameter to the constructor. That number of threads is created when the pool is created, and the number of threads in the pool is never allowed to drop below that count over the lifetime of the pool object. An additional class parameter, lMaxThreads, specifies the maximum number of threads to create as a result of DispatchThread being called. If DispatchThread is called when no existing threads are idle, the DispatchThread function will create a new thread on the fly as long as the number of threads in the pool wouldn’t go over the value specified by lMaxThreads. The third class parameter, dwThreadLifetime, specifies the maximum amount of time that an idle thread should spend asleep in the pool before terminating itself. This means that if a thread has been sleeping idle in the pool for this duration of time, it will check to see if the minimum thread count for the pool has already been reached and, if not, terminate itself. The implementation of CGrowableThreadPool is shown in Example 10-7.