Home ⌂Doc Index ◂Up ▴
Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
task_stream_extended.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2020 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 */
16 
17 #ifndef _TBB_task_stream_extended_H
18 #define _TBB_task_stream_extended_H
19 
26 
27 
28 #if _TBB_task_stream_H
29 #error Either task_stream.h or this file can be included at the same time.
30 #endif
31 
32 #if !__TBB_CPF_BUILD
33 #error This code bears a preview status until it proves its usefulness/performance suitability.
34 #endif
35 
36 #include "tbb/tbb_stddef.h"
37 #include <deque>
38 #include <climits>
39 #include "tbb/atomic.h" // for __TBB_Atomic*
40 #include "tbb/spin_mutex.h"
41 #include "tbb/tbb_allocator.h"
42 #include "scheduler_common.h"
43 #include "tbb_misc.h" // for FastRandom
44 
45 namespace tbb {
46 namespace internal {
47 
49 
51 template< typename T, typename mutex_t >
52 struct queue_and_mutex {
53  typedef std::deque< T, tbb_allocator<T> > queue_base_t;
54 
57 
60 };
61 
62 typedef uintptr_t population_t;
63 const population_t one = 1;
64 
65 inline void set_one_bit( population_t& dest, int pos ) {
66  __TBB_ASSERT( pos>=0, NULL );
67  __TBB_ASSERT( pos<int(sizeof(population_t)*CHAR_BIT), NULL );
68  __TBB_AtomicOR( &dest, one<<pos );
69 }
70 
71 inline void clear_one_bit( population_t& dest, int pos ) {
72  __TBB_ASSERT( pos>=0, NULL );
73  __TBB_ASSERT( pos<int(sizeof(population_t)*CHAR_BIT), NULL );
74  __TBB_AtomicAND( &dest, ~(one<<pos) );
75 }
76 
77 inline bool is_bit_set( population_t val, int pos ) {
78  __TBB_ASSERT( pos>=0, NULL );
79  __TBB_ASSERT( pos<int(sizeof(population_t)*CHAR_BIT), NULL );
80  return (val & (one<<pos)) != 0;
81 }
82 
84 #if __INTEL_COMPILER == 1110 || __INTEL_COMPILER == 1500
85  no_assign
86 #else
87  no_copy
88 #endif
89 {
90  random_lane_selector( FastRandom& random ) : my_random( random ) {}
91  unsigned operator()( unsigned out_of ) const {
92  __TBB_ASSERT( ((out_of-1) & out_of) == 0, "number of lanes is not power of two." );
93  return my_random.get() & (out_of-1);
94  }
95 private:
97 };
98 
100 #if __INTEL_COMPILER == 1110 || __INTEL_COMPILER == 1500
101  no_assign
102 #else
103  no_copy
104 #endif
105 {
106  unsigned& my_previous;
107  lane_selector_base( unsigned& previous ) : my_previous( previous ) {}
108 };
109 
111  subsequent_lane_selector( unsigned& previous ) : lane_selector_base( previous ) {}
112  unsigned operator()( unsigned out_of ) const {
113  __TBB_ASSERT( ((out_of-1) & out_of) == 0, "number of lanes is not power of two." );
114  return (++my_previous &= out_of-1);
115  }
116 };
117 
119  preceding_lane_selector( unsigned& previous ) : lane_selector_base( previous ) {}
120  unsigned operator()( unsigned out_of ) const {
121  __TBB_ASSERT( ((out_of-1) & out_of) == 0, "number of lanes is not power of two." );
122  return (--my_previous &= (out_of-1));
123  }
124 };
125 
127 protected:
129 };
130 
132 
135 template<task_stream_accessor_type accessor>
137 protected:
140  task* result = queue.front();
141  queue.pop_front();
142  return result;
143  }
144 };
145 
146 template<>
148 protected:
150  task* result = NULL;
151  do {
152  result = queue.back();
153  queue.pop_back();
154  } while( !result && !queue.empty() );
155  return result;
156  }
157 };
158 
160 template<int Levels, task_stream_accessor_type accessor>
161 class task_stream : public task_stream_accessor< accessor > {
163  population_t population[Levels];
164  padded<lane_t>* lanes[Levels];
165  unsigned N;
166 
167 public:
168  task_stream() : N() {
169  for(int level = 0; level < Levels; level++) {
170  population[level] = 0;
171  lanes[level] = NULL;
172  }
173  }
174 
175  void initialize( unsigned n_lanes ) {
176  const unsigned max_lanes = sizeof(population_t) * CHAR_BIT;
177 
178  N = n_lanes>=max_lanes ? max_lanes : n_lanes>2 ? 1<<(__TBB_Log2(n_lanes-1)+1) : 2;
179  __TBB_ASSERT( N==max_lanes || N>=n_lanes && ((N-1)&N)==0, "number of lanes miscalculated");
180  __TBB_ASSERT( N <= sizeof(population_t) * CHAR_BIT, NULL );
181  for(int level = 0; level < Levels; level++) {
182  lanes[level] = new padded<lane_t>[N];
183  __TBB_ASSERT( !population[level], NULL );
184  }
185  }
186 
188  for(int level = 0; level < Levels; level++)
189  if (lanes[level]) delete[] lanes[level];
190  }
191 
193  bool try_push( task* source, int level, unsigned lane_idx ) {
194  __TBB_ASSERT( 0 <= level && level < Levels, "Incorrect lane level specified." );
196  if( lock.try_acquire( lanes[level][lane_idx].my_mutex ) ) {
197  lanes[level][lane_idx].my_queue.push_back( source );
198  set_one_bit( population[level], lane_idx ); // TODO: avoid atomic op if the bit is already set
199  return true;
200  }
201  return false;
202  }
203 
205  template<typename lane_selector_t>
206  void push( task* source, int level, const lane_selector_t& next_lane ) {
207  bool succeed = false;
208  unsigned lane = 0;
209  do {
210  lane = next_lane( /*out_of=*/N );
211  __TBB_ASSERT( lane < N, "Incorrect lane index." );
212  } while( ! (succeed = try_push( source, level, lane )) );
213  }
214 
216  task* try_pop( int level, unsigned lane_idx ) {
217  __TBB_ASSERT( 0 <= level && level < Levels, "Incorrect lane level specified." );
218  if( !is_bit_set( population[level], lane_idx ) )
219  return NULL;
220  task* result = NULL;
221  lane_t& lane = lanes[level][lane_idx];
223  if( lock.try_acquire( lane.my_mutex ) && !lane.my_queue.empty() ) {
224  result = this->get_item( lane.my_queue );
225  if( lane.my_queue.empty() )
226  clear_one_bit( population[level], lane_idx );
227  }
228  return result;
229  }
230 
233  template<typename lane_selector_t>
234  task* pop( int level, const lane_selector_t& next_lane ) {
235  task* popped = NULL;
236  unsigned lane = 0;
237  do {
238  lane = next_lane( /*out_of=*/N );
239  __TBB_ASSERT( lane < N, "Incorrect lane index." );
240  } while( !empty( level ) && !(popped = try_pop( level, lane )) );
241  return popped;
242  }
243 
244  // TODO: unify '*_specific' logic with 'pop' methods above
246  __TBB_ASSERT( !queue.empty(), NULL );
247  // TODO: add a worst-case performance test and consider an alternative container with better
248  // performance for isolation search.
249  typename lane_t::queue_base_t::iterator curr = queue.end();
250  do {
251  // TODO: consider logic from get_task to simplify the code.
252  task* result = *--curr;
253  if( result __TBB_ISOLATION_EXPR( && result->prefix().isolation == isolation ) ) {
254  if( queue.end() - curr == 1 )
255  queue.pop_back(); // a little of housekeeping along the way
256  else
257  *curr = 0; // grabbing task with the same isolation
258  // TODO: move one of the container's ends instead if the task has been found there
259  return result;
260  }
261  } while( curr != queue.begin() );
262  return NULL;
263  }
264 
266  task* pop_specific( int level, __TBB_ISOLATION_ARG(unsigned& last_used_lane, isolation_tag isolation) ) {
267  task* result = NULL;
268  // Lane selection is round-robin in backward direction.
269  unsigned idx = last_used_lane & (N-1);
270  do {
271  if( is_bit_set( population[level], idx ) ) {
272  lane_t& lane = lanes[level][idx];
274  if( lock.try_acquire(lane.my_mutex) && !lane.my_queue.empty() ) {
275  result = look_specific( __TBB_ISOLATION_ARG(lane.my_queue, isolation) );
276  if( lane.my_queue.empty() )
277  clear_one_bit( population[level], idx );
278  if( result )
279  break;
280  }
281  }
282  idx=(idx-1)&(N-1);
283  } while( !empty(level) && idx != last_used_lane );
284  last_used_lane = idx;
285  return result;
286  }
287 
289  bool empty(int level) {
290  return !population[level];
291  }
292 
294 
296  intptr_t drain() {
297  intptr_t result = 0;
298  for(int level = 0; level < Levels; level++)
299  for(unsigned i=0; i<N; ++i) {
300  lane_t& lane = lanes[level][i];
302  for(typename lane_t::queue_base_t::iterator it=lane.my_queue.begin();
303  it!=lane.my_queue.end(); ++it, ++result)
304  {
305  __TBB_ASSERT( is_bit_set( population[level], i ), NULL );
306  task* t = *it;
307  tbb::task::destroy(*t);
308  }
309  lane.my_queue.clear();
310  clear_one_bit( population[level], i );
311  }
312  return result;
313  }
314 }; // task_stream
315 
316 } // namespace internal
317 } // namespace tbb
318 
319 #endif /* _TBB_task_stream_extended_H */
const population_t one
Definition: task_stream.h:47
task_stream_accessor< accessor >::lane_t lane_t
A fast random number generator.
Definition: tbb_misc.h:135
isolation_tag isolation
The tag used for task isolation.
Definition: task.h:220
CRITICAL_SECTION mutex_t
#define __TBB_ISOLATION_EXPR(isolation)
internal::task_prefix & prefix(internal::version_tag *=NULL) const
Get reference to corresponding task_prefix.
Definition: task.h:1002
unsigned operator()(unsigned out_of) const
Base class for user-defined tasks.
Definition: task.h:615
bool try_push(task *source, int level, unsigned lane_idx)
Returns true on successful push, otherwise - false.
task * try_pop(int level, unsigned lane_idx)
Returns pointer to task on successful pop, otherwise - NULL.
task * pop(int level, const lane_selector_t &next_lane)
task * get_item(lane_t::queue_base_t &queue)
The container for "fairness-oriented" aka "enqueued" tasks.
Definition: task_stream.h:69
task * look_specific(__TBB_ISOLATION_ARG(task_stream_base::lane_t::queue_base_t &queue, isolation_tag isolation))
Base class for types that should not be assigned.
Definition: tbb_stddef.h:322
void set_one_bit(population_t &dest, int pos)
Definition: task_stream.h:49
unsigned short get()
Get a random number.
Definition: tbb_misc.h:146
void __TBB_AtomicOR(volatile void *operand, uintptr_t addend)
Definition: tbb_machine.h:878
void __TBB_AtomicAND(volatile void *operand, uintptr_t addend)
Definition: tbb_machine.h:888
intptr_t isolation_tag
A tag for task isolation.
Definition: task.h:143
Essentially, this is just a pair of a queue and a mutex to protect the queue.
Definition: task_stream.h:36
bool is_bit_set(population_t val, int pos)
Definition: task_stream.h:61
Pads type T to fill out to a multiple of cache line size.
Definition: tbb_stddef.h:261
padded< lane_t > * lanes[Levels]
Definition: task_stream.h:72
intptr_t drain()
Destroys all remaining tasks in every lane. Returns the number of destroyed tasks.
Represents acquisition of a mutex.
Definition: spin_mutex.h:53
unsigned operator()(unsigned out_of) const
#define __TBB_ISOLATION_ARG(arg1, isolation)
Base class for types that should not be copied or assigned.
Definition: tbb_stddef.h:330
void clear_one_bit(population_t &dest, int pos)
Definition: task_stream.h:55
intptr_t __TBB_Log2(uintptr_t x)
Definition: tbb_machine.h:860
unsigned operator()(unsigned out_of) const
population_t population[Levels]
Definition: task_stream.h:71
void initialize(unsigned n_lanes)
bool empty(int level)
Checks existence of a task.
Definition: task_stream.h:138
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
queue_and_mutex< task *, spin_mutex > lane_t
void push(task *source, int level, const lane_selector_t &next_lane)
Push a task into a lane. Lane selection is performed by passed functor.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
std::deque< T, tbb_allocator< T > > queue_base_t
task * pop_specific(int level, __TBB_ISOLATION_ARG(unsigned &last_used_lane, isolation_tag isolation))
Try finding and popping a related task.
uintptr_t population_t
Definition: task_stream.h:46
The graph class.

Copyright © 2005-2020 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.