Home ⌂Doc Index ◂Up ▴
Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
flow_graph.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2020 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 */
16 
17 #ifndef __TBB_flow_graph_H
18 #define __TBB_flow_graph_H
19 
20 #define __TBB_flow_graph_H_include_area
22 
23 #include "tbb_stddef.h"
24 #include "atomic.h"
25 #include "spin_mutex.h"
26 #include "null_mutex.h"
27 #include "spin_rw_mutex.h"
28 #include "null_rw_mutex.h"
29 #include "task.h"
31 #include "tbb_exception.h"
32 #include "pipeline.h"
36 #include "tbb_profiling.h"
37 #include "task_arena.h"
38 
39 #if TBB_USE_THREADING_TOOLS && TBB_PREVIEW_FLOW_GRAPH_TRACE && ( __linux__ || __APPLE__ )
40  #if __INTEL_COMPILER
41  // Disabled warning "routine is both inline and noinline"
42  #pragma warning (push)
43  #pragma warning( disable: 2196 )
44  #endif
45  #define __TBB_NOINLINE_SYM __attribute__((noinline))
46 #else
47  #define __TBB_NOINLINE_SYM
48 #endif
49 
50 #if __TBB_PREVIEW_ASYNC_MSG
51 #include <vector> // std::vector in internal::async_storage
52 #include <memory> // std::shared_ptr in async_msg
53 #endif
54 
55 #if __TBB_PREVIEW_STREAMING_NODE
56 // For streaming_node
57 #include <array> // std::array
58 #include <unordered_map> // std::unordered_map
59 #include <type_traits> // std::decay, std::true_type, std::false_type
60 #endif // __TBB_PREVIEW_STREAMING_NODE
61 
62 #if TBB_DEPRECATED_FLOW_ENQUEUE
63 #define FLOW_SPAWN(a) tbb::task::enqueue((a))
64 #else
65 #define FLOW_SPAWN(a) tbb::task::spawn((a))
66 #endif
67 
68 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
69 #define __TBB_DEFAULT_NODE_ALLOCATOR(T) cache_aligned_allocator<T>
70 #else
71 #define __TBB_DEFAULT_NODE_ALLOCATOR(T) null_type
72 #endif
73 
74 // use the VC10 or gcc version of tuple if it is available.
75 #if __TBB_CPP11_TUPLE_PRESENT
76  #include <tuple>
77 namespace tbb {
78  namespace flow {
79  using std::tuple;
80  using std::tuple_size;
81  using std::tuple_element;
82  using std::get;
83  }
84 }
85 #else
86  #include "compat/tuple"
87 #endif
88 
89 #include<list>
90 #include<queue>
91 
102 namespace tbb {
103 namespace flow {
104 
106 enum concurrency { unlimited = 0, serial = 1 };
108 namespace interface11 {
109 
111 struct null_type {};
114 class continue_msg {};
117 template< typename T > class sender;
118 template< typename T > class receiver;
119 class continue_receiver;
121 template< typename T, typename U > class limiter_node; // needed for resetting decrementer
123 template< typename R, typename B > class run_and_put_task;
125 namespace internal {
127 template<typename T, typename M> class successor_cache;
128 template<typename T, typename M> class broadcast_cache;
129 template<typename T, typename M> class round_robin_cache;
130 template<typename T, typename M> class predecessor_cache;
131 template<typename T, typename M> class reservable_predecessor_cache;
133 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
134 namespace order {
135 struct following;
136 struct preceding;
137 }
138 template<typename Order, typename... Args> struct node_set;
139 #endif
141 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
142 // Holder of edges both for caches and for those nodes which do not have predecessor caches.
143 // C == receiver< ... > or sender< ... >, depending.
144 template<typename C>
145 class edge_container {
147 public:
148  typedef std::list<C *, tbb::tbb_allocator<C *> > edge_list_type;
150  void add_edge(C &s) {
151  built_edges.push_back(&s);
152  }
154  void delete_edge(C &s) {
155  for (typename edge_list_type::iterator i = built_edges.begin(); i != built_edges.end(); ++i) {
156  if (*i == &s) {
157  (void)built_edges.erase(i);
158  return; // only remove one predecessor per request
159  }
160  }
161  }
163  void copy_edges(edge_list_type &v) {
164  v = built_edges;
165  }
166 
167  size_t edge_count() {
168  return (size_t)(built_edges.size());
169  }
170 
171  void clear() {
172  built_edges.clear();
173  }
175  // methods remove the statement from all predecessors/successors liste in the edge
176  // container.
177  template< typename S > void sender_extract(S &s);
178  template< typename R > void receiver_extract(R &r);
180 private:
181  edge_list_type built_edges;
182 }; // class edge_container
183 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
185 } // namespace internal
187 } // namespace interfaceX
188 } // namespace flow
189 } // namespace tbb
194 namespace tbb {
195 namespace flow {
196 namespace interface11 {
197 
198 // enqueue left task if necessary. Returns the non-enqueued task if there is one.
199 static inline tbb::task *combine_tasks(graph& g, tbb::task * left, tbb::task * right) {
200  // if no RHS task, don't change left.
201  if (right == NULL) return left;
202  // right != NULL
203  if (left == NULL) return right;
204  if (left == SUCCESSFULLY_ENQUEUED) return right;
205  // left contains a task
206  if (right != SUCCESSFULLY_ENQUEUED) {
207  // both are valid tasks
209  return right;
210  }
211  return left;
212 }
214 #if __TBB_PREVIEW_ASYNC_MSG
216 template < typename T > class __TBB_DEPRECATED async_msg;
218 namespace internal {
220 template < typename T > class async_storage;
222 template< typename T, typename = void >
225  typedef T filtered_type;
227  static const bool is_async_type = false;
229  static const void* to_void_ptr(const T& t) {
230  return static_cast<const void*>(&t);
231  }
233  static void* to_void_ptr(T& t) {
234  return static_cast<void*>(&t);
235  }
237  static const T& from_void_ptr(const void* p) {
238  return *static_cast<const T*>(p);
239  }
241  static T& from_void_ptr(void* p) {
242  return *static_cast<T*>(p);
243  }
244 
245  static task* try_put_task_wrapper_impl(receiver<T>* const this_recv, const void *p, bool is_async) {
246  if (is_async) {
247  // This (T) is NOT async and incoming 'A<X> t' IS async
248  // Get data from async_msg
250  task* const new_task = msg.my_storage->subscribe(*this_recv, this_recv->graph_reference());
251  // finalize() must be called after subscribe() because set() can be called in finalize()
252  // and 'this_recv' client must be subscribed by this moment
253  msg.finalize();
254  return new_task;
255  }
256  else {
257  // Incoming 't' is NOT async
258  return this_recv->try_put_task(from_void_ptr(p));
259  }
260  }
261 };
262 
263 template< typename T >
264 struct async_helpers< T, typename std::enable_if< std::is_base_of<async_msg<typename T::async_msg_data_type>, T>::value >::type > {
265  typedef T async_type;
266  typedef typename T::async_msg_data_type filtered_type;
268  static const bool is_async_type = true;
270  // Receiver-classes use const interfaces
271  static const void* to_void_ptr(const T& t) {
272  return static_cast<const void*>(&static_cast<const async_msg<filtered_type>&>(t));
273  }
275  static void* to_void_ptr(T& t) {
276  return static_cast<void*>(&static_cast<async_msg<filtered_type>&>(t));
277  }
279  // Sender-classes use non-const interfaces
280  static const T& from_void_ptr(const void* p) {
281  return *static_cast<const T*>(static_cast<const async_msg<filtered_type>*>(p));
282  }
284  static T& from_void_ptr(void* p) {
285  return *static_cast<T*>(static_cast<async_msg<filtered_type>*>(p));
286  }
287 
288  // Used in receiver<T> class
289  static task* try_put_task_wrapper_impl(receiver<T>* const this_recv, const void *p, bool is_async) {
290  if (is_async) {
291  // Both are async
292  return this_recv->try_put_task(from_void_ptr(p));
293  }
294  else {
295  // This (T) is async and incoming 'X t' is NOT async
296  // Create async_msg for X
298  const T msg(t);
299  return this_recv->try_put_task(msg);
300  }
301  }
302 };
304 class untyped_receiver;
307  template< typename, typename > friend class internal::predecessor_cache;
308  template< typename, typename > friend class internal::reservable_predecessor_cache;
309 public:
313  virtual ~untyped_sender() {}
315  // NOTE: Following part of PUBLIC section is copy-paste from original sender<T> class
317  // TODO: Prevent untyped successor registration
320  virtual bool register_successor( successor_type &r ) = 0;
323  virtual bool remove_successor( successor_type &r ) = 0;
326  virtual bool try_release( ) { return false; }
329  virtual bool try_consume( ) { return false; }
331 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
332  typedef internal::edge_container<successor_type> built_successors_type;
334  typedef built_successors_type::edge_list_type successor_list_type;
335  virtual built_successors_type &built_successors() = 0;
336  virtual void internal_add_built_successor( successor_type & ) = 0;
337  virtual void internal_delete_built_successor( successor_type & ) = 0;
338  virtual void copy_successors( successor_list_type &) = 0;
339  virtual size_t successor_count() = 0;
340 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
341 protected:
343  template< typename X >
344  bool try_get( X &t ) {
346  }
349  template< typename X >
350  bool try_reserve( X &t ) {
352  }
354  virtual bool try_get_wrapper( void* p, bool is_async ) = 0;
355  virtual bool try_reserve_wrapper( void* p, bool is_async ) = 0;
356 };
358 class untyped_receiver {
359  template< typename, typename > friend class run_and_put_task;
361  template< typename, typename > friend class internal::broadcast_cache;
362  template< typename, typename > friend class internal::round_robin_cache;
363  template< typename, typename > friend class internal::successor_cache;
365 #if __TBB_PREVIEW_OPENCL_NODE
366  template< typename, typename > friend class proxy_dependency_receiver;
367 #endif /* __TBB_PREVIEW_OPENCL_NODE */
368 public:
373  virtual ~untyped_receiver() {}
376  template<typename X>
377  bool try_put(const X& t) {
378  task *res = try_put_task(t);
379  if (!res) return false;
381  return true;
382  }
384  // NOTE: Following part of PUBLIC section is copy-paste from original receiver<T> class
386  // TODO: Prevent untyped predecessor registration
389  virtual bool register_predecessor( predecessor_type & ) { return false; }
390 
392  virtual bool remove_predecessor( predecessor_type & ) { return false; }
394 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
395  typedef internal::edge_container<predecessor_type> built_predecessors_type;
396  typedef built_predecessors_type::edge_list_type predecessor_list_type;
397  virtual built_predecessors_type &built_predecessors() = 0;
398  virtual void internal_add_built_predecessor( predecessor_type & ) = 0;
399  virtual void internal_delete_built_predecessor( predecessor_type & ) = 0;
400  virtual void copy_predecessors( predecessor_list_type & ) = 0;
401  virtual size_t predecessor_count() = 0;
402 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
403 protected:
404  template<typename X>
405  task *try_put_task(const X& t) {
407  }
409  virtual task* try_put_task_wrapper( const void* p, bool is_async ) = 0;
411  virtual graph& graph_reference() const = 0;
413  // NOTE: Following part of PROTECTED and PRIVATE sections is copy-paste from original receiver<T> class
418  virtual bool is_continue_receiver() { return false; }
419 };
421 } // namespace internal
424 template< typename T >
426 public:
433  virtual bool try_get( T & ) { return false; }
436  virtual bool try_reserve( T & ) { return false; }
437 
438 protected:
439  virtual bool try_get_wrapper( void* p, bool is_async ) __TBB_override {
440  // Both async OR both are NOT async
443  }
444  // Else: this (T) is async OR incoming 't' is async
445  __TBB_ASSERT(false, "async_msg interface does not support 'pull' protocol in try_get()");
446  return false;
447  }
449  virtual bool try_reserve_wrapper( void* p, bool is_async ) __TBB_override {
450  // Both async OR both are NOT async
453  }
454  // Else: this (T) is async OR incoming 't' is async
455  __TBB_ASSERT(false, "async_msg interface does not support 'pull' protocol in try_reserve()");
456  return false;
457  }
458 }; // class sender<T>
461 template< typename T >
463  template< typename > friend class internal::async_storage;
464  template< typename, typename > friend struct internal::async_helpers;
465 public:
474  }
478  }
480 protected:
481  virtual task* try_put_task_wrapper( const void *p, bool is_async ) __TBB_override {
483  }
484 
486  virtual task *try_put_task(const T& t) = 0;
488 }; // class receiver<T>
489 
490 #else // __TBB_PREVIEW_ASYNC_MSG
493 template< typename T >
494 class sender {
495 public:
497  __TBB_DEPRECATED typedef T output_type;
498 
501 
502  virtual ~sender() {}
504  // NOTE: Following part of PUBLIC section is partly copy-pasted in sender<T> under #if __TBB_PREVIEW_ASYNC_MSG
505 
507  __TBB_DEPRECATED virtual bool register_successor( successor_type &r ) = 0;
513  virtual bool try_get( T & ) { return false; }
514 
516  virtual bool try_reserve( T & ) { return false; }
519  virtual bool try_release( ) { return false; }
522  virtual bool try_consume( ) { return false; }
524 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
525  __TBB_DEPRECATED typedef typename internal::edge_container<successor_type> built_successors_type;
527  __TBB_DEPRECATED typedef typename built_successors_type::edge_list_type successor_list_type;
528  __TBB_DEPRECATED virtual built_successors_type &built_successors() = 0;
529  __TBB_DEPRECATED virtual void internal_add_built_successor( successor_type & ) = 0;
530  __TBB_DEPRECATED virtual void internal_delete_built_successor( successor_type & ) = 0;
531  __TBB_DEPRECATED virtual void copy_successors( successor_list_type &) = 0;
532  __TBB_DEPRECATED virtual size_t successor_count() = 0;
533 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
534 }; // class sender<T>
535 
537 template< typename T >
538 class receiver {
539 public:
545 
547  virtual ~receiver() {}
550  bool try_put( const T& t ) {
551  task *res = try_put_task(t);
552  if (!res) return false;
554  return true;
555  }
558 protected:
559  template< typename R, typename B > friend class run_and_put_task;
560  template< typename X, typename Y > friend class internal::broadcast_cache;
561  template< typename X, typename Y > friend class internal::round_robin_cache;
562  virtual task *try_put_task(const T& t) = 0;
563  virtual graph& graph_reference() const = 0;
564 public:
565  // NOTE: Following part of PUBLIC and PROTECTED sections is copy-pasted in receiver<T> under #if __TBB_PREVIEW_ASYNC_MSG
568  __TBB_DEPRECATED virtual bool register_predecessor( predecessor_type & ) { return false; }
571  __TBB_DEPRECATED virtual bool remove_predecessor( predecessor_type & ) { return false; }
573 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
574  __TBB_DEPRECATED typedef typename internal::edge_container<predecessor_type> built_predecessors_type;
575  __TBB_DEPRECATED typedef typename built_predecessors_type::edge_list_type predecessor_list_type;
576  __TBB_DEPRECATED virtual built_predecessors_type &built_predecessors() = 0;
577  __TBB_DEPRECATED virtual void internal_add_built_predecessor( predecessor_type & ) = 0;
578  __TBB_DEPRECATED virtual void internal_delete_built_predecessor( predecessor_type & ) = 0;
579  __TBB_DEPRECATED virtual void copy_predecessors( predecessor_list_type & ) = 0;
580  __TBB_DEPRECATED virtual size_t predecessor_count() = 0;
581 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
583 protected:
586 
587  template<typename TT, typename M> friend class internal::successor_cache;
588  virtual bool is_continue_receiver() { return false; }
590 #if __TBB_PREVIEW_OPENCL_NODE
591  template< typename, typename > friend class proxy_dependency_receiver;
592 #endif /* __TBB_PREVIEW_OPENCL_NODE */
593 }; // class receiver<T>
595 #endif // __TBB_PREVIEW_ASYNC_MSG
598 
599 class continue_receiver : public receiver< continue_msg > {
600 public:
601 
604 
607 
610  __TBB_FLOW_GRAPH_PRIORITY_ARG1(int number_of_predecessors, node_priority_t priority)) {
611  my_predecessor_count = my_initial_predecessor_count = number_of_predecessors;
612  my_current_count = 0;
613  __TBB_FLOW_GRAPH_PRIORITY_EXPR( my_priority = priority; )
614  }
615 
619  my_current_count = 0;
620  __TBB_FLOW_GRAPH_PRIORITY_EXPR( my_priority = src.my_priority; )
621  }
622 
627  return true;
628  }
629 
637  return true;
638  }
640 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
641  __TBB_DEPRECATED typedef internal::edge_container<predecessor_type> built_predecessors_type;
642  __TBB_DEPRECATED typedef built_predecessors_type::edge_list_type predecessor_list_type;
643  built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
645  __TBB_DEPRECATED void internal_add_built_predecessor( predecessor_type &s) __TBB_override {
647  my_built_predecessors.add_edge( s );
648  }
650  __TBB_DEPRECATED void internal_delete_built_predecessor( predecessor_type &s) __TBB_override {
652  my_built_predecessors.delete_edge(s);
653  }
655  __TBB_DEPRECATED void copy_predecessors( predecessor_list_type &v) __TBB_override {
657  my_built_predecessors.copy_edges(v);
658  }
660  __TBB_DEPRECATED size_t predecessor_count() __TBB_override {
662  return my_built_predecessors.edge_count();
663  }
665 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
667 protected:
668  template< typename R, typename B > friend class run_and_put_task;
669  template<typename X, typename Y> friend class internal::broadcast_cache;
670  template<typename X, typename Y> friend class internal::round_robin_cache;
671  // execute body is supposed to be too small to create a task for.
673  {
676  return SUCCESSFULLY_ENQUEUED;
677  else
678  my_current_count = 0;
679  }
680  task * res = execute();
681  return res? res : SUCCESSFULLY_ENQUEUED;
682  }
683 
684 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
685  // continue_receiver must contain its own built_predecessors because it does
686  // not have a node_cache.
687  built_predecessors_type my_built_predecessors;
688 #endif
694  // the friend declaration in the base class did not eliminate the "protected class"
695  // error in gcc 4.1.2
696  template<typename U, typename V> friend class tbb::flow::interface11::limiter_node;
697 
700  if (f & rf_clear_edges) {
701 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
702  my_built_predecessors.clear();
703 #endif
705  }
706  }
711  virtual task * execute() = 0;
712  template<typename TT, typename M> friend class internal::successor_cache;
713  bool is_continue_receiver() __TBB_override { return true; }
714 
715 }; // class continue_receiver
716 
717 } // interfaceX
719 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
720  template <typename K, typename T>
721  K key_from_message( const T &t ) {
722  return t.key();
723  }
724 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
725 
726  using interface11::sender;
727  using interface11::receiver;
729 } // flow
730 } // tbb
735 namespace tbb {
736 namespace flow {
737 namespace interface11 {
742 #if __TBB_PREVIEW_ASYNC_MSG
744 #endif
745 using namespace internal::graph_policy_namespace;
746 
747 template <typename C, typename N>
748 graph_iterator<C,N>::graph_iterator(C *g, bool begin) : my_graph(g), current_node(NULL)
749 {
750  if (begin) current_node = my_graph->my_nodes;
751  //else it is an end iterator by default
752 }
754 template <typename C, typename N>
756  __TBB_ASSERT(current_node, "graph_iterator at end");
757  return *operator->();
758 }
760 template <typename C, typename N>
762  return current_node;
763 }
764 
765 template <typename C, typename N>
767  if (current_node) current_node = current_node->next;
768 }
769 
770 } // namespace interfaceX
772 namespace interface10 {
774 inline graph::graph() : my_nodes(NULL), my_nodes_last(NULL), my_task_arena(NULL) {
776  own_context = true;
777  cancelled = false;
778  caught_exception = false;
779  my_context = new task_group_context(tbb::internal::FLOW_TASKS);
783  my_is_active = true;
784 }
785 
786 inline graph::graph(task_group_context& use_this_context) :
787  my_context(&use_this_context), my_nodes(NULL), my_nodes_last(NULL), my_task_arena(NULL) {
788  prepare_task_arena();
789  own_context = false;
790  cancelled = false;
791  caught_exception = false;
792  my_root_task = (new (task::allocate_root(*my_context)) empty_task);
793  my_root_task->set_ref_count(1);
795  my_is_active = true;
796 }
798 inline graph::~graph() {
799  wait_for_all();
800  my_root_task->set_ref_count(0);
801  tbb::task::destroy(*my_root_task);
802  if (own_context) delete my_context;
803  delete my_task_arena;
804 }
805 
806 inline void graph::reserve_wait() {
807  if (my_root_task) {
808  my_root_task->increment_ref_count();
810  }
811 }
812 
813 inline void graph::release_wait() {
814  if (my_root_task) {
816  my_root_task->decrement_ref_count();
817  }
818 }
819 
821  n->next = NULL;
822  {
823  spin_mutex::scoped_lock lock(nodelist_mutex);
824  n->prev = my_nodes_last;
825  if (my_nodes_last) my_nodes_last->next = n;
826  my_nodes_last = n;
827  if (!my_nodes) my_nodes = n;
828  }
829 }
830 
832  {
833  spin_mutex::scoped_lock lock(nodelist_mutex);
834  __TBB_ASSERT(my_nodes && my_nodes_last, "graph::remove_node: Error: no registered nodes");
835  if (n->prev) n->prev->next = n->next;
836  if (n->next) n->next->prev = n->prev;
837  if (my_nodes_last == n) my_nodes_last = n->prev;
838  if (my_nodes == n) my_nodes = n->next;
839  }
840  n->prev = n->next = NULL;
841 }
842 
844  // reset context
846 
847  if(my_context) my_context->reset();
848  cancelled = false;
849  caught_exception = false;
850  // reset all the nodes comprising the graph
851  for(iterator ii = begin(); ii != end(); ++ii) {
852  tbb::flow::interface11::graph_node *my_p = &(*ii);
853  my_p->reset_node(f);
854  }
855  // Reattach the arena. Might be useful to run the graph in a particular task_arena
856  // while not limiting graph lifetime to a single task_arena::execute() call.
857  prepare_task_arena( /*reinit=*/true );
859  // now spawn the tasks necessary to start the graph
860  for(task_list_type::iterator rti = my_reset_task_list.begin(); rti != my_reset_task_list.end(); ++rti) {
862  }
863  my_reset_task_list.clear();
864 }
865 
866 inline graph::iterator graph::begin() { return iterator(this, true); }
868 inline graph::iterator graph::end() { return iterator(this, false); }
869 
870 inline graph::const_iterator graph::begin() const { return const_iterator(this, true); }
871 
872 inline graph::const_iterator graph::end() const { return const_iterator(this, false); }
873 
874 inline graph::const_iterator graph::cbegin() const { return const_iterator(this, true); }
876 inline graph::const_iterator graph::cend() const { return const_iterator(this, false); }
878 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
879 inline void graph::set_name(const char *name) {
881 }
882 #endif
883 
884 } // namespace interface10
885 
886 namespace interface11 {
888 inline graph_node::graph_node(graph& g) : my_graph(g) {
889  my_graph.register_node(this);
890 }
891 
893  my_graph.remove_node(this);
894 }
895 
897 
898 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
899 using internal::node_set;
900 #endif
901 
903 template < typename Output >
904 class input_node : public graph_node, public sender< Output > {
905 public:
907  typedef Output output_type;
908 
912  //Source node has no input type
914 
915 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
916  typedef typename sender<output_type>::built_successors_type built_successors_type;
917  typedef typename sender<output_type>::successor_list_type successor_list_type;
918 #endif
919 
921  template< typename Body >
923  : graph_node(g), my_active(false),
924  my_body( new internal::input_body_leaf< output_type, Body>(body) ),
926  my_reserved(false), my_has_cached_item(false)
927  {
929  tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
930  static_cast<sender<output_type> *>(this), this->my_body );
931  }
933 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
934  template <typename Body, typename... Successors>
935  input_node( const node_set<internal::order::preceding, Successors...>& successors, Body body )
936  : input_node(successors.graph_reference(), body) {
937  make_edges(*this, successors);
938  }
939 #endif
943  graph_node(src.my_graph), sender<Output>(),
944  my_active(false),
945  my_body( src.my_init_body->clone() ), my_init_body(src.my_init_body->clone() ),
946  my_reserved(false), my_has_cached_item(false)
947  {
949  tbb::internal::fgt_node_with_body(CODEPTR(), tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
950  static_cast<sender<output_type> *>(this), this->my_body );
951  }
952 
954  ~input_node() { delete my_body; delete my_init_body; }
956 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
957  void set_name( const char *name ) __TBB_override {
959  }
960 #endif
966  if ( my_active )
967  spawn_put();
968  return true;
969  }
970 
975  return true;
976  }
977 
978 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
979 
980  built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
981 
982  void internal_add_built_successor( successor_type &r) __TBB_override {
984  my_successors.internal_add_built_successor(r);
985  }
986 
987  void internal_delete_built_successor( successor_type &r) __TBB_override {
989  my_successors.internal_delete_built_successor(r);
990  }
992  size_t successor_count() __TBB_override {
994  return my_successors.successor_count();
995  }
996 
997  void copy_successors(successor_list_type &v) __TBB_override {
999  my_successors.copy_successors(v);
1000  }
1001 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1002 
1006  if ( my_reserved )
1007  return false;
1008 
1009  if ( my_has_cached_item ) {
1011  my_has_cached_item = false;
1012  return true;
1013  }
1014  // we've been asked to provide an item, but we have none. enqueue a task to
1015  // provide one.
1016  if ( my_active )
1017  spawn_put();
1018  return false;
1019  }
1020 
1024  if ( my_reserved ) {
1025  return false;
1026  }
1030  my_reserved = true;
1031  return true;
1032  } else {
1033  return false;
1034  }
1035  }
1036 
1038 
1041  __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
1042  my_reserved = false;
1043  if(!my_successors.empty())
1044  spawn_put();
1045  return true;
1046  }
1051  __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
1052  my_reserved = false;
1054  if ( !my_successors.empty() ) {
1056  }
1057  return true;
1058  }
1059 
1061  void activate() {
1063  my_active = true;
1065  spawn_put();
1066  }
1067 
1068  template<typename Body>
1071  return dynamic_cast< internal::input_body_leaf<output_type, Body> & >(body_ref).get_body();
1072  }
1074 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1075  void extract( ) __TBB_override {
1076  my_successors.built_successors().sender_extract(*this); // removes "my_owner" == this from each successor
1077  my_active = false;
1078  my_reserved = false;
1080  }
1081 #endif
1082 
1083 protected:
1084 
1087  my_active = false;
1088  my_reserved = false;
1092  if(f & rf_reset_bodies) {
1094  delete my_body;
1095  my_body = tmp;
1096  }
1097  }
1098 
1099 private:
1108 
1109  // used by apply_body_bypass, can invoke body of node.
1112  if ( my_reserved ) {
1113  return false;
1114  }
1115  if ( !my_has_cached_item ) {
1117 
1118 #if TBB_DEPRECATED_INPUT_NODE_BODY
1119  bool r = (*my_body)(my_cached_item);
1120  if (r) {
1121  my_has_cached_item = true;
1122  }
1123 #else
1124  flow_control control;
1125  my_cached_item = (*my_body)(control);
1127 #endif
1129  }
1130  if ( my_has_cached_item ) {
1131  v = my_cached_item;
1132  my_reserved = true;
1133  return true;
1134  } else {
1135  return false;
1136  }
1137  }
1138 
1140  return ( new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
1142  }
1143 
1145  void spawn_put( ) {
1146  if(internal::is_graph_active(this->my_graph)) {
1148  }
1149  }
1150 
1154  output_type v;
1155  if ( !try_reserve_apply_body(v) )
1156  return NULL;
1157 
1158  task *last_task = my_successors.try_put_task(v);
1159  if ( last_task )
1160  try_consume();
1161  else
1162  try_release();
1163  return last_task;
1164  }
1165 }; // class input_node
1166 
1167 #if TBB_USE_SOURCE_NODE_AS_ALIAS
1168 template < typename Output >
1169 class source_node : public input_node <Output> {
1170 public:
1172  template< typename Body >
1173  __TBB_NOINLINE_SYM source_node( graph &g, Body body )
1174  : input_node<Output>(g, body)
1175  {
1176  }
1177 
1178 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1179  template <typename Body, typename... Successors>
1180  source_node( const node_set<internal::order::preceding, Successors...>& successors, Body body )
1181  : input_node<Output>(successors, body) {
1182  }
1183 #endif
1184 };
1185 #else // TBB_USE_SOURCE_NODE_AS_ALIAS
1186 template < typename Output > class
1188 __TBB_DEPRECATED_MSG("TBB Warning: tbb::flow::source_node is deprecated, use tbb::flow::input_node." )
1189 source_node : public graph_node, public sender< Output > {
1190 public:
1192  typedef Output output_type;
1196 
1197  //Source node has no input type
1199 
1200 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1201  typedef typename sender<output_type>::built_successors_type built_successors_type;
1202  typedef typename sender<output_type>::successor_list_type successor_list_type;
1203 #endif
1204 
1206  template< typename Body >
1207  __TBB_NOINLINE_SYM source_node( graph &g, Body body, bool is_active = true )
1208  : graph_node(g), my_active(is_active), init_my_active(is_active),
1209  my_body( new internal::source_body_leaf< output_type, Body>(body) ),
1210  my_init_body( new internal::source_body_leaf< output_type, Body>(body) ),
1211  my_reserved(false), my_has_cached_item(false)
1212  {
1213  my_successors.set_owner(this);
1214  tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
1215  static_cast<sender<output_type> *>(this), this->my_body );
1216  }
1218 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1219  template <typename Body, typename... Successors>
1220  source_node( const node_set<internal::order::preceding, Successors...>& successors, Body body, bool is_active = true )
1221  : source_node(successors.graph_reference(), body, is_active) {
1222  make_edges(*this, successors);
1223  }
1224 #endif
1225 
1228  graph_node(src.my_graph), sender<Output>(),
1229  my_active(src.init_my_active),
1230  init_my_active(src.init_my_active), my_body( src.my_init_body->clone() ), my_init_body(src.my_init_body->clone() ),
1231  my_reserved(false), my_has_cached_item(false)
1232  {
1233  my_successors.set_owner(this);
1234  tbb::internal::fgt_node_with_body(CODEPTR(), tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
1235  static_cast<sender<output_type> *>(this), this->my_body );
1236  }
1237 
1239  ~source_node() { delete my_body; delete my_init_body; }
1240 
1241 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1242  void set_name( const char *name ) __TBB_override {
1244  }
1245 #endif
1246 
1249  spin_mutex::scoped_lock lock(my_mutex);
1250  my_successors.register_successor(r);
1251  if ( my_active )
1252  spawn_put();
1253  return true;
1254  }
1255 
1259  my_successors.remove_successor(r);
1260  return true;
1261  }
1262 
1263 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1264 
1265  built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
1266 
1267  void internal_add_built_successor( successor_type &r) __TBB_override {
1268  spin_mutex::scoped_lock lock(my_mutex);
1269  my_successors.internal_add_built_successor(r);
1270  }
1272  void internal_delete_built_successor( successor_type &r) __TBB_override {
1273  spin_mutex::scoped_lock lock(my_mutex);
1274  my_successors.internal_delete_built_successor(r);
1275  }
1276 
1277  size_t successor_count() __TBB_override {
1278  spin_mutex::scoped_lock lock(my_mutex);
1279  return my_successors.successor_count();
1280  }
1281 
1282  void copy_successors(successor_list_type &v) __TBB_override {
1283  spin_mutex::scoped_lock l(my_mutex);
1284  my_successors.copy_successors(v);
1285  }
1286 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1287 
1290  spin_mutex::scoped_lock lock(my_mutex);
1291  if ( my_reserved )
1292  return false;
1294  if ( my_has_cached_item ) {
1295  v = my_cached_item;
1296  my_has_cached_item = false;
1297  return true;
1298  }
1299  // we've been asked to provide an item, but we have none. enqueue a task to
1300  // provide one.
1301  spawn_put();
1302  return false;
1303  }
1304 
1308  if ( my_reserved ) {
1309  return false;
1310  }
1312  if ( my_has_cached_item ) {
1313  v = my_cached_item;
1314  my_reserved = true;
1315  return true;
1316  } else {
1317  return false;
1318  }
1319  }
1320 
1322 
1324  spin_mutex::scoped_lock lock(my_mutex);
1325  __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
1326  my_reserved = false;
1327  if(!my_successors.empty())
1328  spawn_put();
1329  return true;
1330  }
1331 
1334  spin_mutex::scoped_lock lock(my_mutex);
1335  __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
1336  my_reserved = false;
1337  my_has_cached_item = false;
1338  if ( !my_successors.empty() ) {
1339  spawn_put();
1340  }
1341  return true;
1342  }
1343 
1345  void activate() {
1346  spin_mutex::scoped_lock lock(my_mutex);
1347  my_active = true;
1348  if (!my_successors.empty())
1349  spawn_put();
1350  }
1351 
1352  template<typename Body>
1354  internal::source_body<output_type> &body_ref = *this->my_body;
1355  return dynamic_cast< internal::source_body_leaf<output_type, Body> & >(body_ref).get_body();
1356  }
1357 
1358 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1359  void extract( ) __TBB_override {
1360  my_successors.built_successors().sender_extract(*this); // removes "my_owner" == this from each successor
1361  my_active = init_my_active;
1362  my_reserved = false;
1363  if(my_has_cached_item) my_has_cached_item = false;
1364  }
1365 #endif
1366 
1367 protected:
1368 
1371  my_active = init_my_active;
1372  my_reserved =false;
1373  if(my_has_cached_item) {
1374  my_has_cached_item = false;
1375  }
1376  if(f & rf_clear_edges) my_successors.clear();
1377  if(f & rf_reset_bodies) {
1378  internal::source_body<output_type> *tmp = my_init_body->clone();
1379  delete my_body;
1380  my_body = tmp;
1381  }
1382  if(my_active)
1383  internal::add_task_to_graph_reset_list(this->my_graph, create_put_task());
1384  }
1385 
1386 private:
1396 
1397  // used by apply_body_bypass, can invoke body of node.
1399  spin_mutex::scoped_lock lock(my_mutex);
1400  if ( my_reserved ) {
1401  return false;
1402  }
1403  if ( !my_has_cached_item ) {
1404  tbb::internal::fgt_begin_body( my_body );
1405  bool r = (*my_body)(my_cached_item);
1406  tbb::internal::fgt_end_body( my_body );
1407  if (r) {
1408  my_has_cached_item = true;
1409  }
1410  }
1411  if ( my_has_cached_item ) {
1412  v = my_cached_item;
1413  my_reserved = true;
1414  return true;
1415  } else {
1416  return false;
1417  }
1418  }
1419 
1420  // when resetting, and if the source_node was created with my_active == true, then
1421  // when we reset the node we must store a task to run the node, and spawn it only
1422  // after the reset is complete and is_active() is again true. This is why we don't
1423  // test for is_active() here.
1425  return ( new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
1427  }
1430  void spawn_put( ) {
1431  if(internal::is_graph_active(this->my_graph)) {
1432  internal::spawn_in_graph_arena(this->my_graph, *create_put_task());
1433  }
1434  }
1435 
1439  output_type v;
1440  if ( !try_reserve_apply_body(v) )
1441  return NULL;
1442 
1443  task *last_task = my_successors.try_put_task(v);
1444  if ( last_task )
1445  try_consume();
1446  else
1447  try_release();
1448  return last_task;
1449  }
1450 }; // class source_node
1451 #endif // TBB_USE_SOURCE_NODE_AS_ALIAS
1452 
1454 template<typename Input, typename Output = continue_msg, typename Policy = queueing,
1455  typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(Input)>
1457  : public graph_node
1458 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
1459  , public internal::function_input< Input, Output, Policy, Allocator >
1460 #else
1461  , public internal::function_input< Input, Output, Policy, cache_aligned_allocator<Input> >
1462 #endif
1463  , public internal::function_output<Output> {
1464 
1465 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
1466  typedef Allocator internals_allocator;
1467 #else
1472  "Allocator template parameter for flow graph nodes is deprecated and will be removed. "
1473  "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface."
1474  );
1475 #endif
1476 
1477 public:
1478  typedef Input input_type;
1479  typedef Output output_type;
1485 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1486  typedef typename input_impl_type::predecessor_list_type predecessor_list_type;
1487  typedef typename fOutput_type::successor_list_type successor_list_type;
1488 #endif
1490 
1492  // input_queue_type is allocated here, but destroyed in the function_input_base.
1493  // TODO: pass the graph_buffer_policy to the function_input_base so it can all
1494  // be done in one place. This would be an interface-breaking change.
1495  template< typename Body >
1499 #else
1501 #endif
1503  fOutput_type(g) {
1504  tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_FUNCTION_NODE, &this->my_graph,
1505  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
1506  }
1507 
1508 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1509  template <typename Body>
1510  function_node( graph& g, size_t concurrency, Body body, node_priority_t priority )
1511  : function_node(g, concurrency, body, Policy(), priority) {}
1512 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1514 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1515  template <typename Body, typename... Args>
1516  function_node( const node_set<Args...>& nodes, size_t concurrency, Body body,
1519  make_edges_in_order(nodes, *this);
1520  }
1522 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1523  template <typename Body, typename... Args>
1524  function_node( const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t priority )
1525  : function_node(nodes, concurrency, body, Policy(), priority) {}
1526 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1527 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1528 
1531  graph_node(src.my_graph),
1532  input_impl_type(src),
1533  fOutput_type(src.my_graph) {
1534  tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_FUNCTION_NODE, &this->my_graph,
1535  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
1536  }
1537 
1538 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1539  void set_name( const char *name ) __TBB_override {
1541  }
1542 #endif
1544 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1545  void extract( ) __TBB_override {
1546  my_predecessors.built_predecessors().receiver_extract(*this);
1547  successors().built_successors().sender_extract(*this);
1548  }
1549 #endif
1551 protected:
1552  template< typename R, typename B > friend class run_and_put_task;
1553  template<typename X, typename Y> friend class internal::broadcast_cache;
1554  template<typename X, typename Y> friend class internal::round_robin_cache;
1556 
1558 
1561  // TODO: use clear() instead.
1562  if(f & rf_clear_edges) {
1563  successors().clear();
1565  }
1566  __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "function_node successors not empty");
1567  __TBB_ASSERT(this->my_predecessors.empty(), "function_node predecessors not empty");
1568  }
1570 }; // class function_node
1571 
1573 // Output is a tuple of output types.
1574 template<typename Input, typename Output, typename Policy = queueing,
1575  typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(Input)>
1577  public graph_node,
1579  <
1580  Input,
1581  typename internal::wrap_tuple_elements<
1582  tbb::flow::tuple_size<Output>::value, // #elements in tuple
1583  internal::multifunction_output, // wrap this around each element
1584  Output // the tuple providing the types
1585  >::type,
1586  Policy,
1587 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
1588  Allocator
1589 #else
1590  cache_aligned_allocator<Input>
1591 #endif
1592  > {
1593 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
1594  typedef Allocator internals_allocator;
1595 #else
1597 
1600  "Allocator template parameter for flow graph nodes is deprecated and will be removed. "
1601  "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface."
1602  );
1603 #endif
1604 
1605 protected:
1607 public:
1608  typedef Input input_type;
1614 private:
1616 public:
1617  template<typename Body>
1619  graph &g, size_t concurrency,
1622 #else
1624 #endif
1626  tbb::internal::fgt_multioutput_node_with_body<N>(
1627  CODEPTR(), tbb::internal::FLOW_MULTIFUNCTION_NODE,
1628  &this->my_graph, static_cast<receiver<input_type> *>(this),
1629  this->output_ports(), this->my_body
1630  );
1631  }
1632 
1633 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1634  template <typename Body>
1635  __TBB_NOINLINE_SYM multifunction_node(graph& g, size_t concurrency, Body body, node_priority_t priority)
1636  : multifunction_node(g, concurrency, body, Policy(), priority) {}
1637 #endif // TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1639 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1640  template <typename Body, typename... Args>
1641  __TBB_NOINLINE_SYM multifunction_node(const node_set<Args...>& nodes, size_t concurrency, Body body,
1644  make_edges_in_order(nodes, *this);
1645  }
1646 
1647 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1648  template <typename Body, typename... Args>
1649  __TBB_NOINLINE_SYM multifunction_node(const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t priority)
1650  : multifunction_node(nodes, concurrency, body, Policy(), priority) {}
1651 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1652 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1656  tbb::internal::fgt_multioutput_node_with_body<N>( CODEPTR(), tbb::internal::FLOW_MULTIFUNCTION_NODE,
1657  &this->my_graph, static_cast<receiver<input_type> *>(this),
1658  this->output_ports(), this->my_body );
1659  }
1661 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1662  void set_name( const char *name ) __TBB_override {
1664  }
1665 #endif
1667 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1668  void extract( ) __TBB_override {
1669  my_predecessors.built_predecessors().receiver_extract(*this);
1670  input_impl_type::extract();
1671  }
1672 #endif
1673  // all the guts are in multifunction_input...
1674 protected:
1676 }; // multifunction_node
1677 
1679 // successors. The node has unlimited concurrency, so it does not reject inputs.
1680 template<typename TupleType, typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(TupleType)>
1681 class split_node : public graph_node, public receiver<TupleType> {
1684 public:
1685  typedef TupleType input_type;
1686 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
1687  typedef Allocator allocator_type;
1688 #else
1691  "Allocator template parameter for flow graph nodes is deprecated and will be removed. "
1692  "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface."
1693  );
1694 #endif
1695 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1697  typedef typename base_type::predecessor_list_type predecessor_list_type;
1698  typedef internal::predecessor_cache<input_type, null_mutex > predecessor_cache_type;
1699  typedef typename predecessor_cache_type::built_predecessors_type built_predecessors_type;
1700 #endif
1703  N, // #elements in tuple
1704  internal::multifunction_output, // wrap this around each element
1705  TupleType // the tuple providing the types
1711  {
1712  tbb::internal::fgt_multioutput_node<N>(CODEPTR(), tbb::internal::FLOW_SPLIT_NODE, &this->my_graph,
1713  static_cast<receiver<input_type> *>(this), this->output_ports());
1714  }
1716 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1717  template <typename... Args>
1718  __TBB_NOINLINE_SYM split_node(const node_set<Args...>& nodes) : split_node(nodes.graph_reference()) {
1719  make_edges_in_order(nodes, *this);
1720  }
1721 #endif
1722 
1724  : graph_node(other.my_graph), base_type(other),
1726  {
1727  tbb::internal::fgt_multioutput_node<N>(CODEPTR(), tbb::internal::FLOW_SPLIT_NODE, &this->my_graph,
1728  static_cast<receiver<input_type> *>(this), this->output_ports());
1729  }
1730 
1731 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1732  void set_name( const char *name ) __TBB_override {
1734  }
1735 #endif
1736 
1738 
1739 protected:
1740  task *try_put_task(const TupleType& t) __TBB_override {
1741  // Sending split messages in parallel is not justified, as overheads would prevail.
1742  // Also, we do not have successors here. So we just tell the task returned here is successful.
1744  }
1746  if (f & rf_clear_edges)
1750  }
1753  return my_graph;
1754  }
1755 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1756 private:
1757  void extract() __TBB_override {}
1760  void internal_add_built_predecessor(predecessor_type&) __TBB_override {}
1763  void internal_delete_built_predecessor(predecessor_type&) __TBB_override {}
1765  size_t predecessor_count() __TBB_override { return 0; }
1767  void copy_predecessors(predecessor_list_type&) __TBB_override {}
1769  built_predecessors_type &built_predecessors() __TBB_override { return my_predessors; }
1770 
1772  built_predecessors_type my_predessors;
1773 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1774 
1775 private:
1777 };
1778 
1780 template <typename Output, typename Policy = internal::Policy<void> >
1781 class continue_node : public graph_node, public internal::continue_input<Output, Policy>,
1782  public internal::function_output<Output> {
1783 public:
1785  typedef Output output_type;
1790 
1792  template <typename Body >
1794  graph &g,
1796  Body body, __TBB_FLOW_GRAPH_PRIORITY_ARG1( Policy = Policy(), node_priority_t priority = tbb::flow::internal::no_priority )
1797 #else
1799 #endif
1800  ) : graph_node(g), input_impl_type( g, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority) ),
1801  fOutput_type(g) {
1802  tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1803 
1804  static_cast<receiver<input_type> *>(this),
1805  static_cast<sender<output_type> *>(this), this->my_body );
1806  }
1808 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1809  template <typename Body>
1810  continue_node( graph& g, Body body, node_priority_t priority )
1811  : continue_node(g, body, Policy(), priority) {}
1812 #endif
1814 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1815  template <typename Body, typename... Args>
1816  continue_node( const node_set<Args...>& nodes, Body body,
1819  make_edges_in_order(nodes, *this);
1820  }
1821 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1822  template <typename Body, typename... Args>
1823  continue_node( const node_set<Args...>& nodes, Body body, node_priority_t priority)
1824  : continue_node(nodes, body, Policy(), priority) {}
1825 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1826 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1829  template <typename Body >
1831  graph &g, int number_of_predecessors,
1834 #else
1836 #endif
1837  ) : graph_node(g)
1838  , input_impl_type(g, number_of_predecessors, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority)),
1839  fOutput_type(g) {
1840  tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1841  static_cast<receiver<input_type> *>(this),
1842  static_cast<sender<output_type> *>(this), this->my_body );
1843  }
1844 
1845 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1846  template <typename Body>
1847  continue_node( graph& g, int number_of_predecessors, Body body, node_priority_t priority)
1848  : continue_node(g, number_of_predecessors, body, Policy(), priority) {}
1849 #endif
1850 
1851 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1852  template <typename Body, typename... Args>
1853  continue_node( const node_set<Args...>& nodes, int number_of_predecessors,
1855  : continue_node(nodes.graph_reference(), number_of_predecessors, body, __TBB_FLOW_GRAPH_PRIORITY_ARG1(p, priority)) {
1856  make_edges_in_order(nodes, *this);
1857  }
1859 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1860  template <typename Body, typename... Args>
1861  continue_node( const node_set<Args...>& nodes, int number_of_predecessors,
1862  Body body, node_priority_t priority )
1863  : continue_node(nodes, number_of_predecessors, body, Policy(), priority) {}
1864 #endif
1865 #endif
1871  tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1872  static_cast<receiver<input_type> *>(this),
1873  static_cast<sender<output_type> *>(this), this->my_body );
1874  }
1875 
1876 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1877  void set_name( const char *name ) __TBB_override {
1879  }
1880 #endif
1882 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1883  void extract() __TBB_override {
1884  input_impl_type::my_built_predecessors.receiver_extract(*this);
1885  successors().built_successors().sender_extract(*this);
1886  }
1887 #endif
1889 protected:
1890  template< typename R, typename B > friend class run_and_put_task;
1891  template<typename X, typename Y> friend class internal::broadcast_cache;
1892  template<typename X, typename Y> friend class internal::round_robin_cache;
1895 
1898  if(f & rf_clear_edges)successors().clear();
1899  __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "continue_node not reset");
1900  }
1901 }; // continue_node
1902 
1904 template <typename T>
1905 class broadcast_node : public graph_node, public receiver<T>, public sender<T> {
1906 public:
1907  typedef T input_type;
1908  typedef T output_type;
1911 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1912  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
1913  typedef typename sender<output_type>::successor_list_type successor_list_type;
1914 #endif
1915 private:
1917 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1918  internal::edge_container<predecessor_type> my_built_predecessors;
1919  spin_mutex pred_mutex; // serialize accesses on edge_container
1920 #endif
1921 public:
1924  my_successors.set_owner( this );
1925  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_BROADCAST_NODE, &this->my_graph,
1926  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1927  }
1929 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1930  template <typename... Args>
1931  broadcast_node(const node_set<Args...>& nodes) : broadcast_node(nodes.graph_reference()) {
1932  make_edges_in_order(nodes, *this);
1933  }
1934 #endif
1936  // Copy constructor
1939  {
1940  my_successors.set_owner( this );
1941  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_BROADCAST_NODE, &this->my_graph,
1942  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1943  }
1945 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1946  void set_name( const char *name ) __TBB_override {
1948  }
1949 #endif
1954  return true;
1955  }
1956 
1960  return true;
1961  }
1962 
1963 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1964  typedef typename sender<T>::built_successors_type built_successors_type;
1965 
1966  built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
1967 
1968  void internal_add_built_successor(successor_type &r) __TBB_override {
1969  my_successors.internal_add_built_successor(r);
1970  }
1971 
1972  void internal_delete_built_successor(successor_type &r) __TBB_override {
1973  my_successors.internal_delete_built_successor(r);
1974  }
1976  size_t successor_count() __TBB_override {
1977  return my_successors.successor_count();
1978  }
1979 
1980  void copy_successors(successor_list_type &v) __TBB_override {
1981  my_successors.copy_successors(v);
1982  }
1983 
1984  typedef typename receiver<T>::built_predecessors_type built_predecessors_type;
1985 
1986  built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
1987 
1988  void internal_add_built_predecessor( predecessor_type &p) __TBB_override {
1989  spin_mutex::scoped_lock l(pred_mutex);
1990  my_built_predecessors.add_edge(p);
1991  }
1992 
1993  void internal_delete_built_predecessor( predecessor_type &p) __TBB_override {
1994  spin_mutex::scoped_lock l(pred_mutex);
1995  my_built_predecessors.delete_edge(p);
1996  }
1997 
1998  size_t predecessor_count() __TBB_override {
1999  spin_mutex::scoped_lock l(pred_mutex);
2000  return my_built_predecessors.edge_count();
2001  }
2002 
2003  void copy_predecessors(predecessor_list_type &v) __TBB_override {
2004  spin_mutex::scoped_lock l(pred_mutex);
2005  my_built_predecessors.copy_edges(v);
2006  }
2007 
2008  void extract() __TBB_override {
2009  my_built_predecessors.receiver_extract(*this);
2010  my_successors.built_successors().sender_extract(*this);
2011  }
2012 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
2013 
2014 protected:
2015  template< typename R, typename B > friend class run_and_put_task;
2016  template<typename X, typename Y> friend class internal::broadcast_cache;
2017  template<typename X, typename Y> friend class internal::round_robin_cache;
2020  task *new_task = my_successors.try_put_task(t);
2021  if (!new_task) new_task = SUCCESSFULLY_ENQUEUED;
2022  return new_task;
2023  }
2024 
2026  return my_graph;
2027  }
2028 
2030 
2032  if (f&rf_clear_edges) {
2033  my_successors.clear();
2034 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2035  my_built_predecessors.clear();
2036 #endif
2037  }
2038  __TBB_ASSERT(!(f & rf_clear_edges) || my_successors.empty(), "Error resetting broadcast_node");
2039  }
2040 }; // broadcast_node
2041 
2043 template <typename T, typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(T) >
2045  : public graph_node
2046 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
2047  , public internal::reservable_item_buffer< T, Allocator >
2048 #else
2049  , public internal::reservable_item_buffer< T, cache_aligned_allocator<T> >
2050 #endif
2051  , public receiver<T>, public sender<T> {
2052 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
2053  typedef Allocator internals_allocator;
2054 #else
2056 #endif
2057 public:
2058  typedef T input_type;
2059  typedef T output_type;
2063 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2064  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
2065  typedef typename sender<output_type>::successor_list_type successor_list_type;
2066 #endif
2067 #if !TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
2070  "Allocator template parameter for flow graph nodes is deprecated and will be removed. "
2071  "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface."
2072  );
2073 #endif
2074 
2075 protected:
2076  typedef size_t size_type;
2078 
2079 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2080  internal::edge_container<predecessor_type> my_built_predecessors;
2081 #endif
2082 
2084 
2086 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2087  , add_blt_succ, del_blt_succ,
2088  add_blt_pred, del_blt_pred,
2089  blt_succ_cnt, blt_pred_cnt,
2090  blt_succ_cpy, blt_pred_cpy // create vector copies of preds and succs
2091 #endif
2092  };
2093 
2094  // implements the aggregator_operation concept
2095  class buffer_operation : public internal::aggregated_operation< buffer_operation > {
2096  public:
2097  char type;
2098 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2099  task * ltask;
2100  union {
2101  input_type *elem;
2102  successor_type *r;
2104  size_t cnt_val;
2105  successor_list_type *svec;
2106  predecessor_list_type *pvec;
2107  };
2108 #else
2109  T *elem;
2112 #endif
2113  buffer_operation(const T& e, op_type t) : type(char(t))
2114 
2115 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2116  , ltask(NULL), elem(const_cast<T*>(&e))
2117 #else
2118  , elem(const_cast<T*>(&e)) , ltask(NULL)
2119 #endif
2120  {}
2121  buffer_operation(op_type t) : type(char(t)), ltask(NULL) {}
2122  };
2123 
2125  typedef internal::aggregating_functor<class_type, buffer_operation> handler_type;
2126  friend class internal::aggregating_functor<class_type, buffer_operation>;
2127  internal::aggregator< handler_type, buffer_operation> my_aggregator;
2128 
2129  virtual void handle_operations(buffer_operation *op_list) {
2130  handle_operations_impl(op_list, this);
2131  }
2132 
2133  template<typename derived_type>
2134  void handle_operations_impl(buffer_operation *op_list, derived_type* derived) {
2135  __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
2136 
2137  buffer_operation *tmp = NULL;
2138  bool try_forwarding = false;
2139  while (op_list) {
2140  tmp = op_list;
2141  op_list = op_list->next;
2142  switch (tmp->type) {
2143  case reg_succ: internal_reg_succ(tmp); try_forwarding = true; break;
2144  case rem_succ: internal_rem_succ(tmp); break;
2145  case req_item: internal_pop(tmp); break;
2146  case res_item: internal_reserve(tmp); break;
2147  case rel_res: internal_release(tmp); try_forwarding = true; break;
2148  case con_res: internal_consume(tmp); try_forwarding = true; break;
2149  case put_item: try_forwarding = internal_push(tmp); break;
2150  case try_fwd_task: internal_forward_task(tmp); break;
2151 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2152  // edge recording
2153  case add_blt_succ: internal_add_built_succ(tmp); break;
2154  case del_blt_succ: internal_del_built_succ(tmp); break;
2155  case add_blt_pred: internal_add_built_pred(tmp); break;
2156  case del_blt_pred: internal_del_built_pred(tmp); break;
2157  case blt_succ_cnt: internal_succ_cnt(tmp); break;
2158  case blt_pred_cnt: internal_pred_cnt(tmp); break;
2159  case blt_succ_cpy: internal_copy_succs(tmp); break;
2160  case blt_pred_cpy: internal_copy_preds(tmp); break;
2161 #endif
2162  }
2163  }
2164 
2165  derived->order();
2166 
2167  if (try_forwarding && !forwarder_busy) {
2168  if(internal::is_graph_active(this->my_graph)) {
2169  forwarder_busy = true;
2170  task *new_task = new(task::allocate_additional_child_of(*(this->my_graph.root_task()))) internal::
2171  forward_task_bypass<class_type>(*this);
2172  // tmp should point to the last item handled by the aggregator. This is the operation
2173  // the handling thread enqueued. So modifying that record will be okay.
2174  // workaround for icc bug
2175  tbb::task *z = tmp->ltask;
2176  graph &g = this->my_graph;
2177  tmp->ltask = combine_tasks(g, z, new_task); // in case the op generated a task
2178  }
2179  }
2180  } // handle_operations
2181 
2183  return op_data.ltask;
2184  }
2185 
2187  task *ft = grab_forwarding_task(op_data);
2188  if(ft) {
2190  return true;
2191  }
2192  return false;
2193  }
2194 
2196  virtual task *forward_task() {
2197  buffer_operation op_data(try_fwd_task);
2198  task *last_task = NULL;
2199  do {
2200  op_data.status = internal::WAIT;
2201  op_data.ltask = NULL;
2202  my_aggregator.execute(&op_data);
2203 
2204  // workaround for icc bug
2205  tbb::task *xtask = op_data.ltask;
2206  graph& g = this->my_graph;
2207  last_task = combine_tasks(g, last_task, xtask);
2208  } while (op_data.status ==internal::SUCCEEDED);
2209  return last_task;
2210  }
2211 
2214  my_successors.register_successor(*(op->r));
2216  }
2217 
2220  my_successors.remove_successor(*(op->r));
2222  }
2223 
2224 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2225  typedef typename sender<T>::built_successors_type built_successors_type;
2226 
2227  built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
2228 
2229  virtual void internal_add_built_succ(buffer_operation *op) {
2230  my_successors.internal_add_built_successor(*(op->r));
2232  }
2233 
2234  virtual void internal_del_built_succ(buffer_operation *op) {
2235  my_successors.internal_delete_built_successor(*(op->r));
2237  }
2238 
2239  typedef typename receiver<T>::built_predecessors_type built_predecessors_type;
2240 
2241  built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
2242 
2243  virtual void internal_add_built_pred(buffer_operation *op) {
2244  my_built_predecessors.add_edge(*(op->p));
2246  }
2247 
2248  virtual void internal_del_built_pred(buffer_operation *op) {
2249  my_built_predecessors.delete_edge(*(op->p));
2251  }
2252 
2253  virtual void internal_succ_cnt(buffer_operation *op) {
2254  op->cnt_val = my_successors.successor_count();
2256  }
2257 
2258  virtual void internal_pred_cnt(buffer_operation *op) {
2259  op->cnt_val = my_built_predecessors.edge_count();
2261  }
2262 
2263  virtual void internal_copy_succs(buffer_operation *op) {
2264  my_successors.copy_successors(*(op->svec));
2266  }
2267 
2268  virtual void internal_copy_preds(buffer_operation *op) {
2269  my_built_predecessors.copy_edges(*(op->pvec));
2271  }
2272 
2273 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
2274 
2275 private:
2276  void order() {}
2277 
2278  bool is_item_valid() {
2279  return this->my_item_valid(this->my_tail - 1);
2280  }
2281 
2282  void try_put_and_add_task(task*& last_task) {
2283  task *new_task = my_successors.try_put_task(this->back());
2284  if (new_task) {
2285  // workaround for icc bug
2286  graph& g = this->my_graph;
2287  last_task = combine_tasks(g, last_task, new_task);
2288  this->destroy_back();
2289  }
2290  }
2291 
2292 protected:
2295  internal_forward_task_impl(op, this);
2296  }
2297 
2298  template<typename derived_type>
2299  void internal_forward_task_impl(buffer_operation *op, derived_type* derived) {
2300  __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
2301 
2302  if (this->my_reserved || !derived->is_item_valid()) {
2304  this->forwarder_busy = false;
2305  return;
2306  }
2307  // Try forwarding, giving each successor a chance
2308  task * last_task = NULL;
2309  size_type counter = my_successors.size();
2310  for (; counter > 0 && derived->is_item_valid(); --counter)
2311  derived->try_put_and_add_task(last_task);
2312 
2313  op->ltask = last_task; // return task
2314  if (last_task && !counter) {
2316  }
2317  else {
2319  forwarder_busy = false;
2320  }
2321  }
2322 
2323  virtual bool internal_push(buffer_operation *op) {
2324  this->push_back(*(op->elem));
2326  return true;
2327  }
2328 
2329  virtual void internal_pop(buffer_operation *op) {
2330  if(this->pop_back(*(op->elem))) {
2332  }
2333  else {
2335  }
2336  }
2337 
2339  if(this->reserve_front(*(op->elem))) {
2341  }
2342  else {
2344  }
2345  }
2346 
2348  this->consume_front();
2350  }
2351 
2353  this->release_front();
2355  }
2356 
2357 public:
2361  sender<T>(), forwarder_busy(false)
2362  {
2363  my_successors.set_owner(this);
2364  my_aggregator.initialize_handler(handler_type(this));
2365  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_BUFFER_NODE, &this->my_graph,
2366  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
2367  }
2368 
2369 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2370  template <typename... Args>
2371  buffer_node(const node_set<Args...>& nodes) : buffer_node(nodes.graph_reference()) {
2372  make_edges_in_order(nodes, *this);
2373  }
2374 #endif
2375 
2379  receiver<T>(), sender<T>(), forwarder_busy(false)
2380  {
2381  my_successors.set_owner(this);
2382  my_aggregator.initialize_handler(handler_type(this));
2383  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_BUFFER_NODE, &this->my_graph,
2384  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
2385  }
2386 
2387 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2388  void set_name( const char *name ) __TBB_override {
2390  }
2391 #endif
2392 
2393  //
2394  // message sender implementation
2395  //
2396 
2398 
2400  buffer_operation op_data(reg_succ);
2401  op_data.r = &r;
2402  my_aggregator.execute(&op_data);
2403  (void)enqueue_forwarding_task(op_data);
2404  return true;
2405  }
2406 
2407 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2408  void internal_add_built_successor( successor_type &r) __TBB_override {
2409  buffer_operation op_data(add_blt_succ);
2410  op_data.r = &r;
2411  my_aggregator.execute(&op_data);
2412  }
2413 
2414  void internal_delete_built_successor( successor_type &r) __TBB_override {
2415  buffer_operation op_data(del_blt_succ);
2416  op_data.r = &r;
2417  my_aggregator.execute(&op_data);
2418  }
2419 
2420  void internal_add_built_predecessor( predecessor_type &p) __TBB_override {
2421  buffer_operation op_data(add_blt_pred);
2422  op_data.p = &p;
2423  my_aggregator.execute(&op_data);
2424  }
2425 
2426  void internal_delete_built_predecessor( predecessor_type &p) __TBB_override {
2427  buffer_operation op_data(del_blt_pred);
2428  op_data.p = &p;
2429  my_aggregator.execute(&op_data);
2430  }
2431 
2432  size_t predecessor_count() __TBB_override {
2433  buffer_operation op_data(blt_pred_cnt);
2434  my_aggregator.execute(&op_data);
2435  return op_data.cnt_val;
2436  }
2437 
2438  size_t successor_count() __TBB_override {
2439  buffer_operation op_data(blt_succ_cnt);
2440  my_aggregator.execute(&op_data);
2441  return op_data.cnt_val;
2442  }
2443 
2444  void copy_predecessors( predecessor_list_type &v ) __TBB_override {
2445  buffer_operation op_data(blt_pred_cpy);
2446  op_data.pvec = &v;
2447  my_aggregator.execute(&op_data);
2448  }
2449 
2450  void copy_successors( successor_list_type &v ) __TBB_override {
2451  buffer_operation op_data(blt_succ_cpy);
2452  op_data.svec = &v;
2453  my_aggregator.execute(&op_data);
2454  }
2455 
2456 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
2457 
2459 
2462  r.remove_predecessor(*this);
2463  buffer_operation op_data(rem_succ);
2464  op_data.r = &r;
2465  my_aggregator.execute(&op_data);
2466  // even though this operation does not cause a forward, if we are the handler, and
2467  // a forward is scheduled, we may be the first to reach this point after the aggregator,
2468  // and so should check for the task.
2469  (void)enqueue_forwarding_task(op_data);
2470  return true;
2471  }
2472 
2474 
2476  bool try_get( T &v ) __TBB_override {
2477  buffer_operation op_data(req_item);
2478  op_data.elem = &v;
2479  my_aggregator.execute(&op_data);
2480  (void)enqueue_forwarding_task(op_data);
2481  return (op_data.status==internal::SUCCEEDED);
2482  }
2483 
2485 
2488  buffer_operation op_data(res_item);
2489  op_data.elem = &v;
2490  my_aggregator.execute(&op_data);
2491  (void)enqueue_forwarding_task(op_data);
2492  return (op_data.status==internal::SUCCEEDED);
2493  }
2494 
2496 
2498  buffer_operation op_data(rel_res);
2499  my_aggregator.execute(&op_data);
2500  (void)enqueue_forwarding_task(op_data);
2501  return true;
2502  }
2503 
2505 
2507  buffer_operation op_data(con_res);
2508  my_aggregator.execute(&op_data);
2509  (void)enqueue_forwarding_task(op_data);
2510  return true;
2511  }
2512 
2513 protected:
2514 
2515  template< typename R, typename B > friend class run_and_put_task;
2516  template<typename X, typename Y> friend class internal::broadcast_cache;
2517  template<typename X, typename Y> friend class internal::round_robin_cache;
2520  buffer_operation op_data(t, put_item);
2521  my_aggregator.execute(&op_data);
2522  task *ft = grab_forwarding_task(op_data);
2523  // sequencer_nodes can return failure (if an item has been previously inserted)
2524  // We have to spawn the returned task if our own operation fails.
2525 
2526  if(ft && op_data.status ==internal::FAILED) {
2527  // we haven't succeeded queueing the item, but for some reason the
2528  // call returned a task (if another request resulted in a successful
2529  // forward this could happen.) Queue the task and reset the pointer.
2531  }
2532  else if(!ft && op_data.status ==internal::SUCCEEDED) {
2533  ft = SUCCESSFULLY_ENQUEUED;
2534  }
2535  return ft;
2536  }
2537 
2539  return my_graph;
2540  }
2541 
2543 
2544 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2545 public:
2546  void extract() __TBB_override {
2547  my_built_predecessors.receiver_extract(*this);
2548  my_successors.built_successors().sender_extract(*this);
2549  }
2550 #endif
2551 
2552 protected:
2555  // TODO: just clear structures
2556  if (f&rf_clear_edges) {
2557  my_successors.clear();
2558 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2559  my_built_predecessors.clear();
2560 #endif
2561  }
2562  forwarder_busy = false;
2563  }
2564 }; // buffer_node
2565 
2567 template <typename T, typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(T) >
2568 class queue_node : public buffer_node<T, Allocator> {
2569 #if !TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
2572  "Allocator template parameter for flow graph nodes is deprecated and will be removed. "
2573  "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface."
2574  );
2575 #endif
2576 protected:
2581 
2582 private:
2583  template<typename, typename> friend class buffer_node;
2584 
2585  bool is_item_valid() {
2586  return this->my_item_valid(this->my_head);
2587  }
2588 
2589  void try_put_and_add_task(task*& last_task) {
2590  task *new_task = this->my_successors.try_put_task(this->front());
2591  if (new_task) {
2592  // workaround for icc bug
2593  graph& graph_ref = this->graph_reference();
2594  last_task = combine_tasks(graph_ref, last_task, new_task);
2595  this->destroy_front();
2596  }
2597  }
2598 
2599 protected:
2600  void internal_forward_task(queue_operation *op) __TBB_override {
2601  this->internal_forward_task_impl(op, this);
2602  }
2603 
2605  if ( this->my_reserved || !this->my_item_valid(this->my_head)){
2607  }
2608  else {
2609  this->pop_front(*(op->elem));
2611  }
2612  }
2614  if (this->my_reserved || !this->my_item_valid(this->my_head)) {
2616  }
2617  else {
2618  this->reserve_front(*(op->elem));
2620  }
2621  }
2623  this->consume_front();
2625  }
2626 
2627 public:
2628  typedef T input_type;
2629  typedef T output_type;
2632 
2635  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_QUEUE_NODE, &(this->my_graph),
2636  static_cast<receiver<input_type> *>(this),
2637  static_cast<sender<output_type> *>(this) );
2638  }
2639 
2640 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2641  template <typename... Args>
2642  queue_node( const node_set<Args...>& nodes) : queue_node(nodes.graph_reference()) {
2643  make_edges_in_order(nodes, *this);
2644  }
2645 #endif
2646 
2649  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_QUEUE_NODE, &(this->my_graph),
2650  static_cast<receiver<input_type> *>(this),
2651  static_cast<sender<output_type> *>(this) );
2652  }
2653 
2654 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2655  void set_name( const char *name ) __TBB_override {
2657  }
2658 #endif
2659 
2660 protected:
2663  }
2664 }; // queue_node
2665 
2667 template< typename T, typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(T) >
2668 class sequencer_node : public queue_node<T, Allocator> {
2670  // my_sequencer should be a benign function and must be callable
2671  // from a parallel context. Does this mean it needn't be reset?
2672 public:
2673 #if !TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
2676  "Allocator template parameter for flow graph nodes is deprecated and will be removed. "
2677  "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface."
2678  );
2679 #endif
2680  typedef T input_type;
2681  typedef T output_type;
2684 
2686  template< typename Sequencer >
2687  __TBB_NOINLINE_SYM sequencer_node( graph &g, const Sequencer& s ) : queue_node<T, Allocator>(g),
2688  my_sequencer(new internal::function_body_leaf< T, size_t, Sequencer>(s) ) {
2689  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_SEQUENCER_NODE, &(this->my_graph),
2690  static_cast<receiver<input_type> *>(this),
2691  static_cast<sender<output_type> *>(this) );
2692  }
2693 
2694 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2695  template <typename Sequencer, typename... Args>
2696  sequencer_node( const node_set<Args...>& nodes, const Sequencer& s)
2697  : sequencer_node(nodes.graph_reference(), s) {
2698  make_edges_in_order(nodes, *this);
2699  }
2700 #endif
2701 
2703  __TBB_NOINLINE_SYM sequencer_node( const sequencer_node& src ) : queue_node<T, Allocator>(src),
2704  my_sequencer( src.my_sequencer->clone() ) {
2705  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_SEQUENCER_NODE, &(this->my_graph),
2706  static_cast<receiver<input_type> *>(this),
2707  static_cast<sender<output_type> *>(this) );
2708  }
2709 
2712 
2713 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2714  void set_name( const char *name ) __TBB_override {
2716  }
2717 #endif
2718 
2719 protected:
2722 
2723 private:
2725  size_type tag = (*my_sequencer)(*(op->elem));
2726 #if !TBB_DEPRECATED_SEQUENCER_DUPLICATES
2727  if (tag < this->my_head) {
2728  // have already emitted a message with this tag
2730  return false;
2731  }
2732 #endif
2733  // cannot modify this->my_tail now; the buffer would be inconsistent.
2734  size_t new_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
2735 
2736  if (this->size(new_tail) > this->capacity()) {
2737  this->grow_my_array(this->size(new_tail));
2738  }
2739  this->my_tail = new_tail;
2740 
2741  const internal::op_stat res = this->place_item(tag, *(op->elem)) ? internal::SUCCEEDED : internal::FAILED;
2742  __TBB_store_with_release(op->status, res);
2743  return res ==internal::SUCCEEDED;
2744  }
2745 }; // sequencer_node
2746 
2748 template<typename T, typename Compare = std::less<T>, typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(T)>
2749 class priority_queue_node : public buffer_node<T, Allocator> {
2750 public:
2751 #if !TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
2754  "Allocator template parameter for flow graph nodes is deprecated and will removed in the future. "
2755  "To temporary enable the deprecated interface specify TBB_ENABLE_DEPRECATED_NODE_ALLOCATOR."
2756  );
2757 #endif
2758  typedef T input_type;
2759  typedef T output_type;
2764 
2766  __TBB_NOINLINE_SYM explicit priority_queue_node( graph &g, const Compare& comp = Compare() )
2767  : buffer_node<T, Allocator>(g), compare(comp), mark(0) {
2768  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
2769  static_cast<receiver<input_type> *>(this),
2770  static_cast<sender<output_type> *>(this) );
2771  }
2772 
2773 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2774  template <typename... Args>
2775  priority_queue_node(const node_set<Args...>& nodes, const Compare& comp = Compare())
2776  : priority_queue_node(nodes.graph_reference(), comp) {
2777  make_edges_in_order(nodes, *this);
2778  }
2779 #endif
2780 
2783  : buffer_node<T, Allocator>(src), mark(0)
2784  {
2785  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
2786  static_cast<receiver<input_type> *>(this),
2787  static_cast<sender<output_type> *>(this) );
2788  }
2789 
2790 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2791  void set_name( const char *name ) __TBB_override {
2793  }
2794 #endif
2795 
2796 protected:
2797 
2799  mark = 0;
2801  }
2802 
2806 
2809  this->internal_forward_task_impl(op, this);
2810  }
2811 
2813  this->handle_operations_impl(op_list, this);
2814  }
2815 
2817  prio_push(*(op->elem));
2819  return true;
2820  }
2821 
2823  // if empty or already reserved, don't pop
2824  if ( this->my_reserved == true || this->my_tail == 0 ) {
2826  return;
2827  }
2828 
2829  *(op->elem) = prio();
2831  prio_pop();
2832 
2833  }
2834 
2835  // pops the highest-priority item, saves copy
2837  if (this->my_reserved == true || this->my_tail == 0) {
2839  return;
2840  }
2841  this->my_reserved = true;
2842  *(op->elem) = prio();
2843  reserved_item = *(op->elem);
2845  prio_pop();
2846  }
2847 
2850  this->my_reserved = false;
2852  }
2853 
2857  this->my_reserved = false;
2859  }
2860 
2861 private:
2862  template<typename, typename> friend class buffer_node;
2863 
2864  void order() {
2865  if (mark < this->my_tail) heapify();
2866  __TBB_ASSERT(mark == this->my_tail, "mark unequal after heapify");
2867  }
2868 
2869  bool is_item_valid() {
2870  return this->my_tail > 0;
2871  }
2872 
2873  void try_put_and_add_task(task*& last_task) {
2874  task * new_task = this->my_successors.try_put_task(this->prio());
2875  if (new_task) {
2876  // workaround for icc bug
2877  graph& graph_ref = this->graph_reference();
2878  last_task = combine_tasks(graph_ref, last_task, new_task);
2879  prio_pop();
2880  }
2881  }
2882 
2883 private:
2884  Compare compare;
2886 
2888 
2889  // in case a reheap has not been done after a push, check if the mark item is higher than the 0'th item
2890  bool prio_use_tail() {
2891  __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds before test");
2892  return mark < this->my_tail && compare(this->get_my_item(0), this->get_my_item(this->my_tail - 1));
2893  }
2894 
2895  // prio_push: checks that the item will fit, expand array if necessary, put at end
2896  void prio_push(const T &src) {
2897  if ( this->my_tail >= this->my_array_size )
2898  this->grow_my_array( this->my_tail + 1 );
2899  (void) this->place_item(this->my_tail, src);
2900  ++(this->my_tail);
2901  __TBB_ASSERT(mark < this->my_tail, "mark outside bounds after push");
2902  }
2903 
2904  // prio_pop: deletes highest priority item from the array, and if it is item
2905  // 0, move last item to 0 and reheap. If end of array, just destroy and decrement tail
2906  // and mark. Assumes the array has already been tested for emptiness; no failure.
2907  void prio_pop() {
2908  if (prio_use_tail()) {
2909  // there are newly pushed elements; last one higher than top
2910  // copy the data
2911  this->destroy_item(this->my_tail-1);
2912  --(this->my_tail);
2913  __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
2914  return;
2915  }
2916  this->destroy_item(0);
2917  if(this->my_tail > 1) {
2918  // push the last element down heap
2919  __TBB_ASSERT(this->my_item_valid(this->my_tail - 1), NULL);
2920  this->move_item(0,this->my_tail - 1);
2921  }
2922  --(this->my_tail);
2923  if(mark > this->my_tail) --mark;
2924  if (this->my_tail > 1) // don't reheap for heap of size 1
2925  reheap();
2926  __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
2927  }
2928 
2929  const T& prio() {
2930  return this->get_my_item(prio_use_tail() ? this->my_tail-1 : 0);
2931  }
2932 
2933  // turn array into heap
2934  void heapify() {
2935  if(this->my_tail == 0) {
2936  mark = 0;
2937  return;
2938  }
2939  if (!mark) mark = 1;
2940  for (; mark<this->my_tail; ++mark) { // for each unheaped element
2941  size_type cur_pos = mark;
2942  input_type to_place;
2943  this->fetch_item(mark,to_place);
2944  do { // push to_place up the heap
2945  size_type parent = (cur_pos-1)>>1;
2946  if (!compare(this->get_my_item(parent), to_place))
2947  break;
2948  this->move_item(cur_pos, parent);
2949  cur_pos = parent;
2950  } while( cur_pos );
2951  (void) this->place_item(cur_pos, to_place);
2952  }
2953  }
2954 
2955  // otherwise heapified array with new root element; rearrange to heap
2956  void reheap() {
2957  size_type cur_pos=0, child=1;
2958  while (child < mark) {
2959  size_type target = child;
2960  if (child+1<mark &&
2961  compare(this->get_my_item(child),
2962  this->get_my_item(child+1)))
2963  ++target;
2964  // target now has the higher priority child
2965  if (compare(this->get_my_item(target),
2966  this->get_my_item(cur_pos)))
2967  break;
2968  // swap
2969  this->swap_items(cur_pos, target);
2970  cur_pos = target;
2971  child = (cur_pos<<1)+1;
2972  }
2973  }
2974 }; // priority_queue_node
2975 
2976 } // interfaceX
2977 
2978 namespace interface11 {
2979 
2981 
2984 template< typename T, typename DecrementType=continue_msg >
2985 class limiter_node : public graph_node, public receiver< T >, public sender< T > {
2986 public:
2987  typedef T input_type;
2988  typedef T output_type;
2991 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2992  typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
2993  typedef typename sender<output_type>::built_successors_type built_successors_type;
2994  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
2995  typedef typename sender<output_type>::successor_list_type successor_list_type;
2996 #endif
2997  //TODO: There is a lack of predefined types for its controlling "decrementer" port. It should be fixed later.
2998 
2999 private:
3001  size_t my_count; //number of successful puts
3002  size_t my_tries; //number of active put attempts
3006  __TBB_DEPRECATED_LIMITER_EXPR( int init_decrement_predecessors; )
3007 
3009 
3010  // Let decrementer call decrement_counter()
3011  friend class internal::decrementer< limiter_node<T,DecrementType>, DecrementType >;
3012 
3013  bool check_conditions() { // always called under lock
3014  return ( my_count + my_tries < my_threshold && !my_predecessors.empty() && !my_successors.empty() );
3015  }
3016 
3017  // only returns a valid task pointer or NULL, never SUCCESSFULLY_ENQUEUED
3019  input_type v;
3020  task *rval = NULL;
3021  bool reserved = false;
3022  {
3024  if ( check_conditions() )
3025  ++my_tries;
3026  else
3027  return NULL;
3028  }
3029 
3030  //SUCCESS
3031  // if we can reserve and can put, we consume the reservation
3032  // we increment the count and decrement the tries
3033  if ( (my_predecessors.try_reserve(v)) == true ){
3034  reserved=true;
3035  if ( (rval = my_successors.try_put_task(v)) != NULL ){
3036  {
3038  ++my_count;
3039  --my_tries;
3040  my_predecessors.try_consume();
3041  if ( check_conditions() ) {
3042  if ( internal::is_graph_active(this->my_graph) ) {
3043  task *rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
3046  }
3047  }
3048  }
3049  return rval;
3050  }
3051  }
3052  //FAILURE
3053  //if we can't reserve, we decrement the tries
3054  //if we can reserve but can't put, we decrement the tries and release the reservation
3055  {
3057  --my_tries;
3058  if (reserved) my_predecessors.try_release();
3059  if ( check_conditions() ) {
3060  if ( internal::is_graph_active(this->my_graph) ) {
3061  task *rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
3063  __TBB_ASSERT(!rval, "Have two tasks to handle");
3064  return rtask;
3065  }
3066  }
3067  return rval;
3068  }
3069  }
3070 
3071  void forward() {
3072  __TBB_ASSERT(false, "Should never be called");
3073  return;
3074  }
3075 
3076  task* decrement_counter( long long delta ) {
3077  {
3079  if( delta > 0 && size_t(delta) > my_count )
3080  my_count = 0;
3081  else if( delta < 0 && size_t(delta) > my_threshold - my_count )
3083  else
3084  my_count -= size_t(delta); // absolute value of delta is sufficiently small
3085  }
3086  return forward_task();
3087  }
3088 
3089  void initialize() {
3090  my_predecessors.set_owner(this);
3091  my_successors.set_owner(this);
3092  decrement.set_owner(this);
3094  CODEPTR(), tbb::internal::FLOW_LIMITER_NODE, &this->my_graph,
3095  static_cast<receiver<input_type> *>(this), static_cast<receiver<DecrementType> *>(&decrement),
3096  static_cast<sender<output_type> *>(this)
3097  );
3098  }
3099 public:
3102 
3103 #if TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
3105  "Deprecated interface of the limiter node can be used only in conjunction "
3106  "with continue_msg as the type of DecrementType template parameter." );
3107 #endif // Check for incompatible interface
3108 
3111  __TBB_DEPRECATED_LIMITER_ARG2(size_t threshold, int num_decrement_predecessors=0))
3112  : graph_node(g), my_threshold(threshold), my_count(0),
3114  my_tries(0), decrement(),
3115  init_decrement_predecessors(num_decrement_predecessors),
3116  decrement(num_decrement_predecessors)) {
3117  initialize();
3118  }
3119 
3120 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3121  template <typename... Args>
3122  limiter_node(const node_set<Args...>& nodes, size_t threshold)
3123  : limiter_node(nodes.graph_reference(), threshold) {
3124  make_edges_in_order(nodes, *this);
3125  }
3126 #endif
3127 
3129  limiter_node( const limiter_node& src ) :
3130  graph_node(src.my_graph), receiver<T>(), sender<T>(),
3133  my_tries(0), decrement(),
3134  init_decrement_predecessors(src.init_decrement_predecessors),
3135  decrement(src.init_decrement_predecessors)) {
3136  initialize();
3137  }
3138 
3139 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3140  void set_name( const char *name ) __TBB_override {
3142  }
3143 #endif
3144 
3148  bool was_empty = my_successors.empty();
3150  //spawn a forward task if this is the only successor
3151  if ( was_empty && !my_predecessors.empty() && my_count + my_tries < my_threshold ) {
3152  if ( internal::is_graph_active(this->my_graph) ) {
3153  task* task = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
3156  }
3157  }
3158  return true;
3159  }
3160 
3162 
3164  r.remove_predecessor(*this);
3166  return true;
3167  }
3168 
3169 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3170  built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
3171  built_predecessors_type &built_predecessors() __TBB_override { return my_predecessors.built_predecessors(); }
3172 
3173  void internal_add_built_successor(successor_type &src) __TBB_override {
3174  my_successors.internal_add_built_successor(src);
3175  }
3176 
3177  void internal_delete_built_successor(successor_type &src) __TBB_override {
3178  my_successors.internal_delete_built_successor(src);
3179  }
3180 
3181  size_t successor_count() __TBB_override { return my_successors.successor_count(); }
3182 
3183  void copy_successors(successor_list_type &v) __TBB_override {
3184  my_successors.copy_successors(v);
3185  }
3186 
3187  void internal_add_built_predecessor(predecessor_type &src) __TBB_override {
3188  my_predecessors.internal_add_built_predecessor(src);
3189  }
3190 
3191  void internal_delete_built_predecessor(predecessor_type &src) __TBB_override {
3192  my_predecessors.internal_delete_built_predecessor(src);
3193  }
3194 
3195  size_t predecessor_count() __TBB_override { return my_predecessors.predecessor_count(); }
3196 
3197  void copy_predecessors(predecessor_list_type &v) __TBB_override {
3198  my_predecessors.copy_predecessors(v);
3199  }
3200 
3201  void extract() __TBB_override {
3202  my_count = 0;
3203  my_successors.built_successors().sender_extract(*this);
3204  my_predecessors.built_predecessors().receiver_extract(*this);
3205  decrement.built_predecessors().receiver_extract(decrement);
3206  }
3207 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
3208 
3212  my_predecessors.add( src );
3214  task* task = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
3217  }
3218  return true;
3219  }
3220 
3223  my_predecessors.remove( src );
3224  return true;
3225  }
3226 
3227 protected:
3228 
3229  template< typename R, typename B > friend class run_and_put_task;
3230  template<typename X, typename Y> friend class internal::broadcast_cache;
3231  template<typename X, typename Y> friend class internal::round_robin_cache;
3234  {
3236  if ( my_count + my_tries >= my_threshold )
3237  return NULL;
3238  else
3239  ++my_tries;
3240  }
3241 
3242  task * rtask = my_successors.try_put_task(t);
3243 
3244  if ( !rtask ) { // try_put_task failed.
3246  --my_tries;
3248  rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
3250  }
3251  }
3252  else {
3254  ++my_count;
3255  --my_tries;
3256  }
3257  return rtask;
3258  }
3259 
3261 
3263  __TBB_ASSERT(false,NULL); // should never be called
3264  }
3265 
3267  my_count = 0;
3268  if(f & rf_clear_edges) {
3269  my_predecessors.clear();
3270  my_successors.clear();
3271  }
3272  else
3273  {
3274  my_predecessors.reset( );
3275  }
3276  decrement.reset_receiver(f);
3277  }
3278 }; // limiter_node
3279 
3281 
3285 using internal::input_port;
3286 using internal::tag_value;
3287 
3288 template<typename OutputTuple, typename JP=queueing> class join_node;
3289 
3290 template<typename OutputTuple>
3291 class join_node<OutputTuple,reserving>: public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value, reserving_port, OutputTuple, reserving> {
3292 private:
3295 public:
3296  typedef OutputTuple output_type;
3299  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_RESERVING, &this->my_graph,
3300  this->input_ports(), static_cast< sender< output_type > *>(this) );
3301  }
3302 
3303 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3304  template <typename... Args>
3305  __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, reserving = reserving()) : join_node(nodes.graph_reference()) {
3306  make_edges_in_order(nodes, *this);
3307  }
3308 #endif
3309 
3311  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_RESERVING, &this->my_graph,
3312  this->input_ports(), static_cast< sender< output_type > *>(this) );
3313  }
3314 
3315 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3316  void set_name( const char *name ) __TBB_override {
3318  }
3319 #endif
3320 
3321 };
3322 
3323 template<typename OutputTuple>
3324 class join_node<OutputTuple,queueing>: public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value, queueing_port, OutputTuple, queueing> {
3325 private:
3328 public:
3329  typedef OutputTuple output_type;
3332  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
3333  this->input_ports(), static_cast< sender< output_type > *>(this) );
3334  }
3335 
3336 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3337  template <typename... Args>
3338  __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, queueing = queueing()) : join_node(nodes.graph_reference()) {
3339  make_edges_in_order(nodes, *this);
3340  }
3341 #endif
3342 
3344  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
3345  this->input_ports(), static_cast< sender< output_type > *>(this) );
3346  }
3347 
3348 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3349  void set_name( const char *name ) __TBB_override {
3351  }
3352 #endif
3353 
3354 };
3355 
3356 // template for key_matching join_node
3357 // tag_matching join_node is a specialization of key_matching, and is source-compatible.
3358 template<typename OutputTuple, typename K, typename KHash>
3359 class join_node<OutputTuple, key_matching<K, KHash> > : public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value,
3360  key_matching_port, OutputTuple, key_matching<K,KHash> > {
3361 private:
3364 public:
3365  typedef OutputTuple output_type;
3367 
3368 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
3370 
3371 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3372  template <typename... Args>
3373  join_node(const node_set<Args...>& nodes, key_matching<K, KHash> = key_matching<K, KHash>())
3374  : join_node(nodes.graph_reference()) {
3375  make_edges_in_order(nodes, *this);
3376  }
3377 #endif
3378 
3379 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
3380 
3381  template<typename __TBB_B0, typename __TBB_B1>
3382  __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1) : unfolded_type(g, b0, b1) {
3383  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3384  this->input_ports(), static_cast< sender< output_type > *>(this) );
3385  }
3386  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2>
3387  __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2) : unfolded_type(g, b0, b1, b2) {
3388  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3389  this->input_ports(), static_cast< sender< output_type > *>(this) );
3390  }
3391  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3>
3392  __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3) : unfolded_type(g, b0, b1, b2, b3) {
3393  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3394  this->input_ports(), static_cast< sender< output_type > *>(this) );
3395  }
3396  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4>
3397  __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4) :
3398  unfolded_type(g, b0, b1, b2, b3, b4) {
3399  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3400  this->input_ports(), static_cast< sender< output_type > *>(this) );
3401  }
3402 #if __TBB_VARIADIC_MAX >= 6
3403  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3404  typename __TBB_B5>
3405  __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5) :
3406  unfolded_type(g, b0, b1, b2, b3, b4, b5) {
3407  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3408  this->input_ports(), static_cast< sender< output_type > *>(this) );
3409  }
3410 #endif
3411 #if __TBB_VARIADIC_MAX >= 7
3412  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3413  typename __TBB_B5, typename __TBB_B6>
3414  __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6) :
3415  unfolded_type(g, b0, b1, b2, b3, b4, b5, b6) {
3416  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3417  this->input_ports(), static_cast< sender< output_type > *>(this) );
3418  }
3419 #endif
3420 #if __TBB_VARIADIC_MAX >= 8
3421  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3422  typename __TBB_B5, typename __TBB_B6, typename __TBB_B7>
3423  __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
3424  __TBB_B7 b7) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7) {
3425  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3426  this->input_ports(), static_cast< sender< output_type > *>(this) );
3427  }
3428 #endif
3429 #if __TBB_VARIADIC_MAX >= 9
3430  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3431  typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8>
3432  __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
3433  __TBB_B7 b7, __TBB_B8 b8) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8) {
3434  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3435  this->input_ports(), static_cast< sender< output_type > *>(this) );
3436  }
3437 #endif
3438 #if __TBB_VARIADIC_MAX >= 10
3439  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3440  typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8, typename __TBB_B9>
3441  __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
3442  __TBB_B7 b7, __TBB_B8 b8, __TBB_B9 b9) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8, b9) {
3443  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3444  this->input_ports(), static_cast< sender< output_type > *>(this) );
3445  }
3446 #endif
3447 
3448 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3449  template <typename... Args, typename... Bodies>
3450  __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, Bodies... bodies)
3451  : join_node(nodes.graph_reference(), bodies...) {
3452  make_edges_in_order(nodes, *this);
3453  }
3454 #endif
3455 
3457  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3458  this->input_ports(), static_cast< sender< output_type > *>(this) );
3459  }
3460 
3461 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3462  void set_name( const char *name ) __TBB_override {
3464  }
3465 #endif
3466 
3467 };
3468 
3469 // indexer node
3471 
3472 // TODO: Implement interface with variadic template or tuple
3473 template<typename T0, typename T1=null_type, typename T2=null_type, typename T3=null_type,
3474  typename T4=null_type, typename T5=null_type, typename T6=null_type,
3475  typename T7=null_type, typename T8=null_type, typename T9=null_type> class indexer_node;
3476 
3477 //indexer node specializations
3478 template<typename T0>
3479 class indexer_node<T0> : public internal::unfolded_indexer_node<tuple<T0> > {
3480 private:
3481  static const int N = 1;
3482 public:
3483  typedef tuple<T0> InputTuple;
3487  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3488  this->input_ports(), static_cast< sender< output_type > *>(this) );
3489  }
3490 
3491 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3492  template <typename... Args>
3493  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3494  make_edges_in_order(nodes, *this);
3495  }
3496 #endif
3497 
3498  // Copy constructor
3500  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3501  this->input_ports(), static_cast< sender< output_type > *>(this) );
3502  }
3503 
3504 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3505  void set_name( const char *name ) __TBB_override {
3507  }
3508 #endif
3509 };
3510 
3511 template<typename T0, typename T1>
3512 class indexer_node<T0, T1> : public internal::unfolded_indexer_node<tuple<T0, T1> > {
3513 private:
3514  static const int N = 2;
3515 public:
3516  typedef tuple<T0, T1> InputTuple;
3520  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3521  this->input_ports(), static_cast< sender< output_type > *>(this) );
3522  }
3523 
3524 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3525  template <typename... Args>
3526  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3527  make_edges_in_order(nodes, *this);
3528  }
3529 #endif
3530 
3531  // Copy constructor
3533  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3534  this->input_ports(), static_cast< sender< output_type > *>(this) );
3535  }
3536 
3537 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3538  void set_name( const char *name ) __TBB_override {
3540  }
3541 #endif
3542 };
3543 
3544 template<typename T0, typename T1, typename T2>
3545 class indexer_node<T0, T1, T2> : public internal::unfolded_indexer_node<tuple<T0, T1, T2> > {
3546 private:
3547  static const int N = 3;
3548 public:
3549  typedef tuple<T0, T1, T2> InputTuple;
3553  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3554  this->input_ports(), static_cast< sender< output_type > *>(this) );
3555  }
3556 
3557 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3558  template <typename... Args>
3559  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3560  make_edges_in_order(nodes, *this);
3561  }
3562 #endif
3563 
3564  // Copy constructor
3566  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3567  this->input_ports(), static_cast< sender< output_type > *>(this) );
3568  }
3569 
3570 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3571  void set_name( const char *name ) __TBB_override {
3573  }
3574 #endif
3575 };
3576 
3577 template<typename T0, typename T1, typename T2, typename T3>
3578 class indexer_node<T0, T1, T2, T3> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3> > {
3579 private:
3580  static const int N = 4;
3581 public:
3582  typedef tuple<T0, T1, T2, T3> InputTuple;
3586  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3587  this->input_ports(), static_cast< sender< output_type > *>(this) );
3588  }
3589 
3590 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3591  template <typename... Args>
3592  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3593  make_edges_in_order(nodes, *this);
3594  }
3595 #endif
3596 
3597  // Copy constructor
3599  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3600  this->input_ports(), static_cast< sender< output_type > *>(this) );
3601  }
3602 
3603 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3604  void set_name( const char *name ) __TBB_override {
3606  }
3607 #endif
3608 };
3609 
3610 template<typename T0, typename T1, typename T2, typename T3, typename T4>
3611 class indexer_node<T0, T1, T2, T3, T4> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4> > {
3612 private:
3613  static const int N = 5;
3614 public:
3615  typedef tuple<T0, T1, T2, T3, T4> InputTuple;
3619  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3620  this->input_ports(), static_cast< sender< output_type > *>(this) );
3621  }
3622 
3623 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3624  template <typename... Args>
3625  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3626  make_edges_in_order(nodes, *this);
3627  }
3628 #endif
3629 
3630  // Copy constructor
3632  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3633  this->input_ports(), static_cast< sender< output_type > *>(this) );
3634  }
3635 
3636 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3637  void set_name( const char *name ) __TBB_override {
3639  }
3640 #endif
3641 };
3642 
3643 #if __TBB_VARIADIC_MAX >= 6
3644 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5>
3645 class indexer_node<T0, T1, T2, T3, T4, T5> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5> > {
3646 private:
3647  static const int N = 6;
3648 public:
3649  typedef tuple<T0, T1, T2, T3, T4, T5> InputTuple;
3653  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3654  this->input_ports(), static_cast< sender< output_type > *>(this) );
3655  }
3656 
3657 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3658  template <typename... Args>
3659  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3660  make_edges_in_order(nodes, *this);
3661  }
3662 #endif
3663 
3664  // Copy constructor
3666  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3667  this->input_ports(), static_cast< sender< output_type > *>(this) );
3668  }
3669 
3670 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3671  void set_name( const char *name ) __TBB_override {
3673  }
3674 #endif
3675 };
3676 #endif //variadic max 6
3677 
3678 #if __TBB_VARIADIC_MAX >= 7
3679 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3680  typename T6>
3681 class indexer_node<T0, T1, T2, T3, T4, T5, T6> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6> > {
3682 private:
3683  static const int N = 7;
3684 public:
3685  typedef tuple<T0, T1, T2, T3, T4, T5, T6> InputTuple;
3689  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3690  this->input_ports(), static_cast< sender< output_type > *>(this) );
3691  }
3692 
3693 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3694  template <typename... Args>
3695  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3696  make_edges_in_order(nodes, *this);
3697  }
3698 #endif
3699 
3700  // Copy constructor
3702  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3703  this->input_ports(), static_cast< sender< output_type > *>(this) );
3704  }
3705 
3706 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3707  void set_name( const char *name ) __TBB_override {
3709  }
3710 #endif
3711 };
3712 #endif //variadic max 7
3713 
3714 #if __TBB_VARIADIC_MAX >= 8
3715 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3716  typename T6, typename T7>
3717 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7> > {
3718 private:
3719  static const int N = 8;
3720 public:
3721  typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7> InputTuple;
3725  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3726  this->input_ports(), static_cast< sender< output_type > *>(this) );
3727  }
3728 
3729 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3730  template <typename... Args>
3731  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3732  make_edges_in_order(nodes, *this);
3733  }
3734 #endif
3735 
3736  // Copy constructor
3737  indexer_node( const indexer_node& other ) : unfolded_type(other) {
3738  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3739  this->input_ports(), static_cast< sender< output_type > *>(this) );
3740  }
3741 
3742 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3743  void set_name( const char *name ) __TBB_override {
3745  }
3746 #endif
3747 };
3748 #endif //variadic max 8
3749 
3750 #if __TBB_VARIADIC_MAX >= 9
3751 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3752  typename T6, typename T7, typename T8>
3753 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7, T8> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> > {
3754 private:
3755  static const int N = 9;
3756 public:
3757  typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> InputTuple;
3761  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3762  this->input_ports(), static_cast< sender< output_type > *>(this) );
3763  }
3764 
3765 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3766  template <typename... Args>
3767  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3768  make_edges_in_order(nodes, *this);
3769  }
3770 #endif
3771 
3772  // Copy constructor
3774  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3775  this->input_ports(), static_cast< sender< output_type > *>(this) );
3776  }
3777 
3778 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3779  void set_name( const char *name ) __TBB_override {
3781  }
3782 #endif
3783 };
3784 #endif //variadic max 9
3785 
3786 #if __TBB_VARIADIC_MAX >= 10
3787 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3788  typename T6, typename T7, typename T8, typename T9>
3789 class indexer_node/*default*/ : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> > {
3790 private:
3791  static const int N = 10;
3792 public:
3793  typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> InputTuple;
3797  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3798  this->input_ports(), static_cast< sender< output_type > *>(this) );
3799  }
3800 
3801 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3802  template <typename... Args>
3803  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3804  make_edges_in_order(nodes, *this);
3805  }
3806 #endif
3807 
3808  // Copy constructor
3810  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3811  this->input_ports(), static_cast< sender< output_type > *>(this) );
3812  }
3813 
3814 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3815  void set_name( const char *name ) __TBB_override {
3817  }
3818 #endif
3819 };
3820 #endif //variadic max 10
3821 
3822 #if __TBB_PREVIEW_ASYNC_MSG
3824 #else
3825 template< typename T >
3826 inline void internal_make_edge( sender<T> &p, receiver<T> &s ) {
3827 #endif
3828 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3829  s.internal_add_built_predecessor(p);
3830  p.internal_add_built_successor(s);
3831 #endif
3832  p.register_successor( s );
3834 }
3835 
3837 template< typename T >
3838 inline void make_edge( sender<T> &p, receiver<T> &s ) {
3839  internal_make_edge( p, s );
3840 }
3841 
3842 #if __TBB_PREVIEW_ASYNC_MSG
3843 template< typename TS, typename TR,
3846 inline void make_edge( TS &p, TR &s ) {
3847  internal_make_edge( p, s );
3848 }
3849 
3850 template< typename T >
3852  internal_make_edge( p, s );
3853 }
3854 
3855 template< typename T >
3857  internal_make_edge( p, s );
3858 }
3859 
3860 #endif // __TBB_PREVIEW_ASYNC_MSG
3861 
3862 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
3863 //Makes an edge from port 0 of a multi-output predecessor to port 0 of a multi-input successor.
3864 template< typename T, typename V,
3865  typename = typename T::output_ports_type, typename = typename V::input_ports_type >
3866 inline void make_edge( T& output, V& input) {
3867  make_edge(get<0>(output.output_ports()), get<0>(input.input_ports()));
3868 }
3869 
3870 //Makes an edge from port 0 of a multi-output predecessor to a receiver.
3871 template< typename T, typename R,
3872  typename = typename T::output_ports_type >
3873 inline void make_edge( T& output, receiver<R>& input) {
3874  make_edge(get<0>(output.output_ports()), input);
3875 }
3876 
3877 //Makes an edge from a sender to port 0 of a multi-input successor.
3878 template< typename S, typename V,
3879  typename = typename V::input_ports_type >
3880 inline void make_edge( sender<S>& output, V& input) {
3881  make_edge(output, get<0>(input.input_ports()));
3882 }
3883 #endif
3884 
3885 #if __TBB_PREVIEW_ASYNC_MSG
3887 #else
3888 template< typename T >
3889 inline void internal_remove_edge( sender<T> &p, receiver<T> &s ) {
3890 #endif
3891  p.remove_successor( s );
3892 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3893  // TODO: should we try to remove p from the predecessor list of s, in case the edge is reversed?
3894  p.internal_delete_built_successor(s);
3895  s.internal_delete_built_predecessor(p);
3896 #endif
3898 }
3899 
3901 template< typename T >
3902 inline void remove_edge( sender<T> &p, receiver<T> &s ) {
3903  internal_remove_edge( p, s );
3904 }
3905 
3906 #if __TBB_PREVIEW_ASYNC_MSG
3907 template< typename TS, typename TR,
3910 inline void remove_edge( TS &p, TR &s ) {
3911  internal_remove_edge( p, s );
3912 }
3913 
3914 template< typename T >
3916  internal_remove_edge( p, s );
3917 }
3918 
3919 template< typename T >
3921  internal_remove_edge( p, s );
3922 }
3923 #endif // __TBB_PREVIEW_ASYNC_MSG
3924 
3925 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
3926 //Removes an edge between port 0 of a multi-output predecessor and port 0 of a multi-input successor.
3927 template< typename T, typename V,
3928  typename = typename T::output_ports_type, typename = typename V::input_ports_type >
3929 inline void remove_edge( T& output, V& input) {
3930  remove_edge(get<0>(output.output_ports()), get<0>(input.input_ports()));
3931 }
3932 
3933 //Removes an edge between port 0 of a multi-output predecessor and a receiver.
3934 template< typename T, typename R,
3935  typename = typename T::output_ports_type >
3936 inline void remove_edge( T& output, receiver<R>& input) {
3937  remove_edge(get<0>(output.output_ports()), input);
3938 }
3939 //Removes an edge between a sender and port 0 of a multi-input successor.
3940 template< typename S, typename V,
3941  typename = typename V::input_ports_type >
3942 inline void remove_edge( sender<S>& output, V& input) {
3943  remove_edge(output, get<0>(input.input_ports()));
3944 }
3945 #endif
3946 
3947 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3948 template<typename C >
3949 template< typename S >
3950 void internal::edge_container<C>::sender_extract( S &s ) {
3951  edge_list_type e = built_edges;
3952  for ( typename edge_list_type::iterator i = e.begin(); i != e.end(); ++i ) {
3953  remove_edge(s, **i);
3954  }
3955 }
3956 
3957 template<typename C >
3958 template< typename R >
3959 void internal::edge_container<C>::receiver_extract( R &r ) {
3960  edge_list_type e = built_edges;
3961  for ( typename edge_list_type::iterator i = e.begin(); i != e.end(); ++i ) {
3962  remove_edge(**i, r);
3963  }
3964 }
3965 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
3966 
3968 template< typename Body, typename Node >
3969 Body copy_body( Node &n ) {
3970  return n.template copy_function_object<Body>();
3971 }
3972 
3973 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
3974 
3975 //composite_node
3976 template< typename InputTuple, typename OutputTuple > class composite_node;
3977 
3978 template< typename... InputTypes, typename... OutputTypes>
3979 class composite_node <tbb::flow::tuple<InputTypes...>, tbb::flow::tuple<OutputTypes...> > : public graph_node{
3980 
3981 public:
3982  typedef tbb::flow::tuple< receiver<InputTypes>&... > input_ports_type;
3983  typedef tbb::flow::tuple< sender<OutputTypes>&... > output_ports_type;
3984 
3985 private:
3986  std::unique_ptr<input_ports_type> my_input_ports;
3987  std::unique_ptr<output_ports_type> my_output_ports;
3988 
3989  static const size_t NUM_INPUTS = sizeof...(InputTypes);
3990  static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
3991 
3992 protected:
3994 
3995 public:
3996 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3997  composite_node( graph &g, const char *type_name = "composite_node" ) : graph_node(g) {
3998  tbb::internal::fgt_multiinput_multioutput_node( CODEPTR(), tbb::internal::FLOW_COMPOSITE_NODE, this, &this->my_graph );
4000  }
4001 #else
4003  tbb::internal::fgt_multiinput_multioutput_node( CODEPTR(), tbb::internal::FLOW_COMPOSITE_NODE, this, &this->my_graph );
4004  }
4005 #endif
4006 
4007  template<typename T1, typename T2>
4008  void set_external_ports(T1&& input_ports_tuple, T2&& output_ports_tuple) {
4009  __TBB_STATIC_ASSERT(NUM_INPUTS == tbb::flow::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
4010  __TBB_STATIC_ASSERT(NUM_OUTPUTS == tbb::flow::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
4011  my_input_ports = tbb::internal::make_unique<input_ports_type>(std::forward<T1>(input_ports_tuple));
4012  my_output_ports = tbb::internal::make_unique<output_ports_type>(std::forward<T2>(output_ports_tuple));
4013 
4016  }
4017 
4018  template< typename... NodeTypes >
4019  void add_visible_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, true, n...); }
4020 
4021  template< typename... NodeTypes >
4022  void add_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, false, n...); }
4023 
4024 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
4025  void set_name( const char *name ) __TBB_override {
4027  }
4028 #endif
4029 
4031  __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
4032  return *my_input_ports;
4033  }
4034 
4036  __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
4037  return *my_output_ports;
4038  }
4039 
4040 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4041  void extract() __TBB_override {
4042  __TBB_ASSERT(false, "Current composite_node implementation does not support extract");
4043  }
4044 #endif
4045 }; // class composite_node
4046 
4047 //composite_node with only input ports
4048 template< typename... InputTypes>
4049 class composite_node <tbb::flow::tuple<InputTypes...>, tbb::flow::tuple<> > : public graph_node {
4050 public:
4051  typedef tbb::flow::tuple< receiver<InputTypes>&... > input_ports_type;
4052 
4053 private:
4054  std::unique_ptr<input_ports_type> my_input_ports;
4055  static const size_t NUM_INPUTS = sizeof...(InputTypes);
4056 
4057 protected:
4059 
4060 public:
4061 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
4062  composite_node( graph &g, const char *type_name = "composite_node") : graph_node(g) {
4063  tbb::internal::fgt_composite( CODEPTR(), this, &g );
4065  }
4066 #else
4068  tbb::internal::fgt_composite( CODEPTR(), this, &g );
4069  }
4070 #endif
4071 
4072  template<typename T>
4073  void set_external_ports(T&& input_ports_tuple) {
4074  __TBB_STATIC_ASSERT(NUM_INPUTS == tbb::flow::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
4075 
4076  my_input_ports = tbb::internal::make_unique<input_ports_type>(std::forward<T>(input_ports_tuple));
4077 
4078  tbb::internal::fgt_internal_input_alias_helper<T, NUM_INPUTS>::alias_port( this, std::forward<T>(input_ports_tuple));
4079  }
4080 
4081  template< typename... NodeTypes >
4082  void add_visible_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, true, n...); }
4083 
4084  template< typename... NodeTypes >
4085  void add_nodes( const NodeTypes&... n) { internal::add_nodes_impl(this, false, n...); }
4086 
4087 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
4088  void set_name( const char *name ) __TBB_override {
4090  }
4091 #endif
4092 
4094  __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
4095  return *my_input_ports;
4096  }
4097 
4098 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4099  void extract() __TBB_override {
4100  __TBB_ASSERT(false, "Current composite_node implementation does not support extract");
4101  }
4102 #endif
4103 
4104 }; // class composite_node
4105 
4106 //composite_nodes with only output_ports
4107 template<typename... OutputTypes>
4108 class composite_node <tbb::flow::tuple<>, tbb::flow::tuple<OutputTypes...> > : public graph_node {
4109 public:
4110  typedef tbb::flow::tuple< sender<OutputTypes>&... > output_ports_type;
4111 
4112 private:
4113  std::unique_ptr<output_ports_type> my_output_ports;
4114  static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
4115 
4116 protected:
4118 
4119 public:
4120 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
4121  __TBB_NOINLINE_SYM composite_node( graph &g, const char *type_name = "composite_node") : graph_node(g) {
4122  tbb::internal::fgt_composite( CODEPTR(), this, &g );
4124  }
4125 #else
4127  tbb::internal::fgt_composite( CODEPTR(), this, &g );
4128  }
4129 #endif
4130 
4131  template<typename T>
4132  void set_external_ports(T&& output_ports_tuple) {
4133  __TBB_STATIC_ASSERT(NUM_OUTPUTS == tbb::flow::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
4134 
4135  my_output_ports = tbb::internal::make_unique<output_ports_type>(std::forward<T>(output_ports_tuple));
4136 
4137  tbb::internal::fgt_internal_output_alias_helper<T, NUM_OUTPUTS>::alias_port( this, std::forward<T>(output_ports_tuple));
4138  }
4139 
4140  template<typename... NodeTypes >
4141  void add_visible_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, true, n...); }
4142 
4143  template<typename... NodeTypes >
4144  void add_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, false, n...); }
4145 
4146 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
4147  void set_name( const char *name ) __TBB_override {
4149  }
4150 #endif
4151 
4153  __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
4154  return *my_output_ports;
4155  }
4156 
4157 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4158  void extract() __TBB_override {
4159  __TBB_ASSERT(false, "Current composite_node implementation does not support extract");
4160  }
4161 #endif
4162 
4163 }; // class composite_node
4164 
4165 #endif // __TBB_FLOW_GRAPH_CPP11_FEATURES
4166 
4167 namespace internal {
4168 
4169 template<typename Gateway>
4171 public:
4172  typedef Gateway gateway_type;
4173 
4174  async_body_base(gateway_type *gateway): my_gateway(gateway) { }
4175  void set_gateway(gateway_type *gateway) {
4176  my_gateway = gateway;
4177  }
4178 
4179 protected:
4181 };
4182 
4183 template<typename Input, typename Ports, typename Gateway, typename Body>
4184 class async_body: public async_body_base<Gateway> {
4185 public:
4187  typedef Gateway gateway_type;
4188 
4189  async_body(const Body &body, gateway_type *gateway)
4190  : base_type(gateway), my_body(body) { }
4191 
4192  void operator()( const Input &v, Ports & ) {
4193  my_body(v, *this->my_gateway);
4194  }
4195 
4196  Body get_body() { return my_body; }
4197 
4198 private:
4199  Body my_body;
4200 };
4201 
4202 } // namespace internal
4203 
4204 } // namespace interfaceX
4205 namespace interface11 {
4206 
4208 template < typename Input, typename Output,
4209  typename Policy = queueing_lightweight,
4210  typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(Input) >
4212  : public multifunction_node< Input, tuple< Output >, Policy, Allocator >, public sender< Output >
4213 {
4214 #if !TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
4217  "Allocator template parameter for flow graph nodes is deprecated and will removed in the future. "
4218  "To temporary enable the deprecated interface specify TBB_ENABLE_DEPRECATED_NODE_ALLOCATOR."
4219  );
4220 #endif
4223 
4224 public:
4225  typedef Input input_type;
4226  typedef Output output_type;
4233 
4234 private:
4238  // TODO: pass value by copy since we do not want to block asynchronous thread.
4239  const Output *value;
4240  bool result;
4241  try_put_functor(output_port_type &p, const Output &v) : port(&p), value(&v), result(false) { }
4242  void operator()() {
4243  result = port->try_put(*value);
4244  }
4245  };
4246 
4247  class receiver_gateway_impl: public receiver_gateway<Output> {
4248  public:
4249  receiver_gateway_impl(async_node* node): my_node(node) {}
4251  tbb::internal::fgt_async_reserve(static_cast<typename async_node::receiver_type *>(my_node), &my_node->my_graph);
4252  my_node->my_graph.reserve_wait();
4253  }
4254 
4256  my_node->my_graph.release_wait();
4257  tbb::internal::fgt_async_commit(static_cast<typename async_node::receiver_type *>(my_node), &my_node->my_graph);
4258  }
4259 
4261  bool try_put(const Output &i) __TBB_override {
4262  return my_node->try_put_impl(i);
4263  }
4264 
4265  private:
4267  } my_gateway;
4268 
4269  //The substitute of 'this' for member construction, to prevent compiler warnings
4270  async_node* self() { return this; }
4271 
4273  bool try_put_impl(const Output &i) {
4274  internal::multifunction_output<Output> &port_0 = internal::output_port<0>(*this);
4275  internal::broadcast_cache<output_type>& port_successors = port_0.successors();
4277  task_list tasks;
4278  bool is_at_least_one_put_successful = port_successors.gather_successful_try_puts(i, tasks);
4279  __TBB_ASSERT( is_at_least_one_put_successful || tasks.empty(),
4280  "Return status is inconsistent with the method operation." );
4281 
4282  while( !tasks.empty() ) {
4283  internal::enqueue_in_graph_arena(this->my_graph, tasks.pop_front());
4284  }
4285  tbb::internal::fgt_async_try_put_end(this, &port_0);
4286  return is_at_least_one_put_successful;
4287  }
4288 
4289 public:
4290  template<typename Body>
4292  graph &g, size_t concurrency,
4294  Body body, __TBB_FLOW_GRAPH_PRIORITY_ARG1(Policy = Policy(), node_priority_t priority = tbb::flow::internal::no_priority)
4295 #else
4297 #endif
4298  ) : base_type(
4299  g, concurrency,
4300  internal::async_body<Input, typename base_type::output_ports_type, gateway_type, Body>
4301  (body, &my_gateway) __TBB_FLOW_GRAPH_PRIORITY_ARG0(priority) ), my_gateway(self()) {
4302  tbb::internal::fgt_multioutput_node_with_body<1>(
4303  CODEPTR(), tbb::internal::FLOW_ASYNC_NODE,
4304  &this->my_graph, static_cast<receiver<input_type> *>(this),
4305  this->output_ports(), this->my_body
4306  );
4307  }
4308 
4309 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
4310  template <typename Body, typename... Args>
4311  __TBB_NOINLINE_SYM async_node(graph& g, size_t concurrency, Body body, node_priority_t priority)
4312  : async_node(g, concurrency, body, Policy(), priority) {}
4313 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
4314 
4315 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
4316  template <typename Body, typename... Args>
4318  const node_set<Args...>& nodes, size_t concurrency, Body body,
4320  ) : async_node(nodes.graph_reference(), concurrency, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority)) {
4321  make_edges_in_order(nodes, *this);
4322  }
4323 
4324 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
4325  template <typename Body, typename... Args>
4326  __TBB_NOINLINE_SYM async_node(const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t priority)
4327  : async_node(nodes, concurrency, body, Policy(), priority) {}
4328 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
4329 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
4330 
4331  __TBB_NOINLINE_SYM async_node( const async_node &other ) : base_type(other), sender<Output>(), my_gateway(self()) {
4332  static_cast<async_body_base_type*>(this->my_body->get_body_ptr())->set_gateway(&my_gateway);
4333  static_cast<async_body_base_type*>(this->my_init_body->get_body_ptr())->set_gateway(&my_gateway);
4334 
4335  tbb::internal::fgt_multioutput_node_with_body<1>( CODEPTR(), tbb::internal::FLOW_ASYNC_NODE,
4336  &this->my_graph, static_cast<receiver<input_type> *>(this),
4337  this->output_ports(), this->my_body );
4338  }
4339 
4341  return my_gateway;
4342  }
4343 
4344 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
4345  void set_name( const char *name ) __TBB_override {
4347  }
4348 #endif
4349 
4350  // Define sender< Output >
4351 
4354  return internal::output_port<0>(*this).register_successor(r);
4355  }
4356 
4359  return internal::output_port<0>(*this).remove_successor(r);
4360  }
4361 
4362  template<typename Body>
4366  mfn_body_type &body_ref = *this->my_body;
4367  async_body_type ab = *static_cast<async_body_type*>(dynamic_cast< internal::multifunction_body_leaf<input_type, typename base_type::output_ports_type, async_body_type> & >(body_ref).get_body_ptr());
4368  return ab.get_body();
4369  }
4370 
4371 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4372  typedef typename internal::edge_container<successor_type> built_successors_type;
4374  typedef typename built_successors_type::edge_list_type successor_list_type;
4375  built_successors_type &built_successors() __TBB_override {
4376  return internal::output_port<0>(*this).built_successors();
4377  }
4378 
4379  void internal_add_built_successor( successor_type &r ) __TBB_override {
4380  internal::output_port<0>(*this).internal_add_built_successor(r);
4381  }
4382 
4383  void internal_delete_built_successor( successor_type &r ) __TBB_override {
4384  internal::output_port<0>(*this).internal_delete_built_successor(r);
4385  }
4386 
4387  void copy_successors( successor_list_type &l ) __TBB_override {
4388  internal::output_port<0>(*this).copy_successors(l);
4389  }
4390 
4391  size_t successor_count() __TBB_override {
4392  return internal::output_port<0>(*this).successor_count();
4393  }
4394 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
4395 
4396 protected:
4397 
4399  base_type::reset_node(f);
4400  }
4401 };
4402 
4403 #if __TBB_PREVIEW_STREAMING_NODE
4405 #endif // __TBB_PREVIEW_STREAMING_NODE
4406 
4408 
4409 template< typename T >
4410 class overwrite_node : public graph_node, public receiver<T>, public sender<T> {
4411 public:
4412  typedef T input_type;
4413  typedef T output_type;
4416 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4417  typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
4418  typedef typename sender<output_type>::built_successors_type built_successors_type;
4419  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
4420  typedef typename sender<output_type>::successor_list_type successor_list_type;
4421 #endif
4422 
4423  __TBB_NOINLINE_SYM explicit overwrite_node(graph &g) : graph_node(g), my_buffer_is_valid(false) {
4424  my_successors.set_owner( this );
4425  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_OVERWRITE_NODE, &this->my_graph,
4426  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
4427  }
4428 
4429 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
4430  template <typename... Args>
4431  overwrite_node(const node_set<Args...>& nodes) : overwrite_node(nodes.graph_reference()) {
4432  make_edges_in_order(nodes, *this);
4433  }
4434 #endif
4435 
4438  graph_node(src.my_graph), receiver<T>(), sender<T>(), my_buffer_is_valid(false)
4439  {
4440  my_successors.set_owner( this );
4441  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_OVERWRITE_NODE, &this->my_graph,
4442  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
4443  }
4444 
4446 
4447 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
4448  void set_name( const char *name ) __TBB_override {
4450  }
4451 #endif
4452 
4454  spin_mutex::scoped_lock l( my_mutex );
4455  if (my_buffer_is_valid && internal::is_graph_active( my_graph )) {
4456  // We have a valid value that must be forwarded immediately.
4457  bool ret = s.try_put( my_buffer );
4458  if ( ret ) {
4459  // We add the successor that accepted our put
4460  my_successors.register_successor( s );
4461  } else {
4462  // In case of reservation a race between the moment of reservation and register_successor can appear,
4463  // because failed reserve does not mean that register_successor is not ready to put a message immediately.
4464  // We have some sort of infinite loop: reserving node tries to set pull state for the edge,
4465  // but overwrite_node tries to return push state back. That is why we have to break this loop with task creation.
4466  task *rtask = new ( task::allocate_additional_child_of( *( my_graph.root_task() ) ) )
4467  register_predecessor_task( *this, s );
4468  internal::spawn_in_graph_arena( my_graph, *rtask );
4469  }
4470  } else {
4471  // No valid value yet, just add as successor
4472  my_successors.register_successor( s );
4473  }
4474  return true;
4475  }
4476 
4478  spin_mutex::scoped_lock l( my_mutex );
4479  my_successors.remove_successor(s);
4480  return true;
4481  }
4482 
4483 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4484  built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
4485  built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
4486 
4487  void internal_add_built_successor( successor_type &s) __TBB_override {
4488  spin_mutex::scoped_lock l( my_mutex );
4489  my_successors.internal_add_built_successor(s);
4490  }
4491 
4492  void internal_delete_built_successor( successor_type &s) __TBB_override {
4493  spin_mutex::scoped_lock l( my_mutex );
4494  my_successors.internal_delete_built_successor(s);
4495  }
4496 
4497  size_t successor_count() __TBB_override {
4498  spin_mutex::scoped_lock l( my_mutex );
4499  return my_successors.successor_count();
4500  }
4501 
4502  void copy_successors(successor_list_type &v) __TBB_override {
4503  spin_mutex::scoped_lock l( my_mutex );
4504  my_successors.copy_successors(v);
4505  }
4506 
4507  void internal_add_built_predecessor( predecessor_type &p) __TBB_override {
4508  spin_mutex::scoped_lock l( my_mutex );
4509  my_built_predecessors.add_edge(p);
4510  }
4511 
4512  void internal_delete_built_predecessor( predecessor_type &p) __TBB_override {
4513  spin_mutex::scoped_lock l( my_mutex );
4514  my_built_predecessors.delete_edge(p);
4515  }
4516 
4517  size_t predecessor_count() __TBB_override {
4518  spin_mutex::scoped_lock l( my_mutex );
4519  return my_built_predecessors.edge_count();
4520  }
4521 
4522  void copy_predecessors( predecessor_list_type &v ) __TBB_override {
4523  spin_mutex::scoped_lock l( my_mutex );
4524  my_built_predecessors.copy_edges(v);
4525  }
4526 
4527  void extract() __TBB_override {
4528  my_buffer_is_valid = false;
4529  built_successors().sender_extract(*this);
4530  built_predecessors().receiver_extract(*this);
4531  }
4532 
4533 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
4534 
4536  spin_mutex::scoped_lock l( my_mutex );
4537  if ( my_buffer_is_valid ) {
4538  v = my_buffer;
4539  return true;
4540  }
4541  return false;
4542  }
4543 
4546  return try_get(v);
4547  }
4548 
4550  bool try_release() __TBB_override { return true; }
4551 
4553  bool try_consume() __TBB_override { return true; }
4554 
4555  bool is_valid() {
4556  spin_mutex::scoped_lock l( my_mutex );
4557  return my_buffer_is_valid;
4558  }
4559 
4560  void clear() {
4561  spin_mutex::scoped_lock l( my_mutex );
4562  my_buffer_is_valid = false;
4563  }
4564 
4565 protected:
4566 
4567  template< typename R, typename B > friend class run_and_put_task;
4568  template<typename X, typename Y> friend class internal::broadcast_cache;
4569  template<typename X, typename Y> friend class internal::round_robin_cache;
4572  return try_put_task_impl(v);
4573  }
4574 
4576  my_buffer = v;
4577  my_buffer_is_valid = true;
4578  task * rtask = my_successors.try_put_task(v);
4579  if (!rtask) rtask = SUCCESSFULLY_ENQUEUED;
4580  return rtask;
4581  }
4582 
4584  return my_graph;
4585  }
4586 
4589 
4591  o(owner), s(succ) {};
4592 
4594  if (!s.register_predecessor(o)) {
4595  o.register_successor(s);
4596  }
4597  return NULL;
4598  }
4599 
4602  };
4603 
4606 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4607  internal::edge_container<predecessor_type> my_built_predecessors;
4608 #endif
4612 
4614  my_buffer_is_valid = false;
4615  if (f&rf_clear_edges) {
4616  my_successors.clear();
4617  }
4618  }
4619 }; // overwrite_node
4620 
4621 template< typename T >
4622 class write_once_node : public overwrite_node<T> {
4623 public:
4624  typedef T input_type;
4625  typedef T output_type;
4629 
4632  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_WRITE_ONCE_NODE, &(this->my_graph),
4633  static_cast<receiver<input_type> *>(this),
4634  static_cast<sender<output_type> *>(this) );
4635  }
4636 
4637 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
4638  template <typename... Args>
4639  write_once_node(const node_set<Args...>& nodes) : write_once_node(nodes.graph_reference()) {
4640  make_edges_in_order(nodes, *this);
4641  }
4642 #endif
4643 
4646  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_WRITE_ONCE_NODE, &(this->my_graph),
4647  static_cast<receiver<input_type> *>(this),
4648  static_cast<sender<output_type> *>(this) );
4649  }
4650 
4651 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
4652  void set_name( const char *name ) __TBB_override {
4654  }
4655 #endif
4656 
4657 protected:
4658  template< typename R, typename B > friend class run_and_put_task;
4659  template<typename X, typename Y> friend class internal::broadcast_cache;
4660  template<typename X, typename Y> friend class internal::round_robin_cache;
4662  spin_mutex::scoped_lock l( this->my_mutex );
4663  return this->my_buffer_is_valid ? NULL : this->try_put_task_impl(v);
4664  }
4665 };
4666 
4667 } // interfaceX
4668 
4673 
4674  using interface11::graph;
4675  using interface11::graph_node;
4676  using interface11::continue_msg;
4677  using interface11::source_node;
4678  using interface11::input_node;
4679  using interface11::function_node;
4680  using interface11::multifunction_node;
4681  using interface11::split_node;
4683  using interface11::indexer_node;
4684  using interface11::internal::tagged_msg;
4687  using interface11::continue_node;
4688  using interface11::overwrite_node;
4689  using interface11::write_once_node;
4690  using interface11::broadcast_node;
4691  using interface11::buffer_node;
4692  using interface11::queue_node;
4693  using interface11::sequencer_node;
4694  using interface11::priority_queue_node;
4695  using interface11::limiter_node;
4696  using namespace interface11::internal::graph_policy_namespace;
4697  using interface11::join_node;
4699  using interface11::copy_body;
4700  using interface11::make_edge;
4703 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
4704  using interface11::composite_node;
4705 #endif
4706  using interface11::async_node;
4707 #if __TBB_PREVIEW_ASYNC_MSG
4708  using interface11::async_msg;
4709 #endif
4710 #if __TBB_PREVIEW_STREAMING_NODE
4711  using interface11::port_ref;
4713 #endif // __TBB_PREVIEW_STREAMING_NODE
4714 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
4716  using internal::no_priority;
4717 #endif
4718 
4719 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
4720  using interface11::internal::follows;
4721  using interface11::internal::precedes;
4722  using interface11::internal::make_node_set;
4723  using interface11::internal::make_edges;
4724 #endif
4725 
4726 } // flow
4727 } // tbb
4728 
4729 // Include deduction guides for node classes
4731 
4732 #undef __TBB_PFG_RESET_ARG
4733 #undef __TBB_COMMA
4734 #undef __TBB_DEFAULT_NODE_ALLOCATOR
4735 
4737 #undef __TBB_flow_graph_H_include_area
4738 
4739 #if TBB_USE_THREADING_TOOLS && TBB_PREVIEW_FLOW_GRAPH_TRACE && ( __linux__ || __APPLE__ )
4740  #undef __TBB_NOINLINE_SYM
4741 #endif
4742 
4743 #endif // __TBB_flow_graph_H
virtual task * try_put_task_wrapper(const void *p, bool is_async)=0
void handle_operations_impl(buffer_operation *op_list, derived_type *derived)
Definition: flow_graph.h:2134
__TBB_STATIC_ASSERT((tbb::internal::is_same_type< Allocator, null_type >::value), "Allocator template parameter for flow graph nodes is deprecated and will be removed. " "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface.")
tbb::task * execute() __TBB_override
Should be overridden by derived classes.
Definition: flow_graph.h:4593
bool try_reserve(output_type &v) __TBB_override
Reserves an item.
Definition: flow_graph.h:1306
bool try_reserve(T &v) __TBB_override
Reserves an item.
Definition: flow_graph.h:2487
bool try_get(output_type &v) __TBB_override
Request an item from the node.
Definition: flow_graph.h:1004
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:2630
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp end
virtual bool try_release()
Releases the reserved item.
Definition: flow_graph.h:326
__TBB_NOINLINE_SYM indexer_node(graph &g)
Definition: flow_graph.h:3519
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:1896
__TBB_STATIC_ASSERT((tbb::internal::is_same_type< Allocator, null_type >::value), "Allocator template parameter for flow graph nodes is deprecated and will be removed. " "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface.")
virtual bool register_successor(successor_type &r)=0
Add a new successor to this node.
Used to form groups of tasks.
Definition: task.h:358
bool register_successor(successor_type &r) __TBB_override
Add a new successor to this node.
Definition: flow_graph.h:1248
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6)
Definition: flow_graph.h:3414
void reserve_wait() __TBB_override
Used to register that an external entity may still interact with the graph.
Definition: flow_graph.h:806
graph & graph_reference() const __TBB_override
Definition: flow_graph.h:2538
bool remove_successor(successor_type &r) __TBB_override
Removes s as a successor.
Definition: flow_graph.h:1958
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8, T9 > output_type
Definition: flow_graph.h:3794
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:1559
tbb::task_group_context * my_context
bool try_get(output_type &v) __TBB_override
Request an item from the node.
Definition: flow_graph.h:1289
bool gather_successful_try_puts(const X &t, task_list &tasks)
Definition: flow_graph.h:511
__TBB_NOINLINE_SYM split_node(graph &g)
Definition: flow_graph.h:1708
bool is_continue_receiver() __TBB_override
Definition: flow_graph.h:713
task * try_put_task(const input_type &) __TBB_override
Definition: flow_graph.h:672
task * apply_body_bypass()
Applies the body. Returning SUCCESSFULLY_ENQUEUED okay; forward_task_bypass will handle it.
Definition: flow_graph.h:1438
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3631
task * try_put_task(const T &t) __TBB_override
Puts an item to this receiver.
Definition: flow_graph.h:3233
void operator()(const Input &v, Ports &)
Definition: flow_graph.h:4192
#define __TBB_DEPRECATED_LIMITER_ARG2(arg1, arg2)
__TBB_DEPRECATED typedef T output_type
The output type of this sender.
Definition: flow_graph.h:428
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:1675
void call(F &&f, Pack &&p)
Calls the given function with arguments taken from a stored_pack.
__TBB_NOINLINE_SYM queue_node(graph &g)
Constructor.
Definition: flow_graph.h:2634
internal::broadcast_cache< output_type > & successors() __TBB_override
Definition: flow_graph.h:1894
__TBB_NOINLINE_SYM multifunction_node(graph &g, size_t concurrency,)
Definition: flow_graph.h:1618
#define __TBB_NOINLINE_SYM
Definition: flow_graph.h:47
void prepare_task_arena(bool reinit=false)
__TBB_DEPRECATED bool register_predecessor(predecessor_type &) __TBB_override
Increments the trigger threshold.
Definition: flow_graph.h:624
void internal_release(prio_operation *op) __TBB_override
Definition: flow_graph.h:2854
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:2682
virtual task * forward_task()
This is executed by an enqueued task, the "forwarder".
Definition: flow_graph.h:2196
__TBB_DEPRECATED bool remove_predecessor(predecessor_type &) __TBB_override
Decrements the trigger threshold.
Definition: flow_graph.h:634
void internal_remove_edge(internal::untyped_sender &p, internal::untyped_receiver &s)
Definition: flow_graph.h:3886
__TBB_NOINLINE_SYM broadcast_node(graph &g)
Definition: flow_graph.h:1923
__TBB_DEPRECATED typedef receiver< input_type >::predecessor_type predecessor_type
The predecessor type for this node.
Definition: flow_graph.h:606
bool try_release() __TBB_override
Release a reserved item.
Definition: flow_graph.h:2497
bool register_successor(successor_type &s) __TBB_override
Add a new successor to this node.
Definition: flow_graph.h:4453
static task * emit_this(graph &g, const T &t, P &p)
Definition: flow_graph.h:733
graph & graph_reference() const __TBB_override
Definition: flow_graph.h:3260
buffer_node< T, Allocator >::buffer_operation prio_operation
Definition: flow_graph.h:2805
#define CODEPTR()
#define __TBB_DEPRECATED_LIMITER_ARG4(arg1, arg2, arg3, arg4)
__TBB_NOINLINE_SYM join_node(const join_node &other)
Definition: flow_graph.h:3343
static void fgt_async_reserve(void *, void *)
A list of children.
Definition: task.h:1074
base_type::size_type size_type
Definition: flow_graph.h:2578
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:2553
__TBB_NOINLINE_SYM write_once_node(const write_once_node &src)
Copy constructor: call base class copy constructor.
Definition: flow_graph.h:4645
virtual void internal_rem_succ(buffer_operation *op)
Remove successor.
Definition: flow_graph.h:2219
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3532
virtual bool register_predecessor(predecessor_type &)
Add a predecessor to the node.
Definition: flow_graph.h:389
internal::continue_input< Output, Policy > input_impl_type
Definition: flow_graph.h:1786
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3651
Base class for user-defined tasks.
Definition: task.h:615
#define __TBB_FLOW_GRAPH_PRIORITY_ARG0(priority)
bool remove_successor(successor_type &s) __TBB_override
Removes a successor from this node.
Definition: flow_graph.h:4477
bool remove_predecessor(predecessor_type &src) __TBB_override
Removes src from the list of cached predecessors.
Definition: flow_graph.h:3222
task * decrement_counter(long long delta)
Definition: flow_graph.h:3076
virtual task * try_put_task_wrapper(const void *p, bool is_async) __TBB_override
Definition: flow_graph.h:481
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:4613
static void fgt_graph(void *)
An executable node that acts as a source, i.e. it has no predecessors.
Definition: flow_graph.h:1187
static void fgt_make_edge(void *, void *)
void enqueue_in_graph_arena(tbb::flow::interface10::graph &g, tbb::task &arena_task)
Enqueues a task inside graph arena.
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:2989
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type size_t void ITT_FORMAT p const __itt_domain __itt_id __itt_string_handle const wchar_t size_t ITT_FORMAT lu const __itt_domain __itt_id __itt_relation __itt_id ITT_FORMAT p const wchar_t int ITT_FORMAT __itt_group_mark S
receiver< TupleType > base_type
Definition: flow_graph.h:1683
buffer_node< T, Allocator > base_type
Definition: flow_graph.h:2760
task * try_put_task(const T &t) __TBB_override
receive an item, return a task *if possible
Definition: flow_graph.h:2519
reference operator *() const
Dereference.
Definition: flow_graph.h:755
Forward declaration section.
Definition: flow_graph.h:117
__TBB_STATIC_ASSERT((tbb::internal::is_same_type< Allocator, null_type >::value), "Allocator template parameter for flow graph nodes is deprecated and will be removed. " "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface.")
limiter_node(graph &g, __TBB_DEPRECATED_LIMITER_ARG2(size_t threshold, int num_decrement_predecessors=0))
Constructor.
Definition: flow_graph.h:3110
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp begin
Implements methods for both executable and function nodes that puts Output to its successors.
Definition: flow_graph.h:854
The two-phase join port.
void add_nodes_impl(CompositeType *, bool)
Definition: flow_graph.h:958
__TBB_DEPRECATED internal::port_ref_impl< N1, N2 > port_ref()
Definition: flow_graph.h:42
void reserve_wait() __TBB_override
Inform a graph that messages may come from outside, to prevent premature graph completion.
Definition: flow_graph.h:4250
static void fgt_remove_edge(void *, void *)
fOutput_type::successor_type successor_type
Definition: flow_graph.h:1484
__TBB_NOINLINE_SYM async_node(const async_node &other)
Definition: flow_graph.h:4331
static void fgt_composite(void *, void *, void *)
bool try_reserve(T &v) __TBB_override
Reserves an item.
Definition: flow_graph.h:4545
void spawn_in_graph_arena(tbb::flow::interface10::graph &g, tbb::task &arena_task)
Spawns a task inside graph arena.
__TBB_NOINLINE_SYM async_node(graph &g, size_t concurrency,)
Definition: flow_graph.h:4291
internal::reservable_predecessor_cache< T, spin_mutex > my_predecessors
Definition: flow_graph.h:3003
graph & graph_reference() const __TBB_override
Definition: flow_graph.h:4583
__TBB_NOINLINE_SYM source_node(graph &g, Body body, bool is_active=true)
Constructor for a node with a successor.
Definition: flow_graph.h:1207
bool try_reserve_apply_body(output_type &v)
Definition: flow_graph.h:1110
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:2683
void try_put_and_add_task(task *&last_task)
Definition: flow_graph.h:2589
bool try_consume() __TBB_override
Consumes a reserved item.
Definition: flow_graph.h:1049
A lock that occupies a single byte.
Definition: spin_mutex.h:39
virtual bool remove_successor(successor_type &r)=0
Removes a successor from this node.
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5, T6, T7 > output_type
Definition: flow_graph.h:3722
untyped_receiver successor_type
The successor type for this node.
Definition: flow_graph.h:311
internal::broadcast_cache< T > my_successors
Definition: flow_graph.h:3005
static void fgt_graph_desc(void *, const char *)
bool enqueue_forwarding_task(buffer_operation &op_data)
Definition: flow_graph.h:2186
void reset(tbb::flow::interface11::reset_flags f=tbb::flow::interface11::rf_reset_protocol)
Definition: flow_graph.h:843
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3584
task * try_put_task(const input_type &v) __TBB_override
Put item to successor; return task to run the successor if possible.
Definition: flow_graph.h:4570
Base class for types that should not be assigned.
Definition: tbb_stddef.h:322
input_impl_type::predecessor_type predecessor_type
Definition: flow_graph.h:1788
bool internal_push(prio_operation *op) __TBB_override
Definition: flow_graph.h:2816
An cache of predecessors that supports requests and reservations.
Definition: flow_graph.h:131
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4)
Definition: flow_graph.h:3397
task & pop_front()
Pop the front task from the list.
Definition: task.h:1109
__TBB_NOINLINE_SYM sequencer_node(graph &g, const Sequencer &s)
Constructor.
Definition: flow_graph.h:2687
internal::source_body< output_type > * my_init_body
Definition: flow_graph.h:1391
Forwards messages of type T to all successors.
Definition: flow_graph.h:1905
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3809
void internal_pop(queue_operation *op) __TBB_override
Definition: flow_graph.h:2604
Breaks an infinite loop between the node reservation and register_successor call.
Definition: flow_graph.h:4588
internal::function_input< input_type, output_type, Policy, internals_allocator > input_impl_type
Definition: flow_graph.h:1480
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2)
Definition: flow_graph.h:3387
static const void * to_void_ptr(const T &t)
Definition: flow_graph.h:229
Output output_type
The type of the output message, which is complete.
Definition: flow_graph.h:1192
const_iterator cbegin() const
start const iterator
Definition: flow_graph.h:874
A cache of successors that are broadcast to.
Definition: flow_graph.h:128
receiver_type::predecessor_type predecessor_type
Definition: flow_graph.h:4228
virtual bool internal_push(buffer_operation *op)
Definition: flow_graph.h:2323
tbb::flow::tuple_element< N, typename JNT::input_ports_type >::type & input_port(JNT &jn)
templated function to refer to input ports of the join node
Definition: flow_graph.h:1996
void try_put_and_add_task(task *&last_task)
Definition: flow_graph.h:2282
implements a function node that supports Input -> (set of outputs)
Definition: flow_graph.h:1576
#define __TBB_CPP11_PRESENT
Definition: tbb_config.h:149
virtual void internal_forward_task(buffer_operation *op)
Tries to forward valid items to successors.
Definition: flow_graph.h:2294
internal::function_input_queue< input_type, internals_allocator > input_queue_type
Definition: flow_graph.h:1613
__TBB_STATIC_ASSERT((tbb::internal::is_same_type< Allocator, null_type >::value), "Allocator template parameter for flow graph nodes is deprecated and will be removed. " "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface.")
virtual bool try_get_wrapper(void *p, bool is_async)=0
__TBB_DEPRECATED continue_receiver(const continue_receiver &src)
Copy constructor.
Definition: flow_graph.h:617
void deactivate_graph(tbb::flow::interface10::graph &g)
void reset_receiver(reset_flags f) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:698
#define __TBB_DEPRECATED
Definition: tbb_config.h:636
static void fgt_node(void *, string_index, void *, void *)
virtual void internal_reg_succ(buffer_operation *op)
Register successor.
Definition: flow_graph.h:2213
internal::decrementer< limiter_node< T, DecrementType >, DecrementType > decrement
The internal receiver< DecrementType > that decrements the count.
Definition: flow_graph.h:3101
unsigned int node_priority_t
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:2031
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:3266
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id parent
void internal_reserve(queue_operation *op) __TBB_override
Definition: flow_graph.h:2613
iterator end()
end iterator
Definition: flow_graph.h:868
class __TBB_DEPRECATED_MSG("tbb::tbb_hash is deprecated, use std::hash") tbb_hash
internal::unfolded_join_node< N, reserving_port, OutputTuple, reserving > unfolded_type
Definition: flow_graph.h:3294
#define __TBB_DEFAULT_NODE_ALLOCATOR(T)
Definition: flow_graph.h:71
Forwards messages in priority order.
Definition: flow_graph.h:2749
__TBB_NOINLINE_SYM join_node(const join_node &other)
Definition: flow_graph.h:3310
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:3262
tbb::flow::tuple_element< N, typename MOP::output_ports_type >::type & output_port(MOP &op)
Definition: flow_graph.h:719
leaf for multifunction. OutputSet can be a std::tuple or a vector.
Definition: flow_graph.h:261
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:2990
__TBB_NOINLINE_SYM source_node(const source_node &src)
Copy constructor.
Definition: flow_graph.h:1227
bool register_predecessor(predecessor_type &src) __TBB_override
Adds src to the list of cached predecessors.
Definition: flow_graph.h:3210
virtual task * execute()=0
Does whatever should happen when the threshold is reached.
internal::tagged_msg< size_t, T0 > output_type
Definition: flow_graph.h:3484
void spawn_put()
Spawns a task that applies the body.
Definition: flow_graph.h:1430
bool register_successor(successor_type &r) __TBB_override
Replace the current successor with this new successor.
Definition: flow_graph.h:3146
__TBB_NOINLINE_SYM queue_node(const queue_node &src)
Copy constructor.
Definition: flow_graph.h:2648
bool remove_successor(successor_type &r) __TBB_override
Removes a successor.
Definition: flow_graph.h:2461
void reset_receiver(reset_flags f) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:797
static internal::allocate_root_proxy allocate_root()
Returns proxy for overloaded new that allocates a root task.
Definition: task.h:663
void __TBB_store_with_release(volatile T &location, V value)
Definition: tbb_machine.h:713
static void fgt_multiinput_multioutput_node(void *, string_index, void *, void *)
graph & graph_reference() const __TBB_override
Definition: flow_graph.h:2025
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:2763
bool is_graph_active(tbb::flow::interface10::graph &g)
bool register_successor(successor_type &r) __TBB_override
Add a new successor to this node.
Definition: flow_graph.h:963
virtual bool remove_predecessor(predecessor_type &)
Remove a predecessor from the node.
Definition: flow_graph.h:392
internal::tagged_msg< size_t, T0, T1, T2 > output_type
Definition: flow_graph.h:3550
__TBB_NOINLINE_SYM write_once_node(graph &g)
Constructor.
Definition: flow_graph.h:4631
limiter_node(const limiter_node &src)
Copy constructor.
Definition: flow_graph.h:3129
__TBB_STATIC_ASSERT((tbb::internal::is_same_type< Allocator, null_type >::value), "Allocator template parameter for flow graph nodes is deprecated and will be removed. " "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface.")
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
Definition: flow_graph.h:972
#define __TBB_STATIC_ASSERT(condition, msg)
Definition: tbb_stddef.h:553
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:2631
void internal_pop(prio_operation *op) __TBB_override
Definition: flow_graph.h:2822
void internal_consume(prio_operation *op) __TBB_override
Definition: flow_graph.h:2848
void add_task_to_graph_reset_list(tbb::flow::interface10::graph &g, tbb::task *tp)
virtual void internal_reserve(buffer_operation *op)
Definition: flow_graph.h:2338
void internal_forward_task_impl(buffer_operation *op, derived_type *derived)
Definition: flow_graph.h:2299
__TBB_DEPRECATED typedef T input_type
The input type of this receiver.
Definition: flow_graph.h:467
graph & graph_reference() const __TBB_override
Definition: flow_graph.h:1752
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6, __TBB_B7 b7, __TBB_B8 b8, __TBB_B9 b9)
Definition: flow_graph.h:3441
void grow_my_array(size_t minimum_size)
Grows the internal array.
Definition: flow_graph.h:159
tbb::internal::uint64_t tag_value
Definition: flow_graph.h:31
buffer_node< T, Allocator > base_type
Definition: flow_graph.h:2577
void activate()
Activates a node that was created in the inactive state.
Definition: flow_graph.h:1061
__TBB_NOINLINE_SYM continue_node(graph &g,)
Constructor for executable node with continue_msg -> Output.
Definition: flow_graph.h:1793
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:4611
Forwards messages only if the threshold has not been reached.
Definition: flow_graph.h:121
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3617
static void fgt_release_wait(void *)
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:4229
base_type::output_ports_type output_ports_type
Definition: flow_graph.h:4232
__TBB_NOINLINE_SYM overwrite_node(graph &g)
Definition: flow_graph.h:4423
An executable node that acts as a source, i.e. it has no predecessors.
Definition: flow_graph.h:904
A task that calls a node's forward_task function.
Definition: flow_graph.h:329
void release_wait() __TBB_override
Inform a graph that a previous call to reserve_wait is no longer in effect.
Definition: flow_graph.h:4255
An empty class used for messages that mean "I'm done".
Definition: flow_graph.h:114
bool try_release() __TBB_override
Releases the reserved item.
Definition: flow_graph.h:4550
unfolded_join_node : passes input_ports_type to join_node_base. We build the input port type
Definition: flow_graph.h:1508
virtual void internal_consume(buffer_operation *op)
Definition: flow_graph.h:2347
void internal_consume(queue_operation *op) __TBB_override
Definition: flow_graph.h:2622
internal::tagged_msg< size_t, T0, T1 > output_type
Definition: flow_graph.h:3517
Implements methods for an executable node that takes continue_msg as input.
Definition: flow_graph.h:753
internal::aggregating_functor< class_type, buffer_operation > handler_type
Definition: flow_graph.h:2125
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
Definition: flow_graph.h:4358
#define __TBB_FLOW_GRAPH_PRIORITY_ARG1(arg1, priority)
static void fgt_async_try_put_begin(void *, void *)
bool try_put(const Output &i) __TBB_override
Implements gateway_type::try_put for an external activity to submit a message to FG.
Definition: flow_graph.h:4261
cache_aligned_allocator< T > internals_allocator
Definition: flow_graph.h:2055
void internal_reserve(prio_operation *op) __TBB_override
Definition: flow_graph.h:2836
multifunction_node< Input, tuple< Output >, Policy, Allocator > base_type
Definition: flow_graph.h:4221
bool internal_push(sequencer_operation *op) __TBB_override
Definition: flow_graph.h:2724
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3773
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8 > output_type
Definition: flow_graph.h:3758
concurrency
An enumeration the provides the two most common concurrency levels: unlimited and serial.
Definition: flow_graph.h:106
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:1751
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3759
internal::function_output< output_type > fOutput_type
Definition: flow_graph.h:1787
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:4628
internal::input_body< output_type > * my_init_body
Definition: flow_graph.h:1103
static void fgt_end_body(void *)
__TBB_NOINLINE_SYM input_node(graph &g, Body body)
Constructor for a node with a successor.
Definition: flow_graph.h:922
sender< output_type >::successor_type successor_type
The type of successors of this node.
Definition: flow_graph.h:910
virtual bool try_get_wrapper(void *p, bool is_async) __TBB_override
Definition: flow_graph.h:439
internal::unfolded_join_node< N, key_matching_port, OutputTuple, key_matching< K, KHash > > unfolded_type
Definition: flow_graph.h:3363
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:4398
internal::broadcast_cache< output_type > my_successors
Definition: flow_graph.h:1104
static const T & from_void_ptr(const void *p)
Definition: flow_graph.h:237
Body copy_body(Node &n)
Returns a copy of the body from a function or continue node.
Definition: flow_graph.h:3969
void release_wait() __TBB_override
Deregisters an external entity that may have interacted with the graph.
Definition: flow_graph.h:813
void const char const char int ITT_FORMAT __itt_group_sync p
internal::multifunction_output< Output > output_port_type
Definition: flow_graph.h:4236
A cache of predecessors that only supports try_get.
Definition: flow_graph.h:130
register_predecessor_task(predecessor_type &owner, successor_type &succ)
Definition: flow_graph.h:4590
receiver< input_type > receiver_type
Definition: flow_graph.h:4227
internal::round_robin_cache< T, null_rw_mutex > my_successors
Definition: flow_graph.h:2077
__TBB_DEPRECATED continue_receiver(__TBB_FLOW_GRAPH_PRIORITY_ARG1(int number_of_predecessors, node_priority_t priority))
Constructor.
Definition: flow_graph.h:609
task that does nothing. Useful for synchronization.
Definition: task.h:1042
Implements an executable node that supports continue_msg -> Output.
Definition: flow_graph.h:1781
static task * try_put_task_wrapper_impl(receiver< T > *const this_recv, const void *p, bool is_async)
Definition: flow_graph.h:245
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:4415
static void fgt_async_commit(void *, void *)
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3665
An abstract cache of successors.
Definition: flow_graph.h:127
__TBB_NOINLINE_SYM indexer_node(graph &g)
Definition: flow_graph.h:3486
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3518
virtual bool try_reserve_wrapper(void *p, bool is_async) __TBB_override
Definition: flow_graph.h:449
Implements a function node that supports Input -> Output.
Definition: flow_graph.h:1456
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5)
Definition: flow_graph.h:3405
void try_put_and_add_task(task *&last_task)
Definition: flow_graph.h:2873
internal::broadcast_cache< output_type > my_successors
Definition: flow_graph.h:1392
bool try_get(input_type &v) __TBB_override
Request an item from the sender.
Definition: flow_graph.h:4535
Pure virtual template class that defines a receiver of messages of type T.
Definition: flow_graph.h:118
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:1909
buffer_node< T, Allocator >::size_type size_type
Definition: flow_graph.h:2803
__TBB_NOINLINE_SYM indexer_node(graph &g)
Definition: flow_graph.h:3796
static const node_priority_t no_priority
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:2542
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5 > output_type
Definition: flow_graph.h:3650
static tbb::task *const SUCCESSFULLY_ENQUEUED
try_put_functor(output_port_type &p, const Output &v)
Definition: flow_graph.h:4241
buffer_node< T, Allocator >::size_type size_type
Definition: flow_graph.h:2720
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:1910
buffer_node< T, Allocator >::buffer_operation sequencer_operation
Definition: flow_graph.h:2721
bool try_consume() __TBB_override
Consumes a reserved item.
Definition: flow_graph.h:2506
static void fgt_reserve_wait(void *)
class __TBB_DEPRECATED streaming_node
Definition: flow_graph.h:303
bool try_consume() __TBB_override
Consumes the reserved item.
Definition: flow_graph.h:4553
__TBB_STATIC_ASSERT((tbb::internal::is_same_type< Allocator, null_type >::value), "Allocator template parameter for flow graph nodes is deprecated and will removed in the future. " "To temporary enable the deprecated interface specify TBB_ENABLE_DEPRECATED_NODE_ALLOCATOR.")
void reset_node(reset_flags f) __TBB_override
resets the input_node to its initial state
Definition: flow_graph.h:1086
Represents acquisition of a mutex.
Definition: spin_mutex.h:53
async_body(const Body &body, gateway_type *gateway)
Definition: flow_graph.h:4189
void spawn_put()
Spawns a task that applies the body.
Definition: flow_graph.h:1145
internal::async_body_base< gateway_type > async_body_base_type
Definition: flow_graph.h:4231
item_buffer with reservable front-end. NOTE: if reserving, do not
Definition: flow_graph.h:248
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5, T6 > output_type
Definition: flow_graph.h:3686
Forwards messages in sequence order.
Definition: flow_graph.h:2668
void fgt_multiinput_multioutput_node_desc(const NodeType *, const char *)
#define __TBB_FLOW_GRAPH_PRIORITY_EXPR(expr)
internal::source_body< output_type > * my_body
Definition: flow_graph.h:1390
graph()
Constructs a graph with isolated task_group_context.
Definition: flow_graph.h:774
internal::wrap_tuple_elements< N, internal::multifunction_output, Output >::type output_ports_type
Definition: flow_graph.h:1610
bool try_release() __TBB_override
Release a reserved item.
Definition: flow_graph.h:1323
internal::function_body< T, size_t > * my_sequencer
Definition: flow_graph.h:2669
virtual void internal_pop(buffer_operation *op)
Definition: flow_graph.h:2329
bool register_successor(successor_type &r) __TBB_override
Adds a new successor.
Definition: flow_graph.h:2399
__TBB_DEPRECATED typedef continue_msg input_type
The input type.
Definition: flow_graph.h:603
void handle_operations(prio_operation *op_list) __TBB_override
Definition: flow_graph.h:2812
function_body that takes an Input and a set of output ports
Definition: flow_graph.h:251
virtual void handle_operations(buffer_operation *op_list)
Definition: flow_graph.h:2129
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:2029
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3499
Forwards messages in arbitrary order.
Definition: flow_graph.h:2044
internal::aggregator< handler_type, buffer_operation > my_aggregator
Definition: flow_graph.h:2127
tuple< T0, T1, T2, T3, T4, T5, T6, T7, T8, T9 > InputTuple
Definition: flow_graph.h:3793
bool try_release() __TBB_override
Release a reserved item.
Definition: flow_graph.h:1039
__TBB_NOINLINE_SYM split_node(const split_node &other)
Definition: flow_graph.h:1723
virtual bool try_get(T &)
Request an item from the sender.
Definition: flow_graph.h:433
void set_ref_count(int count)
Set reference count.
Definition: task.h:761
A cache of successors that are put in a round-robin fashion.
Definition: flow_graph.h:129
Enables one or the other code branches.
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3485
void internal_forward_task(queue_operation *op) __TBB_override
Tries to forward valid items to successors.
Definition: flow_graph.h:2600
split_node: accepts a tuple as input, forwards each element of the tuple to its
Definition: flow_graph.h:1681
void const char const char int ITT_FORMAT __itt_group_sync x void const char * name
__TBB_NOINLINE_SYM priority_queue_node(graph &g, const Compare &comp=Compare())
Constructor.
Definition: flow_graph.h:2766
void internal_make_edge(internal::untyped_sender &p, internal::untyped_receiver &s)
Definition: flow_graph.h:3823
__TBB_NOINLINE_SYM input_node(const input_node &src)
Copy constructor.
Definition: flow_graph.h:942
task * try_put_task(const T &v) __TBB_override
Put item to successor; return task to run the successor if possible.
Definition: flow_graph.h:4661
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
Definition: flow_graph.h:3163
bool register_successor(successor_type &r) __TBB_override
Adds a successor.
Definition: flow_graph.h:1952
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:2060
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3795
The base of all graph nodes.
internal::broadcast_cache< input_type > my_successors
Definition: flow_graph.h:1916
base_type::buffer_operation queue_operation
Definition: flow_graph.h:2579
#define __TBB_override
Definition: tbb_stddef.h:240
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3565
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1)
Definition: flow_graph.h:3382
Detects whether two given types are the same.
void make_edge(sender< T > &p, receiver< T > &s)
Makes an edge between a single predecessor and a single successor.
Definition: flow_graph.h:3838
internal::function_input_queue< input_type, internals_allocator > input_queue_type
Definition: flow_graph.h:1481
Implements methods for a function node that takes a type Input as input and sends.
Definition: flow_graph.h:421
A task that calls a node's apply_body_bypass function with no input.
Definition: flow_graph.h:379
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3598
sender< output_type >::successor_type successor_type
The type of successors of this node.
Definition: flow_graph.h:1195
__TBB_DEPRECATED typedef internal::async_helpers< T >::filtered_type filtered_type
Definition: flow_graph.h:430
internal::wrap_tuple_elements< N, internal::multifunction_output, TupleType >::type output_ports_type
Definition: flow_graph.h:1706
bool try_consume() __TBB_override
Consumes a reserved item.
Definition: flow_graph.h:1333
internal::multifunction_input< Input, typename base_type::output_ports_type, Policy, Allocator > mfn_input_type
Definition: flow_graph.h:4222
static void fgt_multioutput_node_desc(const NodeType *, const char *)
bool register_successor(successor_type &r) __TBB_override
Add a new successor to this node.
Definition: flow_graph.h:4353
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3)
Definition: flow_graph.h:3392
Meets "allocator" requirements of ISO C++ Standard, Section 20.1.5.
void const char const char int ITT_FORMAT __itt_group_sync s
bool try_reserve_apply_body(output_type &v)
Definition: flow_graph.h:1398
Class for determining type of std::allocator<T>::value_type.
Definition: tbb_stddef.h:471
Implements async node.
Definition: flow_graph.h:4211
void reset_node(reset_flags f) __TBB_override
resets the source_node to its initial state
Definition: flow_graph.h:1370
Base class for receivers of completion messages.
Definition: flow_graph.h:599
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3701
output_ports_type & output_ports()
Definition: flow_graph.h:1737
bool try_put(const X &t)
Put an item to the receiver.
Definition: flow_graph.h:377
static void fgt_async_try_put_end(void *, void *)
internal::tagged_msg< size_t, T0, T1, T2, T3 > output_type
Definition: flow_graph.h:3583
iterator begin()
start iterator
Definition: flow_graph.h:866
internal::broadcast_cache< input_type, null_rw_mutex > my_successors
Definition: flow_graph.h:4605
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3723
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6, __TBB_B7 b7)
Definition: flow_graph.h:3423
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3551
__TBB_DEPRECATED tbb::task * root_task()
Returns the root task of the graph.
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
Definition: flow_graph.h:1257
interface11::internal::Policy< queueing, lightweight > queueing_lightweight
Definition: flow_graph.h:90
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6, __TBB_B7 b7, __TBB_B8 b8)
Definition: flow_graph.h:3432
bool try_put(const typename internal::async_helpers< T >::filtered_type &t)
Put an item to the receiver.
Definition: flow_graph.h:472
Forwards messages in FIFO order.
Definition: flow_graph.h:2568
bool try_get(X &t)
Request an item from the sender.
Definition: flow_graph.h:344
task * try_put_task(const TupleType &t) __TBB_override
Put item to successor; return task to run the successor if possible.
Definition: flow_graph.h:1740
task * try_put_task_impl(const input_type &v)
Definition: flow_graph.h:4575
virtual void reset_node(reset_flags f=rf_reset_protocol)=0
static void fgt_node_desc(const NodeType *, const char *)
receiver_gateway< output_type > gateway_type
Definition: flow_graph.h:4230
async_body_base< Gateway > base_type
Definition: flow_graph.h:4186
internal::function_output< output_type > fOutput_type
Definition: flow_graph.h:1482
untyped_sender predecessor_type
The predecessor type for this node.
Definition: flow_graph.h:370
fOutput_type::successor_type successor_type
Definition: flow_graph.h:1789
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
virtual bool try_reserve_wrapper(void *p, bool is_async)=0
const_iterator cend() const
end const iterator
Definition: flow_graph.h:876
buffer_node< T, Allocator > class_type
Definition: flow_graph.h:2062
cache_aligned_allocator< Input > internals_allocator
Definition: flow_graph.h:1596
internal::multifunction_input< input_type, output_ports_type, Policy, internals_allocator > input_impl_type
Definition: flow_graph.h:1612
static void fgt_node_with_body(void *, string_index, void *, void *, void *)
__TBB_NOINLINE_SYM buffer_node(graph &g)
Constructor.
Definition: flow_graph.h:2359
virtual void reset_receiver(reset_flags f=rf_reset_protocol)=0
put receiver back in initial state
__TBB_NOINLINE_SYM overwrite_node(const overwrite_node &src)
Copy constructor; doesn't take anything from src; default won't work.
Definition: flow_graph.h:4437
virtual task * try_put_task(const T &t)=0
Put item to successor; return task to run the successor if possible.
buffer_node< T, Allocator >::item_type item_type
Definition: flow_graph.h:2804
static void fgt_begin_body(void *)
void internal_forward_task(prio_operation *op) __TBB_override
Tries to forward valid items to successors.
Definition: flow_graph.h:2808
void remove_edge(sender< T > &p, receiver< T > &s)
Removes an edge between a single predecessor and a single successor.
Definition: flow_graph.h:3902
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:2061
Base class for tasks generated by graph nodes.
#define __TBB_DEPRECATED_LIMITER_EXPR(expr)
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:4414
task * try_put_task(const T &t) __TBB_override
build a task to run the successor if possible. Default is old behavior.
Definition: flow_graph.h:2019
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type type
internal::unfolded_join_node< N, queueing_port, OutputTuple, queueing > unfolded_type
Definition: flow_graph.h:3327
virtual bool try_consume()
Consumes the reserved item.
Definition: flow_graph.h:329
void remove_node(tbb::flow::interface11::graph_node *n)
Definition: flow_graph.h:831
cache_aligned_allocator< Input > internals_allocator
Definition: flow_graph.h:1468
virtual void internal_release(buffer_operation *op)
Definition: flow_graph.h:2352
void activate()
Activates a node that was created in the inactive state.
Definition: flow_graph.h:1345
internal::broadcast_cache< output_type > & successors() __TBB_override
Definition: flow_graph.h:1557
The graph class.
__TBB_DEPRECATED typedef internal::async_helpers< T >::filtered_type filtered_type
Definition: flow_graph.h:469
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3687
static tbb::task * combine_tasks(graph &g, tbb::task *left, tbb::task *right)
Definition: flow_graph.h:199
bool try_put_impl(const Output &i)
Implements gateway_type::try_put for an external activity to submit a message to FG.
Definition: flow_graph.h:4273
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:2798
__TBB_STATIC_ASSERT((tbb::internal::is_same_type< Allocator, null_type >::value), "Allocator template parameter for flow graph nodes is deprecated and will be removed. " "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface.")
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:2762
bool try_reserve(output_type &v) __TBB_override
Reserves an item.
Definition: flow_graph.h:1022
friend class scoped_lock
Definition: spin_mutex.h:179
internal::input_body< output_type > * my_body
Definition: flow_graph.h:1102
internal::tagged_msg< size_t, T0, T1, T2, T3, T4 > output_type
Definition: flow_graph.h:3616
bool try_get(T &v) __TBB_override
Request an item from the buffer_node.
Definition: flow_graph.h:2476
bool empty() const
True if list is empty; false otherwise.
Definition: task.h:1088
input_filter control to signal end-of-input for parallel_pipeline
Definition: pipeline.h:316
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:4627
__TBB_NOINLINE_SYM function_node(graph &g, size_t concurrency, __TBB_FLOW_GRAPH_PRIORITY_ARG1(Body body, node_priority_t priority=tbb::flow::internal::no_priority))
Constructor.
Definition: flow_graph.h:1496
virtual void finalize() const
Definition: flow_graph.h:147
~graph()
Destroys the graph.
Definition: flow_graph.h:798
__TBB_NOINLINE_SYM buffer_node(const buffer_node &src)
Copy constructor.
Definition: flow_graph.h:2377
task * grab_forwarding_task(buffer_operation &op_data)
Definition: flow_graph.h:2182
pointer operator->() const
Dereference.
Definition: flow_graph.h:761
class __TBB_DEPRECATED async_msg
Definition: flow_graph.h:216
bool try_reserve(X &t)
Reserves an item in the sender.
Definition: flow_graph.h:350
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:1745
__TBB_NOINLINE_SYM priority_queue_node(const priority_queue_node &src)
Copy constructor.
Definition: flow_graph.h:2782
void activate_graph(tbb::flow::interface10::graph &g)
virtual bool try_reserve(T &)
Reserves an item in the sender.
Definition: flow_graph.h:436
__TBB_NOINLINE_SYM sequencer_node(const sequencer_node &src)
Copy constructor.
Definition: flow_graph.h:2703
Implements methods for a function node that takes a type Input as input.
Definition: flow_graph.h:638
const V & cast_to(T const &t)
Definition: flow_graph.h:715
K key_from_message(const T &t)
Definition: flow_graph.h:721
graph & graph_reference() const __TBB_override
Definition: flow_graph.h:847
void register_node(tbb::flow::interface11::graph_node *n)
Definition: flow_graph.h:820
task * apply_body_bypass()
Applies the body. Returning SUCCESSFULLY_ENQUEUED okay; forward_task_bypass will handle it.
Definition: flow_graph.h:1153
Output output_type
The type of the output message, which is complete.
Definition: flow_graph.h:907
wrap_tuple_elements< N, PT, OutputTuple >::type input_ports_type
Definition: flow_graph.h:1510
input_impl_type::predecessor_type predecessor_type
Definition: flow_graph.h:1483
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:2661

Copyright © 2005-2020 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.