Home ⌂Doc Index ◂Up ▴
Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
market.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2020 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 */
16 
17 #include "tbb/tbb_stddef.h"
18 #include "tbb/global_control.h" // global_control::active_value
19 
20 #include "market.h"
21 #include "tbb_main.h"
22 #include "governor.h"
23 #include "scheduler.h"
24 #include "itt_notify.h"
25 
26 namespace tbb {
27 namespace internal {
28 
30 #if __TBB_TASK_PRIORITY
31  arena_list_type &arenas = my_priority_levels[a.my_top_priority].arenas;
32  arena *&next = my_priority_levels[a.my_top_priority].next_arena;
33 #else /* !__TBB_TASK_PRIORITY */
34  arena_list_type &arenas = my_arenas;
35  arena *&next = my_next_arena;
36 #endif /* !__TBB_TASK_PRIORITY */
37  arenas.push_front( a );
38  if ( arenas.size() == 1 )
39  next = &*arenas.begin();
40 }
41 
43 #if __TBB_TASK_PRIORITY
44  arena_list_type &arenas = my_priority_levels[a.my_top_priority].arenas;
45  arena *&next = my_priority_levels[a.my_top_priority].next_arena;
46 #else /* !__TBB_TASK_PRIORITY */
47  arena_list_type &arenas = my_arenas;
48  arena *&next = my_next_arena;
49 #endif /* !__TBB_TASK_PRIORITY */
50  arena_list_type::iterator it = next;
51  __TBB_ASSERT( it != arenas.end(), NULL );
52  if ( next == &a ) {
53  if ( ++it == arenas.end() && arenas.size() > 1 )
54  it = arenas.begin();
55  next = &*it;
56  }
57  arenas.remove( a );
58 }
59 
60 //------------------------------------------------------------------------
61 // market
62 //------------------------------------------------------------------------
63 
64 market::market ( unsigned workers_soft_limit, unsigned workers_hard_limit, size_t stack_size )
65  : my_num_workers_hard_limit(workers_hard_limit)
66  , my_num_workers_soft_limit(workers_soft_limit)
68  , my_global_top_priority(normalized_normal_priority)
69  , my_global_bottom_priority(normalized_normal_priority)
70 #endif /* __TBB_TASK_PRIORITY */
71  , my_ref_count(1)
72  , my_stack_size(stack_size)
73  , my_workers_soft_limit_to_report(workers_soft_limit)
74 {
75 #if __TBB_TASK_PRIORITY
76  __TBB_ASSERT( my_global_reload_epoch == 0, NULL );
77  my_priority_levels[normalized_normal_priority].workers_available = my_num_workers_soft_limit;
78 #endif /* __TBB_TASK_PRIORITY */
79 
80  // Once created RML server will start initializing workers that will need
81  // global market instance to get worker stack size
83  __TBB_ASSERT( my_server, "Failed to create RML server" );
84 }
85 
86 static unsigned calc_workers_soft_limit(unsigned workers_soft_limit, unsigned workers_hard_limit) {
87  if( int soft_limit = market::app_parallelism_limit() )
88  workers_soft_limit = soft_limit-1;
89  else // if user set no limits (yet), use market's parameter
90  workers_soft_limit = max( governor::default_num_threads() - 1, workers_soft_limit );
91  if( workers_soft_limit >= workers_hard_limit )
92  workers_soft_limit = workers_hard_limit-1;
93  return workers_soft_limit;
94 }
95 
96 market& market::global_market ( bool is_public, unsigned workers_requested, size_t stack_size ) {
97  global_market_mutex_type::scoped_lock lock( theMarketMutex );
98  market *m = theMarket;
99  if( m ) {
100  ++m->my_ref_count;
101  const unsigned old_public_count = is_public? m->my_public_ref_count++ : /*any non-zero value*/1;
102  lock.release();
103  if( old_public_count==0 )
105 
106  // do not warn if default number of workers is requested
107  if( workers_requested != governor::default_num_threads()-1 ) {
108  __TBB_ASSERT( skip_soft_limit_warning > workers_requested,
109  "skip_soft_limit_warning must be larger than any valid workers_requested" );
110  unsigned soft_limit_to_report = m->my_workers_soft_limit_to_report;
111  if( soft_limit_to_report < workers_requested ) {
112  runtime_warning( "The number of workers is currently limited to %u. "
113  "The request for %u workers is ignored. Further requests for more workers "
114  "will be silently ignored until the limit changes.\n",
115  soft_limit_to_report, workers_requested );
116  // The race is possible when multiple threads report warnings.
117  // We are OK with that, as there are just multiple warnings.
119  compare_and_swap(skip_soft_limit_warning, soft_limit_to_report);
120  }
121 
122  }
123  if( m->my_stack_size < stack_size )
124  runtime_warning( "Thread stack size has been already set to %u. "
125  "The request for larger stack (%u) cannot be satisfied.\n",
126  m->my_stack_size, stack_size );
127  }
128  else {
129  // TODO: A lot is done under theMarketMutex locked. Can anything be moved out?
130  if( stack_size == 0 )
132  // Expecting that 4P is suitable for most applications.
133  // Limit to 2P for large thread number.
134  // TODO: ask RML for max concurrency and possibly correct hard_limit
135  const unsigned factor = governor::default_num_threads()<=128? 4 : 2;
136  // The requested number of threads is intentionally not considered in
137  // computation of the hard limit, in order to separate responsibilities
138  // and avoid complicated interactions between global_control and task_scheduler_init.
139  // The market guarantees that at least 256 threads might be created.
140  const unsigned workers_hard_limit = max(max(factor*governor::default_num_threads(), 256u), app_parallelism_limit());
141  const unsigned workers_soft_limit = calc_workers_soft_limit(workers_requested, workers_hard_limit);
142  // Create the global market instance
143  size_t size = sizeof(market);
144 #if __TBB_TASK_GROUP_CONTEXT
145  __TBB_ASSERT( __TBB_offsetof(market, my_workers) + sizeof(generic_scheduler*) == sizeof(market),
146  "my_workers must be the last data field of the market class");
147  size += sizeof(generic_scheduler*) * (workers_hard_limit - 1);
148 #endif /* __TBB_TASK_GROUP_CONTEXT */
150  void* storage = NFS_Allocate(1, size, NULL);
151  memset( storage, 0, size );
152  // Initialize and publish global market
153  m = new (storage) market( workers_soft_limit, workers_hard_limit, stack_size );
154  if( is_public )
155  m->my_public_ref_count = 1;
156  theMarket = m;
157  // This check relies on the fact that for shared RML default_concurrency==max_concurrency
158  if ( !governor::UsePrivateRML && m->my_server->default_concurrency() < workers_soft_limit )
159  runtime_warning( "RML might limit the number of workers to %u while %u is requested.\n"
160  , m->my_server->default_concurrency(), workers_soft_limit );
161  }
162  return *m;
163 }
164 
166 #if __TBB_COUNT_TASK_NODES
167  if ( my_task_node_count )
168  runtime_warning( "Leaked %ld task objects\n", (long)my_task_node_count );
169 #endif /* __TBB_COUNT_TASK_NODES */
170  this->market::~market(); // qualified to suppress warning
171  NFS_Free( this );
173 }
174 
175 bool market::release ( bool is_public, bool blocking_terminate ) {
176  __TBB_ASSERT( theMarket == this, "Global market instance was destroyed prematurely?" );
177  bool do_release = false;
178  {
179  global_market_mutex_type::scoped_lock lock( theMarketMutex );
180  if ( blocking_terminate ) {
181  __TBB_ASSERT( is_public, "Only an object with a public reference can request the blocking terminate" );
182  while ( my_public_ref_count == 1 && my_ref_count > 1 ) {
183  lock.release();
184  // To guarantee that request_close_connection() is called by the last master, we need to wait till all
185  // references are released. Re-read my_public_ref_count to limit waiting if new masters are created.
186  // Theoretically, new private references to the market can be added during waiting making it potentially
187  // endless.
188  // TODO: revise why the weak scheduler needs market's pointer and try to remove this wait.
189  // Note that the market should know about its schedulers for cancellation/exception/priority propagation,
190  // see e.g. task_group_context::cancel_group_execution()
192  __TBB_Yield();
193  lock.acquire( theMarketMutex );
194  }
195  }
196  if ( is_public ) {
197  __TBB_ASSERT( theMarket == this, "Global market instance was destroyed prematurely?" );
200  }
201  if ( --my_ref_count == 0 ) {
203  do_release = true;
204  theMarket = NULL;
205  }
206  }
207  if( do_release ) {
208  __TBB_ASSERT( !__TBB_load_with_acquire(my_public_ref_count), "No public references remain if we remove the market." );
209  // inform RML that blocking termination is required
210  my_join_workers = blocking_terminate;
211  my_server->request_close_connection();
212  return blocking_terminate;
213  }
214  return false;
215 }
216 
218  int old_request = my_num_workers_requested;
220 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
221  if (my_mandatory_num_requested > 0) {
224  }
225 #endif
226 #if __TBB_TASK_PRIORITY
227  my_priority_levels[my_global_top_priority].workers_available = my_num_workers_requested;
228  update_allotment(my_global_top_priority);
229 #else
231 #endif
232  return my_num_workers_requested - old_request;
233 }
234 
235 void market::set_active_num_workers ( unsigned soft_limit ) {
236  market *m;
237 
238  {
239  global_market_mutex_type::scoped_lock lock( theMarketMutex );
240  if ( !theMarket )
241  return; // actual value will be used at market creation
242  m = theMarket;
243  if (m->my_num_workers_soft_limit == soft_limit)
244  return;
245  ++m->my_ref_count;
246  }
247  // have my_ref_count for market, use it safely
248 
249  int delta = 0;
250  {
252  __TBB_ASSERT(soft_limit <= m->my_num_workers_hard_limit, NULL);
253 
254 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
255 #if __TBB_TASK_PRIORITY
256 #define FOR_EACH_PRIORITY_LEVEL_BEGIN { \
257  for (int p = m->my_global_top_priority; p >= m->my_global_bottom_priority; --p) { \
258  priority_level_info& pl = m->my_priority_levels[p]; \
259  arena_list_type& arenas = pl.arenas;
260 #else
261 #define FOR_EACH_PRIORITY_LEVEL_BEGIN { { \
262  const int p = 0; \
263  tbb::internal::suppress_unused_warning(p); \
264  arena_list_type& arenas = m->my_arenas;
265 #endif
266 #define FOR_EACH_PRIORITY_LEVEL_END } }
267 
268  if (m->my_num_workers_soft_limit == 0 && m->my_mandatory_num_requested > 0) {
269  FOR_EACH_PRIORITY_LEVEL_BEGIN
270  for (arena_list_type::iterator it = arenas.begin(); it != arenas.end(); ++it)
271  if (it->my_global_concurrency_mode)
272  m->disable_mandatory_concurrency_impl(&*it);
273  FOR_EACH_PRIORITY_LEVEL_END
274  }
275  __TBB_ASSERT(m->my_mandatory_num_requested == 0, NULL);
276 #endif
277 
278  as_atomic(m->my_num_workers_soft_limit) = soft_limit;
279  // report only once after new soft limit value is set
280  m->my_workers_soft_limit_to_report = soft_limit;
281 
282 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
283  if (m->my_num_workers_soft_limit == 0) {
284  FOR_EACH_PRIORITY_LEVEL_BEGIN
285  for (arena_list_type::iterator it = arenas.begin(); it != arenas.end(); ++it) {
286  if (!it->my_task_stream.empty(p))
287  m->enable_mandatory_concurrency_impl(&*it);
288  }
289  FOR_EACH_PRIORITY_LEVEL_END
290  }
291 #undef FOR_EACH_PRIORITY_LEVEL_BEGIN
292 #undef FOR_EACH_PRIORITY_LEVEL_END
293 #endif
294 
295  delta = m->update_workers_request();
296  }
297  // adjust_job_count_estimate must be called outside of any locks
298  if( delta!=0 )
299  m->my_server->adjust_job_count_estimate( delta );
300  // release internal market reference to match ++m->my_ref_count above
301  m->release( /*is_public=*/false, /*blocking_terminate=*/false );
302 }
303 
304 bool governor::does_client_join_workers (const tbb::internal::rml::tbb_client &client) {
305  return ((const market&)client).must_join_workers();
306 }
307 
308 arena* market::create_arena ( int num_slots, int num_reserved_slots, size_t stack_size ) {
309  __TBB_ASSERT( num_slots > 0, NULL );
310  __TBB_ASSERT( num_reserved_slots <= num_slots, NULL );
311  // Add public market reference for master thread/task_arena (that adds an internal reference in exchange).
312  market &m = global_market( /*is_public=*/true, num_slots-num_reserved_slots, stack_size );
313 
314  arena& a = arena::allocate_arena( m, num_slots, num_reserved_slots );
315  // Add newly created arena into the existing market's list.
318  return &a;
319 }
320 
323  __TBB_ASSERT( theMarket == this, "Global market instance was destroyed prematurely?" );
324  __TBB_ASSERT( !a.my_slots[0].my_scheduler, NULL );
325  if (a.my_global_concurrency_mode)
326  disable_mandatory_concurrency_impl(&a);
327 
331 }
332 
333 void market::try_destroy_arena ( arena* a, uintptr_t aba_epoch ) {
334  bool locked = true;
335  __TBB_ASSERT( a, NULL );
336  // we hold reference to the market, so it cannot be destroyed at any moment here
337  __TBB_ASSERT( this == theMarket, NULL );
338  __TBB_ASSERT( my_ref_count!=0, NULL );
341 #if __TBB_TASK_PRIORITY
342  // scan all priority levels, not only in [my_global_bottom_priority;my_global_top_priority]
343  // range, because arena to be destroyed can have no outstanding request for workers
344  for ( int p = num_priority_levels-1; p >= 0; --p ) {
345  priority_level_info &pl = my_priority_levels[p];
346  arena_list_type &my_arenas = pl.arenas;
347 #endif /* __TBB_TASK_PRIORITY */
349  for ( ; it != my_arenas.end(); ++it ) {
350  if ( a == &*it ) {
351  if ( it->my_aba_epoch == aba_epoch ) {
352  // Arena is alive
353  if ( !a->my_num_workers_requested && !a->my_references ) {
354  __TBB_ASSERT( !a->my_num_workers_allotted && (a->my_pool_state == arena::SNAPSHOT_EMPTY || !a->my_max_num_workers), "Inconsistent arena state" );
355  // Arena is abandoned. Destroy it.
356  detach_arena( *a );
358  locked = false;
359  a->free_arena();
360  }
361  }
362  if (locked)
364  return;
365  }
366  }
367 #if __TBB_TASK_PRIORITY
368  }
369 #endif /* __TBB_TASK_PRIORITY */
371 }
372 
375  if ( arenas.empty() )
376  return NULL;
377  arena_list_type::iterator it = hint;
378  __TBB_ASSERT( it != arenas.end(), NULL );
379  do {
380  arena& a = *it;
381  if ( ++it == arenas.end() )
382  it = arenas.begin();
385  return &a;
386  }
387  } while ( it != hint );
388  return NULL;
389 }
390 
391 int market::update_allotment ( arena_list_type& arenas, int workers_demand, int max_workers ) {
392  __TBB_ASSERT( workers_demand > 0, NULL );
393  max_workers = min(workers_demand, max_workers);
394  int assigned = 0;
395  int carry = 0;
396  for (arena_list_type::iterator it = arenas.begin(); it != arenas.end(); ++it) {
397  arena& a = *it;
398  if (a.my_num_workers_requested <= 0) {
400  continue;
401  }
402  int allotted = 0;
403 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
404  if (my_num_workers_soft_limit == 0) {
405  __TBB_ASSERT(max_workers == 0 || max_workers == 1, NULL);
406  allotted = a.my_global_concurrency_mode && assigned < max_workers ? 1 : 0;
407  } else
408 #endif
409  {
410  int tmp = a.my_num_workers_requested * max_workers + carry;
411  allotted = tmp / workers_demand;
412  carry = tmp % workers_demand;
413  // a.my_num_workers_requested may temporarily exceed a.my_max_num_workers
414  allotted = min(allotted, (int)a.my_max_num_workers);
415  }
416  a.my_num_workers_allotted = allotted;
417  assigned += allotted;
418  }
419  __TBB_ASSERT( 0 <= assigned && assigned <= max_workers, NULL );
420  return assigned;
421 }
422 
425  if ( a ) {
426  for ( arena_list_type::iterator it = arenas.begin(); it != arenas.end(); ++it )
427  if ( a == &*it )
428  return true;
429  }
430  return false;
431 }
432 
433 #if __TBB_TASK_PRIORITY
434 inline void market::update_global_top_priority ( intptr_t newPriority ) {
435  GATHER_STATISTIC( ++governor::local_scheduler_if_initialized()->my_counters.market_prio_switches );
436  my_global_top_priority = newPriority;
437  my_priority_levels[newPriority].workers_available =
438 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
439  my_mandatory_num_requested && !my_num_workers_soft_limit ? 1 :
440 #endif
442  advance_global_reload_epoch();
443 }
444 
445 inline void market::reset_global_priority () {
446  my_global_bottom_priority = normalized_normal_priority;
447  update_global_top_priority(normalized_normal_priority);
448 }
449 
450 arena* market::arena_in_need ( arena* prev_arena ) {
451  if( as_atomic(my_total_demand) <= 0 )
452  return NULL;
453  arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex, /*is_writer=*/false);
455  int p = my_global_top_priority;
456  arena *a = NULL;
457 
458  // Checks if arena is alive or not
459  if ( is_arena_in_list( my_priority_levels[p].arenas, prev_arena ) ) {
460  a = arena_in_need( my_priority_levels[p].arenas, prev_arena );
461  }
462 
463  while ( !a && p >= my_global_bottom_priority ) {
464  priority_level_info &pl = my_priority_levels[p--];
465  a = arena_in_need( pl.arenas, pl.next_arena );
466  if ( a ) {
467  as_atomic(pl.next_arena) = a; // a subject for innocent data race under the reader lock
468  // TODO: rework global round robin policy to local or random to avoid this write
469  }
470  // TODO: When refactoring task priority code, take into consideration the
471  // __TBB_TRACK_PRIORITY_LEVEL_SATURATION sections from earlier versions of TBB
472  }
473  return a;
474 }
475 
476 void market::update_allotment ( intptr_t highest_affected_priority ) {
477  intptr_t i = highest_affected_priority;
478  int available = my_priority_levels[i].workers_available;
479  for ( ; i >= my_global_bottom_priority; --i ) {
480  priority_level_info &pl = my_priority_levels[i];
481  pl.workers_available = available;
482  if ( pl.workers_requested ) {
483  available -= update_allotment( pl.arenas, pl.workers_requested, available );
484  if ( available <= 0 ) { // TODO: assertion?
485  available = 0;
486  break;
487  }
488  }
489  }
490  __TBB_ASSERT( i <= my_global_bottom_priority || !available, NULL );
491  for ( --i; i >= my_global_bottom_priority; --i ) {
492  priority_level_info &pl = my_priority_levels[i];
493  pl.workers_available = 0;
494  arena_list_type::iterator it = pl.arenas.begin();
495  for ( ; it != pl.arenas.end(); ++it ) {
496  __TBB_ASSERT( it->my_num_workers_requested >= 0 || !it->my_num_workers_allotted, NULL );
497  it->my_num_workers_allotted = 0;
498  }
499  }
500 }
501 #endif /* __TBB_TASK_PRIORITY */
502 
503 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
504 void market::enable_mandatory_concurrency_impl ( arena *a ) {
505  __TBB_ASSERT(!a->my_global_concurrency_mode, NULL);
507 
508  a->my_global_concurrency_mode = true;
509  my_mandatory_num_requested++;
510 }
511 
512 void market::enable_mandatory_concurrency ( arena *a ) {
513  int delta = 0;
514  {
515  arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex);
516  if (my_num_workers_soft_limit != 0 || a->my_global_concurrency_mode)
517  return;
518 
519  enable_mandatory_concurrency_impl(a);
520  delta = update_workers_request();
521  }
522 
523  if (delta != 0)
524  my_server->adjust_job_count_estimate(delta);
525 }
526 
527 void market::disable_mandatory_concurrency_impl(arena* a) {
528  __TBB_ASSERT(a->my_global_concurrency_mode, NULL);
529  __TBB_ASSERT(my_mandatory_num_requested > 0, NULL);
530 
531  a->my_global_concurrency_mode = false;
532  my_mandatory_num_requested--;
533 }
534 
535 void market::mandatory_concurrency_disable ( arena *a ) {
536  int delta = 0;
537  {
538  arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex);
539  if (!a->my_global_concurrency_mode)
540  return;
541  // There is a racy window in advertise_new_work between mandtory concurrency enabling and
542  // setting SNAPSHOT_FULL. It gives a chance to spawn request to disable mandatory concurrency.
543  // Therefore, we double check that there is no enqueued tasks.
544  if (a->has_enqueued_tasks())
545  return;
546 
548  disable_mandatory_concurrency_impl(a);
549 
550  delta = update_workers_request();
551  }
552  if (delta != 0)
553  my_server->adjust_job_count_estimate(delta);
554 }
555 #endif /* __TBB_ENQUEUE_ENFORCED_CONCURRENCY */
556 
557 void market::adjust_demand ( arena& a, int delta ) {
558  __TBB_ASSERT( theMarket, "market instance was destroyed prematurely?" );
559  if ( !delta )
560  return;
562  int prev_req = a.my_num_workers_requested;
563  a.my_num_workers_requested += delta;
564  if ( a.my_num_workers_requested <= 0 ) {
566  if ( prev_req <= 0 ) {
568  return;
569  }
570  delta = -prev_req;
571  }
572  else if ( prev_req < 0 ) {
573  delta = a.my_num_workers_requested;
574  }
575  my_total_demand += delta;
576  unsigned effective_soft_limit = my_num_workers_soft_limit;
577  if (my_mandatory_num_requested > 0) {
578  __TBB_ASSERT(effective_soft_limit == 0, NULL);
579  effective_soft_limit = 1;
580  }
581 #if !__TBB_TASK_PRIORITY
582  update_allotment(effective_soft_limit);
583 #else /* !__TBB_TASK_PRIORITY */
584  intptr_t p = a.my_top_priority;
585  priority_level_info &pl = my_priority_levels[p];
586  pl.workers_requested += delta;
587  __TBB_ASSERT( pl.workers_requested >= 0, NULL );
588  if ( a.my_num_workers_requested <= 0 ) {
589  if ( a.my_top_priority != normalized_normal_priority ) {
590  GATHER_STATISTIC( ++governor::local_scheduler_if_initialized()->my_counters.arena_prio_resets );
591  update_arena_top_priority( a, normalized_normal_priority );
592  }
593  a.my_bottom_priority = normalized_normal_priority;
594  }
595  if ( p == my_global_top_priority ) {
596  if ( !pl.workers_requested ) {
597  while ( --p >= my_global_bottom_priority && !my_priority_levels[p].workers_requested )
598  continue;
599  if ( p < my_global_bottom_priority )
600  reset_global_priority();
601  else
602  update_global_top_priority(p);
603  }
604  my_priority_levels[my_global_top_priority].workers_available = effective_soft_limit;
605  update_allotment( my_global_top_priority );
606  }
607  else if ( p > my_global_top_priority ) {
608  __TBB_ASSERT( pl.workers_requested > 0, NULL );
609  // TODO: investigate if the following invariant is always valid
610  __TBB_ASSERT( a.my_num_workers_requested >= 0, NULL );
611  update_global_top_priority(p);
612  a.my_num_workers_allotted = min( (int)effective_soft_limit, a.my_num_workers_requested );
613  my_priority_levels[p - 1].workers_available = effective_soft_limit - a.my_num_workers_allotted;
614  update_allotment( p - 1 );
615  }
616  else if ( p == my_global_bottom_priority ) {
617  if ( !pl.workers_requested ) {
618  while ( ++p <= my_global_top_priority && !my_priority_levels[p].workers_requested )
619  continue;
620  if ( p > my_global_top_priority )
621  reset_global_priority();
622  else
623  my_global_bottom_priority = p;
624  }
625  else
626  update_allotment( p );
627  }
628  else if ( p < my_global_bottom_priority ) {
629  int prev_bottom = my_global_bottom_priority;
630  my_global_bottom_priority = p;
631  update_allotment( prev_bottom );
632  }
633  else {
634  __TBB_ASSERT( my_global_bottom_priority < p && p < my_global_top_priority, NULL );
635  update_allotment( p );
636  }
637  __TBB_ASSERT( my_global_top_priority >= a.my_top_priority || a.my_num_workers_requested<=0, NULL );
639 #endif /* !__TBB_TASK_PRIORITY */
640  if ( delta > 0 ) {
641  // can't overflow soft_limit, but remember values request by arenas in
642  // my_total_demand to not prematurely release workers to RML
643  if ( my_num_workers_requested+delta > (int)effective_soft_limit)
644  delta = effective_soft_limit - my_num_workers_requested;
645  } else {
646  // the number of workers should not be decreased below my_total_demand
648  delta = min(my_total_demand, (int)effective_soft_limit) - my_num_workers_requested;
649  }
650  my_num_workers_requested += delta;
651  __TBB_ASSERT( my_num_workers_requested <= (int)effective_soft_limit, NULL );
652 
654  // Must be called outside of any locks
655  my_server->adjust_job_count_estimate( delta );
657 }
658 
659 void market::process( job& j ) {
660  generic_scheduler& s = static_cast<generic_scheduler&>(j);
661  // s.my_arena can be dead. Don't access it until arena_in_need is called
662  arena *a = s.my_arena;
663  __TBB_ASSERT( governor::is_set(&s), NULL );
664 
665  for (int i = 0; i < 2; ++i) {
666  while ( (a = arena_in_need(a)) ) {
667  a->process(s);
668  a = NULL; // to avoid double checks in arena_in_need(arena*) for the same priority level
669  }
670  // Workers leave market because there is no arena in need. It can happen earlier than
671  // adjust_job_count_estimate() decreases my_slack and RML can put this thread to sleep.
672  // It might result in a busy-loop checking for my_slack<0 and calling this method instantly.
673  // the yield refines this spinning.
674  if ( !i )
675  __TBB_Yield();
676  }
677 
678  GATHER_STATISTIC( ++s.my_counters.market_roundtrips );
679 }
680 
681 void market::cleanup( job& j ) {
682  __TBB_ASSERT( theMarket != this, NULL );
683  generic_scheduler& s = static_cast<generic_scheduler&>(j);
685  __TBB_ASSERT( !mine || mine->is_worker(), NULL );
686  if( mine!=&s ) {
688  generic_scheduler::cleanup_worker( &s, mine!=NULL );
690  } else {
692  }
693 }
694 
696  destroy();
697 }
698 
699 ::rml::job* market::create_one_job() {
700  unsigned index = ++my_first_unused_worker_idx;
701  __TBB_ASSERT( index > 0, NULL );
702  ITT_THREAD_SET_NAME(_T("TBB Worker Thread"));
703  // index serves as a hint decreasing conflicts between workers when they migrate between arenas
704  generic_scheduler* s = generic_scheduler::create_worker( *this, index, /* genuine = */ true );
705 #if __TBB_TASK_GROUP_CONTEXT
706  __TBB_ASSERT( index <= my_num_workers_hard_limit, NULL );
707  __TBB_ASSERT( !my_workers[index - 1], NULL );
708  my_workers[index - 1] = s;
709 #endif /* __TBB_TASK_GROUP_CONTEXT */
710  return s;
711 }
712 
713 #if __TBB_TASK_PRIORITY
714 void market::update_arena_top_priority ( arena& a, intptr_t new_priority ) {
715  GATHER_STATISTIC( ++governor::local_scheduler_if_initialized()->my_counters.arena_prio_switches );
716  __TBB_ASSERT( a.my_top_priority != new_priority, NULL );
717  priority_level_info &prev_level = my_priority_levels[a.my_top_priority],
718  &new_level = my_priority_levels[new_priority];
720  a.my_top_priority = new_priority;
722  as_atomic( a.my_reload_epoch ).fetch_and_increment<tbb::release>(); // TODO: synch with global reload epoch in order to optimize usage of local reload epoch
723  prev_level.workers_requested -= a.my_num_workers_requested;
724  new_level.workers_requested += a.my_num_workers_requested;
725  __TBB_ASSERT( prev_level.workers_requested >= 0 && new_level.workers_requested >= 0, NULL );
726 }
727 
728 bool market::lower_arena_priority ( arena& a, intptr_t new_priority, uintptr_t old_reload_epoch ) {
729  // TODO: replace the lock with a try_lock loop which performs a double check of the epoch
730  arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex);
731  if ( a.my_reload_epoch != old_reload_epoch ) {
733  return false;
734  }
735  __TBB_ASSERT( a.my_top_priority > new_priority, NULL );
736  __TBB_ASSERT( my_global_top_priority >= a.my_top_priority, NULL );
737 
738  intptr_t p = a.my_top_priority;
739  update_arena_top_priority( a, new_priority );
740  if ( a.my_num_workers_requested > 0 ) {
741  if ( my_global_bottom_priority > new_priority ) {
742  my_global_bottom_priority = new_priority;
743  }
744  if ( p == my_global_top_priority && !my_priority_levels[p].workers_requested ) {
745  // Global top level became empty
746  for ( --p; p>my_global_bottom_priority && !my_priority_levels[p].workers_requested; --p ) continue;
747  update_global_top_priority(p);
748  }
749  update_allotment( p );
750  }
751 
752  __TBB_ASSERT( my_global_top_priority >= a.my_top_priority, NULL );
754  return true;
755 }
756 
757 bool market::update_arena_priority ( arena& a, intptr_t new_priority ) {
758  // TODO: do not acquire this global lock while checking arena's state.
759  arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex);
760 
761  tbb::internal::assert_priority_valid(new_priority);
762  __TBB_ASSERT( my_global_top_priority >= a.my_top_priority || a.my_num_workers_requested <= 0, NULL );
764  if ( a.my_top_priority == new_priority ) {
765  return false;
766  }
767  else if ( a.my_top_priority > new_priority ) {
768  if ( a.my_bottom_priority > new_priority )
769  a.my_bottom_priority = new_priority;
770  return false;
771  }
772  else if ( a.my_num_workers_requested <= 0 ) {
773  return false;
774  }
775 
776  __TBB_ASSERT( my_global_top_priority >= a.my_top_priority, NULL );
777 
778  intptr_t p = a.my_top_priority;
779  intptr_t highest_affected_level = max(p, new_priority);
780  update_arena_top_priority( a, new_priority );
781 
782  if ( my_global_top_priority < new_priority ) {
783  update_global_top_priority(new_priority);
784  }
785  else if ( my_global_top_priority == new_priority ) {
786  advance_global_reload_epoch();
787  }
788  else {
789  __TBB_ASSERT( new_priority < my_global_top_priority, NULL );
790  __TBB_ASSERT( new_priority > my_global_bottom_priority, NULL );
791  if ( p == my_global_top_priority && !my_priority_levels[p].workers_requested ) {
792  // Global top level became empty
793  __TBB_ASSERT( my_global_bottom_priority < p, NULL );
794  for ( --p; !my_priority_levels[p].workers_requested; --p ) continue;
795  __TBB_ASSERT( p >= new_priority, NULL );
796  update_global_top_priority(p);
797  highest_affected_level = p;
798  }
799  }
800  if ( p == my_global_bottom_priority ) {
801  // Arena priority was increased from the global bottom level.
802  __TBB_ASSERT( p < new_priority, NULL );
803  __TBB_ASSERT( new_priority <= my_global_top_priority, NULL );
804  while ( my_global_bottom_priority < my_global_top_priority
805  && !my_priority_levels[my_global_bottom_priority].workers_requested )
806  ++my_global_bottom_priority;
807  __TBB_ASSERT( my_global_bottom_priority <= new_priority, NULL );
808  __TBB_ASSERT( my_priority_levels[my_global_bottom_priority].workers_requested > 0, NULL );
809  }
810  update_allotment( highest_affected_level );
811 
812  __TBB_ASSERT( my_global_top_priority >= a.my_top_priority, NULL );
814  return true;
815 }
816 #endif /* __TBB_TASK_PRIORITY */
817 
818 } // namespace internal
819 } // namespace tbb
#define GATHER_STATISTIC(x)
Work stealing task scheduler.
Definition: scheduler.h:137
void insert_arena_into_list(arena &a)
Definition: market.cpp:29
tbb::atomic< uintptr_t > my_pool_state
Current task pool state and estimate of available tasks amount.
Definition: arena.h:195
unsigned num_workers_active() const
The number of workers active in the arena.
Definition: arena.h:334
static const intptr_t num_priority_levels
static const unsigned skip_soft_limit_warning
The value indicating that the soft limit warning is unnecessary.
Definition: market.h:158
void update_allotment(unsigned effective_soft_limit)
Recalculates the number of workers assigned to each arena in the list.
Definition: market.h:214
atomic< unsigned > my_first_unused_worker_idx
First unused index of worker.
Definition: market.h:86
void *__TBB_EXPORTED_FUNC NFS_Allocate(size_t n_element, size_t element_size, void *hint)
Allocate memory on cache/sector line boundary.
unsigned my_ref_count
Reference count controlling market object lifetime.
Definition: market.h:146
arena_slot my_slots[1]
Definition: arena.h:390
void assert_market_valid() const
Definition: market.h:241
generic_scheduler * my_scheduler
Scheduler of the thread attached to the slot.
static unsigned calc_workers_soft_limit(unsigned workers_soft_limit, unsigned workers_hard_limit)
Definition: market.cpp:86
void detach_arena(arena &)
Removes the arena from the market's list.
Definition: market.cpp:322
uintptr_t my_arenas_aba_epoch
ABA prevention marker to assign to newly created arenas.
Definition: market.h:143
rml::tbb_server * my_server
Pointer to the RML server object that services this TBB instance.
Definition: market.h:70
static void set_active_num_workers(unsigned w)
Set number of active workers.
Definition: market.cpp:235
uintptr_t my_aba_epoch
ABA prevention marker.
Definition: arena.h:235
static void assume_scheduler(generic_scheduler *s)
Temporarily set TLS slot to the given scheduler.
Definition: governor.cpp:116
int my_num_workers_requested
The number of workers that are currently requested from the resource manager.
Definition: arena.h:188
static generic_scheduler * create_worker(market &m, size_t index, bool geniune)
Initialize a scheduler for a worker thread.
Definition: scheduler.cpp:1273
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t size
unsigned my_max_num_workers
The number of workers requested by the master thread owning the arena.
Definition: arena.h:185
static unsigned default_num_threads()
Definition: governor.h:84
void cleanup(job &j) __TBB_override
Definition: market.cpp:681
static const pool_state_t SNAPSHOT_EMPTY
No tasks to steal since last snapshot was taken.
Definition: arena.h:318
static size_t active_value(parameter p)
void adjust_demand(arena &, int delta)
Request that arena's need in workers should be adjusted.
Definition: market.cpp:557
static global_market_mutex_type theMarketMutex
Mutex guarding creation/destruction of theMarket, insertions/deletions in my_arenas,...
Definition: market.h:63
friend class arena
Definition: market.h:47
static market & global_market(bool is_public, unsigned max_num_workers=0, size_t stack_size=0)
Factory method creating new market object.
Definition: market.cpp:96
static void add_ref()
Add reference to resources. If first reference added, acquire the resources.
Definition: tbb_main.cpp:117
void process(generic_scheduler &)
Registers the worker with the arena and enters TBB scheduler dispatch loop.
Definition: arena.cpp:146
void destroy()
Destroys and deallocates market object created by market::create()
Definition: market.cpp:165
unsigned my_num_workers_hard_limit
Maximal number of workers allowed for use by the underlying resource manager.
Definition: market.h:74
static const unsigned ref_worker
Definition: arena.h:328
Release.
Definition: atomic.h:59
unsigned my_num_workers_allotted
The number of workers that have been marked out by the resource manager to service the arena.
Definition: arena.h:147
int my_total_demand
Number of workers that were requested by all arenas.
Definition: market.h:89
static bool is_set(generic_scheduler *s)
Used to check validity of the local scheduler TLS contents.
Definition: governor.cpp:120
bool my_join_workers
Shutdown mode.
Definition: market.h:155
arena_list_type my_arenas
List of registered arenas.
Definition: market.h:135
void lock()
Acquire writer lock.
atomic< unsigned > my_references
Reference counter for the arena.
Definition: arena.h:153
arena * arena_in_need(arena *prev_arena)
Returns next arena that needs more workers, or NULL.
Definition: market.h:221
#define __TBB_Yield()
Definition: ibm_aix51.h:44
arena * my_next_arena
The first arena to be checked when idle worker seeks for an arena to enter.
Definition: market.h:139
void const char const char int ITT_FORMAT __itt_group_sync p
void process(job &j) __TBB_override
Definition: market.cpp:659
#define ITT_THREAD_SET_NAME(name)
Definition: itt_notify.h:113
bool is_arena_in_list(arena_list_type &arenas, arena *a)
Definition: market.cpp:424
#define __TBB_TASK_PRIORITY
Definition: tbb_config.h:571
market(unsigned workers_soft_limit, unsigned workers_hard_limit, size_t stack_size)
Constructor.
Definition: market.cpp:64
static bool does_client_join_workers(const tbb::internal::rml::tbb_client &client)
Definition: market.cpp:304
#define _T(string_literal)
Standard Windows style macro to markup the string literals.
Definition: itt_notify.h:59
#define __TBB_offsetof(class_name, member_name)
Extended variant of the standard offsetof macro.
Definition: tbb_stddef.h:266
unsigned my_num_workers_soft_limit
Current application-imposed limit on the number of workers (see set_active_num_workers())
Definition: market.h:78
T min(const T &val1, const T &val2)
Utility template function returning lesser of the two values.
Definition: tbb_misc.h:110
bool is_worker() const
True if running on a worker thread, false otherwise.
Definition: scheduler.h:673
int update_workers_request()
Recalculates the number of workers requested from RML and updates the allotment.
Definition: market.cpp:217
void try_destroy_arena(arena *, uintptr_t aba_epoch)
Removes the arena from the market's list.
Definition: market.cpp:333
unsigned my_workers_soft_limit_to_report
Either workers soft limit to be reported via runtime_warning() or skip_soft_limit_warning.
Definition: market.h:161
static rml::tbb_server * create_rml_server(rml::tbb_client &)
Definition: governor.cpp:92
static generic_scheduler * local_scheduler_if_initialized()
Definition: governor.h:139
void acknowledge_close_connection() __TBB_override
Definition: market.cpp:695
void __TBB_EXPORTED_FUNC NFS_Free(void *)
Free memory allocated by NFS_Allocate.
static arena & allocate_arena(market &, unsigned num_slots, unsigned num_reserved_slots)
Allocate an instance of arena.
Definition: arena.cpp:285
job * create_one_job() __TBB_override
Definition: market.cpp:699
bool release(bool is_public, bool blocking_terminate)
Decrements market's refcount and destroys it in the end.
Definition: market.cpp:175
void free_arena()
Completes arena shutdown, destructs and deallocates it.
Definition: arena.cpp:296
The scoped locking pattern.
Definition: spin_rw_mutex.h:86
void const char const char int ITT_FORMAT __itt_group_sync s
unsigned my_public_ref_count
Count of master threads attached.
Definition: market.h:149
T max(const T &val1, const T &val2)
Utility template function returning greater of the two values.
Definition: tbb_misc.h:119
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
atomic< T > & as_atomic(T &t)
Definition: atomic.h:572
int my_num_workers_requested
Number of workers currently requested from RML.
Definition: market.h:81
arenas_list_mutex_type my_arenas_list_mutex
Definition: market.h:67
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
The graph class.
size_t my_stack_size
Stack size of worker threads.
Definition: market.h:152
T __TBB_load_with_acquire(const volatile T &location)
Definition: tbb_machine.h:709
void __TBB_EXPORTED_FUNC runtime_warning(const char *format,...)
Report a runtime warning.
static void cleanup_worker(void *arg, bool worker)
Perform necessary cleanup when a worker thread finishes.
Definition: scheduler.cpp:1331
void remove_arena_from_list(arena &a)
Definition: market.cpp:42
static bool UsePrivateRML
Definition: governor.h:64
static unsigned app_parallelism_limit()
Reports active parallelism level according to user's settings.
Definition: tbb_main.cpp:512
static void remove_ref()
Remove reference to resources. If last reference removed, release the resources.
Definition: tbb_main.cpp:122
void unlock()
Release lock.
static market * theMarket
Currently active global market.
Definition: market.h:58
static arena * create_arena(int num_slots, int num_reserved_slots, size_t stack_size)
Creates an arena object.
Definition: market.cpp:308

Copyright © 2005-2020 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.