Home ⌂Doc Index ◂Up ▴
Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
_flow_graph_node_impl.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2020 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 */
16 
17 #ifndef __TBB__flow_graph_node_impl_H
18 #define __TBB__flow_graph_node_impl_H
19 
20 #ifndef __TBB_flow_graph_H
21 #error Do not #include this internal file directly; use public TBB headers instead.
22 #endif
23 
25 
27 namespace internal {
28 
32 
33  template< typename T, typename A >
34  class function_input_queue : public item_buffer<T,A> {
35  public:
36  bool empty() const {
37  return this->buffer_empty();
38  }
39 
40  const T& front() const {
41  return this->item_buffer<T, A>::front();
42  }
43 
44  bool pop( T& t ) {
45  return this->pop_front( t );
46  }
47 
48  void pop() {
49  this->destroy_front();
50  }
51 
52  bool push( T& t ) {
53  return this->push_back( t );
54  }
55  };
56 
58  // The only up-ref is apply_body_impl, which should implement the function
59  // call and any handling of the result.
60  template< typename Input, typename Policy, typename A, typename ImplType >
61  class function_input_base : public receiver<Input>, tbb::internal::no_assign {
63 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
64  , add_blt_pred, del_blt_pred,
65  blt_pred_cnt, blt_pred_cpy // create vector copies of preds and succs
66 #endif
67  };
69 
70  public:
71 
73  typedef Input input_type;
79  "queueing and rejecting policies can't be specified simultaneously");
80 
81 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
82  typedef typename predecessor_cache_type::built_predecessors_type built_predecessors_type;
83  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
84 #endif
85 
90  , __TBB_FLOW_GRAPH_PRIORITY_ARG1(my_concurrency(0), my_priority(priority))
92  , forwarder_busy(false)
93  {
95  my_aggregator.initialize_handler(handler_type(this));
96  }
97 
100  : receiver<Input>(), tbb::internal::no_assign()
102  , __TBB_FLOW_GRAPH_PRIORITY_ARG1(my_concurrency(0), my_priority(src.my_priority))
103  , my_queue(src.my_queue ? new input_queue_type() : NULL), forwarder_busy(false)
104  {
106  my_aggregator.initialize_handler(handler_type(this));
107  }
108 
110  // The queue is allocated by the constructor for {multi}function_node.
111  // TODO: pass the graph_buffer_policy to the base so it can allocate the queue instead.
112  // This would be an interface-breaking change.
114  if ( my_queue ) delete my_queue;
115  }
116 
119  }
120 
123  operation_type op_data(reg_pred);
124  op_data.r = &src;
125  my_aggregator.execute(&op_data);
126  return true;
127  }
128 
131  operation_type op_data(rem_pred);
132  op_data.r = &src;
133  my_aggregator.execute(&op_data);
134  return true;
135  }
136 
137 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
138  void internal_add_built_predecessor( predecessor_type &src) __TBB_override {
140  operation_type op_data(add_blt_pred);
141  op_data.r = &src;
142  my_aggregator.execute(&op_data);
143  }
144 
146  void internal_delete_built_predecessor( predecessor_type &src) __TBB_override {
147  operation_type op_data(del_blt_pred);
148  op_data.r = &src;
149  my_aggregator.execute(&op_data);
150  }
151 
152  size_t predecessor_count() __TBB_override {
153  operation_type op_data(blt_pred_cnt);
154  my_aggregator.execute(&op_data);
155  return op_data.cnt_val;
156  }
157 
158  void copy_predecessors(predecessor_list_type &v) __TBB_override {
159  operation_type op_data(blt_pred_cpy);
160  op_data.predv = &v;
161  my_aggregator.execute(&op_data);
162  }
163 
164  built_predecessors_type &built_predecessors() __TBB_override {
165  return my_predecessors.built_predecessors();
166  }
167 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
168 
169  protected:
170 
172  my_concurrency = 0;
173  if(my_queue) {
174  my_queue->reset();
175  }
176  reset_receiver(f);
177  forwarder_busy = false;
178  }
179 
180  graph& my_graph_ref;
181  const size_t my_max_concurrency;
186 
189  else
191  __TBB_ASSERT(!(f & rf_clear_edges) || my_predecessors.empty(), "function_input_base reset failed");
192  }
193 
195  return my_graph_ref;
196  }
197 
199  operation_type op_data(i, app_body_bypass); // tries to pop an item or get_item
200  my_aggregator.execute(&op_data);
201  return op_data.bypass_t;
202  }
203 
204  private:
205 
208 
209  class operation_type : public aggregated_operation< operation_type > {
210  public:
211  char type;
212  union {
215 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
216  size_t cnt_val;
217  predecessor_list_type *predv;
218 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
219  };
222  type(char(t)), elem(const_cast<input_type*>(&e)) {}
223  operation_type(op_type t) : type(char(t)), r(NULL) {}
224  };
225 
228  friend class internal::aggregating_functor<class_type, operation_type>;
230 
232  task* new_task = NULL;
233  if(my_queue) {
234  if(!my_queue->empty()) {
235  ++my_concurrency;
236  new_task = create_body_task(my_queue->front());
237 
238  my_queue->pop();
239  }
240  }
241  else {
242  input_type i;
243  if(my_predecessors.get_item(i)) {
244  ++my_concurrency;
245  new_task = create_body_task(i);
246  }
247  }
248  return new_task;
249  }
250  void handle_operations(operation_type *op_list) {
251  operation_type *tmp;
252  while (op_list) {
253  tmp = op_list;
254  op_list = op_list->next;
255  switch (tmp->type) {
256  case reg_pred:
257  my_predecessors.add(*(tmp->r));
258  __TBB_store_with_release(tmp->status, SUCCEEDED);
259  if (!forwarder_busy) {
260  forwarder_busy = true;
262  }
263  break;
264  case rem_pred:
265  my_predecessors.remove(*(tmp->r));
266  __TBB_store_with_release(tmp->status, SUCCEEDED);
267  break;
268  case app_body_bypass: {
269  tmp->bypass_t = NULL;
270  __TBB_ASSERT(my_max_concurrency != 0, NULL);
271  --my_concurrency;
273  tmp->bypass_t = perform_queued_requests();
274 
275  __TBB_store_with_release(tmp->status, SUCCEEDED);
276  }
277  break;
278  case tryput_bypass: internal_try_put_task(tmp); break;
279  case try_fwd: internal_forward(tmp); break;
280  case occupy_concurrency:
282  ++my_concurrency;
283  __TBB_store_with_release(tmp->status, SUCCEEDED);
284  } else {
285  __TBB_store_with_release(tmp->status, FAILED);
286  }
287  break;
288 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
289  case add_blt_pred: {
290  my_predecessors.internal_add_built_predecessor(*(tmp->r));
291  __TBB_store_with_release(tmp->status, SUCCEEDED);
292  }
293  break;
294  case del_blt_pred:
295  my_predecessors.internal_delete_built_predecessor(*(tmp->r));
296  __TBB_store_with_release(tmp->status, SUCCEEDED);
297  break;
298  case blt_pred_cnt:
299  tmp->cnt_val = my_predecessors.predecessor_count();
300  __TBB_store_with_release(tmp->status, SUCCEEDED);
301  break;
302  case blt_pred_cpy:
303  my_predecessors.copy_predecessors( *(tmp->predv) );
304  __TBB_store_with_release(tmp->status, SUCCEEDED);
305  break;
306 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
307  }
308  }
309  }
310 
312  void internal_try_put_task(operation_type *op) {
313  __TBB_ASSERT(my_max_concurrency != 0, NULL);
315  ++my_concurrency;
316  task * new_task = create_body_task(*(op->elem));
317  op->bypass_t = new_task;
318  __TBB_store_with_release(op->status, SUCCEEDED);
319  } else if ( my_queue && my_queue->push(*(op->elem)) ) {
320  op->bypass_t = SUCCESSFULLY_ENQUEUED;
321  __TBB_store_with_release(op->status, SUCCEEDED);
322  } else {
323  op->bypass_t = NULL;
324  __TBB_store_with_release(op->status, FAILED);
325  }
326  }
327 
329  void internal_forward(operation_type *op) {
330  op->bypass_t = NULL;
332  op->bypass_t = perform_queued_requests();
333  if(op->bypass_t)
334  __TBB_store_with_release(op->status, SUCCEEDED);
335  else {
336  forwarder_busy = false;
337  __TBB_store_with_release(op->status, FAILED);
338  }
339  }
340 
342  operation_type op_data(t, tryput_bypass);
343  my_aggregator.execute(&op_data);
344  if( op_data.status == internal::SUCCEEDED ) {
345  return op_data.bypass_t;
346  }
347  return NULL;
348  }
349 
351  if( my_max_concurrency == 0 ) {
352  return apply_body_bypass(t);
353  } else {
354  operation_type check_op(t, occupy_concurrency);
355  my_aggregator.execute(&check_op);
356  if( check_op.status == internal::SUCCEEDED ) {
357  return apply_body_bypass(t);
358  }
359  return internal_try_put_bypass(t);
360  }
361  }
362 
364  if( my_max_concurrency == 0 ) {
365  return create_body_task(t);
366  } else {
367  return internal_try_put_bypass(t);
368  }
369  }
370 
372  // then decides if more work is available
374  return static_cast<ImplType *>(this)->apply_body_impl_bypass(i);
375  }
376 
378  inline task * create_body_task( const input_type &input ) {
380  new( task::allocate_additional_child_of(*(my_graph_ref.root_task())) )
382  *this, __TBB_FLOW_GRAPH_PRIORITY_ARG1(input, my_priority))
383  : NULL;
384  }
385 
388  operation_type op_data(try_fwd);
389  task* rval = NULL;
390  do {
391  op_data.status = WAIT;
392  my_aggregator.execute(&op_data);
393  if(op_data.status == SUCCEEDED) {
394  task* ttask = op_data.bypass_t;
395  __TBB_ASSERT( ttask && ttask != SUCCESSFULLY_ENQUEUED, NULL );
396  rval = combine_tasks(my_graph_ref, rval, ttask);
397  }
398  } while (op_data.status == SUCCEEDED);
399  return rval;
400  }
401 
404  new( task::allocate_additional_child_of(*(my_graph_ref.root_task())) )
406  : NULL;
407  }
408 
410  inline void spawn_forward_task() {
411  task* tp = create_forward_task();
412  if(tp) {
414  }
415  }
416  }; // function_input_base
417 
419  // a type Output to its successors.
420  template< typename Input, typename Output, typename Policy, typename A>
421  class function_input : public function_input_base<Input, Policy, A, function_input<Input,Output,Policy,A> > {
422  public:
423  typedef Input input_type;
424  typedef Output output_type;
429 
430  // constructor
431  template<typename Body>
433  graph &g, size_t max_concurrency,
434  __TBB_FLOW_GRAPH_PRIORITY_ARG1(Body& body, node_priority_t priority)
436  , my_body( new internal::function_body_leaf< input_type, output_type, Body>(body) )
438  }
439 
442  base_type(src),
443  my_body( src.my_init_body->clone() ),
444  my_init_body(src.my_init_body->clone() ) {
445  }
446 
448  delete my_body;
449  delete my_init_body;
450  }
451 
452  template< typename Body >
454  function_body_type &body_ref = *this->my_body;
455  return dynamic_cast< internal::function_body_leaf<input_type, output_type, Body> & >(body_ref).get_body();
456  }
457 
459  // There is an extra copied needed to capture the
460  // body execution without the try_put
462  output_type v = (*my_body)(i);
464  return v;
465  }
466 
467  //TODO: consider moving into the base class
470 #if TBB_DEPRECATED_MESSAGE_FLOW_ORDER
471  task* successor_task = successors().try_put_task(v);
472 #endif
473  task* postponed_task = NULL;
474  if( base_type::my_max_concurrency != 0 ) {
475  postponed_task = base_type::try_get_postponed_task(i);
476  __TBB_ASSERT( !postponed_task || postponed_task != SUCCESSFULLY_ENQUEUED, NULL );
477  }
478 #if TBB_DEPRECATED_MESSAGE_FLOW_ORDER
479  graph& g = base_type::my_graph_ref;
480  return combine_tasks(g, successor_task, postponed_task);
481 #else
482  if( postponed_task ) {
483  // make the task available for other workers since we do not know successors'
484  // execution policy
486  }
487  task* successor_task = successors().try_put_task(v);
488 #if _MSC_VER && !__INTEL_COMPILER
489 #pragma warning (push)
490 #pragma warning (disable: 4127) /* suppress conditional expression is constant */
491 #endif
493 #if _MSC_VER && !__INTEL_COMPILER
494 #pragma warning (pop)
495 #endif
496  if(!successor_task) {
497  // Return confirmative status since current
498  // node's body has been executed anyway
499  successor_task = SUCCESSFULLY_ENQUEUED;
500  }
501  }
502  return successor_task;
503 #endif /* TBB_DEPRECATED_MESSAGE_FLOW_ORDER */
504  }
505 
506  protected:
507 
510  if(f & rf_reset_bodies) {
512  delete my_body;
513  my_body = tmp;
514  }
515  }
516 
520 
521  }; // function_input
522 
523 
524  // helper templates to clear the successor edges of the output ports of an multifunction_node
525  template<int N> struct clear_element {
526  template<typename P> static void clear_this(P &p) {
527  (void)tbb::flow::get<N-1>(p).successors().clear();
529  }
530  template<typename P> static bool this_empty(P &p) {
531  if(tbb::flow::get<N-1>(p).successors().empty())
533  return false;
534  }
535  };
536 
537  template<> struct clear_element<1> {
538  template<typename P> static void clear_this(P &p) {
539  (void)tbb::flow::get<0>(p).successors().clear();
540  }
541  template<typename P> static bool this_empty(P &p) {
542  return tbb::flow::get<0>(p).successors().empty();
543  }
544  };
545 
546 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
547  // helper templates to extract the output ports of an multifunction_node from graph
548  template<int N> struct extract_element {
549  template<typename P> static void extract_this(P &p) {
550  (void)tbb::flow::get<N-1>(p).successors().built_successors().sender_extract(tbb::flow::get<N-1>(p));
551  extract_element<N-1>::extract_this(p);
552  }
553  };
554 
555  template<> struct extract_element<1> {
556  template<typename P> static void extract_this(P &p) {
557  (void)tbb::flow::get<0>(p).successors().built_successors().sender_extract(tbb::flow::get<0>(p));
558  }
559  };
560 #endif
561 
562  template <typename OutputTuple>
564 #if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
565  template <typename... Args>
566  static OutputTuple call(graph& g, const tbb::flow::tuple<Args...>&) {
567  return OutputTuple(Args(g)...);
568  }
569 #else // __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
570  template <typename T1>
571  static OutputTuple call(graph& g, const tbb::flow::tuple<T1>&) {
572  return OutputTuple(T1(g));
573  }
574 
575  template <typename T1, typename T2>
576  static OutputTuple call(graph& g, const tbb::flow::tuple<T1, T2>&) {
577  return OutputTuple(T1(g), T2(g));
578  }
579 
580  template <typename T1, typename T2, typename T3>
581  static OutputTuple call(graph& g, const tbb::flow::tuple<T1, T2, T3>&) {
582  return OutputTuple(T1(g), T2(g), T3(g));
583  }
584 
585  template <typename T1, typename T2, typename T3, typename T4>
586  static OutputTuple call(graph& g, const tbb::flow::tuple<T1, T2, T3, T4>&) {
587  return OutputTuple(T1(g), T2(g), T3(g), T4(g));
588  }
589 
590  template <typename T1, typename T2, typename T3, typename T4, typename T5>
591  static OutputTuple call(graph& g, const tbb::flow::tuple<T1, T2, T3, T4, T5>&) {
592  return OutputTuple(T1(g), T2(g), T3(g), T4(g), T5(g));
593  }
594 #if __TBB_VARIADIC_MAX >= 6
595  template <typename T1, typename T2, typename T3, typename T4, typename T5, typename T6>
596  static OutputTuple call(graph& g, const tbb::flow::tuple<T1, T2, T3, T4, T5, T6>&) {
597  return OutputTuple(T1(g), T2(g), T3(g), T4(g), T5(g), T6(g));
598  }
599 #endif
600 #if __TBB_VARIADIC_MAX >= 7
601  template <typename T1, typename T2, typename T3, typename T4,
602  typename T5, typename T6, typename T7>
603  static OutputTuple call(graph& g,
604  const tbb::flow::tuple<T1, T2, T3, T4, T5, T6, T7>&) {
605  return OutputTuple(T1(g), T2(g), T3(g), T4(g), T5(g), T6(g), T7(g));
606  }
607 #endif
608 #if __TBB_VARIADIC_MAX >= 8
609  template <typename T1, typename T2, typename T3, typename T4,
610  typename T5, typename T6, typename T7, typename T8>
611  static OutputTuple call(graph& g,
612  const tbb::flow::tuple<T1, T2, T3, T4, T5, T6, T7, T8>&) {
613  return OutputTuple(T1(g), T2(g), T3(g), T4(g), T5(g), T6(g), T7(g), T8(g));
614  }
615 #endif
616 #if __TBB_VARIADIC_MAX >= 9
617  template <typename T1, typename T2, typename T3, typename T4,
618  typename T5, typename T6, typename T7, typename T8, typename T9>
619  static OutputTuple call(graph& g,
620  const tbb::flow::tuple<T1, T2, T3, T4, T5, T6, T7, T8, T9>&) {
621  return OutputTuple(T1(g), T2(g), T3(g), T4(g), T5(g), T6(g), T7(g), T8(g), T9(g));
622  }
623 #endif
624 #if __TBB_VARIADIC_MAX >= 9
625  template <typename T1, typename T2, typename T3, typename T4, typename T5,
626  typename T6, typename T7, typename T8, typename T9, typename T10>
627  static OutputTuple call(graph& g,
628  const tbb::flow::tuple<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>&) {
629  return OutputTuple(T1(g), T2(g), T3(g), T4(g), T5(g), T6(g), T7(g), T8(g), T9(g), T10(g));
630  }
631 #endif
632 #endif // __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
633  }; // struct init_output_ports
634 
636  // and has a tuple of output ports specified.
637  template< typename Input, typename OutputPortSet, typename Policy, typename A>
638  class multifunction_input : public function_input_base<Input, Policy, A, multifunction_input<Input,OutputPortSet,Policy,A> > {
639  public:
641  typedef Input input_type;
642  typedef OutputPortSet output_ports_type;
647 
648  // constructor
649  template<typename Body>
651  __TBB_FLOW_GRAPH_PRIORITY_ARG1(Body& body, node_priority_t priority)
656  }
657 
660  base_type(src),
661  my_body( src.my_init_body->clone() ),
662  my_init_body(src.my_init_body->clone() ),
664  }
665 
667  delete my_body;
668  delete my_init_body;
669  }
670 
671  template< typename Body >
673  multifunction_body_type &body_ref = *this->my_body;
674  return *static_cast<Body*>(dynamic_cast< internal::multifunction_body_leaf<input_type, output_ports_type, Body> & >(body_ref).get_body_ptr());
675  }
676 
677  // for multifunction nodes we do not have a single successor as such. So we just tell
678  // the task we were successful.
679  //TODO: consider moving common parts with implementation in function_input into separate function
682  (*my_body)(i, my_output_ports);
684  task* ttask = NULL;
687  }
688  return ttask ? ttask : SUCCESSFULLY_ENQUEUED;
689  }
690 
692 
693  protected:
694 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
695  void extract() {
696  extract_element<N>::extract_this(my_output_ports);
697  }
698 #endif
699 
700  void reset(reset_flags f) {
703  if(f & rf_reset_bodies) {
705  delete my_body;
706  my_body = tmp;
707  }
708  __TBB_ASSERT(!(f & rf_clear_edges) || clear_element<N>::this_empty(my_output_ports), "multifunction_node reset failed");
709  }
710 
714 
715  }; // multifunction_input
716 
717  // template to refer to an output port of a multifunction_node
718  template<size_t N, typename MOP>
720  return tbb::flow::get<N>(op.output_ports());
721  }
722 
723  inline void check_task_and_spawn(graph& g, task* t) {
724  if (t && t != SUCCESSFULLY_ENQUEUED) {
726  }
727  }
728 
729  // helper structs for split_node
730  template<int N>
731  struct emit_element {
732  template<typename T, typename P>
733  static task* emit_this(graph& g, const T &t, P &p) {
734  // TODO: consider to collect all the tasks in task_list and spawn them all at once
735  task* last_task = tbb::flow::get<N-1>(p).try_put_task(tbb::flow::get<N-1>(t));
736  check_task_and_spawn(g, last_task);
737  return emit_element<N-1>::emit_this(g,t,p);
738  }
739  };
740 
741  template<>
742  struct emit_element<1> {
743  template<typename T, typename P>
744  static task* emit_this(graph& g, const T &t, P &p) {
745  task* last_task = tbb::flow::get<0>(p).try_put_task(tbb::flow::get<0>(t));
746  check_task_and_spawn(g, last_task);
747  return SUCCESSFULLY_ENQUEUED;
748  }
749  };
750 
752  template< typename Output, typename Policy>
753  class continue_input : public continue_receiver {
754  public:
755 
757  typedef continue_msg input_type;
758 
760  typedef Output output_type;
763 
764  template< typename Body >
766  : continue_receiver(__TBB_FLOW_GRAPH_PRIORITY_ARG1(/*number_of_predecessors=*/0, priority))
767  , my_graph_ref(g)
768  , my_body( new internal::function_body_leaf< input_type, output_type, Body>(body) )
770  { }
771 
772  template< typename Body >
773  continue_input( graph &g, int number_of_predecessors,
774  __TBB_FLOW_GRAPH_PRIORITY_ARG1(Body& body, node_priority_t priority)
775  ) : continue_receiver( __TBB_FLOW_GRAPH_PRIORITY_ARG1(number_of_predecessors, priority) )
776  , my_graph_ref(g)
777  , my_body( new internal::function_body_leaf< input_type, output_type, Body>(body) )
779  { }
780 
781  continue_input( const continue_input& src ) : continue_receiver(src),
783  my_body( src.my_init_body->clone() ),
784  my_init_body( src.my_init_body->clone() ) {}
785 
787  delete my_body;
788  delete my_init_body;
789  }
790 
791  template< typename Body >
793  function_body_type &body_ref = *my_body;
794  return dynamic_cast< internal::function_body_leaf<input_type, output_type, Body> & >(body_ref).get_body();
795  }
796 
798  continue_receiver::reset_receiver(f);
799  if(f & rf_reset_bodies) {
801  delete my_body;
802  my_body = tmp;
803  }
804  }
805 
806  protected:
807 
808  graph& my_graph_ref;
811 
813 
814  friend class apply_body_task_bypass< class_type, continue_msg >;
815 
818  // There is an extra copied needed to capture the
819  // body execution without the try_put
821  output_type v = (*my_body)( continue_msg() );
823  return successors().try_put_task( v );
824  }
825 
828  return NULL;
829  }
830 #if _MSC_VER && !__INTEL_COMPILER
831 #pragma warning (push)
832 #pragma warning (disable: 4127) /* suppress conditional expression is constant */
833 #endif
835 #if _MSC_VER && !__INTEL_COMPILER
836 #pragma warning (pop)
837 #endif
838  return apply_body_bypass( continue_msg() );
839  }
840  else {
841  return new ( task::allocate_additional_child_of( *(my_graph_ref.root_task()) ) )
843  *this, __TBB_FLOW_GRAPH_PRIORITY_ARG1(continue_msg(), my_priority) );
844  }
845  }
846 
848  return my_graph_ref;
849  }
850  }; // continue_input
851 
853  template< typename Output >
854  class function_output : public sender<Output> {
855  public:
856 
857  template<int N> friend struct clear_element;
858  typedef Output output_type;
859  typedef typename sender<output_type>::successor_type successor_type;
861 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
862  typedef typename sender<output_type>::built_successors_type built_successors_type;
863  typedef typename sender<output_type>::successor_list_type successor_list_type;
864 #endif
865 
868  my_successors.set_owner(this);
869  }
870 
874  return true;
875  }
876 
880  return true;
881  }
882 
883 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
884  built_successors_type &built_successors() __TBB_override { return successors().built_successors(); }
885 
886 
887  void internal_add_built_successor( successor_type &r) __TBB_override {
888  successors().internal_add_built_successor( r );
889  }
890 
891  void internal_delete_built_successor( successor_type &r) __TBB_override {
892  successors().internal_delete_built_successor( r );
893  }
894 
895  size_t successor_count() __TBB_override {
896  return successors().successor_count();
897  }
898 
899  void copy_successors( successor_list_type &v) __TBB_override {
900  successors().copy_successors(v);
901  }
902 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
903 
904  // for multifunction_node. The function_body that implements
905  // the node will have an input and an output tuple of ports. To put
906  // an item to a successor, the body should
907  //
908  // get<I>(output_ports).try_put(output_value);
909  //
910  // if task pointer is returned will always spawn and return true, else
911  // return value will be bool returned from successors.try_put.
912  task *try_put_task(const output_type &i) { // not a virtual method in this class
913  return my_successors.try_put_task(i);
914  }
915 
917 
918  graph& graph_reference() const { return my_graph_ref; }
919  protected:
921  graph& my_graph_ref;
922  }; // function_output
923 
924  template< typename Output >
925  class multifunction_output : public function_output<Output> {
926  public:
927  typedef Output output_type;
930 
933 
934  bool try_put(const output_type &i) {
935  task *res = try_put_task(i);
936  if(!res) return false;
937  if(res != SUCCESSFULLY_ENQUEUED) {
938  FLOW_SPAWN(*res); // TODO: Spawn task inside arena
939  }
940  return true;
941  }
942 
944 
945  protected:
946 
948  return my_successors.try_put_task(i);
949  }
950 
951  template <int N> friend struct emit_element;
952 
953  }; // multifunction_output
954 
955 //composite_node
956 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
957  template<typename CompositeType>
958  void add_nodes_impl(CompositeType*, bool) {}
959 
960  template< typename CompositeType, typename NodeType1, typename... NodeTypes >
961  void add_nodes_impl(CompositeType *c_node, bool visible, const NodeType1& n1, const NodeTypes&... n) {
962  void *addr = const_cast<NodeType1 *>(&n1);
963 
964  fgt_alias_port(c_node, addr, visible);
965  add_nodes_impl(c_node, visible, n...);
966  }
967 #endif
968 
969 } // internal
970 
971 #endif // __TBB__flow_graph_node_impl_H
task * execute() __TBB_override
internal::aggregating_functor< class_type, operation_type > handler_type
virtual broadcast_cache< output_type > & successors()=0
continue_msg input_type
The input type of this receiver.
multifunction_body_type * my_init_body
predecessor_cache< input_type, null_mutex > my_predecessors
void internal_forward(operation_type *op)
Creates tasks for postponed messages if available and if concurrency allows.
task * try_put_task(const output_type &i)
function_body_type * my_init_body
__TBB_STATIC_ASSERT(!((internal::has_policy< queueing, Policy >::value) &&(internal::has_policy< rejecting, Policy >::value)), "queueing and rejecting policies can't be specified simultaneously")
static task * emit_this(graph &g, const T &t, P &p)
task * forward_task()
This is executed by an enqueued task, the "forwarder".
void reset_receiver(reset_flags f) __TBB_override
function_input_queue< input_type, A > input_queue_type
void call(F &&f, Pack &&p)
Calls the given function with arguments taken from a stored_pack.
sender< output_type >::successor_type successor_type
const item_type & front() const
the leaf for function_body
#define FLOW_SPAWN(a)
Definition: flow_graph.h:65
task * apply_body_impl_bypass(const input_type &i)
function_body_type * my_init_body
Base class for user-defined tasks.
Definition: task.h:615
void reset_function_input_base(reset_flags f)
bool register_successor(successor_type &r) __TBB_override
Adds a new successor to this node.
leaf for multifunction. OutputSet can be a std::tuple or a vector.
Input and scheduling for a function node that takes a type Input as input.
virtual ~function_input_base()
Destructor.
void spawn_in_graph_arena(tbb::flow::interface10::graph &g, tbb::task &arena_task)
Spawns a task inside graph arena.
continue_input(const continue_input &src)
continue_input< output_type, Policy > class_type
function_body< input_type, output_type > function_body_type
bool register_predecessor(predecessor_type &src) __TBB_override
Adds src to the list of cached predecessors.
broadcast_cache_type & successors()
Base class for types that should not be assigned.
Definition: tbb_stddef.h:322
virtual function_body * clone()=0
multifunction_body< input_type, output_ports_type > multifunction_body_type
Output output_type
The output type of this receiver.
function_input(graph &g, size_t max_concurrency,)
A task that calls a node's forward_task function.
multifunction_body_type * my_body
function_input_base< Input, Policy, A, my_class > base_type
static task * emit_this(graph &g, const T &t, P &p)
virtual multifunction_body * clone()=0
unsigned int node_priority_t
continue_input(graph &g, int number_of_predecessors,)
tbb::flow::tuple_element< N, typename MOP::output_ports_type >::type & output_port(MOP &op)
allocator_traits< Alloc >::template rebind_alloc< T >::other type
void spawn_forward_task()
Spawns a task that calls forward()
function_output< output_type > base_type
Implements methods for an executable node that takes continue_msg as input.
void __TBB_store_with_release(volatile T &location, V value)
Definition: tbb_machine.h:713
task * try_put_task(const output_type &i)
function_body_type * my_body
bool is_graph_active(tbb::flow::interface10::graph &g)
graph & graph_reference() const __TBB_override
function_input_base(graph &g, __TBB_FLOW_GRAPH_PRIORITY_ARG1(size_t max_concurrency, node_priority_t priority))
Constructor for function_input_base.
task * try_get_postponed_task(const input_type &i)
Implements methods for a function node that takes a type Input as input.
function_output(const function_output &other)
continue_input(graph &g, __TBB_FLOW_GRAPH_PRIORITY_ARG1(Body &body, node_priority_t priority))
task * internal_try_put_bypass(const input_type &t)
task * try_put_task(const T &t) __TBB_override
void * addr
void reset_function_input(reset_flags f)
predecessor_cache< input_type, null_mutex > predecessor_cache_type
static OutputTuple call(graph &g, const tbb::flow::tuple< Args... > &)
task * apply_body_bypass(const input_type &i)
Applies the body to the provided input.
function_body that takes an Input and a set of output ports
graph & graph_reference() const __TBB_override
function_input_queue< input_type, A > input_queue_type
bool remove_predecessor(predecessor_type &src) __TBB_override
Removes src from the list of cached predecessors.
#define __TBB_FLOW_GRAPH_PRIORITY_ARG1(arg1, priority)
task * apply_body_bypass(input_type)
Applies the body to the provided input.
multifunction_input(graph &g, size_t max_concurrency,)
int max_concurrency()
Returns the maximal number of threads that can work inside the arena.
Definition: task_arena.h:490
static void fgt_end_body(void *)
void remove_successor(successor_type &r)
receiver< input_type >::predecessor_type predecessor_type
void const char const char int ITT_FORMAT __itt_group_sync p
void handle_operations(operation_type *op_list)
function_input_queue< input_type, A > input_queue_type
function_body_type * my_body
broadcast_cache_type my_successors
static tbb::task *const SUCCESSFULLY_ENQUEUED
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task * task
void register_successor(successor_type &r)
#define __TBB_FLOW_GRAPH_PRIORITY_EXPR(expr)
broadcast_cache< output_type > broadcast_cache_type
A task that calls a node's apply_body_bypass function, passing in an input of type Input.
task * try_put_task_impl(const input_type &t, tbb::internal::false_type)
task * apply_body_impl_bypass(const input_type &i)
#define __TBB_override
Definition: tbb_stddef.h:240
Implements methods for a function node that takes a type Input as input and sends.
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
static void fgt_alias_port(void *, void *, bool)
void reset_receiver(reset_flags f) __TBB_override
function_input_base(const function_input_base &src)
Copy constructor.
task * try_put_task(const input_type &t) __TBB_override
Put item to successor; return task to run the successor if possible.
void internal_try_put_task(operation_type *op)
Put to the node, but return the task instead of enqueueing it.
Input input_type
The input type of this receiver.
Implements methods for both executable and function nodes that puts Output to its successors.
untyped_sender predecessor_type
The predecessor type for this node.
Definition: flow_graph.h:370
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
task * try_put_task_impl(const input_type &t, tbb::internal::true_type)
void set_owner(owner_type *owner)
bool try_put(const output_type &i)
task * create_body_task(const input_type &input)
allocates a task to apply a body
function_input_base< Input, Policy, A, my_class > base_type
static void fgt_begin_body(void *)
function_input< Input, Output, Policy, A > my_class
multifunction_output(const multifunction_output &other)
void set_owner(successor_type *owner)
void add_nodes_impl(CompositeType *, bool)
A functor that takes an Input and generates an Output.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type type
function_body< input_type, output_type > function_body_type
The graph class.
void check_task_and_spawn(graph &g, task *t)
static tbb::task * combine_tasks(graph &g, tbb::task *left, tbb::task *right)
Definition: flow_graph.h:199
tbb::internal::allocator_rebind< A, input_queue_type >::type queue_allocator_type
function_input(const function_input &src)
Copy constructor.
output_type apply_body_impl(const input_type &i)
function_input_base< Input, Policy, A, ImplType > class_type
aggregator< handler_type, operation_type > my_aggregator
multifunction_input(const multifunction_input &src)
Copy constructor.
multifunction_input< Input, OutputPortSet, Policy, A > my_class
virtual broadcast_cache< output_type > & successors()=0

Copyright © 2005-2020 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.