Bullet Collision Detection & Physics Library
btTaskScheduler.cpp
Go to the documentation of this file.
1 
2 #include "LinearMath/btMinMax.h"
4 #include "LinearMath/btThreads.h"
6 #include <stdio.h>
7 #include <algorithm>
8 
9 
10 
11 #if BT_THREADSAFE
12 
14 
15 #if defined( _WIN32 )
16 
17 #define WIN32_LEAN_AND_MEAN
18 
19 #include <windows.h>
20 
21 #endif
22 
23 
24 typedef unsigned long long btU64;
25 static const int kCacheLineSize = 64;
26 
27 void btSpinPause()
28 {
29 #if defined( _WIN32 )
30  YieldProcessor();
31 #endif
32 }
33 
34 
35 struct WorkerThreadStatus
36 {
37  enum Type
38  {
39  kInvalid,
40  kWaitingForWork,
41  kWorking,
42  kSleeping,
43  };
44 };
45 
46 
47 ATTRIBUTE_ALIGNED64(class) WorkerThreadDirectives
48 {
49  static const int kMaxThreadCount = BT_MAX_THREAD_COUNT;
50  // directives for all worker threads packed into a single cacheline
51  char m_threadDirs[kMaxThreadCount];
52 
53 public:
54  enum Type
55  {
56  kInvalid,
57  kGoToSleep, // go to sleep
58  kStayAwakeButIdle, // wait for not checking job queue
59  kScanForJobs, // actively scan job queue for jobs
60  };
61  WorkerThreadDirectives()
62  {
63  for ( int i = 0; i < kMaxThreadCount; ++i )
64  {
65  m_threadDirs[ i ] = 0;
66  }
67  }
68 
69  Type getDirective(int threadId)
70  {
71  btAssert(threadId < kMaxThreadCount);
72  return static_cast<Type>(m_threadDirs[threadId]);
73  }
74 
75  void setDirectiveByRange(int threadBegin, int threadEnd, Type dir)
76  {
77  btAssert( threadBegin < threadEnd );
78  btAssert( threadEnd <= kMaxThreadCount );
79  char dirChar = static_cast<char>(dir);
80  for ( int i = threadBegin; i < threadEnd; ++i )
81  {
82  m_threadDirs[ i ] = dirChar;
83  }
84  }
85 };
86 
87 class JobQueue;
88 
89 ATTRIBUTE_ALIGNED64(struct) ThreadLocalStorage
90 {
91  int m_threadId;
92  WorkerThreadStatus::Type m_status;
93  int m_numJobsFinished;
94  btSpinMutex m_mutex;
95  btScalar m_sumResult;
96  WorkerThreadDirectives * m_directive;
97  JobQueue* m_queue;
98  btClock* m_clock;
99  unsigned int m_cooldownTime;
100 };
101 
102 
103 struct IJob
104 {
105  virtual void executeJob(int threadId) = 0;
106 };
107 
108 class ParallelForJob : public IJob
109 {
110  const btIParallelForBody* m_body;
111  int m_begin;
112  int m_end;
113 
114 public:
115  ParallelForJob( int iBegin, int iEnd, const btIParallelForBody& body )
116  {
117  m_body = &body;
118  m_begin = iBegin;
119  m_end = iEnd;
120  }
121  virtual void executeJob(int threadId) BT_OVERRIDE
122  {
123  BT_PROFILE( "executeJob" );
124 
125  // call the functor body to do the work
126  m_body->forLoop( m_begin, m_end );
127  }
128 };
129 
130 
131 class ParallelSumJob : public IJob
132 {
133  const btIParallelSumBody* m_body;
134  ThreadLocalStorage* m_threadLocalStoreArray;
135  int m_begin;
136  int m_end;
137 
138 public:
139  ParallelSumJob( int iBegin, int iEnd, const btIParallelSumBody& body, ThreadLocalStorage* tls )
140  {
141  m_body = &body;
142  m_threadLocalStoreArray = tls;
143  m_begin = iBegin;
144  m_end = iEnd;
145  }
146  virtual void executeJob( int threadId ) BT_OVERRIDE
147  {
148  BT_PROFILE( "executeJob" );
149 
150  // call the functor body to do the work
151  btScalar val = m_body->sumLoop( m_begin, m_end );
152 #if BT_PARALLEL_SUM_DETERMINISTISM
153  // by truncating bits of the result, we can make the parallelSum deterministic (at the expense of precision)
154  const float TRUNC_SCALE = float(1<<19);
155  val = floor(val*TRUNC_SCALE+0.5f)/TRUNC_SCALE; // truncate some bits
156 #endif
157  m_threadLocalStoreArray[threadId].m_sumResult += val;
158  }
159 };
160 
161 
162 ATTRIBUTE_ALIGNED64(class) JobQueue
163 {
164  btThreadSupportInterface* m_threadSupport;
165  btCriticalSection* m_queueLock;
166  btSpinMutex m_mutex;
167 
168  btAlignedObjectArray<IJob*> m_jobQueue;
169  char* m_jobMem;
170  int m_jobMemSize;
171  bool m_queueIsEmpty;
172  int m_tailIndex;
173  int m_headIndex;
174  int m_allocSize;
175  bool m_useSpinMutex;
176  btAlignedObjectArray<JobQueue*> m_neighborContexts;
177  char m_cachePadding[kCacheLineSize]; // prevent false sharing
178 
179  void freeJobMem()
180  {
181  if ( m_jobMem )
182  {
183  // free old
184  btAlignedFree(m_jobMem);
185  m_jobMem = NULL;
186  }
187  }
188  void resizeJobMem(int newSize)
189  {
190  if (newSize > m_jobMemSize)
191  {
192  freeJobMem();
193  m_jobMem = static_cast<char*>(btAlignedAlloc(newSize, kCacheLineSize));
194  m_jobMemSize = newSize;
195  }
196  }
197 
198 public:
199 
200  JobQueue()
201  {
202  m_jobMem = NULL;
203  m_jobMemSize = 0;
204  m_threadSupport = NULL;
205  m_queueLock = NULL;
206  m_headIndex = 0;
207  m_tailIndex = 0;
208  m_useSpinMutex = false;
209  }
210  ~JobQueue()
211  {
212  exit();
213  }
214  void exit()
215  {
216  freeJobMem();
217  if (m_queueLock && m_threadSupport)
218  {
219  m_threadSupport->deleteCriticalSection(m_queueLock);
220  m_queueLock = NULL;
221  m_threadSupport = 0;
222  }
223  }
224 
225  void init(btThreadSupportInterface* threadSup, btAlignedObjectArray<JobQueue>* contextArray)
226  {
227  m_threadSupport = threadSup;
228  if (threadSup)
229  {
230  m_queueLock = m_threadSupport->createCriticalSection();
231  }
232  setupJobStealing(contextArray, contextArray->size());
233  }
234  void setupJobStealing(btAlignedObjectArray<JobQueue>* contextArray, int numActiveContexts)
235  {
236  btAlignedObjectArray<JobQueue>& contexts = *contextArray;
237  int selfIndex = 0;
238  for (int i = 0; i < contexts.size(); ++i)
239  {
240  if ( this == &contexts[ i ] )
241  {
242  selfIndex = i;
243  break;
244  }
245  }
246  int numNeighbors = btMin(2, contexts.size() - 1);
247  int neighborOffsets[ ] = {-1, 1, -2, 2, -3, 3};
248  int numOffsets = sizeof(neighborOffsets)/sizeof(neighborOffsets[0]);
249  m_neighborContexts.reserve( numNeighbors );
250  m_neighborContexts.resizeNoInitialize(0);
251  for (int i = 0; i < numOffsets && m_neighborContexts.size() < numNeighbors; i++)
252  {
253  int neighborIndex = selfIndex + neighborOffsets[i];
254  if ( neighborIndex >= 0 && neighborIndex < numActiveContexts)
255  {
256  m_neighborContexts.push_back( &contexts[ neighborIndex ] );
257  }
258  }
259  }
260 
261  bool isQueueEmpty() const {return m_queueIsEmpty;}
262  void lockQueue()
263  {
264  if ( m_useSpinMutex )
265  {
266  m_mutex.lock();
267  }
268  else
269  {
270  m_queueLock->lock();
271  }
272  }
273  void unlockQueue()
274  {
275  if ( m_useSpinMutex )
276  {
277  m_mutex.unlock();
278  }
279  else
280  {
281  m_queueLock->unlock();
282  }
283  }
284  void clearQueue(int jobCount, int jobSize)
285  {
286  lockQueue();
287  m_headIndex = 0;
288  m_tailIndex = 0;
289  m_allocSize = 0;
290  m_queueIsEmpty = true;
291  int jobBufSize = jobSize * jobCount;
292  // make sure we have enough memory allocated to store jobs
293  if ( jobBufSize > m_jobMemSize )
294  {
295  resizeJobMem( jobBufSize );
296  }
297  // make sure job queue is big enough
298  if ( jobCount > m_jobQueue.capacity() )
299  {
300  m_jobQueue.reserve( jobCount );
301  }
302  unlockQueue();
303  m_jobQueue.resizeNoInitialize( 0 );
304  }
305  void* allocJobMem(int jobSize)
306  {
307  btAssert(m_jobMemSize >= (m_allocSize + jobSize));
308  void* jobMem = &m_jobMem[m_allocSize];
309  m_allocSize += jobSize;
310  return jobMem;
311  }
312  void submitJob( IJob* job )
313  {
314  btAssert( reinterpret_cast<char*>( job ) >= &m_jobMem[ 0 ] && reinterpret_cast<char*>( job ) < &m_jobMem[ 0 ] + m_allocSize );
315  m_jobQueue.push_back( job );
316  lockQueue();
317  m_tailIndex++;
318  m_queueIsEmpty = false;
319  unlockQueue();
320  }
321  IJob* consumeJobFromOwnQueue()
322  {
323  if ( m_queueIsEmpty )
324  {
325  // lock free path. even if this is taken erroneously it isn't harmful
326  return NULL;
327  }
328  IJob* job = NULL;
329  lockQueue();
330  if ( !m_queueIsEmpty )
331  {
332  job = m_jobQueue[ m_headIndex++ ];
333  btAssert( reinterpret_cast<char*>( job ) >= &m_jobMem[ 0 ] && reinterpret_cast<char*>( job ) < &m_jobMem[ 0 ] + m_allocSize );
334  if ( m_headIndex == m_tailIndex )
335  {
336  m_queueIsEmpty = true;
337  }
338  }
339  unlockQueue();
340  return job;
341  }
342  IJob* consumeJob()
343  {
344  if (IJob* job = consumeJobFromOwnQueue())
345  {
346  return job;
347  }
348  // own queue is empty, try to steal from neighbor
349  for (int i = 0; i < m_neighborContexts.size(); ++i)
350  {
351  JobQueue* otherContext = m_neighborContexts[ i ];
352  if ( IJob* job = otherContext->consumeJobFromOwnQueue() )
353  {
354  return job;
355  }
356  }
357  return NULL;
358  }
359 };
360 
361 
362 static void WorkerThreadFunc( void* userPtr )
363 {
364  BT_PROFILE( "WorkerThreadFunc" );
365  ThreadLocalStorage* localStorage = (ThreadLocalStorage*) userPtr;
366  JobQueue* jobQueue = localStorage->m_queue;
367 
368  bool shouldSleep = false;
369  int threadId = localStorage->m_threadId;
370  while (! shouldSleep)
371  {
372  // do work
373  localStorage->m_mutex.lock();
374  while ( IJob* job = jobQueue->consumeJob() )
375  {
376  localStorage->m_status = WorkerThreadStatus::kWorking;
377  job->executeJob( threadId );
378  localStorage->m_numJobsFinished++;
379  }
380  localStorage->m_status = WorkerThreadStatus::kWaitingForWork;
381  localStorage->m_mutex.unlock();
382  btU64 clockStart = localStorage->m_clock->getTimeMicroseconds();
383  // while queue is empty,
384  while (jobQueue->isQueueEmpty())
385  {
386  // todo: spin wait a bit to avoid hammering the empty queue
387  btSpinPause();
388  if ( localStorage->m_directive->getDirective(threadId) == WorkerThreadDirectives::kGoToSleep )
389  {
390  shouldSleep = true;
391  break;
392  }
393  // if jobs are incoming,
394  if ( localStorage->m_directive->getDirective( threadId ) == WorkerThreadDirectives::kScanForJobs )
395  {
396  clockStart = localStorage->m_clock->getTimeMicroseconds(); // reset clock
397  }
398  else
399  {
400  for ( int i = 0; i < 50; ++i )
401  {
402  btSpinPause();
403  btSpinPause();
404  btSpinPause();
405  btSpinPause();
406  if (localStorage->m_directive->getDirective( threadId ) == WorkerThreadDirectives::kScanForJobs || !jobQueue->isQueueEmpty())
407  {
408  break;
409  }
410  }
411  // if no jobs incoming and queue has been empty for the cooldown time, sleep
412  btU64 timeElapsed = localStorage->m_clock->getTimeMicroseconds() - clockStart;
413  if (timeElapsed > localStorage->m_cooldownTime)
414  {
415  shouldSleep = true;
416  break;
417  }
418  }
419  }
420  }
421  {
422  BT_PROFILE("sleep");
423  // go sleep
424  localStorage->m_mutex.lock();
425  localStorage->m_status = WorkerThreadStatus::kSleeping;
426  localStorage->m_mutex.unlock();
427  }
428 }
429 
430 
431 class btTaskSchedulerDefault : public btITaskScheduler
432 {
433  btThreadSupportInterface* m_threadSupport;
434  WorkerThreadDirectives* m_workerDirective;
435  btAlignedObjectArray<JobQueue> m_jobQueues;
436  btAlignedObjectArray<JobQueue*> m_perThreadJobQueues;
437  btAlignedObjectArray<ThreadLocalStorage> m_threadLocalStorage;
438  btSpinMutex m_antiNestingLock; // prevent nested parallel-for
439  btClock m_clock;
440  int m_numThreads;
441  int m_numWorkerThreads;
442  int m_numActiveJobQueues;
443  int m_maxNumThreads;
444  int m_numJobs;
445  static const int kFirstWorkerThreadId = 1;
446 public:
447 
448  btTaskSchedulerDefault() : btITaskScheduler("ThreadSupport")
449  {
450  m_threadSupport = NULL;
451  m_workerDirective = NULL;
452  }
453 
454  virtual ~btTaskSchedulerDefault()
455  {
456  waitForWorkersToSleep();
457 
458  for ( int i = 0; i < m_jobQueues.size(); ++i )
459  {
460  m_jobQueues[i].exit();
461  }
462 
463  if (m_threadSupport)
464  {
465  delete m_threadSupport;
466  m_threadSupport = NULL;
467  }
468  if (m_workerDirective)
469  {
470  btAlignedFree(m_workerDirective);
471  m_workerDirective = NULL;
472  }
473  }
474 
475  void init()
476  {
477  btThreadSupportInterface::ConstructionInfo constructionInfo( "TaskScheduler", WorkerThreadFunc );
478  m_threadSupport = btThreadSupportInterface::create( constructionInfo );
479  m_workerDirective = static_cast<WorkerThreadDirectives*>(btAlignedAlloc(sizeof(*m_workerDirective), 64));
480 
481  m_numWorkerThreads = m_threadSupport->getNumWorkerThreads();
482  m_maxNumThreads = m_threadSupport->getNumWorkerThreads() + 1;
483  m_numThreads = m_maxNumThreads;
484  // ideal to have one job queue for each physical processor (except for the main thread which needs no queue)
485  int numThreadsPerQueue = m_threadSupport->getLogicalToPhysicalCoreRatio();
486  int numJobQueues = (numThreadsPerQueue == 1) ? (m_maxNumThreads-1) : (m_maxNumThreads / numThreadsPerQueue);
487  m_jobQueues.resize(numJobQueues);
488  m_numActiveJobQueues = numJobQueues;
489  for ( int i = 0; i < m_jobQueues.size(); ++i )
490  {
491  m_jobQueues[i].init( m_threadSupport, &m_jobQueues );
492  }
493  m_perThreadJobQueues.resize(m_numThreads);
494  for ( int i = 0; i < m_numThreads; i++ )
495  {
496  JobQueue* jq = NULL;
497  // only worker threads get a job queue
498  if (i > 0)
499  {
500  if (numThreadsPerQueue == 1)
501  {
502  // one queue per worker thread
503  jq = &m_jobQueues[ i - kFirstWorkerThreadId ];
504  }
505  else
506  {
507  // 2 threads share each queue
508  jq = &m_jobQueues[ i / numThreadsPerQueue ];
509  }
510  }
511  m_perThreadJobQueues[i] = jq;
512  }
513  m_threadLocalStorage.resize(m_numThreads);
514  for ( int i = 0; i < m_numThreads; i++ )
515  {
516  ThreadLocalStorage& storage = m_threadLocalStorage[i];
517  storage.m_threadId = i;
518  storage.m_directive = m_workerDirective;
519  storage.m_status = WorkerThreadStatus::kSleeping;
520  storage.m_cooldownTime = 100; // 100 microseconds, threads go to sleep after this long if they have nothing to do
521  storage.m_clock = &m_clock;
522  storage.m_queue = m_perThreadJobQueues[i];
523  }
524  setWorkerDirectives( WorkerThreadDirectives::kGoToSleep ); // no work for them yet
525  setNumThreads( m_threadSupport->getCacheFriendlyNumThreads() );
526  }
527 
528  void setWorkerDirectives(WorkerThreadDirectives::Type dir)
529  {
530  m_workerDirective->setDirectiveByRange(kFirstWorkerThreadId, m_numThreads, dir);
531  }
532 
533  virtual int getMaxNumThreads() const BT_OVERRIDE
534  {
535  return m_maxNumThreads;
536  }
537 
538  virtual int getNumThreads() const BT_OVERRIDE
539  {
540  return m_numThreads;
541  }
542 
543  virtual void setNumThreads( int numThreads ) BT_OVERRIDE
544  {
545  m_numThreads = btMax( btMin(numThreads, int(m_maxNumThreads)), 1 );
546  m_numWorkerThreads = m_numThreads - 1;
547  m_numActiveJobQueues = 0;
548  // if there is at least 1 worker,
549  if ( m_numWorkerThreads > 0 )
550  {
551  // re-setup job stealing between queues to avoid attempting to steal from an inactive job queue
552  JobQueue* lastActiveContext = m_perThreadJobQueues[ m_numThreads - 1 ];
553  int iLastActiveContext = lastActiveContext - &m_jobQueues[0];
554  m_numActiveJobQueues = iLastActiveContext + 1;
555  for ( int i = 0; i < m_jobQueues.size(); ++i )
556  {
557  m_jobQueues[ i ].setupJobStealing( &m_jobQueues, m_numActiveJobQueues );
558  }
559  }
560  m_workerDirective->setDirectiveByRange(m_numThreads, BT_MAX_THREAD_COUNT, WorkerThreadDirectives::kGoToSleep);
561  }
562 
563  void waitJobs()
564  {
565  BT_PROFILE( "waitJobs" );
566  // have the main thread work until the job queues are empty
567  int numMainThreadJobsFinished = 0;
568  for ( int i = 0; i < m_numActiveJobQueues; ++i )
569  {
570  while ( IJob* job = m_jobQueues[i].consumeJob() )
571  {
572  job->executeJob( 0 );
573  numMainThreadJobsFinished++;
574  }
575  }
576 
577  // done with jobs for now, tell workers to rest (but not sleep)
578  setWorkerDirectives( WorkerThreadDirectives::kStayAwakeButIdle );
579 
580  btU64 clockStart = m_clock.getTimeMicroseconds();
581  // wait for workers to finish any jobs in progress
582  while ( true )
583  {
584  int numWorkerJobsFinished = 0;
585  for ( int iThread = kFirstWorkerThreadId; iThread < m_numThreads; ++iThread )
586  {
587  ThreadLocalStorage* storage = &m_threadLocalStorage[iThread];
588  storage->m_mutex.lock();
589  numWorkerJobsFinished += storage->m_numJobsFinished;
590  storage->m_mutex.unlock();
591  }
592  if (numWorkerJobsFinished + numMainThreadJobsFinished == m_numJobs)
593  {
594  break;
595  }
596  btU64 timeElapsed = m_clock.getTimeMicroseconds() - clockStart;
597  btAssert(timeElapsed < 1000);
598  if (timeElapsed > 100000)
599  {
600  break;
601  }
602  btSpinPause();
603  }
604  }
605 
606  void wakeWorkers(int numWorkersToWake)
607  {
608  BT_PROFILE( "wakeWorkers" );
609  btAssert( m_workerDirective->getDirective(1) == WorkerThreadDirectives::kScanForJobs );
610  int numDesiredWorkers = btMin(numWorkersToWake, m_numWorkerThreads);
611  int numActiveWorkers = 0;
612  for ( int iWorker = 0; iWorker < m_numWorkerThreads; ++iWorker )
613  {
614  // note this count of active workers is not necessarily totally reliable, because a worker thread could be
615  // just about to put itself to sleep. So we may on occasion fail to wake up all the workers. It should be rare.
616  ThreadLocalStorage& storage = m_threadLocalStorage[ kFirstWorkerThreadId + iWorker ];
617  if (storage.m_status != WorkerThreadStatus::kSleeping)
618  {
619  numActiveWorkers++;
620  }
621  }
622  for ( int iWorker = 0; iWorker < m_numWorkerThreads && numActiveWorkers < numDesiredWorkers; ++iWorker )
623  {
624  ThreadLocalStorage& storage = m_threadLocalStorage[ kFirstWorkerThreadId + iWorker ];
625  if (storage.m_status == WorkerThreadStatus::kSleeping)
626  {
627  m_threadSupport->runTask( iWorker, &storage );
628  numActiveWorkers++;
629  }
630  }
631  }
632 
633  void waitForWorkersToSleep()
634  {
635  BT_PROFILE( "waitForWorkersToSleep" );
636  setWorkerDirectives( WorkerThreadDirectives::kGoToSleep );
637  m_threadSupport->waitForAllTasks();
638  for ( int i = kFirstWorkerThreadId; i < m_numThreads; i++ )
639  {
640  ThreadLocalStorage& storage = m_threadLocalStorage[i];
641  btAssert( storage.m_status == WorkerThreadStatus::kSleeping );
642  }
643  }
644 
645  virtual void sleepWorkerThreadsHint() BT_OVERRIDE
646  {
647  BT_PROFILE( "sleepWorkerThreadsHint" );
648  // hint the task scheduler that we may not be using these threads for a little while
649  setWorkerDirectives( WorkerThreadDirectives::kGoToSleep );
650  }
651 
652  void prepareWorkerThreads()
653  {
654  for ( int i = kFirstWorkerThreadId; i < m_numThreads; ++i )
655  {
656  ThreadLocalStorage& storage = m_threadLocalStorage[i];
657  storage.m_mutex.lock();
658  storage.m_numJobsFinished = 0;
659  storage.m_mutex.unlock();
660  }
661  setWorkerDirectives( WorkerThreadDirectives::kScanForJobs );
662  }
663 
664  virtual void parallelFor( int iBegin, int iEnd, int grainSize, const btIParallelForBody& body ) BT_OVERRIDE
665  {
666  BT_PROFILE( "parallelFor_ThreadSupport" );
667  btAssert( iEnd >= iBegin );
668  btAssert( grainSize >= 1 );
669  int iterationCount = iEnd - iBegin;
670  if ( iterationCount > grainSize && m_numWorkerThreads > 0 && m_antiNestingLock.tryLock() )
671  {
672  typedef ParallelForJob JobType;
673  int jobCount = ( iterationCount + grainSize - 1 ) / grainSize;
674  m_numJobs = jobCount;
675  btAssert( jobCount >= 2 ); // need more than one job for multithreading
676  int jobSize = sizeof( JobType );
677 
678  for (int i = 0; i < m_numActiveJobQueues; ++i)
679  {
680  m_jobQueues[i].clearQueue( jobCount, jobSize );
681  }
682  // prepare worker threads for incoming work
683  prepareWorkerThreads();
684  // submit all of the jobs
685  int iJob = 0;
686  int iThread = kFirstWorkerThreadId; // first worker thread
687  for ( int i = iBegin; i < iEnd; i += grainSize )
688  {
689  btAssert( iJob < jobCount );
690  int iE = btMin( i + grainSize, iEnd );
691  JobQueue* jq = m_perThreadJobQueues[ iThread ];
692  btAssert(jq);
693  btAssert((jq - &m_jobQueues[0]) < m_numActiveJobQueues);
694  void* jobMem = jq->allocJobMem(jobSize);
695  JobType* job = new ( jobMem ) ParallelForJob( i, iE, body ); // placement new
696  jq->submitJob( job );
697  iJob++;
698  iThread++;
699  if ( iThread >= m_numThreads )
700  {
701  iThread = kFirstWorkerThreadId; // first worker thread
702  }
703  }
704  wakeWorkers( jobCount - 1 );
705 
706  // put the main thread to work on emptying the job queue and then wait for all workers to finish
707  waitJobs();
708  m_antiNestingLock.unlock();
709  }
710  else
711  {
712  BT_PROFILE( "parallelFor_mainThread" );
713  // just run on main thread
714  body.forLoop( iBegin, iEnd );
715  }
716  }
717  virtual btScalar parallelSum( int iBegin, int iEnd, int grainSize, const btIParallelSumBody& body ) BT_OVERRIDE
718  {
719  BT_PROFILE( "parallelSum_ThreadSupport" );
720  btAssert( iEnd >= iBegin );
721  btAssert( grainSize >= 1 );
722  int iterationCount = iEnd - iBegin;
723  if ( iterationCount > grainSize && m_numWorkerThreads > 0 && m_antiNestingLock.tryLock() )
724  {
725  typedef ParallelSumJob JobType;
726  int jobCount = ( iterationCount + grainSize - 1 ) / grainSize;
727  m_numJobs = jobCount;
728  btAssert( jobCount >= 2 ); // need more than one job for multithreading
729  int jobSize = sizeof( JobType );
730  for (int i = 0; i < m_numActiveJobQueues; ++i)
731  {
732  m_jobQueues[i].clearQueue( jobCount, jobSize );
733  }
734 
735  // initialize summation
736  for ( int iThread = 0; iThread < m_numThreads; ++iThread )
737  {
738  m_threadLocalStorage[iThread].m_sumResult = btScalar(0);
739  }
740 
741  // prepare worker threads for incoming work
742  prepareWorkerThreads();
743  // submit all of the jobs
744  int iJob = 0;
745  int iThread = kFirstWorkerThreadId; // first worker thread
746  for ( int i = iBegin; i < iEnd; i += grainSize )
747  {
748  btAssert( iJob < jobCount );
749  int iE = btMin( i + grainSize, iEnd );
750  JobQueue* jq = m_perThreadJobQueues[ iThread ];
751  btAssert(jq);
752  btAssert((jq - &m_jobQueues[0]) < m_numActiveJobQueues);
753  void* jobMem = jq->allocJobMem(jobSize);
754  JobType* job = new ( jobMem ) ParallelSumJob( i, iE, body, &m_threadLocalStorage[0] ); // placement new
755  jq->submitJob( job );
756  iJob++;
757  iThread++;
758  if ( iThread >= m_numThreads )
759  {
760  iThread = kFirstWorkerThreadId; // first worker thread
761  }
762  }
763  wakeWorkers( jobCount - 1 );
764 
765  // put the main thread to work on emptying the job queue and then wait for all workers to finish
766  waitJobs();
767 
768  // add up all the thread sums
769  btScalar sum = btScalar(0);
770  for ( int iThread = 0; iThread < m_numThreads; ++iThread )
771  {
772  sum += m_threadLocalStorage[ iThread ].m_sumResult;
773  }
774  m_antiNestingLock.unlock();
775  return sum;
776  }
777  else
778  {
779  BT_PROFILE( "parallelSum_mainThread" );
780  // just run on main thread
781  return body.sumLoop( iBegin, iEnd );
782  }
783  }
784 };
785 
786 
787 
789 {
790  btTaskSchedulerDefault* ts = new btTaskSchedulerDefault();
791  ts->init();
792  return ts;
793 }
794 
795 #else // #if BT_THREADSAFE
796 
798 {
799  return NULL;
800 }
801 
802 #endif // #else // #if BT_THREADSAFE
static T sum(const btAlignedObjectArray< T > &items)
virtual int getLogicalToPhysicalCoreRatio() const =0
void push_back(const T &_Val)
virtual void forLoop(int iBegin, int iEnd) const =0
virtual void unlock()=0
The btAlignedObjectArray template class uses a subset of the stl::vector interface for its methods It...
virtual void runTask(int threadIndex, void *userData)=0
static btThreadSupportInterface * create(const ConstructionInfo &info)
bool tryLock()
Definition: btThreads.cpp:216
void resizeNoInitialize(int newsize)
resize changes the number of elements in the array.
#define btAssert(x)
Definition: btScalar.h:131
virtual void lock()=0
The btClock is a portable basic clock that measures accurate time in seconds, use for profiling...
Definition: btQuickprof.h:24
btSpinMutex – lightweight spin-mutex implemented with atomic ops, never puts a thread to sleep becau...
Definition: btThreads.h:47
virtual void deleteCriticalSection(btCriticalSection *criticalSection)=0
const unsigned int BT_MAX_THREAD_COUNT
Definition: btThreads.h:33
int size() const
return the number of elements in the array
#define BT_OVERRIDE
Definition: btThreads.h:28
void lock()
Definition: btThreads.cpp:206
virtual void waitForAllTasks()=0
void unlock()
Definition: btThreads.cpp:211
#define btAlignedFree(ptr)
unsigned long long int getTimeMicroseconds()
Returns the time in us since the last call to reset or since the Clock was created.
virtual btScalar sumLoop(int iBegin, int iEnd) const =0
int capacity() const
return the pre-allocated (reserved) elements, this is at least as large as the total number of elemen...
#define BT_PROFILE(name)
Definition: btQuickprof.h:216
virtual int getCacheFriendlyNumThreads() const =0
btITaskScheduler * btCreateDefaultTaskScheduler()
void resize(int newsize, const T &fillData=T())
const T & btMax(const T &a, const T &b)
Definition: btMinMax.h:29
#define btAlignedAlloc(size, alignment)
virtual btCriticalSection * createCriticalSection()=0
virtual int getNumWorkerThreads() const =0
const T & btMin(const T &a, const T &b)
Definition: btMinMax.h:23
float btScalar
The btScalar type abstracts floating point numbers, to easily switch between double and single floati...
Definition: btScalar.h:292
#define ATTRIBUTE_ALIGNED64(a)
Definition: btScalar.h:83