17 #define WIN32_LEAN_AND_MEAN 24 typedef unsigned long long btU64;
25 static const int kCacheLineSize = 64;
35 struct WorkerThreadStatus
51 char m_threadDirs[kMaxThreadCount];
61 WorkerThreadDirectives()
63 for (
int i = 0; i < kMaxThreadCount; ++i )
65 m_threadDirs[ i ] = 0;
69 Type getDirective(
int threadId)
71 btAssert(threadId < kMaxThreadCount);
72 return static_cast<Type
>(m_threadDirs[threadId]);
75 void setDirectiveByRange(
int threadBegin,
int threadEnd, Type dir)
78 btAssert( threadEnd <= kMaxThreadCount );
79 char dirChar =
static_cast<char>(dir);
80 for (
int i = threadBegin; i < threadEnd; ++i )
82 m_threadDirs[ i ] = dirChar;
92 WorkerThreadStatus::Type m_status;
93 int m_numJobsFinished;
96 WorkerThreadDirectives * m_directive;
99 unsigned int m_cooldownTime;
105 virtual void executeJob(
int threadId) = 0;
108 class ParallelForJob :
public IJob
126 m_body->
forLoop( m_begin, m_end );
131 class ParallelSumJob :
public IJob
134 ThreadLocalStorage* m_threadLocalStoreArray;
139 ParallelSumJob(
int iBegin,
int iEnd,
const btIParallelSumBody& body, ThreadLocalStorage* tls )
142 m_threadLocalStoreArray = tls;
146 virtual void executeJob(
int threadId )
BT_OVERRIDE 152 #if BT_PARALLEL_SUM_DETERMINISTISM 154 const float TRUNC_SCALE = float(1<<19);
155 val = floor(val*TRUNC_SCALE+0.5f)/TRUNC_SCALE;
157 m_threadLocalStoreArray[threadId].m_sumResult += val;
177 char m_cachePadding[kCacheLineSize];
188 void resizeJobMem(
int newSize)
190 if (newSize > m_jobMemSize)
193 m_jobMem =
static_cast<char*
>(
btAlignedAlloc(newSize, kCacheLineSize));
194 m_jobMemSize = newSize;
204 m_threadSupport = NULL;
208 m_useSpinMutex =
false;
217 if (m_queueLock && m_threadSupport)
227 m_threadSupport = threadSup;
232 setupJobStealing(contextArray, contextArray->
size());
238 for (
int i = 0; i < contexts.
size(); ++i)
240 if (
this == &contexts[ i ] )
246 int numNeighbors =
btMin(2, contexts.
size() - 1);
247 int neighborOffsets[ ] = {-1, 1, -2, 2, -3, 3};
248 int numOffsets =
sizeof(neighborOffsets)/
sizeof(neighborOffsets[0]);
249 m_neighborContexts.
reserve( numNeighbors );
251 for (
int i = 0; i < numOffsets && m_neighborContexts.
size() < numNeighbors; i++)
253 int neighborIndex = selfIndex + neighborOffsets[i];
254 if ( neighborIndex >= 0 && neighborIndex < numActiveContexts)
256 m_neighborContexts.
push_back( &contexts[ neighborIndex ] );
261 bool isQueueEmpty()
const {
return m_queueIsEmpty;}
264 if ( m_useSpinMutex )
275 if ( m_useSpinMutex )
284 void clearQueue(
int jobCount,
int jobSize)
290 m_queueIsEmpty =
true;
291 int jobBufSize = jobSize * jobCount;
293 if ( jobBufSize > m_jobMemSize )
295 resizeJobMem( jobBufSize );
298 if ( jobCount > m_jobQueue.
capacity() )
300 m_jobQueue.
reserve( jobCount );
305 void* allocJobMem(
int jobSize)
307 btAssert(m_jobMemSize >= (m_allocSize + jobSize));
308 void* jobMem = &m_jobMem[m_allocSize];
309 m_allocSize += jobSize;
312 void submitJob( IJob* job )
314 btAssert( reinterpret_cast<char*>( job ) >= &m_jobMem[ 0 ] && reinterpret_cast<char*>( job ) < &m_jobMem[ 0 ] + m_allocSize );
318 m_queueIsEmpty =
false;
321 IJob* consumeJobFromOwnQueue()
323 if ( m_queueIsEmpty )
330 if ( !m_queueIsEmpty )
332 job = m_jobQueue[ m_headIndex++ ];
333 btAssert( reinterpret_cast<char*>( job ) >= &m_jobMem[ 0 ] && reinterpret_cast<char*>( job ) < &m_jobMem[ 0 ] + m_allocSize );
334 if ( m_headIndex == m_tailIndex )
336 m_queueIsEmpty =
true;
344 if (IJob* job = consumeJobFromOwnQueue())
349 for (
int i = 0; i < m_neighborContexts.
size(); ++i)
351 JobQueue* otherContext = m_neighborContexts[ i ];
352 if ( IJob* job = otherContext->consumeJobFromOwnQueue() )
362 static void WorkerThreadFunc(
void* userPtr )
365 ThreadLocalStorage* localStorage = (ThreadLocalStorage*) userPtr;
366 JobQueue* jobQueue = localStorage->m_queue;
368 bool shouldSleep =
false;
369 int threadId = localStorage->m_threadId;
370 while (! shouldSleep)
373 localStorage->m_mutex.lock();
374 while ( IJob* job = jobQueue->consumeJob() )
376 localStorage->m_status = WorkerThreadStatus::kWorking;
377 job->executeJob( threadId );
378 localStorage->m_numJobsFinished++;
380 localStorage->m_status = WorkerThreadStatus::kWaitingForWork;
381 localStorage->m_mutex.unlock();
382 btU64 clockStart = localStorage->m_clock->getTimeMicroseconds();
384 while (jobQueue->isQueueEmpty())
388 if ( localStorage->m_directive->getDirective(threadId) == WorkerThreadDirectives::kGoToSleep )
394 if ( localStorage->m_directive->getDirective( threadId ) == WorkerThreadDirectives::kScanForJobs )
396 clockStart = localStorage->m_clock->getTimeMicroseconds();
400 for (
int i = 0; i < 50; ++i )
406 if (localStorage->m_directive->getDirective( threadId ) == WorkerThreadDirectives::kScanForJobs || !jobQueue->isQueueEmpty())
412 btU64 timeElapsed = localStorage->m_clock->getTimeMicroseconds() - clockStart;
413 if (timeElapsed > localStorage->m_cooldownTime)
424 localStorage->m_mutex.lock();
425 localStorage->m_status = WorkerThreadStatus::kSleeping;
426 localStorage->m_mutex.unlock();
434 WorkerThreadDirectives* m_workerDirective;
441 int m_numWorkerThreads;
442 int m_numActiveJobQueues;
445 static const int kFirstWorkerThreadId = 1;
450 m_threadSupport = NULL;
451 m_workerDirective = NULL;
454 virtual ~btTaskSchedulerDefault()
456 waitForWorkersToSleep();
458 for (
int i = 0; i < m_jobQueues.
size(); ++i )
460 m_jobQueues[i].exit();
465 delete m_threadSupport;
466 m_threadSupport = NULL;
468 if (m_workerDirective)
471 m_workerDirective = NULL;
479 m_workerDirective =
static_cast<WorkerThreadDirectives*
>(
btAlignedAlloc(
sizeof(*m_workerDirective), 64));
483 m_numThreads = m_maxNumThreads;
486 int numJobQueues = (numThreadsPerQueue == 1) ? (m_maxNumThreads-1) : (m_maxNumThreads / numThreadsPerQueue);
487 m_jobQueues.
resize(numJobQueues);
488 m_numActiveJobQueues = numJobQueues;
489 for (
int i = 0; i < m_jobQueues.
size(); ++i )
491 m_jobQueues[i].
init( m_threadSupport, &m_jobQueues );
493 m_perThreadJobQueues.
resize(m_numThreads);
494 for (
int i = 0; i < m_numThreads; i++ )
500 if (numThreadsPerQueue == 1)
503 jq = &m_jobQueues[ i - kFirstWorkerThreadId ];
508 jq = &m_jobQueues[ i / numThreadsPerQueue ];
511 m_perThreadJobQueues[i] = jq;
513 m_threadLocalStorage.
resize(m_numThreads);
514 for (
int i = 0; i < m_numThreads; i++ )
516 ThreadLocalStorage& storage = m_threadLocalStorage[i];
517 storage.m_threadId = i;
518 storage.m_directive = m_workerDirective;
519 storage.m_status = WorkerThreadStatus::kSleeping;
520 storage.m_cooldownTime = 100;
521 storage.m_clock = &m_clock;
522 storage.m_queue = m_perThreadJobQueues[i];
524 setWorkerDirectives( WorkerThreadDirectives::kGoToSleep );
528 void setWorkerDirectives(WorkerThreadDirectives::Type dir)
530 m_workerDirective->setDirectiveByRange(kFirstWorkerThreadId, m_numThreads, dir);
535 return m_maxNumThreads;
543 virtual void setNumThreads(
int numThreads )
BT_OVERRIDE 545 m_numThreads =
btMax(
btMin(numThreads,
int(m_maxNumThreads)), 1 );
546 m_numWorkerThreads = m_numThreads - 1;
547 m_numActiveJobQueues = 0;
549 if ( m_numWorkerThreads > 0 )
552 JobQueue* lastActiveContext = m_perThreadJobQueues[ m_numThreads - 1 ];
553 int iLastActiveContext = lastActiveContext - &m_jobQueues[0];
554 m_numActiveJobQueues = iLastActiveContext + 1;
555 for (
int i = 0; i < m_jobQueues.size(); ++i )
557 m_jobQueues[ i ].setupJobStealing( &m_jobQueues, m_numActiveJobQueues );
560 m_workerDirective->setDirectiveByRange(m_numThreads,
BT_MAX_THREAD_COUNT, WorkerThreadDirectives::kGoToSleep);
567 int numMainThreadJobsFinished = 0;
568 for (
int i = 0; i < m_numActiveJobQueues; ++i )
570 while ( IJob* job = m_jobQueues[i].consumeJob() )
572 job->executeJob( 0 );
573 numMainThreadJobsFinished++;
578 setWorkerDirectives( WorkerThreadDirectives::kStayAwakeButIdle );
584 int numWorkerJobsFinished = 0;
585 for (
int iThread = kFirstWorkerThreadId; iThread < m_numThreads; ++iThread )
587 ThreadLocalStorage* storage = &m_threadLocalStorage[iThread];
588 storage->m_mutex.lock();
589 numWorkerJobsFinished += storage->m_numJobsFinished;
590 storage->m_mutex.unlock();
592 if (numWorkerJobsFinished + numMainThreadJobsFinished == m_numJobs)
598 if (timeElapsed > 100000)
606 void wakeWorkers(
int numWorkersToWake)
609 btAssert( m_workerDirective->getDirective(1) == WorkerThreadDirectives::kScanForJobs );
610 int numDesiredWorkers =
btMin(numWorkersToWake, m_numWorkerThreads);
611 int numActiveWorkers = 0;
612 for (
int iWorker = 0; iWorker < m_numWorkerThreads; ++iWorker )
616 ThreadLocalStorage& storage = m_threadLocalStorage[ kFirstWorkerThreadId + iWorker ];
617 if (storage.m_status != WorkerThreadStatus::kSleeping)
622 for (
int iWorker = 0; iWorker < m_numWorkerThreads && numActiveWorkers < numDesiredWorkers; ++iWorker )
624 ThreadLocalStorage& storage = m_threadLocalStorage[ kFirstWorkerThreadId + iWorker ];
625 if (storage.m_status == WorkerThreadStatus::kSleeping)
627 m_threadSupport->
runTask( iWorker, &storage );
633 void waitForWorkersToSleep()
636 setWorkerDirectives( WorkerThreadDirectives::kGoToSleep );
638 for (
int i = kFirstWorkerThreadId; i < m_numThreads; i++ )
640 ThreadLocalStorage& storage = m_threadLocalStorage[i];
641 btAssert( storage.m_status == WorkerThreadStatus::kSleeping );
649 setWorkerDirectives( WorkerThreadDirectives::kGoToSleep );
652 void prepareWorkerThreads()
654 for (
int i = kFirstWorkerThreadId; i < m_numThreads; ++i )
656 ThreadLocalStorage& storage = m_threadLocalStorage[i];
657 storage.m_mutex.lock();
658 storage.m_numJobsFinished = 0;
659 storage.m_mutex.unlock();
661 setWorkerDirectives( WorkerThreadDirectives::kScanForJobs );
669 int iterationCount = iEnd - iBegin;
670 if ( iterationCount > grainSize && m_numWorkerThreads > 0 && m_antiNestingLock.
tryLock() )
672 typedef ParallelForJob JobType;
673 int jobCount = ( iterationCount + grainSize - 1 ) / grainSize;
674 m_numJobs = jobCount;
676 int jobSize =
sizeof( JobType );
678 for (
int i = 0; i < m_numActiveJobQueues; ++i)
680 m_jobQueues[i].clearQueue( jobCount, jobSize );
683 prepareWorkerThreads();
686 int iThread = kFirstWorkerThreadId;
687 for (
int i = iBegin; i < iEnd; i += grainSize )
690 int iE =
btMin( i + grainSize, iEnd );
691 JobQueue* jq = m_perThreadJobQueues[ iThread ];
693 btAssert((jq - &m_jobQueues[0]) < m_numActiveJobQueues);
694 void* jobMem = jq->allocJobMem(jobSize);
695 JobType* job =
new ( jobMem ) ParallelForJob( i, iE, body );
696 jq->submitJob( job );
699 if ( iThread >= m_numThreads )
701 iThread = kFirstWorkerThreadId;
704 wakeWorkers( jobCount - 1 );
708 m_antiNestingLock.
unlock();
714 body.forLoop( iBegin, iEnd );
722 int iterationCount = iEnd - iBegin;
723 if ( iterationCount > grainSize && m_numWorkerThreads > 0 && m_antiNestingLock.
tryLock() )
725 typedef ParallelSumJob JobType;
726 int jobCount = ( iterationCount + grainSize - 1 ) / grainSize;
727 m_numJobs = jobCount;
729 int jobSize =
sizeof( JobType );
730 for (
int i = 0; i < m_numActiveJobQueues; ++i)
732 m_jobQueues[i].clearQueue( jobCount, jobSize );
736 for (
int iThread = 0; iThread < m_numThreads; ++iThread )
738 m_threadLocalStorage[iThread].m_sumResult =
btScalar(0);
742 prepareWorkerThreads();
745 int iThread = kFirstWorkerThreadId;
746 for (
int i = iBegin; i < iEnd; i += grainSize )
749 int iE =
btMin( i + grainSize, iEnd );
750 JobQueue* jq = m_perThreadJobQueues[ iThread ];
752 btAssert((jq - &m_jobQueues[0]) < m_numActiveJobQueues);
753 void* jobMem = jq->allocJobMem(jobSize);
754 JobType* job =
new ( jobMem ) ParallelSumJob( i, iE, body, &m_threadLocalStorage[0] );
755 jq->submitJob( job );
758 if ( iThread >= m_numThreads )
760 iThread = kFirstWorkerThreadId;
763 wakeWorkers( jobCount - 1 );
770 for (
int iThread = 0; iThread < m_numThreads; ++iThread )
772 sum += m_threadLocalStorage[ iThread ].m_sumResult;
774 m_antiNestingLock.
unlock();
781 return body.sumLoop( iBegin, iEnd );
790 btTaskSchedulerDefault* ts =
new btTaskSchedulerDefault();
795 #else // #if BT_THREADSAFE 802 #endif // #else // #if BT_THREADSAFE static T sum(const btAlignedObjectArray< T > &items)
virtual int getLogicalToPhysicalCoreRatio() const =0
void push_back(const T &_Val)
virtual void forLoop(int iBegin, int iEnd) const =0
The btAlignedObjectArray template class uses a subset of the stl::vector interface for its methods It...
virtual void runTask(int threadIndex, void *userData)=0
static btThreadSupportInterface * create(const ConstructionInfo &info)
void resizeNoInitialize(int newsize)
resize changes the number of elements in the array.
The btClock is a portable basic clock that measures accurate time in seconds, use for profiling...
btSpinMutex – lightweight spin-mutex implemented with atomic ops, never puts a thread to sleep becau...
virtual void deleteCriticalSection(btCriticalSection *criticalSection)=0
const unsigned int BT_MAX_THREAD_COUNT
int size() const
return the number of elements in the array
virtual void waitForAllTasks()=0
#define btAlignedFree(ptr)
unsigned long long int getTimeMicroseconds()
Returns the time in us since the last call to reset or since the Clock was created.
virtual btScalar sumLoop(int iBegin, int iEnd) const =0
int capacity() const
return the pre-allocated (reserved) elements, this is at least as large as the total number of elemen...
virtual int getCacheFriendlyNumThreads() const =0
btITaskScheduler * btCreateDefaultTaskScheduler()
void resize(int newsize, const T &fillData=T())
const T & btMax(const T &a, const T &b)
#define btAlignedAlloc(size, alignment)
virtual btCriticalSection * createCriticalSection()=0
virtual int getNumWorkerThreads() const =0
const T & btMin(const T &a, const T &b)
float btScalar
The btScalar type abstracts floating point numbers, to easily switch between double and single floati...
#define ATTRIBUTE_ALIGNED64(a)