Home ⌂Doc Index ◂Up ▴
Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
_flow_graph_join_impl.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2020 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 */
16 
17 #ifndef __TBB__flow_graph_join_impl_H
18 #define __TBB__flow_graph_join_impl_H
19 
20 #ifndef __TBB_flow_graph_H
21 #error Do not #include this internal file directly; use public TBB headers instead.
22 #endif
23 
24 namespace internal {
25 
27  forwarding_base(graph &g) : graph_ref(g) {}
28  virtual ~forwarding_base() {}
29  // decrement_port_count may create a forwarding task. If we cannot handle the task
30  // ourselves, ask decrement_port_count to deal with it.
31  virtual task * decrement_port_count(bool handle_task) = 0;
32  virtual void increment_port_count() = 0;
33  // moved here so input ports can queue tasks
34  graph& graph_ref;
35  };
36 
37  // specialization that lets us keep a copy of the current_key for building results.
38  // KeyType can be a reference type.
39  template<typename KeyType>
43  virtual task * increment_key_count(current_key_type const & /*t*/, bool /*handle_task*/) = 0; // {return NULL;}
44  current_key_type current_key; // so ports can refer to FE's desired items
45  };
46 
47  template< int N >
48  struct join_helper {
49 
50  template< typename TupleType, typename PortType >
51  static inline void set_join_node_pointer(TupleType &my_input, PortType *port) {
52  tbb::flow::get<N-1>( my_input ).set_join_node_pointer(port);
54  }
55  template< typename TupleType >
56  static inline void consume_reservations( TupleType &my_input ) {
57  tbb::flow::get<N-1>( my_input ).consume();
59  }
60 
61  template< typename TupleType >
62  static inline void release_my_reservation( TupleType &my_input ) {
63  tbb::flow::get<N-1>( my_input ).release();
64  }
65 
66  template <typename TupleType>
67  static inline void release_reservations( TupleType &my_input) {
69  release_my_reservation(my_input);
70  }
71 
72  template< typename InputTuple, typename OutputTuple >
73  static inline bool reserve( InputTuple &my_input, OutputTuple &out) {
74  if ( !tbb::flow::get<N-1>( my_input ).reserve( tbb::flow::get<N-1>( out ) ) ) return false;
75  if ( !join_helper<N-1>::reserve( my_input, out ) ) {
76  release_my_reservation( my_input );
77  return false;
78  }
79  return true;
80  }
81 
82  template<typename InputTuple, typename OutputTuple>
83  static inline bool get_my_item( InputTuple &my_input, OutputTuple &out) {
84  bool res = tbb::flow::get<N-1>(my_input).get_item(tbb::flow::get<N-1>(out) ); // may fail
85  return join_helper<N-1>::get_my_item(my_input, out) && res; // do get on other inputs before returning
86  }
87 
88  template<typename InputTuple, typename OutputTuple>
89  static inline bool get_items(InputTuple &my_input, OutputTuple &out) {
90  return get_my_item(my_input, out);
91  }
92 
93  template<typename InputTuple>
94  static inline void reset_my_port(InputTuple &my_input) {
96  tbb::flow::get<N-1>(my_input).reset_port();
97  }
98 
99  template<typename InputTuple>
100  static inline void reset_ports(InputTuple& my_input) {
101  reset_my_port(my_input);
102  }
103 
104  template<typename InputTuple, typename KeyFuncTuple>
105  static inline void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs) {
106  tbb::flow::get<N-1>(my_input).set_my_key_func(tbb::flow::get<N-1>(my_key_funcs));
107  tbb::flow::get<N-1>(my_key_funcs) = NULL;
108  join_helper<N-1>::set_key_functors(my_input, my_key_funcs);
109  }
110 
111  template< typename KeyFuncTuple>
112  static inline void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs) {
113  if(tbb::flow::get<N-1>(other_inputs).get_my_key_func()) {
114  tbb::flow::get<N-1>(my_inputs).set_my_key_func(tbb::flow::get<N-1>(other_inputs).get_my_key_func()->clone());
115  }
116  join_helper<N-1>::copy_key_functors(my_inputs, other_inputs);
117  }
118 
119  template<typename InputTuple>
120  static inline void reset_inputs(InputTuple &my_input, reset_flags f) {
121  join_helper<N-1>::reset_inputs(my_input, f);
122  tbb::flow::get<N-1>(my_input).reset_receiver(f);
123  }
124 
125 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
126  template<typename InputTuple>
127  static inline void extract_inputs(InputTuple &my_input) {
129  tbb::flow::get<N-1>(my_input).extract_receiver();
130  }
131 #endif
132  }; // join_helper<N>
133 
134  template< >
135  struct join_helper<1> {
136 
137  template< typename TupleType, typename PortType >
138  static inline void set_join_node_pointer(TupleType &my_input, PortType *port) {
139  tbb::flow::get<0>( my_input ).set_join_node_pointer(port);
140  }
141 
142  template< typename TupleType >
143  static inline void consume_reservations( TupleType &my_input ) {
144  tbb::flow::get<0>( my_input ).consume();
145  }
146 
147  template< typename TupleType >
148  static inline void release_my_reservation( TupleType &my_input ) {
149  tbb::flow::get<0>( my_input ).release();
150  }
151 
152  template<typename TupleType>
153  static inline void release_reservations( TupleType &my_input) {
154  release_my_reservation(my_input);
155  }
156 
157  template< typename InputTuple, typename OutputTuple >
158  static inline bool reserve( InputTuple &my_input, OutputTuple &out) {
159  return tbb::flow::get<0>( my_input ).reserve( tbb::flow::get<0>( out ) );
160  }
161 
162  template<typename InputTuple, typename OutputTuple>
163  static inline bool get_my_item( InputTuple &my_input, OutputTuple &out) {
164  return tbb::flow::get<0>(my_input).get_item(tbb::flow::get<0>(out));
165  }
166 
167  template<typename InputTuple, typename OutputTuple>
168  static inline bool get_items(InputTuple &my_input, OutputTuple &out) {
169  return get_my_item(my_input, out);
170  }
171 
172  template<typename InputTuple>
173  static inline void reset_my_port(InputTuple &my_input) {
174  tbb::flow::get<0>(my_input).reset_port();
175  }
176 
177  template<typename InputTuple>
178  static inline void reset_ports(InputTuple& my_input) {
179  reset_my_port(my_input);
180  }
181 
182  template<typename InputTuple, typename KeyFuncTuple>
183  static inline void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs) {
184  tbb::flow::get<0>(my_input).set_my_key_func(tbb::flow::get<0>(my_key_funcs));
185  tbb::flow::get<0>(my_key_funcs) = NULL;
186  }
187 
188  template< typename KeyFuncTuple>
189  static inline void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs) {
190  if(tbb::flow::get<0>(other_inputs).get_my_key_func()) {
191  tbb::flow::get<0>(my_inputs).set_my_key_func(tbb::flow::get<0>(other_inputs).get_my_key_func()->clone());
192  }
193  }
194  template<typename InputTuple>
195  static inline void reset_inputs(InputTuple &my_input, reset_flags f) {
196  tbb::flow::get<0>(my_input).reset_receiver(f);
197  }
198 
199 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
200  template<typename InputTuple>
201  static inline void extract_inputs(InputTuple &my_input) {
202  tbb::flow::get<0>(my_input).extract_receiver();
203  }
204 #endif
205  }; // join_helper<1>
206 
208  template< typename T >
209  class reserving_port : public receiver<T> {
210  public:
211  typedef T input_type;
213 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
214  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
215  typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
216 #endif
217  private:
218  // ----------- Aggregator ------------
220 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
221  , add_blt_pred, del_blt_pred, blt_pred_cnt, blt_pred_cpy
222 #endif
223  };
225 
226  class reserving_port_operation : public aggregated_operation<reserving_port_operation> {
227  public:
228  char type;
229  union {
230  T *my_arg;
232 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
233  size_t cnt_val;
234  predecessor_list_type *plist;
235 #endif
236  };
238  type(char(t)), my_arg(const_cast<T*>(&e)) {}
240  my_pred(const_cast<predecessor_type *>(&s)) {}
242  };
243 
247 
249  reserving_port_operation *current;
250  bool no_predecessors;
251  while(op_list) {
252  current = op_list;
253  op_list = op_list->next;
254  switch(current->type) {
255  case reg_pred:
256  no_predecessors = my_predecessors.empty();
257  my_predecessors.add(*(current->my_pred));
258  if ( no_predecessors ) {
259  (void) my_join->decrement_port_count(true); // may try to forward
260  }
262  break;
263  case rem_pred:
264  my_predecessors.remove(*(current->my_pred));
267  break;
268  case res_item:
269  if ( reserved ) {
271  }
272  else if ( my_predecessors.try_reserve( *(current->my_arg) ) ) {
273  reserved = true;
275  } else {
276  if ( my_predecessors.empty() ) {
278  }
280  }
281  break;
282  case rel_res:
283  reserved = false;
286  break;
287  case con_res:
288  reserved = false;
291  break;
292 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
293  case add_blt_pred:
294  my_predecessors.internal_add_built_predecessor(*(current->my_pred));
296  break;
297  case del_blt_pred:
298  my_predecessors.internal_delete_built_predecessor(*(current->my_pred));
300  break;
301  case blt_pred_cnt:
302  current->cnt_val = my_predecessors.predecessor_count();
304  break;
305  case blt_pred_cpy:
306  my_predecessors.copy_predecessors(*(current->plist));
308  break;
309 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
310  }
311  }
312  }
313 
314  protected:
315  template< typename R, typename B > friend class run_and_put_task;
316  template<typename X, typename Y> friend class internal::broadcast_cache;
317  template<typename X, typename Y> friend class internal::round_robin_cache;
319  return NULL;
320  }
321 
323  return my_join->graph_ref;
324  }
325 
326  public:
327 
330  my_join = NULL;
331  my_predecessors.set_owner( this );
332  my_aggregator.initialize_handler(handler_type(this));
333  }
334 
335  // copy constructor
336  reserving_port(const reserving_port& /* other */) : receiver<T>() {
337  reserved = false;
338  my_join = NULL;
339  my_predecessors.set_owner( this );
340  my_aggregator.initialize_handler(handler_type(this));
341  }
342 
344  my_join = join;
345  }
346 
349  reserving_port_operation op_data(src, reg_pred);
350  my_aggregator.execute(&op_data);
351  return op_data.status == SUCCEEDED;
352  }
353 
356  reserving_port_operation op_data(src, rem_pred);
357  my_aggregator.execute(&op_data);
358  return op_data.status == SUCCEEDED;
359  }
360 
362  bool reserve( T &v ) {
364  my_aggregator.execute(&op_data);
365  return op_data.status == SUCCEEDED;
366  }
367 
369  void release( ) {
371  my_aggregator.execute(&op_data);
372  }
373 
375  void consume( ) {
377  my_aggregator.execute(&op_data);
378  }
379 
380 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
381  built_predecessors_type &built_predecessors() __TBB_override { return my_predecessors.built_predecessors(); }
382  void internal_add_built_predecessor(predecessor_type &src) __TBB_override {
383  reserving_port_operation op_data(src, add_blt_pred);
384  my_aggregator.execute(&op_data);
385  }
386 
387  void internal_delete_built_predecessor(predecessor_type &src) __TBB_override {
388  reserving_port_operation op_data(src, del_blt_pred);
389  my_aggregator.execute(&op_data);
390  }
391 
392  size_t predecessor_count() __TBB_override {
393  reserving_port_operation op_data(blt_pred_cnt);
394  my_aggregator.execute(&op_data);
395  return op_data.cnt_val;
396  }
397 
398  void copy_predecessors(predecessor_list_type &l) __TBB_override {
399  reserving_port_operation op_data(blt_pred_cpy);
400  op_data.plist = &l;
401  my_aggregator.execute(&op_data);
402  }
403 
404  void extract_receiver() {
405  my_predecessors.built_predecessors().receiver_extract(*this);
406  }
407 
408 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
409 
412  else
414  reserved = false;
415  __TBB_ASSERT(!(f&rf_clear_edges) || my_predecessors.empty(), "port edges not removed");
416  }
417 
418  private:
419 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
420  friend class get_graph_helper;
421 #endif
422 
425  bool reserved;
426  }; // reserving_port
427 
429  template<typename T>
430  class queueing_port : public receiver<T>, public item_buffer<T> {
431  public:
432  typedef T input_type;
435 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
436  typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
437  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
438 #endif
439 
440  // ----------- Aggregator ------------
441  private:
443 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
444  , add_blt_pred, del_blt_pred, blt_pred_cnt, blt_pred_cpy
445 #endif
446  };
447 
448  class queueing_port_operation : public aggregated_operation<queueing_port_operation> {
449  public:
450  char type;
452  T *my_arg;
453 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
454  predecessor_type *pred;
455  size_t cnt_val;
456  predecessor_list_type *plist;
457 #endif
459  // constructor for value parameter
461  type(char(t)), my_val(e)
462  , bypass_t(NULL)
463  {}
464  // constructor for pointer parameter
466  type(char(t)), my_arg(const_cast<T*>(p))
467  , bypass_t(NULL)
468  {}
469  // constructor with no parameter
471  , bypass_t(NULL)
472  {}
473  };
474 
478 
480  queueing_port_operation *current;
481  bool was_empty;
482  while(op_list) {
483  current = op_list;
484  op_list = op_list->next;
485  switch(current->type) {
486  case try__put_task: {
487  task *rtask = NULL;
488  was_empty = this->buffer_empty();
489  this->push_back(current->my_val);
490  if (was_empty) rtask = my_join->decrement_port_count(false);
491  else
492  rtask = SUCCESSFULLY_ENQUEUED;
493  current->bypass_t = rtask;
495  }
496  break;
497  case get__item:
498  if(!this->buffer_empty()) {
499  *(current->my_arg) = this->front();
501  }
502  else {
504  }
505  break;
506  case res_port:
507  __TBB_ASSERT(this->my_item_valid(this->my_head), "No item to reset");
508  this->destroy_front();
509  if(this->my_item_valid(this->my_head)) {
511  }
513  break;
514 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
515  case add_blt_pred:
516  my_built_predecessors.add_edge(*(current->pred));
518  break;
519  case del_blt_pred:
520  my_built_predecessors.delete_edge(*(current->pred));
522  break;
523  case blt_pred_cnt:
524  current->cnt_val = my_built_predecessors.edge_count();
526  break;
527  case blt_pred_cpy:
528  my_built_predecessors.copy_edges(*(current->plist));
530  break;
531 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
532  }
533  }
534  }
535  // ------------ End Aggregator ---------------
536 
537  protected:
538  template< typename R, typename B > friend class run_and_put_task;
539  template<typename X, typename Y> friend class internal::broadcast_cache;
540  template<typename X, typename Y> friend class internal::round_robin_cache;
543  my_aggregator.execute(&op_data);
544  __TBB_ASSERT(op_data.status == SUCCEEDED || !op_data.bypass_t, "inconsistent return from aggregator");
545  if(!op_data.bypass_t) return SUCCESSFULLY_ENQUEUED;
546  return op_data.bypass_t;
547  }
548 
550  return my_join->graph_ref;
551  }
552 
553  public:
554 
557  my_join = NULL;
558  my_aggregator.initialize_handler(handler_type(this));
559  }
560 
562  queueing_port(const queueing_port& /* other */) : receiver<T>(), item_buffer<T>() {
563  my_join = NULL;
564  my_aggregator.initialize_handler(handler_type(this));
565  }
566 
569  my_join = join;
570  }
571 
572  bool get_item( T &v ) {
573  queueing_port_operation op_data(&v, get__item);
574  my_aggregator.execute(&op_data);
575  return op_data.status == SUCCEEDED;
576  }
577 
578  // reset_port is called when item is accepted by successor, but
579  // is initiated by join_node.
580  void reset_port() {
582  my_aggregator.execute(&op_data);
583  return;
584  }
585 
586 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
587  built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
588 
589  void internal_add_built_predecessor(predecessor_type &p) __TBB_override {
590  queueing_port_operation op_data(add_blt_pred);
591  op_data.pred = &p;
592  my_aggregator.execute(&op_data);
593  }
594 
595  void internal_delete_built_predecessor(predecessor_type &p) __TBB_override {
596  queueing_port_operation op_data(del_blt_pred);
597  op_data.pred = &p;
598  my_aggregator.execute(&op_data);
599  }
600 
601  size_t predecessor_count() __TBB_override {
602  queueing_port_operation op_data(blt_pred_cnt);
603  my_aggregator.execute(&op_data);
604  return op_data.cnt_val;
605  }
606 
607  void copy_predecessors(predecessor_list_type &l) __TBB_override {
608  queueing_port_operation op_data(blt_pred_cpy);
609  op_data.plist = &l;
610  my_aggregator.execute(&op_data);
611  }
612 
613  void extract_receiver() {
615  my_built_predecessors.receiver_extract(*this);
616  }
617 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
618 
622 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
623  if (f & rf_clear_edges)
624  my_built_predecessors.clear();
625 #endif
626  }
627 
628  private:
629 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
630  friend class get_graph_helper;
631 #endif
632 
634 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
635  edge_container<predecessor_type> my_built_predecessors;
636 #endif
637  }; // queueing_port
638 
640 
641  template<typename K>
642  struct count_element {
644  size_t my_value;
645  };
646 
647  // method to access the key in the counting table
648  // the ref has already been removed from K
649  template< typename K >
652  const K& operator()(const table_item_type& v) { return v.my_key; }
653  };
654 
655  // the ports can have only one template parameter. We wrap the types needed in
656  // a traits type
657  template< class TraitsType >
659  public receiver<typename TraitsType::T>,
660  public hash_buffer< typename TraitsType::K, typename TraitsType::T, typename TraitsType::TtoK,
661  typename TraitsType::KHash > {
662  public:
663  typedef TraitsType traits;
665  typedef typename TraitsType::T input_type;
666  typedef typename TraitsType::K key_type;
669  typedef typename TraitsType::TtoK type_to_key_func_type;
670  typedef typename TraitsType::KHash hash_compare_type;
672 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
673  typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
674  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
675 #endif
676  private:
677 // ----------- Aggregator ------------
678  private:
680 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
681  , add_blt_pred, del_blt_pred, blt_pred_cnt, blt_pred_cpy
682 #endif
683  };
684 
685  class key_matching_port_operation : public aggregated_operation<key_matching_port_operation> {
686  public:
687  char type;
690 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
691  predecessor_type *pred;
692  size_t cnt_val;
693  predecessor_list_type *plist;
694 #endif
695  // constructor for value parameter
697  type(char(t)), my_val(e) {}
698  // constructor for pointer parameter
700  type(char(t)), my_arg(const_cast<input_type*>(p)) {}
701  // constructor with no parameter
703  };
704 
708 
711  while(op_list) {
712  current = op_list;
713  op_list = op_list->next;
714  switch(current->type) {
715  case try__put: {
716  bool was_inserted = this->insert_with_key(current->my_val);
717  // return failure if a duplicate insertion occurs
718  __TBB_store_with_release(current->status, was_inserted ? SUCCEEDED : FAILED);
719  }
720  break;
721  case get__item:
722  // use current_key from FE for item
723  if(!this->find_with_key(my_join->current_key, *(current->my_arg))) {
724  __TBB_ASSERT(false, "Failed to find item corresponding to current_key.");
725  }
727  break;
728  case res_port:
729  // use current_key from FE for item
732  break;
733 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
734  case add_blt_pred:
735  my_built_predecessors.add_edge(*(current->pred));
737  break;
738  case del_blt_pred:
739  my_built_predecessors.delete_edge(*(current->pred));
741  break;
742  case blt_pred_cnt:
743  current->cnt_val = my_built_predecessors.edge_count();
745  break;
746  case blt_pred_cpy:
747  my_built_predecessors.copy_edges(*(current->plist));
749  break;
750 #endif
751  }
752  }
753  }
754 // ------------ End Aggregator ---------------
755  protected:
756  template< typename R, typename B > friend class run_and_put_task;
757  template<typename X, typename Y> friend class internal::broadcast_cache;
758  template<typename X, typename Y> friend class internal::round_robin_cache;
761  task *rtask = NULL;
762  my_aggregator.execute(&op_data);
763  if(op_data.status == SUCCEEDED) {
764  rtask = my_join->increment_key_count((*(this->get_key_func()))(v), false); // may spawn
765  // rtask has to reflect the return status of the try_put
766  if(!rtask) rtask = SUCCESSFULLY_ENQUEUED;
767  }
768  return rtask;
769  }
770 
772  return my_join->graph_ref;
773  }
774 
775  public:
776 
778  my_join = NULL;
779  my_aggregator.initialize_handler(handler_type(this));
780  }
781 
782  // copy constructor
783  key_matching_port(const key_matching_port& /*other*/) : receiver<input_type>(), buffer_type() {
784  my_join = NULL;
785  my_aggregator.initialize_handler(handler_type(this));
786  }
787 
789 
791  my_join = dynamic_cast<matching_forwarding_base<key_type>*>(join);
792  }
793 
795 
797 
798  bool get_item( input_type &v ) {
799  // aggregator uses current_key from FE for Key
801  my_aggregator.execute(&op_data);
802  return op_data.status == SUCCEEDED;
803  }
804 
805 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
806  built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
807 
808  void internal_add_built_predecessor(predecessor_type &p) __TBB_override {
809  key_matching_port_operation op_data(add_blt_pred);
810  op_data.pred = &p;
811  my_aggregator.execute(&op_data);
812  }
813 
814  void internal_delete_built_predecessor(predecessor_type &p) __TBB_override {
815  key_matching_port_operation op_data(del_blt_pred);
816  op_data.pred = &p;
817  my_aggregator.execute(&op_data);
818  }
819 
820  size_t predecessor_count() __TBB_override {
821  key_matching_port_operation op_data(blt_pred_cnt);
822  my_aggregator.execute(&op_data);
823  return op_data.cnt_val;
824  }
825 
826  void copy_predecessors(predecessor_list_type &l) __TBB_override {
827  key_matching_port_operation op_data(blt_pred_cpy);
828  op_data.plist = &l;
829  my_aggregator.execute(&op_data);
830  }
831 #endif
832 
833  // reset_port is called when item is accepted by successor, but
834  // is initiated by join_node.
835  void reset_port() {
837  my_aggregator.execute(&op_data);
838  return;
839  }
840 
841 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
842  void extract_receiver() {
844  my_built_predecessors.receiver_extract(*this);
845  }
846 #endif
850 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
851  if (f & rf_clear_edges)
852  my_built_predecessors.clear();
853 #endif
854  }
855 
856  private:
857  // my_join forwarding base used to count number of inputs that
858  // received key.
860 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
861  edge_container<predecessor_type> my_built_predecessors;
862 #endif
863  }; // key_matching_port
864 
865  using namespace graph_policy_namespace;
866 
867  template<typename JP, typename InputTuple, typename OutputTuple>
869 
871  template<typename JP, typename InputTuple, typename OutputTuple>
873 
874  template<typename InputTuple, typename OutputTuple>
875  class join_node_FE<reserving, InputTuple, OutputTuple> : public forwarding_base {
876  public:
878  typedef OutputTuple output_type;
879  typedef InputTuple input_type;
881 
882  join_node_FE(graph &g) : forwarding_base(g), my_node(NULL) {
883  ports_with_no_inputs = N;
884  join_helper<N>::set_join_node_pointer(my_inputs, this);
885  }
886 
887  join_node_FE(const join_node_FE& other) : forwarding_base((other.forwarding_base::graph_ref)), my_node(NULL) {
888  ports_with_no_inputs = N;
889  join_helper<N>::set_join_node_pointer(my_inputs, this);
890  }
891 
892  void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
893 
895  ++ports_with_no_inputs;
896  }
897 
898  // if all input_ports have predecessors, spawn forward to try and consume tuples
900  if(ports_with_no_inputs.fetch_and_decrement() == 1) {
901  if(internal::is_graph_active(this->graph_ref)) {
902  task *rtask = new ( task::allocate_additional_child_of( *(this->graph_ref.root_task()) ) )
904  if(!handle_task) return rtask;
905  internal::spawn_in_graph_arena(this->graph_ref, *rtask);
906  }
907  }
908  return NULL;
909  }
910 
911  input_type &input_ports() { return my_inputs; }
912 
913  protected:
914 
915  void reset( reset_flags f) {
916  // called outside of parallel contexts
917  ports_with_no_inputs = N;
918  join_helper<N>::reset_inputs(my_inputs, f);
919  }
920 
921 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
922  void extract( ) {
923  // called outside of parallel contexts
924  ports_with_no_inputs = N;
926  }
927 #endif
928 
929  // all methods on input ports should be called under mutual exclusion from join_node_base.
930 
932  return !ports_with_no_inputs;
933  }
934 
936  if(ports_with_no_inputs) return false;
937  return join_helper<N>::reserve(my_inputs, out);
938  }
939 
940  void tuple_accepted() {
942  }
943  void tuple_rejected() {
945  }
946 
949  atomic<size_t> ports_with_no_inputs;
950  }; // join_node_FE<reserving, ... >
951 
952  template<typename InputTuple, typename OutputTuple>
953  class join_node_FE<queueing, InputTuple, OutputTuple> : public forwarding_base {
954  public:
956  typedef OutputTuple output_type;
957  typedef InputTuple input_type;
959 
960  join_node_FE(graph &g) : forwarding_base(g), my_node(NULL) {
961  ports_with_no_items = N;
962  join_helper<N>::set_join_node_pointer(my_inputs, this);
963  }
964 
965  join_node_FE(const join_node_FE& other) : forwarding_base((other.forwarding_base::graph_ref)), my_node(NULL) {
966  ports_with_no_items = N;
967  join_helper<N>::set_join_node_pointer(my_inputs, this);
968  }
969 
970  // needed for forwarding
971  void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
972 
974  ports_with_no_items = N;
975  }
976 
977  // if all input_ports have items, spawn forward to try and consume tuples
979  {
980  if(ports_with_no_items.fetch_and_decrement() == 1) {
981  if(internal::is_graph_active(this->graph_ref)) {
982  task *rtask = new ( task::allocate_additional_child_of( *(this->graph_ref.root_task()) ) )
984  if(!handle_task) return rtask;
985  internal::spawn_in_graph_arena(this->graph_ref, *rtask);
986  }
987  }
988  return NULL;
989  }
990 
991  void increment_port_count() __TBB_override { __TBB_ASSERT(false, NULL); } // should never be called
992 
993  input_type &input_ports() { return my_inputs; }
994 
995  protected:
996 
997  void reset( reset_flags f) {
998  reset_port_count();
999  join_helper<N>::reset_inputs(my_inputs, f );
1000  }
1001 
1002 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1003  void extract() {
1004  reset_port_count();
1005  join_helper<N>::extract_inputs(my_inputs);
1006  }
1007 #endif
1008  // all methods on input ports should be called under mutual exclusion from join_node_base.
1009 
1011  return !ports_with_no_items;
1012  }
1013 
1015  if(ports_with_no_items) return false;
1016  return join_helper<N>::get_items(my_inputs, out);
1017  }
1018 
1020  reset_port_count();
1021  join_helper<N>::reset_ports(my_inputs);
1022  }
1024  // nothing to do.
1025  }
1026 
1029  atomic<size_t> ports_with_no_items;
1030  }; // join_node_FE<queueing, ...>
1031 
1032  // key_matching join front-end.
1033  template<typename InputTuple, typename OutputTuple, typename K, typename KHash>
1034  class join_node_FE<key_matching<K,KHash>, InputTuple, OutputTuple> : public matching_forwarding_base<K>,
1035  // buffer of key value counts
1036  public hash_buffer< // typedefed below to key_to_count_buffer_type
1037  typename tbb::internal::strip<K>::type&, // force ref type on K
1038  count_element<typename tbb::internal::strip<K>::type>,
1039  internal::type_to_key_function_body<
1040  count_element<typename tbb::internal::strip<K>::type>,
1041  typename tbb::internal::strip<K>::type& >,
1042  KHash >,
1043  // buffer of output items
1044  public item_buffer<OutputTuple> {
1045  public:
1047  typedef OutputTuple output_type;
1048  typedef InputTuple input_type;
1049  typedef K key_type;
1051  typedef KHash key_hash_compare;
1052  // must use K without ref.
1054  // method that lets us refer to the key of this type.
1058  // this is the type of the special table that keeps track of the number of discrete
1059  // elements corresponding to each key that we've seen.
1063  typedef join_node_base<key_matching<key_type,key_hash_compare>, InputTuple, OutputTuple> base_node_type; // for forwarding
1065 
1066 // ----------- Aggregator ------------
1067  // the aggregator is only needed to serialize the access to the hash table.
1068  // and the output_buffer_type base class
1069  private:
1070  enum op_type { res_count, inc_count, may_succeed, try_make };
1072 
1073  class key_matching_FE_operation : public aggregated_operation<key_matching_FE_operation> {
1074  public:
1075  char type;
1080  // constructor for value parameter
1081  key_matching_FE_operation(const unref_key_type& e , bool q_task , op_type t) : type(char(t)), my_val(e),
1082  my_output(NULL), bypass_t(NULL), enqueue_task(q_task) {}
1083  key_matching_FE_operation(output_type *p, op_type t) : type(char(t)), my_output(p), bypass_t(NULL),
1084  enqueue_task(true) {}
1085  // constructor with no parameter
1086  key_matching_FE_operation(op_type t) : type(char(t)), my_output(NULL), bypass_t(NULL), enqueue_task(true) {}
1087  };
1088 
1090  friend class internal::aggregating_functor<class_type, key_matching_FE_operation>;
1092 
1093  // called from aggregator, so serialized
1094  // returns a task pointer if the a task would have been enqueued but we asked that
1095  // it be returned. Otherwise returns NULL.
1096  task * fill_output_buffer(unref_key_type &t, bool should_enqueue, bool handle_task) {
1097  output_type l_out;
1098  task *rtask = NULL;
1099  bool do_fwd = should_enqueue && this->buffer_empty() && internal::is_graph_active(this->graph_ref);
1100  this->current_key = t;
1101  this->delete_with_key(this->current_key); // remove the key
1102  if(join_helper<N>::get_items(my_inputs, l_out)) { // <== call back
1103  this->push_back(l_out);
1104  if(do_fwd) { // we enqueue if receiving an item from predecessor, not if successor asks for item
1105  rtask = new ( task::allocate_additional_child_of( *(this->graph_ref.root_task()) ) )
1107  if(handle_task) {
1108  internal::spawn_in_graph_arena(this->graph_ref, *rtask);
1109  rtask = NULL;
1110  }
1111  do_fwd = false;
1112  }
1113  // retire the input values
1114  join_helper<N>::reset_ports(my_inputs); // <== call back
1115  }
1116  else {
1117  __TBB_ASSERT(false, "should have had something to push");
1118  }
1119  return rtask;
1120  }
1121 
1122  void handle_operations(key_matching_FE_operation* op_list) {
1123  key_matching_FE_operation *current;
1124  while(op_list) {
1125  current = op_list;
1126  op_list = op_list->next;
1127  switch(current->type) {
1128  case res_count: // called from BE
1129  {
1130  this->destroy_front();
1131  __TBB_store_with_release(current->status, SUCCEEDED);
1132  }
1133  break;
1134  case inc_count: { // called from input ports
1135  count_element_type *p = 0;
1136  unref_key_type &t = current->my_val;
1137  bool do_enqueue = current->enqueue_task;
1138  if(!(this->find_ref_with_key(t,p))) {
1139  count_element_type ev;
1140  ev.my_key = t;
1141  ev.my_value = 0;
1142  this->insert_with_key(ev);
1143  if(!(this->find_ref_with_key(t,p))) {
1144  __TBB_ASSERT(false, "should find key after inserting it");
1145  }
1146  }
1147  if(++(p->my_value) == size_t(N)) {
1148  task *rtask = fill_output_buffer(t, true, do_enqueue);
1149  __TBB_ASSERT(!rtask || !do_enqueue, "task should not be returned");
1150  current->bypass_t = rtask;
1151  }
1152  }
1153  __TBB_store_with_release(current->status, SUCCEEDED);
1154  break;
1155  case may_succeed: // called from BE
1156  __TBB_store_with_release(current->status, this->buffer_empty() ? FAILED : SUCCEEDED);
1157  break;
1158  case try_make: // called from BE
1159  if(this->buffer_empty()) {
1160  __TBB_store_with_release(current->status, FAILED);
1161  }
1162  else {
1163  *(current->my_output) = this->front();
1164  __TBB_store_with_release(current->status, SUCCEEDED);
1165  }
1166  break;
1167  }
1168  }
1169  }
1170 // ------------ End Aggregator ---------------
1171 
1172  public:
1173  template<typename FunctionTuple>
1174  join_node_FE(graph &g, FunctionTuple &TtoK_funcs) : forwarding_base_type(g), my_node(NULL) {
1175  join_helper<N>::set_join_node_pointer(my_inputs, this);
1176  join_helper<N>::set_key_functors(my_inputs, TtoK_funcs);
1177  my_aggregator.initialize_handler(handler_type(this));
1179  this->set_key_func(cfb);
1180  }
1181 
1183  output_buffer_type() {
1184  my_node = NULL;
1185  join_helper<N>::set_join_node_pointer(my_inputs, this);
1186  join_helper<N>::copy_key_functors(my_inputs, const_cast<input_type &>(other.my_inputs));
1187  my_aggregator.initialize_handler(handler_type(this));
1189  this->set_key_func(cfb);
1190  }
1191 
1192  // needed for forwarding
1193  void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
1194 
1195  void reset_port_count() { // called from BE
1196  key_matching_FE_operation op_data(res_count);
1197  my_aggregator.execute(&op_data);
1198  return;
1199  }
1200 
1201  // if all input_ports have items, spawn forward to try and consume tuples
1202  // return a task if we are asked and did create one.
1203  task *increment_key_count(unref_key_type const & t, bool handle_task) __TBB_override { // called from input_ports
1204  key_matching_FE_operation op_data(t, handle_task, inc_count);
1205  my_aggregator.execute(&op_data);
1206  return op_data.bypass_t;
1207  }
1208 
1209  task *decrement_port_count(bool /*handle_task*/) __TBB_override { __TBB_ASSERT(false, NULL); return NULL; }
1210 
1211  void increment_port_count() __TBB_override { __TBB_ASSERT(false, NULL); } // should never be called
1212 
1213  input_type &input_ports() { return my_inputs; }
1214 
1215  protected:
1216 
1217  void reset( reset_flags f ) {
1218  // called outside of parallel contexts
1219  join_helper<N>::reset_inputs(my_inputs, f);
1220 
1221  key_to_count_buffer_type::reset();
1222  output_buffer_type::reset();
1223  }
1224 
1225 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1226  void extract() {
1227  // called outside of parallel contexts
1228  join_helper<N>::extract_inputs(my_inputs);
1229  key_to_count_buffer_type::reset(); // have to reset the tag counts
1230  output_buffer_type::reset(); // also the queue of outputs
1231  // my_node->current_tag = NO_TAG;
1232  }
1233 #endif
1234  // all methods on input ports should be called under mutual exclusion from join_node_base.
1235 
1236  bool tuple_build_may_succeed() { // called from back-end
1237  key_matching_FE_operation op_data(may_succeed);
1238  my_aggregator.execute(&op_data);
1239  return op_data.status == SUCCEEDED;
1240  }
1241 
1242  // cannot lock while calling back to input_ports. current_key will only be set
1243  // and reset under the aggregator, so it will remain consistent.
1245  key_matching_FE_operation op_data(&out,try_make);
1246  my_aggregator.execute(&op_data);
1247  return op_data.status == SUCCEEDED;
1248  }
1249 
1251  reset_port_count(); // reset current_key after ports reset.
1252  }
1253 
1255  // nothing to do.
1256  }
1257 
1258  input_type my_inputs; // input ports
1260  }; // join_node_FE<key_matching<K,KHash>, InputTuple, OutputTuple>
1261 
1263  template<typename JP, typename InputTuple, typename OutputTuple>
1264  class join_node_base : public graph_node, public join_node_FE<JP, InputTuple, OutputTuple>,
1265  public sender<OutputTuple> {
1266  protected:
1267  using graph_node::my_graph;
1268  public:
1269  typedef OutputTuple output_type;
1270 
1271  typedef typename sender<output_type>::successor_type successor_type;
1273  using input_ports_type::tuple_build_may_succeed;
1274  using input_ports_type::try_to_make_tuple;
1275  using input_ports_type::tuple_accepted;
1276  using input_ports_type::tuple_rejected;
1277 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1278  typedef typename sender<output_type>::built_successors_type built_successors_type;
1279  typedef typename sender<output_type>::successor_list_type successor_list_type;
1280 #endif
1281 
1282  private:
1283  // ----------- Aggregator ------------
1284  enum op_type { reg_succ, rem_succ, try__get, do_fwrd, do_fwrd_bypass
1285 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1286  , add_blt_succ, del_blt_succ, blt_succ_cnt, blt_succ_cpy
1287 #endif
1288  };
1290 
1291  class join_node_base_operation : public aggregated_operation<join_node_base_operation> {
1292  public:
1293  char type;
1294  union {
1297 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1298  size_t cnt_val;
1299  successor_list_type *slist;
1300 #endif
1301  };
1304  my_arg(const_cast<output_type*>(&e)), bypass_t(NULL) {}
1306  my_succ(const_cast<successor_type *>(&s)), bypass_t(NULL) {}
1307  join_node_base_operation(op_type t) : type(char(t)), bypass_t(NULL) {}
1308  };
1309 
1314 
1316  join_node_base_operation *current;
1317  while(op_list) {
1318  current = op_list;
1319  op_list = op_list->next;
1320  switch(current->type) {
1321  case reg_succ: {
1322  my_successors.register_successor(*(current->my_succ));
1323  if(tuple_build_may_succeed() && !forwarder_busy && internal::is_graph_active(my_graph)) {
1324  task *rtask = new ( task::allocate_additional_child_of(*(my_graph.root_task())) )
1327  internal::spawn_in_graph_arena(my_graph, *rtask);
1328  forwarder_busy = true;
1329  }
1331  }
1332  break;
1333  case rem_succ:
1334  my_successors.remove_successor(*(current->my_succ));
1336  break;
1337  case try__get:
1338  if(tuple_build_may_succeed()) {
1339  if(try_to_make_tuple(*(current->my_arg))) {
1340  tuple_accepted();
1342  }
1343  else __TBB_store_with_release(current->status, FAILED);
1344  }
1345  else __TBB_store_with_release(current->status, FAILED);
1346  break;
1347  case do_fwrd_bypass: {
1348  bool build_succeeded;
1349  task *last_task = NULL;
1350  output_type out;
1351  if(tuple_build_may_succeed()) { // checks output queue of FE
1352  do {
1353  build_succeeded = try_to_make_tuple(out); // fetch front_end of queue
1354  if(build_succeeded) {
1355  task *new_task = my_successors.try_put_task(out);
1356  last_task = combine_tasks(my_graph, last_task, new_task);
1357  if(new_task) {
1358  tuple_accepted();
1359  }
1360  else {
1361  tuple_rejected();
1362  build_succeeded = false;
1363  }
1364  }
1365  } while(build_succeeded);
1366  }
1367  current->bypass_t = last_task;
1369  forwarder_busy = false;
1370  }
1371  break;
1372 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1373  case add_blt_succ:
1374  my_successors.internal_add_built_successor(*(current->my_succ));
1376  break;
1377  case del_blt_succ:
1378  my_successors.internal_delete_built_successor(*(current->my_succ));
1380  break;
1381  case blt_succ_cnt:
1382  current->cnt_val = my_successors.successor_count();
1384  break;
1385  case blt_succ_cpy:
1386  my_successors.copy_successors(*(current->slist));
1388  break;
1389 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1390  }
1391  }
1392  }
1393  // ---------- end aggregator -----------
1394  public:
1395  join_node_base(graph &g) : graph_node(g), input_ports_type(g), forwarder_busy(false) {
1396  my_successors.set_owner(this);
1397  input_ports_type::set_my_node(this);
1398  my_aggregator.initialize_handler(handler_type(this));
1399  }
1400 
1402  graph_node(other.graph_node::my_graph), input_ports_type(other),
1403  sender<OutputTuple>(), forwarder_busy(false), my_successors() {
1404  my_successors.set_owner(this);
1405  input_ports_type::set_my_node(this);
1406  my_aggregator.initialize_handler(handler_type(this));
1407  }
1408 
1409  template<typename FunctionTuple>
1410  join_node_base(graph &g, FunctionTuple f) : graph_node(g), input_ports_type(g, f), forwarder_busy(false) {
1411  my_successors.set_owner(this);
1412  input_ports_type::set_my_node(this);
1413  my_aggregator.initialize_handler(handler_type(this));
1414  }
1415 
1417  join_node_base_operation op_data(r, reg_succ);
1418  my_aggregator.execute(&op_data);
1419  return op_data.status == SUCCEEDED;
1420  }
1421 
1423  join_node_base_operation op_data(r, rem_succ);
1424  my_aggregator.execute(&op_data);
1425  return op_data.status == SUCCEEDED;
1426  }
1427 
1429  join_node_base_operation op_data(v, try__get);
1430  my_aggregator.execute(&op_data);
1431  return op_data.status == SUCCEEDED;
1432  }
1433 
1434 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1435  built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
1436 
1437  void internal_add_built_successor( successor_type &r) __TBB_override {
1438  join_node_base_operation op_data(r, add_blt_succ);
1439  my_aggregator.execute(&op_data);
1440  }
1441 
1442  void internal_delete_built_successor( successor_type &r) __TBB_override {
1443  join_node_base_operation op_data(r, del_blt_succ);
1444  my_aggregator.execute(&op_data);
1445  }
1446 
1447  size_t successor_count() __TBB_override {
1448  join_node_base_operation op_data(blt_succ_cnt);
1449  my_aggregator.execute(&op_data);
1450  return op_data.cnt_val;
1451  }
1452 
1453  void copy_successors(successor_list_type &l) __TBB_override {
1454  join_node_base_operation op_data(blt_succ_cpy);
1455  op_data.slist = &l;
1456  my_aggregator.execute(&op_data);
1457  }
1458 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1459 
1460 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1461  void extract() __TBB_override {
1462  input_ports_type::extract();
1463  my_successors.built_successors().sender_extract(*this);
1464  }
1465 #endif
1466 
1467  protected:
1468 
1470  input_ports_type::reset(f);
1471  if(f & rf_clear_edges) my_successors.clear();
1472  }
1473 
1474  private:
1476 
1477  friend class forward_task_bypass< join_node_base<JP, InputTuple, OutputTuple> >;
1479  join_node_base_operation op_data(do_fwrd_bypass);
1480  my_aggregator.execute(&op_data);
1481  return op_data.bypass_t;
1482  }
1483 
1484  }; // join_node_base
1485 
1486  // join base class type generator
1487  template<int N, template<class> class PT, typename OutputTuple, typename JP>
1488  struct join_base {
1490  };
1491 
1492  template<int N, typename OutputTuple, typename K, typename KHash>
1493  struct join_base<N, key_matching_port, OutputTuple, key_matching<K,KHash> > {
1495  typedef K key_type;
1496  typedef KHash key_hash_compare;
1497  typedef typename internal::join_node_base< key_traits_type,
1498  // ports type
1500  OutputTuple > type;
1501  };
1502 
1504  // using tuple_element. The class PT is the port type (reserving_port, queueing_port, key_matching_port)
1505  // and should match the typename.
1506 
1507  template<int N, template<class> class PT, typename OutputTuple, typename JP>
1508  class unfolded_join_node : public join_base<N,PT,OutputTuple,JP>::type {
1509  public:
1511  typedef OutputTuple output_type;
1512  private:
1514  public:
1515  unfolded_join_node(graph &g) : base_type(g) {}
1517  };
1518 
1519 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1520  template <typename K, typename T>
1521  struct key_from_message_body {
1522  K operator()(const T& t) const {
1524  return key_from_message<K>(t);
1525  }
1526  };
1527  // Adds const to reference type
1528  template <typename K, typename T>
1529  struct key_from_message_body<K&,T> {
1530  const K& operator()(const T& t) const {
1532  return key_from_message<const K&>(t);
1533  }
1534  };
1535 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1536  // key_matching unfolded_join_node. This must be a separate specialization because the constructors
1537  // differ.
1538 
1539  template<typename OutputTuple, typename K, typename KHash>
1540  class unfolded_join_node<2,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1541  join_base<2,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1544  public:
1546  typedef OutputTuple output_type;
1547  private:
1551  typedef typename tbb::flow::tuple< f0_p, f1_p > func_initializer_type;
1552  public:
1553 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1554  unfolded_join_node(graph &g) : base_type(g,
1556  new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1557  new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>())
1558  ) ) {
1559  }
1560 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1561  template<typename Body0, typename Body1>
1562  unfolded_join_node(graph &g, Body0 body0, Body1 body1) : base_type(g,
1564  new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1565  new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1)
1566  ) ) {
1567  __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 2, "wrong number of body initializers");
1568  }
1570  };
1571 
1572  template<typename OutputTuple, typename K, typename KHash>
1573  class unfolded_join_node<3,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1574  join_base<3,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1578  public:
1580  typedef OutputTuple output_type;
1581  private:
1586  typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p > func_initializer_type;
1587  public:
1588 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1589  unfolded_join_node(graph &g) : base_type(g,
1591  new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1592  new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1593  new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>())
1594  ) ) {
1595  }
1596 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1597  template<typename Body0, typename Body1, typename Body2>
1598  unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2) : base_type(g,
1600  new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1601  new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1602  new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2)
1603  ) ) {
1604  __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 3, "wrong number of body initializers");
1605  }
1607  };
1608 
1609  template<typename OutputTuple, typename K, typename KHash>
1610  class unfolded_join_node<4,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1611  join_base<4,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1616  public:
1618  typedef OutputTuple output_type;
1619  private:
1625  typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p > func_initializer_type;
1626  public:
1627 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1628  unfolded_join_node(graph &g) : base_type(g,
1630  new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1631  new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1632  new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1633  new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>())
1634  ) ) {
1635  }
1636 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1637  template<typename Body0, typename Body1, typename Body2, typename Body3>
1638  unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3) : base_type(g,
1640  new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1641  new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1642  new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2),
1643  new internal::type_to_key_function_body_leaf<T3, K, Body3>(body3)
1644  ) ) {
1645  __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 4, "wrong number of body initializers");
1646  }
1648  };
1649 
1650  template<typename OutputTuple, typename K, typename KHash>
1651  class unfolded_join_node<5,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1652  join_base<5,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1658  public:
1660  typedef OutputTuple output_type;
1661  private:
1668  typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p > func_initializer_type;
1669  public:
1670 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1671  unfolded_join_node(graph &g) : base_type(g,
1673  new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1674  new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1675  new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1676  new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1677  new internal::type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>())
1678  ) ) {
1679  }
1680 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1681  template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4>
1682  unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4) : base_type(g,
1684  new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1685  new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1686  new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2),
1687  new internal::type_to_key_function_body_leaf<T3, K, Body3>(body3),
1688  new internal::type_to_key_function_body_leaf<T4, K, Body4>(body4)
1689  ) ) {
1690  __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 5, "wrong number of body initializers");
1691  }
1693  };
1694 
1695 #if __TBB_VARIADIC_MAX >= 6
1696  template<typename OutputTuple, typename K, typename KHash>
1697  class unfolded_join_node<6,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1698  join_base<6,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1705  public:
1706  typedef typename wrap_key_tuple_elements<6,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1707  typedef OutputTuple output_type;
1708  private:
1709  typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1710  typedef typename internal::type_to_key_function_body<T0, K> *f0_p;
1711  typedef typename internal::type_to_key_function_body<T1, K> *f1_p;
1712  typedef typename internal::type_to_key_function_body<T2, K> *f2_p;
1713  typedef typename internal::type_to_key_function_body<T3, K> *f3_p;
1714  typedef typename internal::type_to_key_function_body<T4, K> *f4_p;
1715  typedef typename internal::type_to_key_function_body<T5, K> *f5_p;
1716  typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p > func_initializer_type;
1717  public:
1718 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1719  unfolded_join_node(graph &g) : base_type(g,
1720  func_initializer_type(
1721  new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1722  new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1723  new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1724  new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1725  new internal::type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1726  new internal::type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>())
1727  ) ) {
1728  }
1729 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1730  template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4, typename Body5>
1731  unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4, Body5 body5)
1732  : base_type(g, func_initializer_type(
1733  new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1734  new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1735  new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2),
1736  new internal::type_to_key_function_body_leaf<T3, K, Body3>(body3),
1737  new internal::type_to_key_function_body_leaf<T4, K, Body4>(body4),
1738  new internal::type_to_key_function_body_leaf<T5, K, Body5>(body5)
1739  ) ) {
1740  __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 6, "wrong number of body initializers");
1741  }
1742  unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1743  };
1744 #endif
1745 
1746 #if __TBB_VARIADIC_MAX >= 7
1747  template<typename OutputTuple, typename K, typename KHash>
1748  class unfolded_join_node<7,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1749  join_base<7,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1757  public:
1758  typedef typename wrap_key_tuple_elements<7,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1759  typedef OutputTuple output_type;
1760  private:
1761  typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1762  typedef typename internal::type_to_key_function_body<T0, K> *f0_p;
1763  typedef typename internal::type_to_key_function_body<T1, K> *f1_p;
1764  typedef typename internal::type_to_key_function_body<T2, K> *f2_p;
1765  typedef typename internal::type_to_key_function_body<T3, K> *f3_p;
1766  typedef typename internal::type_to_key_function_body<T4, K> *f4_p;
1767  typedef typename internal::type_to_key_function_body<T5, K> *f5_p;
1768  typedef typename internal::type_to_key_function_body<T6, K> *f6_p;
1769  typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p > func_initializer_type;
1770  public:
1771 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1772  unfolded_join_node(graph &g) : base_type(g,
1773  func_initializer_type(
1774  new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1775  new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1776  new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1777  new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1778  new internal::type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1779  new internal::type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1780  new internal::type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>())
1781  ) ) {
1782  }
1783 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1784  template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1785  typename Body5, typename Body6>
1786  unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1787  Body5 body5, Body6 body6) : base_type(g, func_initializer_type(
1788  new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1789  new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1790  new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2),
1791  new internal::type_to_key_function_body_leaf<T3, K, Body3>(body3),
1792  new internal::type_to_key_function_body_leaf<T4, K, Body4>(body4),
1793  new internal::type_to_key_function_body_leaf<T5, K, Body5>(body5),
1794  new internal::type_to_key_function_body_leaf<T6, K, Body6>(body6)
1795  ) ) {
1796  __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 7, "wrong number of body initializers");
1797  }
1798  unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1799  };
1800 #endif
1801 
1802 #if __TBB_VARIADIC_MAX >= 8
1803  template<typename OutputTuple, typename K, typename KHash>
1804  class unfolded_join_node<8,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1805  join_base<8,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1814  public:
1815  typedef typename wrap_key_tuple_elements<8,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1816  typedef OutputTuple output_type;
1817  private:
1818  typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1819  typedef typename internal::type_to_key_function_body<T0, K> *f0_p;
1820  typedef typename internal::type_to_key_function_body<T1, K> *f1_p;
1821  typedef typename internal::type_to_key_function_body<T2, K> *f2_p;
1822  typedef typename internal::type_to_key_function_body<T3, K> *f3_p;
1823  typedef typename internal::type_to_key_function_body<T4, K> *f4_p;
1824  typedef typename internal::type_to_key_function_body<T5, K> *f5_p;
1825  typedef typename internal::type_to_key_function_body<T6, K> *f6_p;
1826  typedef typename internal::type_to_key_function_body<T7, K> *f7_p;
1827  typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p > func_initializer_type;
1828  public:
1829 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1830  unfolded_join_node(graph &g) : base_type(g,
1831  func_initializer_type(
1832  new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1833  new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1834  new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1835  new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1836  new internal::type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1837  new internal::type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1838  new internal::type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()),
1839  new internal::type_to_key_function_body_leaf<T7, K, key_from_message_body<K,T7> >(key_from_message_body<K,T7>())
1840  ) ) {
1841  }
1842 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1843  template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1844  typename Body5, typename Body6, typename Body7>
1845  unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1846  Body5 body5, Body6 body6, Body7 body7) : base_type(g, func_initializer_type(
1847  new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1848  new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1849  new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2),
1850  new internal::type_to_key_function_body_leaf<T3, K, Body3>(body3),
1851  new internal::type_to_key_function_body_leaf<T4, K, Body4>(body4),
1852  new internal::type_to_key_function_body_leaf<T5, K, Body5>(body5),
1853  new internal::type_to_key_function_body_leaf<T6, K, Body6>(body6),
1854  new internal::type_to_key_function_body_leaf<T7, K, Body7>(body7)
1855  ) ) {
1856  __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 8, "wrong number of body initializers");
1857  }
1858  unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1859  };
1860 #endif
1861 
1862 #if __TBB_VARIADIC_MAX >= 9
1863  template<typename OutputTuple, typename K, typename KHash>
1864  class unfolded_join_node<9,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1865  join_base<9,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1875  public:
1876  typedef typename wrap_key_tuple_elements<9,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1877  typedef OutputTuple output_type;
1878  private:
1879  typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1880  typedef typename internal::type_to_key_function_body<T0, K> *f0_p;
1881  typedef typename internal::type_to_key_function_body<T1, K> *f1_p;
1882  typedef typename internal::type_to_key_function_body<T2, K> *f2_p;
1883  typedef typename internal::type_to_key_function_body<T3, K> *f3_p;
1884  typedef typename internal::type_to_key_function_body<T4, K> *f4_p;
1885  typedef typename internal::type_to_key_function_body<T5, K> *f5_p;
1886  typedef typename internal::type_to_key_function_body<T6, K> *f6_p;
1887  typedef typename internal::type_to_key_function_body<T7, K> *f7_p;
1888  typedef typename internal::type_to_key_function_body<T8, K> *f8_p;
1889  typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p, f8_p > func_initializer_type;
1890  public:
1891 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1892  unfolded_join_node(graph &g) : base_type(g,
1893  func_initializer_type(
1894  new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1895  new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1896  new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1897  new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1898  new internal::type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1899  new internal::type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1900  new internal::type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()),
1901  new internal::type_to_key_function_body_leaf<T7, K, key_from_message_body<K,T7> >(key_from_message_body<K,T7>()),
1902  new internal::type_to_key_function_body_leaf<T8, K, key_from_message_body<K,T8> >(key_from_message_body<K,T8>())
1903  ) ) {
1904  }
1905 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1906  template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1907  typename Body5, typename Body6, typename Body7, typename Body8>
1908  unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1909  Body5 body5, Body6 body6, Body7 body7, Body8 body8) : base_type(g, func_initializer_type(
1910  new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1911  new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1912  new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2),
1913  new internal::type_to_key_function_body_leaf<T3, K, Body3>(body3),
1914  new internal::type_to_key_function_body_leaf<T4, K, Body4>(body4),
1915  new internal::type_to_key_function_body_leaf<T5, K, Body5>(body5),
1916  new internal::type_to_key_function_body_leaf<T6, K, Body6>(body6),
1917  new internal::type_to_key_function_body_leaf<T7, K, Body7>(body7),
1918  new internal::type_to_key_function_body_leaf<T8, K, Body8>(body8)
1919  ) ) {
1920  __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 9, "wrong number of body initializers");
1921  }
1922  unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1923  };
1924 #endif
1925 
1926 #if __TBB_VARIADIC_MAX >= 10
1927  template<typename OutputTuple, typename K, typename KHash>
1928  class unfolded_join_node<10,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1929  join_base<10,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1940  public:
1941  typedef typename wrap_key_tuple_elements<10,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1942  typedef OutputTuple output_type;
1943  private:
1944  typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1945  typedef typename internal::type_to_key_function_body<T0, K> *f0_p;
1946  typedef typename internal::type_to_key_function_body<T1, K> *f1_p;
1947  typedef typename internal::type_to_key_function_body<T2, K> *f2_p;
1948  typedef typename internal::type_to_key_function_body<T3, K> *f3_p;
1949  typedef typename internal::type_to_key_function_body<T4, K> *f4_p;
1950  typedef typename internal::type_to_key_function_body<T5, K> *f5_p;
1951  typedef typename internal::type_to_key_function_body<T6, K> *f6_p;
1952  typedef typename internal::type_to_key_function_body<T7, K> *f7_p;
1953  typedef typename internal::type_to_key_function_body<T8, K> *f8_p;
1954  typedef typename internal::type_to_key_function_body<T9, K> *f9_p;
1955  typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p, f8_p, f9_p > func_initializer_type;
1956  public:
1957 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1958  unfolded_join_node(graph &g) : base_type(g,
1959  func_initializer_type(
1960  new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1961  new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1962  new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1963  new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1964  new internal::type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1965  new internal::type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1966  new internal::type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()),
1967  new internal::type_to_key_function_body_leaf<T7, K, key_from_message_body<K,T7> >(key_from_message_body<K,T7>()),
1968  new internal::type_to_key_function_body_leaf<T8, K, key_from_message_body<K,T8> >(key_from_message_body<K,T8>()),
1969  new internal::type_to_key_function_body_leaf<T9, K, key_from_message_body<K,T9> >(key_from_message_body<K,T9>())
1970  ) ) {
1971  }
1972 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1973  template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1974  typename Body5, typename Body6, typename Body7, typename Body8, typename Body9>
1975  unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1976  Body5 body5, Body6 body6, Body7 body7, Body8 body8, Body9 body9) : base_type(g, func_initializer_type(
1977  new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1978  new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1979  new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2),
1980  new internal::type_to_key_function_body_leaf<T3, K, Body3>(body3),
1981  new internal::type_to_key_function_body_leaf<T4, K, Body4>(body4),
1982  new internal::type_to_key_function_body_leaf<T5, K, Body5>(body5),
1983  new internal::type_to_key_function_body_leaf<T6, K, Body6>(body6),
1984  new internal::type_to_key_function_body_leaf<T7, K, Body7>(body7),
1985  new internal::type_to_key_function_body_leaf<T8, K, Body8>(body8),
1986  new internal::type_to_key_function_body_leaf<T9, K, Body9>(body9)
1987  ) ) {
1988  __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 10, "wrong number of body initializers");
1989  }
1990  unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1991  };
1992 #endif
1993 
1995  template<size_t N, typename JNT>
1997  return tbb::flow::get<N>(jn.input_ports());
1998  }
1999 
2000 }
2001 #endif // __TBB__flow_graph_join_impl_H
2002 
A cache of successors that are put in a round-robin fashion.
const K & operator()(const table_item_type &v)
static void release_my_reservation(TupleType &my_input)
hash_buffer< key_type, input_type, type_to_key_func_type, hash_compare_type > buffer_type
static bool get_my_item(InputTuple &my_input, OutputTuple &out)
bool remove_predecessor(predecessor_type &src) __TBB_override
Remove a predecessor.
void handle_operations(key_matching_port_operation *op_list)
graph & graph_reference() const __TBB_override
join_node_base(graph &g, FunctionTuple f)
internal::aggregating_functor< class_type, queueing_port_operation > handler_type
static void consume_reservations(TupleType &my_input)
internal::join_node_base< JP, typename wrap_tuple_elements< N, PT, OutputTuple >::type, OutputTuple > type
internal::join_node_base< key_traits_type, typename wrap_key_tuple_elements< N, key_matching_port, key_traits_type, OutputTuple >::type, OutputTuple > type
tbb::internal::strip< key_type >::type noref_key_type
join_node_base< JP, InputTuple, OutputTuple > class_type
void set_join_node_pointer(forwarding_base *join)
record parent for tallying available items
join_node_base< JP, input_ports_type, output_type > base_type
task * increment_key_count(unref_key_type const &t, bool handle_task) __TBB_override
static void reset_my_port(InputTuple &my_input)
task * try_put_task(const input_type &v) __TBB_override
Put item to successor; return task to run the successor if possible.
task * try_put_task(const T &v) __TBB_override
virtual task * increment_key_count(current_key_type const &, bool)=0
reservable_predecessor_cache< T, null_mutex > my_predecessors
unfolded_join_node(const unfolded_join_node &other)
static void set_join_node_pointer(TupleType &my_input, PortType *port)
void set_join_node_pointer(forwarding_base *join)
static bool get_items(InputTuple &my_input, OutputTuple &out)
queueing_port(const queueing_port &)
copy constructor
key_matching_port(const key_matching_port &)
The two-phase join port.
static void reset_my_port(InputTuple &my_input)
aggregator< handler_type, reserving_port_operation > my_aggregator
static void reset_inputs(InputTuple &my_input, reset_flags f)
void spawn_in_graph_arena(tbb::flow::interface10::graph &g, tbb::task &arena_task)
Spawns a task inside graph arena.
tbb::flow::tuple_element< N, typename JNT::input_ports_type >::type & input_port(JNT &jn)
templated function to refer to input ports of the join node
static void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs)
void reset_node(reset_flags f) __TBB_override
void reset_receiver(reset_flags f) __TBB_override
uintptr_t status
Zero value means "wait" status, all other values are "user" specified values and are defined into the...
wrap_key_tuple_elements< 3, key_matching_port, key_matching< K, KHash >, OutputTuple >::type input_ports_type
static void set_join_node_pointer(TupleType &my_input, PortType *port)
Base class for types that should not be assigned.
Definition: tbb_stddef.h:322
internal::aggregating_functor< class_type, reserving_port_operation > handler_type
tbb::internal::strip< KeyType >::type current_key_type
reserving_port< T > class_type
A task that calls a node's forward_task function.
reserving_port_operation(const predecessor_type &s, op_type t)
internal::type_to_key_function_body_leaf< count_element_type, unref_key_type &, key_to_count_func > TtoK_function_body_leaf_type
reserving_port(const reserving_port &)
internal::aggregating_functor< class_type, key_matching_FE_operation > handler_type
void reset_receiver(reset_flags f) __TBB_override
join_node_base< key_matching< K, KHash >, input_ports_type, output_type > base_type
void set_join_node_pointer(forwarding_base *join)
hash_buffer< unref_key_type &, count_element_type, TtoK_function_body_type, key_hash_compare > key_to_count_buffer_type
aggregator< handler_type, queueing_port_operation > my_aggregator
static void reset_ports(InputTuple &my_input)
static void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs)
void suppress_unused_warning(const T1 &)
Utility template function to prevent "unused" warnings by various compilers.
Definition: tbb_stddef.h:398
bool register_successor(successor_type &r) __TBB_override
Add a new successor to this node.
join_node_FE : implements input port policy
join_node_base_operation(const successor_type &s, op_type t)
broadcast_cache< output_type, null_rw_mutex > my_successors
void __TBB_store_with_release(volatile T &location, V value)
Definition: tbb_machine.h:713
unfolded_join_node : passes input_ports_type to join_node_base. We build the input port type
bool is_graph_active(tbb::flow::interface10::graph &g)
unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4)
#define __TBB_STATIC_ASSERT(condition, msg)
Definition: tbb_stddef.h:553
void consume()
Complete use of the port.
Release.
Definition: atomic.h:59
aggregator< handler_type, key_matching_port_operation > my_aggregator
graph & graph_reference() const __TBB_override
sender< output_type >::successor_type successor_type
task * fill_output_buffer(unref_key_type &t, bool should_enqueue, bool handle_task)
join_node_base< reserving, InputTuple, OutputTuple > base_node_type
join_node_base< key_matching< K, KHash >, input_ports_type, output_type > base_type
void const char const char int ITT_FORMAT __itt_group_sync p
bool reserve(T &v)
Reserve an item from the port.
matching_forwarding_base< key_type > * my_join
A cache of successors that are broadcast to.
void handle_operations(reserving_port_operation *op_list)
static tbb::task *const SUCCESSFULLY_ENQUEUED
void release()
Release the port.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task * task
join_node_FE< key_matching< key_type, key_hash_compare >, InputTuple, OutputTuple > class_type
wrap_tuple_elements< N, PT, OutputTuple >::type input_ports_type
key_matching_port< traits > class_type
bool register_predecessor(predecessor_type &src) __TBB_override
Add a predecessor.
join_node_base(const join_node_base &other)
internal::aggregating_functor< class_type, key_matching_port_operation > handler_type
wrap_key_tuple_elements< 4, key_matching_port, key_matching< K, KHash >, OutputTuple >::type input_ports_type
static bool reserve(InputTuple &my_input, OutputTuple &out)
graph & graph_reference() const __TBB_override
static void release_reservations(TupleType &my_input)
#define __TBB_override
Definition: tbb_stddef.h:240
static void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs)
virtual task * decrement_port_count(bool handle_task)=0
type_to_key_func_type * get_my_key_func()
void const char const char int ITT_FORMAT __itt_group_sync s
static void release_my_reservation(TupleType &my_input)
static void release_reservations(TupleType &my_input)
field of type K being used for matching.
join_node_base< queueing, InputTuple, OutputTuple > base_node_type
void reset_receiver(reset_flags f) __TBB_override
virtual void increment_port_count()=0
task * try_put_task(const T &) __TBB_override
static bool reserve(InputTuple &my_input, OutputTuple &out)
untyped_sender predecessor_type
The predecessor type for this node.
Definition: flow_graph.h:370
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
wrap_key_tuple_elements< 5, key_matching_port, key_matching< K, KHash >, OutputTuple >::type input_ports_type
internal::type_to_key_function_body< count_element_type, unref_key_type & > TtoK_function_body_type
static bool get_my_item(InputTuple &my_input, OutputTuple &out)
join_node_base< key_matching< key_type, key_hash_compare >, InputTuple, OutputTuple > base_node_type
join_node_base< key_matching< K, KHash >, input_ports_type, output_type > base_type
void set_owner(successor_type *owner)
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value
aggregator< handler_type, key_matching_FE_operation > my_aggregator
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type type
static void consume_reservations(TupleType &my_input)
receiver< input_type >::predecessor_type predecessor_type
static void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs)
join_node_FE< JP, InputTuple, OutputTuple > input_ports_type
join_node_base< key_matching< K, KHash >, input_ports_type, output_type > base_type
static tbb::task * combine_tasks(graph &g, tbb::task *left, tbb::task *right)
Definition: flow_graph.h:199
internal::aggregating_functor< class_type, join_node_base_operation > handler_type
void handle_operations(queueing_port_operation *op_list)
receiver< input_type >::predecessor_type predecessor_type
static void reset_ports(InputTuple &my_input)
wrap_key_tuple_elements< 2, key_matching_port, key_matching< K, KHash >, OutputTuple >::type input_ports_type
receiver< input_type >::predecessor_type predecessor_type
void handle_operations(join_node_base_operation *op_list)
static void reset_inputs(InputTuple &my_input, reset_flags f)
void set_my_key_func(type_to_key_func_type *f)
aggregator< handler_type, join_node_base_operation > my_aggregator
bool try_get(output_type &v) __TBB_override
Request an item from the sender.
K key_from_message(const T &t)
Definition: flow_graph.h:721
static bool get_items(InputTuple &my_input, OutputTuple &out)
task * decrement_port_count(bool handle_task) __TBB_override

Copyright © 2005-2020 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.