Home ⌂Doc Index ◂Up ▴
Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
task_stream.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2020 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 */
16 
17 #ifndef _TBB_task_stream_H
18 #define _TBB_task_stream_H
19 
20 #include "tbb/tbb_stddef.h"
21 #include <deque>
22 #include <climits>
23 #include "tbb/atomic.h" // for __TBB_Atomic*
24 #include "tbb/spin_mutex.h"
25 #include "tbb/tbb_allocator.h"
26 #include "scheduler_common.h"
27 #include "tbb_misc.h" // for FastRandom
28 
29 namespace tbb {
30 namespace internal {
31 
33 
35 template< typename T, typename mutex_t >
37  typedef std::deque< T, tbb_allocator<T> > queue_base_t;
38 
41 
44 };
45 
46 typedef uintptr_t population_t;
47 const population_t one = 1;
48 
49 inline void set_one_bit( population_t& dest, int pos ) {
50  __TBB_ASSERT( pos>=0, NULL );
51  __TBB_ASSERT( pos<int(sizeof(population_t)*CHAR_BIT), NULL );
52  __TBB_AtomicOR( &dest, one<<pos );
53 }
54 
55 inline void clear_one_bit( population_t& dest, int pos ) {
56  __TBB_ASSERT( pos>=0, NULL );
57  __TBB_ASSERT( pos<int(sizeof(population_t)*CHAR_BIT), NULL );
58  __TBB_AtomicAND( &dest, ~(one<<pos) );
59 }
60 
61 inline bool is_bit_set( population_t val, int pos ) {
62  __TBB_ASSERT( pos>=0, NULL );
63  __TBB_ASSERT( pos<int(sizeof(population_t)*CHAR_BIT), NULL );
64  return (val & (one<<pos)) != 0;
65 }
66 
68 template<int Levels>
73  unsigned N;
74 
75 public:
76  task_stream() : N() {
77  for(int level = 0; level < Levels; level++) {
78  population[level] = 0;
79  lanes[level] = NULL;
80  }
81  }
82 
83  void initialize( unsigned n_lanes ) {
84  const unsigned max_lanes = sizeof(population_t) * CHAR_BIT;
85 
86  N = n_lanes>=max_lanes ? max_lanes : n_lanes>2 ? 1<<(__TBB_Log2(n_lanes-1)+1) : 2;
87  __TBB_ASSERT( N==max_lanes || N>=n_lanes && ((N-1)&N)==0, "number of lanes miscalculated");
88  __TBB_ASSERT( N <= sizeof(population_t) * CHAR_BIT, NULL );
89  for(int level = 0; level < Levels; level++) {
90  lanes[level] = new padded<lane_t>[N];
91  __TBB_ASSERT( !population[level], NULL );
92  }
93  }
94 
96  for(int level = 0; level < Levels; level++)
97  if (lanes[level]) delete[] lanes[level];
98  }
99 
101  void push( task* source, int level, FastRandom& random ) {
102  // Lane selection is random. Each thread should keep a separate seed value.
103  unsigned idx;
104  for( ; ; ) {
105  idx = random.get() & (N-1);
107  if( lock.try_acquire(lanes[level][idx].my_mutex) ) {
108  lanes[level][idx].my_queue.push_back(source);
109  set_one_bit( population[level], idx ); //TODO: avoid atomic op if the bit is already set
110  break;
111  }
112  }
113  }
114 
116  task* pop( int level, unsigned& last_used_lane ) {
117  task* result = NULL;
118  // Lane selection is round-robin. Each thread should keep its last used lane.
119  unsigned idx = (last_used_lane+1)&(N-1);
120  for( ; population[level]; idx=(idx+1)&(N-1) ) {
121  if( is_bit_set( population[level], idx ) ) {
122  lane_t& lane = lanes[level][idx];
124  if( lock.try_acquire(lane.my_mutex) && !lane.my_queue.empty() ) {
125  result = lane.my_queue.front();
126  lane.my_queue.pop_front();
127  if( lane.my_queue.empty() )
128  clear_one_bit( population[level], idx );
129  break;
130  }
131  }
132  }
133  last_used_lane = idx;
134  return result;
135  }
136 
138  bool empty(int level) {
139  return !population[level];
140  }
141 
143 
145  intptr_t drain() {
146  intptr_t result = 0;
147  for(int level = 0; level < Levels; level++)
148  for(unsigned i=0; i<N; ++i) {
149  lane_t& lane = lanes[level][i];
151  for(lane_t::queue_base_t::iterator it=lane.my_queue.begin();
152  it!=lane.my_queue.end(); ++it, ++result)
153  {
154  __TBB_ASSERT( is_bit_set( population[level], i ), NULL );
155  task* t = *it;
156  tbb::task::destroy(*t);
157  }
158  lane.my_queue.clear();
159  clear_one_bit( population[level], i );
160  }
161  return result;
162  }
163 }; // task_stream
164 
165 } // namespace internal
166 } // namespace tbb
167 
168 #endif /* _TBB_task_stream_H */
const population_t one
Definition: task_stream.h:47
A fast random number generator.
Definition: tbb_misc.h:135
CRITICAL_SECTION mutex_t
Base class for user-defined tasks.
Definition: task.h:615
The container for "fairness-oriented" aka "enqueued" tasks.
Definition: task_stream.h:69
void set_one_bit(population_t &dest, int pos)
Definition: task_stream.h:49
void push(task *source, int level, FastRandom &random)
Push a task into a lane.
Definition: task_stream.h:101
unsigned short get()
Get a random number.
Definition: tbb_misc.h:146
task * pop(int level, unsigned &last_used_lane)
Try finding and popping a task.
Definition: task_stream.h:116
void __TBB_AtomicOR(volatile void *operand, uintptr_t addend)
Definition: tbb_machine.h:878
void __TBB_AtomicAND(volatile void *operand, uintptr_t addend)
Definition: tbb_machine.h:888
Essentially, this is just a pair of a queue and a mutex to protect the queue.
Definition: task_stream.h:36
bool is_bit_set(population_t val, int pos)
Definition: task_stream.h:61
Pads type T to fill out to a multiple of cache line size.
Definition: tbb_stddef.h:261
padded< lane_t > * lanes[Levels]
Definition: task_stream.h:72
queue_and_mutex< task *, spin_mutex > lane_t
Definition: task_stream.h:70
intptr_t drain()
Destroys all remaining tasks in every lane. Returns the number of destroyed tasks.
Definition: task_stream.h:145
Represents acquisition of a mutex.
Definition: spin_mutex.h:53
Base class for types that should not be copied or assigned.
Definition: tbb_stddef.h:330
void clear_one_bit(population_t &dest, int pos)
Definition: task_stream.h:55
intptr_t __TBB_Log2(uintptr_t x)
Definition: tbb_machine.h:860
population_t population[Levels]
Definition: task_stream.h:71
void initialize(unsigned n_lanes)
Definition: task_stream.h:83
bool empty(int level)
Checks existence of a task.
Definition: task_stream.h:138
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
std::deque< T, tbb_allocator< T > > queue_base_t
Definition: task_stream.h:37
uintptr_t population_t
Definition: task_stream.h:46
The graph class.

Copyright © 2005-2020 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.