Home ⌂Doc Index ◂Up ▴
Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
_flow_graph_cache_impl.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2020 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 */
16 
17 #ifndef __TBB__flow_graph_cache_impl_H
18 #define __TBB__flow_graph_cache_impl_H
19 
20 #ifndef __TBB_flow_graph_H
21 #error Do not #include this internal file directly; use public TBB headers instead.
22 #endif
23 
24 // included in namespace tbb::flow::interfaceX (in flow_graph.h)
25 
26 namespace internal {
27 
29 template< typename T, typename M=spin_mutex >
30 class node_cache {
31  public:
32 
33  typedef size_t size_type;
34 
35  bool empty() {
36  typename mutex_type::scoped_lock lock( my_mutex );
37  return internal_empty();
38  }
39 
40  void add( T &n ) {
41  typename mutex_type::scoped_lock lock( my_mutex );
42  internal_push(n);
43  }
44 
45  void remove( T &n ) {
46  typename mutex_type::scoped_lock lock( my_mutex );
47  for ( size_t i = internal_size(); i != 0; --i ) {
48  T &s = internal_pop();
49  if ( &s == &n ) return; // only remove one predecessor per request
51  }
52  }
53 
54  void clear() {
55  while( !my_q.empty()) (void)my_q.pop();
56 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
57  my_built_predecessors.clear();
58 #endif
59  }
60 
61 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
62  typedef edge_container<T> built_predecessors_type;
63  built_predecessors_type &built_predecessors() { return my_built_predecessors; }
64 
65  typedef typename edge_container<T>::edge_list_type predecessor_list_type;
66  void internal_add_built_predecessor( T &n ) {
67  typename mutex_type::scoped_lock lock( my_mutex );
68  my_built_predecessors.add_edge(n);
69  }
70 
71  void internal_delete_built_predecessor( T &n ) {
72  typename mutex_type::scoped_lock lock( my_mutex );
73  my_built_predecessors.delete_edge(n);
74  }
75 
76  void copy_predecessors( predecessor_list_type &v) {
77  typename mutex_type::scoped_lock lock( my_mutex );
78  my_built_predecessors.copy_edges(v);
79  }
80 
81  size_t predecessor_count() {
82  typename mutex_type::scoped_lock lock(my_mutex);
83  return (size_t)(my_built_predecessors.edge_count());
84  }
85 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
86 
87 protected:
88 
89  typedef M mutex_type;
91  std::queue< T * > my_q;
92 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
93  built_predecessors_type my_built_predecessors;
94 #endif
95 
96  // Assumes lock is held
97  inline bool internal_empty( ) {
98  return my_q.empty();
99  }
100 
101  // Assumes lock is held
103  return my_q.size();
104  }
105 
106  // Assumes lock is held
107  inline void internal_push( T &n ) {
108  my_q.push(&n);
109  }
110 
111  // Assumes lock is held
112  inline T &internal_pop() {
113  T *v = my_q.front();
114  my_q.pop();
115  return *v;
116  }
117 
118 };
119 
121 template< typename T, typename M=spin_mutex >
122 #if __TBB_PREVIEW_ASYNC_MSG
123 // TODO: make predecessor_cache type T-independent when async_msg becomes regular feature
124 class predecessor_cache : public node_cache< untyped_sender, M > {
125 #else
126 class predecessor_cache : public node_cache< sender<T>, M > {
127 #endif // __TBB_PREVIEW_ASYNC_MSG
128 public:
129  typedef M mutex_type;
130  typedef T output_type;
131 #if __TBB_PREVIEW_ASYNC_MSG
132  typedef untyped_sender predecessor_type;
133  typedef untyped_receiver successor_type;
134 #else
135  typedef sender<output_type> predecessor_type;
136  typedef receiver<output_type> successor_type;
137 #endif // __TBB_PREVIEW_ASYNC_MSG
138 
139  predecessor_cache( ) : my_owner( NULL ) { }
140 
141  void set_owner( successor_type *owner ) { my_owner = owner; }
142 
143  bool get_item( output_type &v ) {
144 
145  bool msg = false;
146 
147  do {
148  predecessor_type *src;
149  {
150  typename mutex_type::scoped_lock lock(this->my_mutex);
151  if ( this->internal_empty() ) {
152  break;
153  }
154  src = &this->internal_pop();
155  }
156 
157  // Try to get from this sender
158  msg = src->try_get( v );
159 
160  if (msg == false) {
161  // Relinquish ownership of the edge
162  if (my_owner)
163  src->register_successor( *my_owner );
164  } else {
165  // Retain ownership of the edge
166  this->add(*src);
167  }
168  } while ( msg == false );
169  return msg;
170  }
171 
172  // If we are removing arcs (rf_clear_edges), call clear() rather than reset().
173  void reset() {
174  if (my_owner) {
175  for(;;) {
176  predecessor_type *src;
177  {
178  if (this->internal_empty()) break;
179  src = &this->internal_pop();
180  }
181  src->register_successor( *my_owner );
182  }
183  }
184  }
185 
186 protected:
187 
188 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
190 #endif
192 };
193 
195 // TODO: make reservable_predecessor_cache type T-independent when async_msg becomes regular feature
196 template< typename T, typename M=spin_mutex >
198 public:
199  typedef M mutex_type;
200  typedef T output_type;
201 #if __TBB_PREVIEW_ASYNC_MSG
202  typedef untyped_sender predecessor_type;
203  typedef untyped_receiver successor_type;
204 #else
205  typedef sender<T> predecessor_type;
206  typedef receiver<T> successor_type;
207 #endif // __TBB_PREVIEW_ASYNC_MSG
208 
209  reservable_predecessor_cache( ) : reserved_src(NULL) { }
210 
211  bool
213  bool msg = false;
214 
215  do {
216  {
217  typename mutex_type::scoped_lock lock(this->my_mutex);
218  if ( reserved_src || this->internal_empty() )
219  return false;
220 
221  reserved_src = &this->internal_pop();
222  }
223 
224  // Try to get from this sender
225  msg = reserved_src->try_reserve( v );
226 
227  if (msg == false) {
228  typename mutex_type::scoped_lock lock(this->my_mutex);
229  // Relinquish ownership of the edge
230  reserved_src->register_successor( *this->my_owner );
231  reserved_src = NULL;
232  } else {
233  // Retain ownership of the edge
234  this->add( *reserved_src );
235  }
236  } while ( msg == false );
237 
238  return msg;
239  }
240 
241  bool
243  reserved_src->try_release( );
244  reserved_src = NULL;
245  return true;
246  }
247 
248  bool
250  reserved_src->try_consume( );
251  reserved_src = NULL;
252  return true;
253  }
254 
255  void reset( ) {
256  reserved_src = NULL;
258  }
259 
260  void clear() {
261  reserved_src = NULL;
263  }
264 
265 private:
267 };
268 
269 
271 // TODO: make successor_cache type T-independent when async_msg becomes regular feature
272 template<typename T, typename M=spin_rw_mutex >
274 protected:
275 
276  typedef M mutex_type;
278 
279 #if __TBB_PREVIEW_ASYNC_MSG
280  typedef untyped_receiver successor_type;
281  typedef untyped_receiver *pointer_type;
282  typedef untyped_sender owner_type;
283 #else
284  typedef receiver<T> successor_type;
285  typedef receiver<T> *pointer_type;
286  typedef sender<T> owner_type;
287 #endif // __TBB_PREVIEW_ASYNC_MSG
288  typedef std::list< pointer_type > successors_type;
289 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
290  edge_container<successor_type> my_built_successors;
291 #endif
293 
294  owner_type *my_owner;
295 
296 public:
297 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
298  typedef typename edge_container<successor_type>::edge_list_type successor_list_type;
299 
300  edge_container<successor_type> &built_successors() { return my_built_successors; }
301 
302  void internal_add_built_successor( successor_type &r) {
303  typename mutex_type::scoped_lock l(my_mutex, true);
304  my_built_successors.add_edge( r );
305  }
306 
307  void internal_delete_built_successor( successor_type &r) {
308  typename mutex_type::scoped_lock l(my_mutex, true);
309  my_built_successors.delete_edge(r);
310  }
311 
312  void copy_successors( successor_list_type &v) {
313  typename mutex_type::scoped_lock l(my_mutex, false);
314  my_built_successors.copy_edges(v);
315  }
316 
317  size_t successor_count() {
318  typename mutex_type::scoped_lock l(my_mutex,false);
319  return my_built_successors.edge_count();
320  }
321 
322 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
323 
324  successor_cache( ) : my_owner(NULL) {}
325 
326  void set_owner( owner_type *owner ) { my_owner = owner; }
327 
328  virtual ~successor_cache() {}
329 
331  typename mutex_type::scoped_lock l(my_mutex, true);
332  my_successors.push_back( &r );
333  }
334 
336  typename mutex_type::scoped_lock l(my_mutex, true);
337  for ( typename successors_type::iterator i = my_successors.begin();
338  i != my_successors.end(); ++i ) {
339  if ( *i == & r ) {
340  my_successors.erase(i);
341  break;
342  }
343  }
344  }
345 
346  bool empty() {
347  typename mutex_type::scoped_lock l(my_mutex, false);
348  return my_successors.empty();
349  }
350 
351  void clear() {
352  my_successors.clear();
353 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
354  my_built_successors.clear();
355 #endif
356  }
357 
358 #if !__TBB_PREVIEW_ASYNC_MSG
359  virtual task * try_put_task( const T &t ) = 0;
360 #endif // __TBB_PREVIEW_ASYNC_MSG
361  }; // successor_cache<T>
362 
364 template<typename M>
365 class successor_cache< continue_msg, M > : tbb::internal::no_copy {
366 protected:
367 
368  typedef M mutex_type;
370 
371 #if __TBB_PREVIEW_ASYNC_MSG
372  typedef untyped_receiver successor_type;
373  typedef untyped_receiver *pointer_type;
374 #else
375  typedef receiver<continue_msg> successor_type;
376  typedef receiver<continue_msg> *pointer_type;
377 #endif // __TBB_PREVIEW_ASYNC_MSG
378  typedef std::list< pointer_type > successors_type;
380 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
381  edge_container<successor_type> my_built_successors;
382  typedef edge_container<successor_type>::edge_list_type successor_list_type;
383 #endif
384 
385  sender<continue_msg> *my_owner;
386 
387 public:
388 
389 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
390 
391  edge_container<successor_type> &built_successors() { return my_built_successors; }
392 
393  void internal_add_built_successor( successor_type &r) {
394  typename mutex_type::scoped_lock l(my_mutex, true);
395  my_built_successors.add_edge( r );
396  }
397 
398  void internal_delete_built_successor( successor_type &r) {
399  typename mutex_type::scoped_lock l(my_mutex, true);
400  my_built_successors.delete_edge(r);
401  }
402 
403  void copy_successors( successor_list_type &v) {
404  typename mutex_type::scoped_lock l(my_mutex, false);
405  my_built_successors.copy_edges(v);
406  }
407 
408  size_t successor_count() {
409  typename mutex_type::scoped_lock l(my_mutex,false);
410  return my_built_successors.edge_count();
411  }
412 
413 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
414 
415  successor_cache( ) : my_owner(NULL) {}
416 
417  void set_owner( sender<continue_msg> *owner ) { my_owner = owner; }
418 
419  virtual ~successor_cache() {}
420 
422  typename mutex_type::scoped_lock l(my_mutex, true);
423  my_successors.push_back( &r );
424  if ( my_owner && r.is_continue_receiver() ) {
425  r.register_predecessor( *my_owner );
426  }
427  }
428 
430  typename mutex_type::scoped_lock l(my_mutex, true);
431  for ( successors_type::iterator i = my_successors.begin();
432  i != my_successors.end(); ++i ) {
433  if ( *i == & r ) {
434  // TODO: Check if we need to test for continue_receiver before
435  // removing from r.
436  if ( my_owner )
437  r.remove_predecessor( *my_owner );
438  my_successors.erase(i);
439  break;
440  }
441  }
442  }
443 
444  bool empty() {
445  typename mutex_type::scoped_lock l(my_mutex, false);
446  return my_successors.empty();
447  }
448 
449  void clear() {
450  my_successors.clear();
451 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
452  my_built_successors.clear();
453 #endif
454  }
455 
456 #if !__TBB_PREVIEW_ASYNC_MSG
457  virtual task * try_put_task( const continue_msg &t ) = 0;
458 #endif // __TBB_PREVIEW_ASYNC_MSG
459 
460 }; // successor_cache< continue_msg >
461 
463 // TODO: make broadcast_cache type T-independent when async_msg becomes regular feature
464 template<typename T, typename M=spin_rw_mutex>
465 class broadcast_cache : public successor_cache<T, M> {
466  typedef M mutex_type;
468 
469 public:
470 
472 
473  // as above, but call try_put_task instead, and return the last task we received (if any)
474 #if __TBB_PREVIEW_ASYNC_MSG
475  template<typename X>
476  task * try_put_task( const X &t ) {
477 #else
478  task * try_put_task( const T &t ) __TBB_override {
479 #endif // __TBB_PREVIEW_ASYNC_MSG
480  task * last_task = NULL;
481  bool upgraded = true;
482  typename mutex_type::scoped_lock l(this->my_mutex, upgraded);
483  typename successors_type::iterator i = this->my_successors.begin();
484  while ( i != this->my_successors.end() ) {
485  task *new_task = (*i)->try_put_task(t);
486  // workaround for icc bug
487  graph& graph_ref = (*i)->graph_reference();
488  last_task = combine_tasks(graph_ref, last_task, new_task); // enqueue if necessary
489  if(new_task) {
490  ++i;
491  }
492  else { // failed
493  if ( (*i)->register_predecessor(*this->my_owner) ) {
494  if (!upgraded) {
495  l.upgrade_to_writer();
496  upgraded = true;
497  }
498  i = this->my_successors.erase(i);
499  } else {
500  ++i;
501  }
502  }
503  }
504  return last_task;
505  }
506 
507  // call try_put_task and return list of received tasks
508 #if __TBB_PREVIEW_ASYNC_MSG
509  template<typename X>
510  bool gather_successful_try_puts( const X &t, task_list &tasks ) {
511 #else
512  bool gather_successful_try_puts( const T &t, task_list &tasks ) {
513 #endif // __TBB_PREVIEW_ASYNC_MSG
514  bool upgraded = true;
515  bool is_at_least_one_put_successful = false;
516  typename mutex_type::scoped_lock l(this->my_mutex, upgraded);
517  typename successors_type::iterator i = this->my_successors.begin();
518  while ( i != this->my_successors.end() ) {
519  task * new_task = (*i)->try_put_task(t);
520  if(new_task) {
521  ++i;
522  if(new_task != SUCCESSFULLY_ENQUEUED) {
523  tasks.push_back(*new_task);
524  }
525  is_at_least_one_put_successful = true;
526  }
527  else { // failed
528  if ( (*i)->register_predecessor(*this->my_owner) ) {
529  if (!upgraded) {
530  l.upgrade_to_writer();
531  upgraded = true;
532  }
533  i = this->my_successors.erase(i);
534  } else {
535  ++i;
536  }
537  }
538  }
539  return is_at_least_one_put_successful;
540  }
541 };
542 
544 // TODO: make round_robin_cache type T-independent when async_msg becomes regular feature
545 template<typename T, typename M=spin_rw_mutex >
546 class round_robin_cache : public successor_cache<T, M> {
547  typedef size_t size_type;
548  typedef M mutex_type;
550 
551 public:
552 
554 
556  typename mutex_type::scoped_lock l(this->my_mutex, false);
557  return this->my_successors.size();
558  }
559 
560 #if __TBB_PREVIEW_ASYNC_MSG
561  template<typename X>
562  task * try_put_task( const X &t ) {
563 #else
565 #endif // __TBB_PREVIEW_ASYNC_MSG
566  bool upgraded = true;
567  typename mutex_type::scoped_lock l(this->my_mutex, upgraded);
568  typename successors_type::iterator i = this->my_successors.begin();
569  while ( i != this->my_successors.end() ) {
570  task *new_task = (*i)->try_put_task(t);
571  if ( new_task ) {
572  return new_task;
573  } else {
574  if ( (*i)->register_predecessor(*this->my_owner) ) {
575  if (!upgraded) {
576  l.upgrade_to_writer();
577  upgraded = true;
578  }
579  i = this->my_successors.erase(i);
580  }
581  else {
582  ++i;
583  }
584  }
585  }
586  return NULL;
587  }
588 };
589 
590 } // namespace internal
591 
592 #endif // __TBB__flow_graph_cache_impl_H
A cache of successors that are put in a round-robin fashion.
bool gather_successful_try_puts(const T &t, task_list &tasks)
sender< output_type > predecessor_type
successor_cache< T, M >::successors_type successors_type
receiver< output_type > successor_type
task * try_put_task(const T &t) __TBB_override
An abstract cache of successors.
A node_cache maintains a std::queue of elements of type T. Each operation is protected by a lock.
void remove_successor(successor_type &r)
A cache of successors that are broadcast to.
task * try_put_task(const T &t) __TBB_override
static tbb::task *const SUCCESSFULLY_ENQUEUED
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task * task
void register_successor(successor_type &r)
Base class for types that should not be copied or assigned.
Definition: tbb_stddef.h:330
successor_cache< T, M >::successors_type successors_type
std::list< pointer_type > successors_type
#define __TBB_override
Definition: tbb_stddef.h:240
void const char const char int ITT_FORMAT __itt_group_sync s
An cache of predecessors that supports requests and reservations.
void set_owner(owner_type *owner)
void set_owner(successor_type *owner)
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
static tbb::task * combine_tasks(graph &g, tbb::task *left, tbb::task *right)
Definition: flow_graph.h:199
A cache of predecessors that only supports try_get.
void set_owner(sender< continue_msg > *owner)

Copyright © 2005-2020 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.