30 #if __TBB_TASK_PRIORITY    32     arena *&next = my_priority_levels[a.my_top_priority].next_arena;
    38     if ( arenas.
size() == 1 )
    39         next = &*arenas.
begin();
    43 #if __TBB_TASK_PRIORITY    45     arena *&next = my_priority_levels[a.my_top_priority].next_arena;
    53         if ( ++it == arenas.
end() && arenas.
size() > 1 )
    64 market::market ( 
unsigned workers_soft_limit, 
unsigned workers_hard_limit, 
size_t stack_size )
    65     : my_num_workers_hard_limit(workers_hard_limit)
    66     , my_num_workers_soft_limit(workers_soft_limit)
    68     , my_global_top_priority(normalized_normal_priority)
    69     , my_global_bottom_priority(normalized_normal_priority)
    72     , my_stack_size(stack_size)
    73     , my_workers_soft_limit_to_report(workers_soft_limit)
    75 #if __TBB_TASK_PRIORITY    88         workers_soft_limit = soft_limit-1;
    91     if( workers_soft_limit >= workers_hard_limit )
    92         workers_soft_limit = workers_hard_limit-1;
    93     return workers_soft_limit;
   103         if( old_public_count==0 )
   109                           "skip_soft_limit_warning must be larger than any valid workers_requested" );
   111             if( soft_limit_to_report < workers_requested ) {
   113                                  "The request for %u workers is ignored. Further requests for more workers "   114                                  "will be silently ignored until the limit changes.\n",
   115                                  soft_limit_to_report, workers_requested );
   125                              "The request for larger stack (%u) cannot be satisfied.\n",
   130         if( stack_size == 0 )
   144 #if __TBB_TASK_GROUP_CONTEXT   146                       "my_workers must be the last data field of the market class");
   151         memset( storage, 0, 
size );
   153         m = 
new (storage) 
market( workers_soft_limit, workers_hard_limit, stack_size );
   159             runtime_warning( 
"RML might limit the number of workers to %u while %u is requested.\n"   160                     , m->
my_server->default_concurrency(), workers_soft_limit );
   166 #if __TBB_COUNT_TASK_NODES   167     if ( my_task_node_count )
   168         runtime_warning( 
"Leaked %ld task objects\n", (
long)my_task_node_count );
   170     this->market::~market(); 
   177     bool do_release = 
false;
   180         if ( blocking_terminate ) {
   181             __TBB_ASSERT( is_public, 
"Only an object with a public reference can request the blocking terminate" );
   212         return blocking_terminate;
   220 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY   221     if (my_mandatory_num_requested > 0) {
   226 #if __TBB_TASK_PRIORITY   254 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY   255 #if __TBB_TASK_PRIORITY   256 #define FOR_EACH_PRIORITY_LEVEL_BEGIN {                                                         \   257         for (int p = m->my_global_top_priority; p >= m->my_global_bottom_priority; --p) {       \   258             priority_level_info& pl = m->my_priority_levels[p];                                 \   259             arena_list_type& arenas = pl.arenas;   261 #define FOR_EACH_PRIORITY_LEVEL_BEGIN { {                                                       \   263             tbb::internal::suppress_unused_warning(p);                                          \   264             arena_list_type& arenas = m->my_arenas;   266 #define FOR_EACH_PRIORITY_LEVEL_END } }   269             FOR_EACH_PRIORITY_LEVEL_BEGIN
   271                     if (it->my_global_concurrency_mode)
   272                         m->disable_mandatory_concurrency_impl(&*it);
   273             FOR_EACH_PRIORITY_LEVEL_END
   282 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY   284             FOR_EACH_PRIORITY_LEVEL_BEGIN
   286                     if (!it->my_task_stream.empty(
p))
   287                         m->enable_mandatory_concurrency_impl(&*it);
   289             FOR_EACH_PRIORITY_LEVEL_END
   291 #undef FOR_EACH_PRIORITY_LEVEL_BEGIN   292 #undef FOR_EACH_PRIORITY_LEVEL_END   299         m->
my_server->adjust_job_count_estimate( delta );
   305     return ((
const market&)client).must_join_workers();
   325     if (a.my_global_concurrency_mode)
   326         disable_mandatory_concurrency_impl(&a);
   341 #if __TBB_TASK_PRIORITY   345         priority_level_info &pl = my_priority_levels[
p];
   351                 if ( it->my_aba_epoch == aba_epoch ) {
   367 #if __TBB_TASK_PRIORITY   375     if ( arenas.
empty() )
   381         if ( ++it == arenas.
end() )
   387     } 
while ( it != hint );
   393     max_workers = 
min(workers_demand, max_workers);
   403 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY   405             __TBB_ASSERT(max_workers == 0 || max_workers == 1, NULL);
   406             allotted = a.my_global_concurrency_mode && assigned < max_workers ? 1 : 0;
   411             allotted = tmp / workers_demand;
   412             carry = tmp % workers_demand;
   417         assigned += allotted;
   419     __TBB_ASSERT( 0 <= assigned && assigned <= max_workers, NULL );
   433 #if __TBB_TASK_PRIORITY   434 inline void market::update_global_top_priority ( intptr_t newPriority ) {
   436     my_global_top_priority = newPriority;
   437     my_priority_levels[newPriority].workers_available =
   438 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY   442     advance_global_reload_epoch();
   445 inline void market::reset_global_priority () {
   446     my_global_bottom_priority = normalized_normal_priority;
   447     update_global_top_priority(normalized_normal_priority);
   455     int p = my_global_top_priority;
   463     while ( !a && 
p >= my_global_bottom_priority ) {
   464         priority_level_info &pl = my_priority_levels[
p--];
   477     intptr_t i = highest_affected_priority;
   478     int available = my_priority_levels[i].workers_available;
   479     for ( ; i >= my_global_bottom_priority; --i ) {
   480         priority_level_info &pl = my_priority_levels[i];
   481         pl.workers_available = available;
   482         if ( pl.workers_requested ) {
   483             available -= 
update_allotment( pl.arenas, pl.workers_requested, available );
   484             if ( available <= 0 ) { 
   490     __TBB_ASSERT( i <= my_global_bottom_priority || !available, NULL );
   491     for ( --i; i >= my_global_bottom_priority; --i ) {
   492         priority_level_info &pl = my_priority_levels[i];
   493         pl.workers_available = 0;
   495         for ( ; it != pl.arenas.end(); ++it ) {
   496             __TBB_ASSERT( it->my_num_workers_requested >= 0 || !it->my_num_workers_allotted, NULL );
   497             it->my_num_workers_allotted = 0;
   503 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY   504 void market::enable_mandatory_concurrency_impl ( arena *a ) {
   508     a->my_global_concurrency_mode = 
true;
   509     my_mandatory_num_requested++;
   512 void market::enable_mandatory_concurrency ( arena *a ) {
   519         enable_mandatory_concurrency_impl(a);
   524         my_server->adjust_job_count_estimate(delta);
   527 void market::disable_mandatory_concurrency_impl(arena* a) {
   531     a->my_global_concurrency_mode = 
false;
   532     my_mandatory_num_requested--;
   535 void market::mandatory_concurrency_disable ( arena *a ) {
   539         if (!a->my_global_concurrency_mode)
   544         if (a->has_enqueued_tasks())
   548         disable_mandatory_concurrency_impl(a);
   553         my_server->adjust_job_count_estimate(delta);
   566         if ( prev_req <= 0 ) {
   572     else if ( prev_req < 0 ) {
   577     if (my_mandatory_num_requested > 0) {
   579         effective_soft_limit = 1;
   581 #if !__TBB_TASK_PRIORITY   584     intptr_t 
p = a.my_top_priority;
   585     priority_level_info &pl = my_priority_levels[
p];
   586     pl.workers_requested += delta;
   589         if ( a.my_top_priority != normalized_normal_priority ) {
   591             update_arena_top_priority( a, normalized_normal_priority );
   593         a.my_bottom_priority = normalized_normal_priority;
   595     if ( 
p == my_global_top_priority ) {
   596         if ( !pl.workers_requested ) {
   597             while ( --
p >= my_global_bottom_priority && !my_priority_levels[
p].workers_requested )
   599             if ( 
p < my_global_bottom_priority )
   600                 reset_global_priority();
   602                 update_global_top_priority(
p);
   604         my_priority_levels[my_global_top_priority].workers_available = effective_soft_limit;
   607     else if ( 
p > my_global_top_priority ) {
   611         update_global_top_priority(
p);
   616     else if ( 
p == my_global_bottom_priority ) {
   617         if ( !pl.workers_requested ) {
   618             while ( ++
p <= my_global_top_priority && !my_priority_levels[
p].workers_requested )
   620             if ( 
p > my_global_top_priority )
   621                 reset_global_priority();
   623                 my_global_bottom_priority = 
p;
   628     else if ( 
p < my_global_bottom_priority ) {
   629         int prev_bottom = my_global_bottom_priority;
   630         my_global_bottom_priority = 
p;
   634         __TBB_ASSERT( my_global_bottom_priority < 
p && 
p < my_global_top_priority, NULL );
   655     my_server->adjust_job_count_estimate( delta );
   665     for (
int i = 0; i < 2; ++i) {
   705 #if __TBB_TASK_GROUP_CONTEXT   708     my_workers[index - 1] = 
s;
   713 #if __TBB_TASK_PRIORITY   714 void market::update_arena_top_priority ( 
arena& a, intptr_t new_priority ) {
   716     __TBB_ASSERT( a.my_top_priority != new_priority, NULL );
   717     priority_level_info &prev_level = my_priority_levels[a.my_top_priority],
   718                         &new_level = my_priority_levels[new_priority];
   720     a.my_top_priority = new_priority;
   725     __TBB_ASSERT( prev_level.workers_requested >= 0 && new_level.workers_requested >= 0, NULL );
   728 bool market::lower_arena_priority ( 
arena& a, intptr_t new_priority, uintptr_t old_reload_epoch ) {
   731     if ( a.my_reload_epoch != old_reload_epoch ) {
   736     __TBB_ASSERT( my_global_top_priority >= a.my_top_priority, NULL );
   738     intptr_t 
p = a.my_top_priority;
   739     update_arena_top_priority( a, new_priority );
   741         if ( my_global_bottom_priority > new_priority ) {
   742             my_global_bottom_priority = new_priority;
   744         if ( 
p == my_global_top_priority && !my_priority_levels[
p].workers_requested ) {
   746             for ( --
p; 
p>my_global_bottom_priority && !my_priority_levels[
p].workers_requested; --
p ) 
continue;
   747             update_global_top_priority(
p);
   752     __TBB_ASSERT( my_global_top_priority >= a.my_top_priority, NULL );
   757 bool market::update_arena_priority ( arena& a, intptr_t new_priority ) {
   761     tbb::internal::assert_priority_valid(new_priority);
   762     __TBB_ASSERT( my_global_top_priority >= a.my_top_priority || a.my_num_workers_requested <= 0, NULL );
   764     if ( a.my_top_priority == new_priority ) {
   767     else if ( a.my_top_priority > new_priority ) {
   768         if ( a.my_bottom_priority > new_priority )
   769             a.my_bottom_priority = new_priority;
   772     else if ( a.my_num_workers_requested <= 0 ) {
   776     __TBB_ASSERT( my_global_top_priority >= a.my_top_priority, NULL );
   778     intptr_t 
p = a.my_top_priority;
   779     intptr_t highest_affected_level = 
max(
p, new_priority);
   780     update_arena_top_priority( a, new_priority );
   782     if ( my_global_top_priority < new_priority ) {
   783         update_global_top_priority(new_priority);
   785     else if ( my_global_top_priority == new_priority ) {
   786         advance_global_reload_epoch();
   789         __TBB_ASSERT( new_priority < my_global_top_priority, NULL );
   790         __TBB_ASSERT( new_priority > my_global_bottom_priority, NULL );
   791         if ( 
p == my_global_top_priority && !my_priority_levels[
p].workers_requested ) {
   794             for ( --
p; !my_priority_levels[
p].workers_requested; --
p ) 
continue;
   796             update_global_top_priority(
p);
   797             highest_affected_level = 
p;
   800     if ( 
p == my_global_bottom_priority ) {
   803         __TBB_ASSERT( new_priority <= my_global_top_priority, NULL );
   804         while ( my_global_bottom_priority < my_global_top_priority
   805                 && !my_priority_levels[my_global_bottom_priority].workers_requested )
   806             ++my_global_bottom_priority;
   807         __TBB_ASSERT( my_global_bottom_priority <= new_priority, NULL );
   808         __TBB_ASSERT( my_priority_levels[my_global_bottom_priority].workers_requested > 0, NULL );
   812     __TBB_ASSERT( my_global_top_priority >= a.my_top_priority, NULL );
 #define GATHER_STATISTIC(x)
 
Work stealing task scheduler.
 
void insert_arena_into_list(arena &a)
 
tbb::atomic< uintptr_t > my_pool_state
Current task pool state and estimate of available tasks amount.
 
unsigned num_workers_active() const
The number of workers active in the arena.
 
static const intptr_t num_priority_levels
 
static const unsigned skip_soft_limit_warning
The value indicating that the soft limit warning is unnecessary.
 
void update_allotment(unsigned effective_soft_limit)
Recalculates the number of workers assigned to each arena in the list.
 
atomic< unsigned > my_first_unused_worker_idx
First unused index of worker.
 
void *__TBB_EXPORTED_FUNC NFS_Allocate(size_t n_element, size_t element_size, void *hint)
Allocate memory on cache/sector line boundary.
 
unsigned my_ref_count
Reference count controlling market object lifetime.
 
void assert_market_valid() const
 
generic_scheduler * my_scheduler
Scheduler of the thread attached to the slot.
 
static unsigned calc_workers_soft_limit(unsigned workers_soft_limit, unsigned workers_hard_limit)
 
void detach_arena(arena &)
Removes the arena from the market's list.
 
uintptr_t my_arenas_aba_epoch
ABA prevention marker to assign to newly created arenas.
 
rml::tbb_server * my_server
Pointer to the RML server object that services this TBB instance.
 
static void set_active_num_workers(unsigned w)
Set number of active workers.
 
uintptr_t my_aba_epoch
ABA prevention marker.
 
static void assume_scheduler(generic_scheduler *s)
Temporarily set TLS slot to the given scheduler.
 
int my_num_workers_requested
The number of workers that are currently requested from the resource manager.
 
static generic_scheduler * create_worker(market &m, size_t index, bool geniune)
Initialize a scheduler for a worker thread.
 
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t size
 
unsigned my_max_num_workers
The number of workers requested by the master thread owning the arena.
 
static unsigned default_num_threads()
 
void cleanup(job &j) __TBB_override
 
static const pool_state_t SNAPSHOT_EMPTY
No tasks to steal since last snapshot was taken.
 
static size_t active_value(parameter p)
 
void adjust_demand(arena &, int delta)
Request that arena's need in workers should be adjusted.
 
static global_market_mutex_type theMarketMutex
Mutex guarding creation/destruction of theMarket, insertions/deletions in my_arenas,...
 
static market & global_market(bool is_public, unsigned max_num_workers=0, size_t stack_size=0)
Factory method creating new market object.
 
static void add_ref()
Add reference to resources. If first reference added, acquire the resources.
 
void process(generic_scheduler &)
Registers the worker with the arena and enters TBB scheduler dispatch loop.
 
void destroy()
Destroys and deallocates market object created by market::create()
 
unsigned my_num_workers_hard_limit
Maximal number of workers allowed for use by the underlying resource manager.
 
static const unsigned ref_worker
 
unsigned my_num_workers_allotted
The number of workers that have been marked out by the resource manager to service the arena.
 
int my_total_demand
Number of workers that were requested by all arenas.
 
static bool is_set(generic_scheduler *s)
Used to check validity of the local scheduler TLS contents.
 
bool my_join_workers
Shutdown mode.
 
arena_list_type my_arenas
List of registered arenas.
 
void lock()
Acquire writer lock.
 
iterator_impl< arena > iterator
 
atomic< unsigned > my_references
Reference counter for the arena.
 
arena * arena_in_need(arena *prev_arena)
Returns next arena that needs more workers, or NULL.
 
arena * my_next_arena
The first arena to be checked when idle worker seeks for an arena to enter.
 
void const char const char int ITT_FORMAT __itt_group_sync p
 
void process(job &j) __TBB_override
 
#define ITT_THREAD_SET_NAME(name)
 
bool is_arena_in_list(arena_list_type &arenas, arena *a)
 
#define __TBB_TASK_PRIORITY
 
market(unsigned workers_soft_limit, unsigned workers_hard_limit, size_t stack_size)
Constructor.
 
static bool does_client_join_workers(const tbb::internal::rml::tbb_client &client)
 
#define _T(string_literal)
Standard Windows style macro to markup the string literals.
 
#define __TBB_offsetof(class_name, member_name)
Extended variant of the standard offsetof macro.
 
unsigned my_num_workers_soft_limit
Current application-imposed limit on the number of workers (see set_active_num_workers())
 
T min(const T &val1, const T &val2)
Utility template function returning lesser of the two values.
 
bool is_worker() const
True if running on a worker thread, false otherwise.
 
int update_workers_request()
Recalculates the number of workers requested from RML and updates the allotment.
 
void try_destroy_arena(arena *, uintptr_t aba_epoch)
Removes the arena from the market's list.
 
unsigned my_workers_soft_limit_to_report
Either workers soft limit to be reported via runtime_warning() or skip_soft_limit_warning.
 
static rml::tbb_server * create_rml_server(rml::tbb_client &)
 
static generic_scheduler * local_scheduler_if_initialized()
 
void acknowledge_close_connection() __TBB_override
 
void __TBB_EXPORTED_FUNC NFS_Free(void *)
Free memory allocated by NFS_Allocate.
 
static arena & allocate_arena(market &, unsigned num_slots, unsigned num_reserved_slots)
Allocate an instance of arena.
 
job * create_one_job() __TBB_override
 
bool release(bool is_public, bool blocking_terminate)
Decrements market's refcount and destroys it in the end.
 
void free_arena()
Completes arena shutdown, destructs and deallocates it.
 
The scoped locking pattern.
 
void const char const char int ITT_FORMAT __itt_group_sync s
 
unsigned my_public_ref_count
Count of master threads attached.
 
T max(const T &val1, const T &val2)
Utility template function returning greater of the two values.
 
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
 
atomic< T > & as_atomic(T &t)
 
int my_num_workers_requested
Number of workers currently requested from RML.
 
arenas_list_mutex_type my_arenas_list_mutex
 
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
 
size_t my_stack_size
Stack size of worker threads.
 
T __TBB_load_with_acquire(const volatile T &location)
 
void __TBB_EXPORTED_FUNC runtime_warning(const char *format,...)
Report a runtime warning.
 
static void cleanup_worker(void *arg, bool worker)
Perform necessary cleanup when a worker thread finishes.
 
void remove_arena_from_list(arena &a)
 
static bool UsePrivateRML
 
static unsigned app_parallelism_limit()
Reports active parallelism level according to user's settings.
 
static void remove_ref()
Remove reference to resources. If last reference removed, release the resources.
 
void unlock()
Release lock.
 
static market * theMarket
Currently active global market.
 
static arena * create_arena(int num_slots, int num_reserved_slots, size_t stack_size)
Creates an arena object.