Home ⌂Doc Index ◂Up ▴
Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
_flow_graph_streaming_node.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2020 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 */
16 
17 #ifndef __TBB_flow_graph_streaming_H
18 #define __TBB_flow_graph_streaming_H
19 
20 #ifndef __TBB_flow_graph_H
21 #error Do not #include this internal file directly; use public TBB headers instead.
22 #endif
23 
24 #if __TBB_PREVIEW_STREAMING_NODE
25 
26 // Included in namespace tbb::flow::interfaceX (in flow_graph.h)
27 
28 namespace internal {
29 
30 template <int N1, int N2>
31 struct port_ref_impl {
32  // "+1" since the port_ref range is a closed interval (includes its endpoints).
33  static const int size = N2 - N1 + 1;
34 };
35 
36 } // internal
37 
38 // The purpose of the port_ref_impl is the pretty syntax: the deduction of a compile-time constant is processed from the return type.
39 // So it is possible to use this helper without parentheses, e.g. "port_ref<0>".
40 template <int N1, int N2 = N1>
43 };
44 
45 namespace internal {
46 
47 template <typename T>
48 struct num_arguments {
49  static const int value = 1;
50 };
51 
52 template <int N1, int N2>
53 struct num_arguments<port_ref_impl<N1,N2>(*)()> {
54  static const int value = port_ref_impl<N1,N2>::size;
55 };
56 
57 template <int N1, int N2>
58 struct num_arguments<port_ref_impl<N1,N2>> {
59  static const int value = port_ref_impl<N1,N2>::size;
60 };
61 
62 template <typename... Args>
63 void ignore_return_values( Args&&... ) {}
64 
65 template <typename T>
66 T or_return_values( T&& t ) { return t; }
67 template <typename T, typename... Rest>
68 T or_return_values( T&& t, Rest&&... rest ) {
69  return t | or_return_values( std::forward<Rest>(rest)... );
70 }
71 
72 template<typename JP>
74  typedef size_t type;
76 };
77 
78 template<typename Key>
79 struct key_from_policy< key_matching<Key> > {
80  typedef Key type;
82 };
83 
84 template<typename Key>
85 struct key_from_policy< key_matching<Key&> > {
86  typedef const Key &type;
88 };
89 
90 template<typename Device, typename Key>
92  Device my_device;
94 public:
95  // TODO: investigate why default constructor is required
97  streaming_device_with_key( const Device& d, Key k ) : my_device( d ), my_key( k ) {}
98  Key key() const { return my_key; }
99  const Device& device() const { return my_device; }
100 };
101 
102 // --------- Kernel argument helpers --------- //
103 template <typename T>
106 };
107 
108 template <int N1, int N2>
109 struct is_port_ref_impl< port_ref_impl<N1, N2> > {
111 };
112 
113 template <int N1, int N2>
114 struct is_port_ref_impl< port_ref_impl<N1, N2>( * )() > {
116 };
117 
118 template <typename T>
119 struct is_port_ref {
121 };
122 
123 template <typename ...Args1>
125 
126 template <typename A1, typename ...Args1>
127 struct convert_and_call_impl<A1, Args1...> {
128  static const size_t my_delta = 1; // Index 0 contains device
129 
130  template <typename F, typename Tuple, typename ...Args2>
131  static void doit(F& f, Tuple& t, A1& a1, Args1&... args1, Args2&... args2) {
132  convert_and_call_impl<A1, Args1...>::doit_impl(typename is_port_ref<A1>::type(), f, t, a1, args1..., args2...);
133  }
134  template <typename F, typename Tuple, typename ...Args2>
135  static void doit_impl(std::false_type, F& f, Tuple& t, A1& a1, Args1&... args1, Args2&... args2) {
136  convert_and_call_impl<Args1...>::doit(f, t, args1..., args2..., a1);
137  }
138  template <typename F, typename Tuple, int N1, int N2, typename ...Args2>
139  static void doit_impl(std::true_type x, F& f, Tuple& t, port_ref_impl<N1, N2>, Args1&... args1, Args2&... args2) {
140  convert_and_call_impl<port_ref_impl<N1 + 1,N2>, Args1...>::doit_impl(x, f, t, port_ref<N1 + 1, N2>(), args1...,
141  args2..., std::get<N1 + my_delta>(t));
142  }
143  template <typename F, typename Tuple, int N, typename ...Args2>
144  static void doit_impl(std::true_type, F& f, Tuple& t, port_ref_impl<N, N>, Args1&... args1, Args2&... args2) {
145  convert_and_call_impl<Args1...>::doit(f, t, args1..., args2..., std::get<N + my_delta>(t));
146  }
147 
148  template <typename F, typename Tuple, int N1, int N2, typename ...Args2>
149  static void doit_impl(std::true_type x, F& f, Tuple& t, port_ref_impl<N1, N2>(* fn)(), Args1&... args1, Args2&... args2) {
150  doit_impl(x, f, t, fn(), args1..., args2...);
151  }
152  template <typename F, typename Tuple, int N, typename ...Args2>
153  static void doit_impl(std::true_type x, F& f, Tuple& t, port_ref_impl<N, N>(* fn)(), Args1&... args1, Args2&... args2) {
154  doit_impl(x, f, t, fn(), args1..., args2...);
155  }
156 };
157 
158 template <>
160  template <typename F, typename Tuple, typename ...Args2>
161  static void doit(F& f, Tuple&, Args2&... args2) {
162  f(args2...);
163  }
164 };
165 // ------------------------------------------- //
166 
167 template<typename JP, typename StreamFactory, typename... Ports>
169  // Do not use 'using' instead of 'struct' because Microsoft Visual C++ 12.0 fails to compile.
170  template <typename T>
171  struct async_msg_type {
172  typedef typename StreamFactory::template async_msg_type<T> type;
173  };
174 
179 
180  // indexer_node parameters pack expansion workaround for VS2013 for streaming_node
182 };
183 
184 // Default empty implementation
185 template<typename StreamFactory, typename KernelInputTuple, typename = void>
187  typedef typename StreamFactory::device_type device_type;
188  typedef typename StreamFactory::kernel_type kernel_type;
189  typedef KernelInputTuple kernel_input_tuple;
190 protected:
191  template <typename ...Args>
192  void enqueue_kernel_impl( kernel_input_tuple&, StreamFactory& factory, device_type device, const kernel_type& kernel, Args&... args ) const {
193  factory.send_kernel( device, kernel, args... );
194  }
195 };
196 
197 // Implementation for StreamFactory supporting range
198 template<typename StreamFactory, typename KernelInputTuple>
199 class kernel_executor_helper<StreamFactory, KernelInputTuple, typename tbb::internal::void_t< typename StreamFactory::range_type >::type > {
200  typedef typename StreamFactory::device_type device_type;
201  typedef typename StreamFactory::kernel_type kernel_type;
202  typedef KernelInputTuple kernel_input_tuple;
203 
204  typedef typename StreamFactory::range_type range_type;
205 
206  // Container for randge. It can contain either port references or real range.
207  struct range_wrapper {
208  virtual range_type get_range( const kernel_input_tuple &ip ) const = 0;
209  virtual range_wrapper *clone() const = 0;
210  virtual ~range_wrapper() {}
211  };
212 
213  struct range_value : public range_wrapper {
214  range_value( const range_type& value ) : my_value(value) {}
215 
216  range_value( range_type&& value ) : my_value(std::move(value)) {}
217 
219  return my_value;
220  }
221 
222  range_wrapper *clone() const __TBB_override {
223  return new range_value(my_value);
224  }
225  private:
227  };
228 
229  template <int N>
230  struct range_mapper : public range_wrapper {
232 
234  // "+1" since get<0>(ip) is StreamFactory::device.
235  return get<N + 1>(ip).data(false);
236  }
237 
238  range_wrapper *clone() const __TBB_override {
239  return new range_mapper<N>;
240  }
241  };
242 
243 protected:
244  template <typename ...Args>
245  void enqueue_kernel_impl( kernel_input_tuple& ip, StreamFactory& factory, device_type device, const kernel_type& kernel, Args&... args ) const {
246  __TBB_ASSERT(my_range_wrapper, "Range is not set. Call set_range() before running streaming_node.");
247  factory.send_kernel( device, kernel, my_range_wrapper->get_range(ip), args... );
248  }
249 
250 public:
251  kernel_executor_helper() : my_range_wrapper(NULL) {}
252 
253  kernel_executor_helper(const kernel_executor_helper& executor) : my_range_wrapper(executor.my_range_wrapper ? executor.my_range_wrapper->clone() : NULL) {}
254 
255  kernel_executor_helper(kernel_executor_helper&& executor) : my_range_wrapper(executor.my_range_wrapper) {
256  // Set moving holder mappers to NULL to prevent double deallocation
257  executor.my_range_wrapper = NULL;
258  }
259 
261  if (my_range_wrapper) delete my_range_wrapper;
262  }
263 
264  void set_range(const range_type& work_size) {
265  my_range_wrapper = new range_value(work_size);
266  }
267 
268  void set_range(range_type&& work_size) {
269  my_range_wrapper = new range_value(std::move(work_size));
270  }
271 
272  template <int N>
274  my_range_wrapper = new range_mapper<N>;
275  }
276 
277  template <int N>
279  my_range_wrapper = new range_mapper<N>;
280  }
281 
282 private:
283  range_wrapper* my_range_wrapper;
284 };
285 
286 } // internal
287 
288 /*
289 /---------------------------------------- streaming_node ------------------------------------\
290 | |
291 | /--------------\ /----------------------\ /-----------\ /----------------------\ |
292 | | | | (device_with_key) O---O | | | |
293 | | | | | | | | | |
294 O---O indexer_node O---O device_selector_node O---O join_node O---O kernel_node O---O
295 | | | | (multifunction_node) | | | | (multifunction_node) | |
296 O---O | | O---O | | O---O
297 | \--------------/ \----------------------/ \-----------/ \----------------------/ |
298 | |
299 \--------------------------------------------------------------------------------------------/
300 */
301 template<typename... Args>
303 
304 template<typename... Ports, typename JP, typename StreamFactory>
306 streaming_node< tuple<Ports...>, JP, StreamFactory >
307  : public composite_node < typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::input_tuple,
308  typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::output_tuple >
309  , public internal::kernel_executor_helper< StreamFactory, typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::kernel_input_tuple >
310 {
311  typedef typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::input_tuple input_tuple;
312  typedef typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::output_tuple output_tuple;
314 protected:
315  typedef typename StreamFactory::device_type device_type;
316  typedef typename StreamFactory::kernel_type kernel_type;
317 private:
319  typedef composite_node<input_tuple, output_tuple> base_type;
320  static const size_t NUM_INPUTS = tuple_size<input_tuple>::value;
321  static const size_t NUM_OUTPUTS = tuple_size<output_tuple>::value;
322 
325 
326  typedef typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::indexer_node_type indexer_node_type;
327  typedef typename indexer_node_type::output_type indexer_node_output_type;
328  typedef typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::kernel_input_tuple kernel_input_tuple;
329  typedef multifunction_node<indexer_node_output_type, kernel_input_tuple> device_selector_node;
330  typedef multifunction_node<kernel_input_tuple, output_tuple> kernel_multifunction_node;
331 
332  template <int... S>
333  typename base_type::input_ports_type get_input_ports( internal::sequence<S...> ) {
334  return std::tie( internal::input_port<S>( my_indexer_node )... );
335  }
336 
337  template <int... S>
338  typename base_type::output_ports_type get_output_ports( internal::sequence<S...> ) {
339  return std::tie( internal::output_port<S>( my_kernel_node )... );
340  }
341 
342  typename base_type::input_ports_type get_input_ports() {
343  return get_input_ports( input_sequence() );
344  }
345 
346  typename base_type::output_ports_type get_output_ports() {
347  return get_output_ports( output_sequence() );
348  }
349 
350  template <int N>
352  make_edge( internal::output_port<N>( my_device_selector_node ), internal::input_port<N>( my_join_node ) );
353  return 0;
354  }
355 
356  template <int... S>
358  make_edge( my_indexer_node, my_device_selector_node );
359  make_edge( my_device_selector_node, my_join_node );
360  internal::ignore_return_values( make_Nth_edge<S + 1>()... );
361  make_edge( my_join_node, my_kernel_node );
362  }
363 
364  void make_edges() {
365  make_edges( input_sequence() );
366  }
367 
368  class device_selector_base {
369  public:
370  virtual void operator()( const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op ) = 0;
371  virtual device_selector_base *clone( streaming_node &n ) const = 0;
373  };
374 
375  template <typename UserFunctor>
376  class device_selector : public device_selector_base, tbb::internal::no_assign {
377  public:
378  device_selector( UserFunctor uf, streaming_node &n, StreamFactory &f )
379  : my_dispatch_funcs( create_dispatch_funcs( input_sequence() ) )
380  , my_user_functor( uf ), my_node(n), my_factory( f )
381  {
382  my_port_epoches.fill( 0 );
383  }
384 
385  void operator()( const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op ) __TBB_override {
386  (this->*my_dispatch_funcs[ v.tag() ])( my_port_epoches[ v.tag() ], v, op );
388  || my_port_epoches[v.tag()] == 0, "Epoch is changed when key matching is requested" );
389  }
390 
391  device_selector_base *clone( streaming_node &n ) const __TBB_override {
392  return new device_selector( my_user_functor, n, my_factory );
393  }
394  private:
395  typedef void(device_selector<UserFunctor>::*send_and_put_fn_type)(size_t &, const indexer_node_output_type &, typename device_selector_node::output_ports_type &);
396  typedef std::array < send_and_put_fn_type, NUM_INPUTS > dispatch_funcs_type;
397 
398  template <int... S>
400  dispatch_funcs_type dispatch = { { &device_selector<UserFunctor>::send_and_put_impl<S>... } };
401  return dispatch;
402  }
403 
404  template <typename T>
405  key_type get_key( std::false_type, const T &, size_t &epoch ) {
407  return epoch++;
408  }
409 
410  template <typename T>
411  key_type get_key( std::true_type, const T &t, size_t &/*epoch*/ ) {
413  return key_from_message<key_type>( t );
414  }
415 
416  template <int N>
417  void send_and_put_impl( size_t &epoch, const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op ) {
418  typedef typename tuple_element<N + 1, typename device_selector_node::output_ports_type>::type::output_type elem_type;
419  elem_type e = internal::cast_to<elem_type>( v );
420  device_type device = get_device( get_key( typename internal::key_from_policy<JP>::is_key_matching(), e, epoch ), get<0>( op ) );
421  my_factory.send_data( device, e );
422  get<N + 1>( op ).try_put( e );
423  }
424 
425  template< typename DevicePort >
426  device_type get_device( key_type key, DevicePort& dp ) {
427  typename std::unordered_map<typename std::decay<key_type>::type, epoch_desc>::iterator it = my_devices.find( key );
428  if ( it == my_devices.end() ) {
429  device_type d = my_user_functor( my_factory );
430  std::tie( it, std::ignore ) = my_devices.insert( std::make_pair( key, d ) );
431  bool res = dp.try_put( device_with_key_type( d, key ) );
432  __TBB_ASSERT_EX( res, NULL );
433  my_node.notify_new_device( d );
434  }
435  epoch_desc &e = it->second;
436  device_type d = e.my_device;
437  if ( ++e.my_request_number == NUM_INPUTS ) my_devices.erase( it );
438  return d;
439  }
440 
441  struct epoch_desc {
442  epoch_desc(device_type d ) : my_device( d ), my_request_number( 0 ) {}
445  };
446 
448  std::array<size_t, NUM_INPUTS> my_port_epoches;
450  UserFunctor my_user_functor;
452  StreamFactory &my_factory;
453  };
454 
455  class device_selector_body {
456  public:
457  device_selector_body( device_selector_base *d ) : my_device_selector( d ) {}
458 
459  void operator()( const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op ) {
460  (*my_device_selector)(v, op);
461  }
462  private:
463  device_selector_base *my_device_selector;
464  };
465 
466  // TODO: investigate why copy-construction is disallowed
467  class args_storage_base : tbb::internal::no_copy {
468  public:
469  typedef typename kernel_multifunction_node::output_ports_type output_ports_type;
470 
471  virtual void enqueue( kernel_input_tuple &ip, output_ports_type &op, const streaming_node &n ) = 0;
472  virtual void send( device_type d ) = 0;
473  virtual args_storage_base *clone() const = 0;
474  virtual ~args_storage_base () {}
475 
476  protected:
477  args_storage_base( const kernel_type& kernel, StreamFactory &f )
478  : my_kernel( kernel ), my_factory( f )
479  {}
480 
481  args_storage_base( const args_storage_base &k )
482  : tbb::internal::no_copy(), my_kernel( k.my_kernel ), my_factory( k.my_factory )
483  {}
484 
486  StreamFactory &my_factory;
487  };
488 
489  template <typename... Args>
490  class args_storage : public args_storage_base {
492 
493  // ---------- Update events helpers ---------- //
494  template <int N>
495  bool do_try_put( const kernel_input_tuple& ip, output_ports_type &op ) const {
496  const auto& t = get<N + 1>( ip );
497  auto &port = get<N>( op );
498  return port.try_put( t );
499  }
500 
501  template <int... S>
503  return internal::or_return_values( do_try_put<S>( ip, op )... );
504  }
505 
506  // ------------------------------------------- //
507  class run_kernel_func : tbb::internal::no_assign {
508  public:
509  run_kernel_func( kernel_input_tuple &ip, const streaming_node &node, const args_storage& storage )
510  : my_kernel_func( ip, node, storage, get<0>(ip).device() ) {}
511 
512  // It is immpossible to use Args... because a function pointer cannot be casted to a function reference implicitly.
513  // Allow the compiler to deduce types for function pointers automatically.
514  template <typename... FnArgs>
515  void operator()( FnArgs&... args ) {
516  internal::convert_and_call_impl<FnArgs...>::doit( my_kernel_func, my_kernel_func.my_ip, args... );
517  }
518  private:
519  struct kernel_func : tbb::internal::no_copy {
522  const args_storage& my_storage;
524 
525  kernel_func( kernel_input_tuple &ip, const streaming_node &node, const args_storage& storage, device_type device )
526  : my_ip( ip ), my_node( node ), my_storage( storage ), my_device( device )
527  {}
528 
529  template <typename... FnArgs>
530  void operator()( FnArgs&... args ) {
531  my_node.enqueue_kernel( my_ip, my_storage.my_factory, my_device, my_storage.my_kernel, args... );
532  }
533  } my_kernel_func;
534  };
535 
536  template<typename FinalizeFn>
537  class run_finalize_func : tbb::internal::no_assign {
538  public:
539  run_finalize_func( kernel_input_tuple &ip, StreamFactory &factory, FinalizeFn fn )
540  : my_ip( ip ), my_finalize_func( factory, get<0>(ip).device(), fn ) {}
541 
542  // It is immpossible to use Args... because a function pointer cannot be casted to a function reference implicitly.
543  // Allow the compiler to deduce types for function pointers automatically.
544  template <typename... FnArgs>
545  void operator()( FnArgs&... args ) {
546  internal::convert_and_call_impl<FnArgs...>::doit( my_finalize_func, my_ip, args... );
547  }
548  private:
550 
551  struct finalize_func : tbb::internal::no_assign {
552  StreamFactory &my_factory;
554  FinalizeFn my_fn;
555 
556  finalize_func( StreamFactory &factory, device_type device, FinalizeFn fn )
557  : my_factory(factory), my_device(device), my_fn(fn) {}
558 
559  template <typename... FnArgs>
560  void operator()( FnArgs&... args ) {
561  my_factory.finalize( my_device, my_fn, args... );
562  }
563  } my_finalize_func;
564  };
565 
566  template<typename FinalizeFn>
567  static run_finalize_func<FinalizeFn> make_run_finalize_func( kernel_input_tuple &ip, StreamFactory &factory, FinalizeFn fn ) {
568  return run_finalize_func<FinalizeFn>( ip, factory, fn );
569  }
570 
571  class send_func : tbb::internal::no_assign {
572  public:
573  send_func( StreamFactory &factory, device_type d )
574  : my_factory(factory), my_device( d ) {}
575 
576  template <typename... FnArgs>
577  void operator()( FnArgs&... args ) {
578  my_factory.send_data( my_device, args... );
579  }
580  private:
581  StreamFactory &my_factory;
583  };
584 
585  public:
586  args_storage( const kernel_type& kernel, StreamFactory &f, Args&&... args )
587  : args_storage_base( kernel, f )
588  , my_args_pack( std::forward<Args>(args)... )
589  {}
590 
591  args_storage( const args_storage &k ) : args_storage_base( k ), my_args_pack( k.my_args_pack ) {}
592 
593  args_storage( const args_storage_base &k, Args&&... args ) : args_storage_base( k ), my_args_pack( std::forward<Args>(args)... ) {}
594 
596  // Make const qualified args_pack (from non-const)
597  const args_pack_type& const_args_pack = my_args_pack;
598  // factory.enqure_kernel() gets
599  // - 'ip' tuple elements by reference and updates it (and 'ip') with dependencies
600  // - arguments (from my_args_pack) by const-reference via const_args_pack
601  tbb::internal::call( run_kernel_func( ip, n, *this ), const_args_pack );
602 
603  if (! do_try_put( ip, op, input_sequence() ) ) {
604  graph& g = n.my_graph;
605  // No one message was passed to successors so set a callback to extend the graph lifetime until the kernel completion.
606  g.increment_wait_count();
607 
608  // factory.finalize() gets
609  // - 'ip' tuple elements by reference, so 'ip' might be changed
610  // - arguments (from my_args_pack) by const-reference via const_args_pack
611  tbb::internal::call( make_run_finalize_func(ip, this->my_factory, [&g] {
612  g.decrement_wait_count();
613  }), const_args_pack );
614  }
615  }
616 
618  // factory.send() gets arguments by reference and updates these arguments with dependencies
619  // (it gets but usually ignores port_ref-s)
620  tbb::internal::call( send_func( this->my_factory, d ), my_args_pack );
621  }
622 
623  args_storage_base *clone() const __TBB_override {
624  // Create new args_storage with copying constructor.
625  return new args_storage<Args...>( *this );
626  }
627 
628  private:
631  };
632 
633  // Body for kernel_multifunction_node.
634  class kernel_body : tbb::internal::no_assign {
635  public:
636  kernel_body( const streaming_node &node ) : my_node( node ) {}
637 
639  __TBB_ASSERT( (my_node.my_args_storage != NULL), "No arguments storage" );
640  // 'ip' is passed by value to create local copy for updating inside enqueue_kernel()
641  my_node.my_args_storage->enqueue( ip, op, my_node );
642  }
643  private:
645  };
646 
648  struct wrap_to_async {
649  typedef T type; // Keep port_ref as it is
650  };
651 
652  template <typename T>
653  struct wrap_to_async<T, std::false_type> {
654  typedef typename StreamFactory::template async_msg_type< typename tbb::internal::strip<T>::type > type;
655  };
656 
657  template <typename... Args>
658  args_storage_base *make_args_storage(const args_storage_base& storage, Args&&... args) const {
659  // In this variadic template convert all simple types 'T' into 'async_msg_type<T>'
660  return new args_storage<Args...>(storage, std::forward<Args>(args)...);
661  }
662 
664  my_args_storage->send( d );
665  }
666 
667  template <typename ...Args>
668  void enqueue_kernel( kernel_input_tuple& ip, StreamFactory& factory, device_type device, const kernel_type& kernel, Args&... args ) const {
669  this->enqueue_kernel_impl( ip, factory, device, kernel, args... );
670  }
671 
672 public:
673  template <typename DeviceSelector>
674  streaming_node( graph &g, const kernel_type& kernel, DeviceSelector d, StreamFactory &f )
675  : base_type( g )
676  , my_indexer_node( g )
677  , my_device_selector( new device_selector<DeviceSelector>( d, *this, f ) )
678  , my_device_selector_node( g, serial, device_selector_body( my_device_selector ) )
679  , my_join_node( g )
680  , my_kernel_node( g, serial, kernel_body( *this ) )
681  // By default, streaming_node maps all its ports to the kernel arguments on a one-to-one basis.
682  , my_args_storage( make_args_storage( args_storage<>(kernel, f), port_ref<0, NUM_INPUTS - 1>() ) )
683  {
684  base_type::set_external_ports( get_input_ports(), get_output_ports() );
685  make_edges();
686  }
687 
689  : base_type( node.my_graph )
690  , my_indexer_node( node.my_indexer_node )
691  , my_device_selector( node.my_device_selector->clone( *this ) )
692  , my_device_selector_node( node.my_graph, serial, device_selector_body( my_device_selector ) )
693  , my_join_node( node.my_join_node )
694  , my_kernel_node( node.my_graph, serial, kernel_body( *this ) )
695  , my_args_storage( node.my_args_storage->clone() )
696  {
697  base_type::set_external_ports( get_input_ports(), get_output_ports() );
698  make_edges();
699  }
700 
702  : base_type( node.my_graph )
703  , my_indexer_node( std::move( node.my_indexer_node ) )
704  , my_device_selector( node.my_device_selector->clone(*this) )
705  , my_device_selector_node( node.my_graph, serial, device_selector_body( my_device_selector ) )
706  , my_join_node( std::move( node.my_join_node ) )
707  , my_kernel_node( node.my_graph, serial, kernel_body( *this ) )
708  , my_args_storage( node.my_args_storage )
709  {
710  base_type::set_external_ports( get_input_ports(), get_output_ports() );
711  make_edges();
712  // Set moving node mappers to NULL to prevent double deallocation.
713  node.my_args_storage = NULL;
714  }
715 
717  if ( my_args_storage ) delete my_args_storage;
718  if ( my_device_selector ) delete my_device_selector;
719  }
720 
721  template <typename... Args>
722  void set_args( Args&&... args ) {
723  // Copy the base class of args_storage and create new storage for "Args...".
724  args_storage_base * const new_args_storage = make_args_storage( *my_args_storage, typename wrap_to_async<Args>::type(std::forward<Args>(args))...);
725  delete my_args_storage;
726  my_args_storage = new_args_storage;
727  }
728 
729 protected:
730  void reset_node( reset_flags = rf_reset_protocol ) __TBB_override { __TBB_ASSERT( false, "Not implemented yet" ); }
731 
732 private:
734  device_selector_base *my_device_selector;
736  join_node<kernel_input_tuple, JP> my_join_node;
738 
739  args_storage_base *my_args_storage;
740 };
741 
742 #endif // __TBB_PREVIEW_STREAMING_NODE
743 #endif // __TBB_flow_graph_streaming_H
args_storage_base * make_args_storage(const args_storage_base &storage, Args &&... args) const
static void doit_impl(std::false_type, F &f, Tuple &t, A1 &a1, Args1 &... args1, Args2 &... args2)
tuple< typename async_msg_type< Ports >::type... > input_tuple
bool_constant< true > true_type
Definition: tbb_stddef.h:489
streaming_node(graph &g, const kernel_type &kernel, DeviceSelector d, StreamFactory &f)
internal::make_sequence< NUM_OUTPUTS >::type output_sequence
internal::streaming_node_traits< JP, StreamFactory, Ports... >::input_tuple input_tuple
void call(F &&f, Pack &&p)
Calls the given function with arguments taken from a stored_pack.
void enqueue_kernel_impl(kernel_input_tuple &, StreamFactory &factory, device_type device, const kernel_type &kernel, Args &... args) const
static run_finalize_func< FinalizeFn > make_run_finalize_func(kernel_input_tuple &ip, StreamFactory &factory, FinalizeFn fn)
static void doit(F &f, Tuple &t, A1 &a1, Args1 &... args1, Args2 &... args2)
run_finalize_func(kernel_input_tuple &ip, StreamFactory &factory, FinalizeFn fn)
internal::streaming_node_traits< JP, StreamFactory, Ports... >::kernel_input_tuple kernel_input_tuple
is_port_ref_impl< typename tbb::internal::strip< T >::type >::type type
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type size_t void ITT_FORMAT p const __itt_domain __itt_id __itt_string_handle const wchar_t size_t ITT_FORMAT lu const __itt_domain __itt_id __itt_relation __itt_id ITT_FORMAT p const wchar_t int ITT_FORMAT __itt_group_mark d __itt_event ITT_FORMAT __itt_group_mark d void const wchar_t const wchar_t int ITT_FORMAT __itt_group_sync __itt_group_fsync x void const wchar_t int const wchar_t int int ITT_FORMAT __itt_group_sync __itt_group_fsync x void ITT_FORMAT __itt_group_sync __itt_group_fsync p void ITT_FORMAT __itt_group_sync __itt_group_fsync p void size_t ITT_FORMAT lu no args __itt_obj_prop_t __itt_obj_state_t ITT_FORMAT d const char ITT_FORMAT s const char ITT_FORMAT s __itt_frame ITT_FORMAT p __itt_counter ITT_FORMAT p __itt_counter unsigned long long ITT_FORMAT lu __itt_counter unsigned long long ITT_FORMAT lu __itt_counter __itt_clock_domain unsigned long long void ITT_FORMAT p const wchar_t ITT_FORMAT S __itt_mark_type const wchar_t ITT_FORMAT S __itt_mark_type const char ITT_FORMAT s __itt_mark_type ITT_FORMAT d __itt_caller ITT_FORMAT p __itt_caller ITT_FORMAT p no args const __itt_domain __itt_clock_domain unsigned long long __itt_id ITT_FORMAT lu const __itt_domain __itt_clock_domain unsigned long long __itt_id __itt_id void * fn
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type size_t void ITT_FORMAT p const __itt_domain __itt_id __itt_string_handle const wchar_t size_t ITT_FORMAT lu const __itt_domain __itt_id __itt_relation __itt_id ITT_FORMAT p const wchar_t int ITT_FORMAT __itt_group_mark S
kernel_func(kernel_input_tuple &ip, const streaming_node &node, const args_storage &storage, device_type device)
composite_node< input_tuple, output_tuple > base_type
std::unordered_map< typename std::decay< key_type >::type, epoch_desc > my_devices
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle * key
static dispatch_funcs_type create_dispatch_funcs(internal::sequence< S... >)
static void doit_impl(std::true_type x, F &f, Tuple &t, port_ref_impl< N1, N2 >(*fn)(), Args1 &... args1, Args2 &... args2)
static void doit_impl(std::true_type x, F &f, Tuple &t, port_ref_impl< N, N >(*fn)(), Args1 &... args1, Args2 &... args2)
device_selector_base * clone(streaming_node &n) const __TBB_override
Base class for types that should not be assigned.
Definition: tbb_stddef.h:322
void ignore_return_values(Args &&...)
class __TBB_DEPRECATED streaming_node
#define __TBB_DEPRECATED
Definition: tbb_config.h:636
void reset_node(reset_flags=rf_reset_protocol) __TBB_override
tuple< streaming_device_with_key< typename StreamFactory::device_type, typename key_from_policy< JP >::type >, typename async_msg_type< Ports >::type... > kernel_input_tuple
bool_constant< false > false_type
Definition: tbb_stddef.h:490
StreamFactory::template async_msg_type< typename tbb::internal::strip< T >::type > type
void send_and_put_impl(size_t &epoch, const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op)
static void doit_impl(std::true_type, F &f, Tuple &t, port_ref_impl< N, N >, Args1 &... args1, Args2 &... args2)
#define __TBB_STATIC_ASSERT(condition, msg)
Definition: tbb_stddef.h:553
internal::streaming_device_with_key< device_type, key_type > device_with_key_type
void operator()(kernel_input_tuple ip, typename args_storage_base::output_ports_type &op)
__TBB_DEPRECATED internal::port_ref_impl< N1, N2 > port_ref()
task * do_try_put(const T &v, void *p)
Definition: flow_graph.h:34
base_type::output_ports_type get_output_ports(internal::sequence< S... >)
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d
void enqueue_kernel(kernel_input_tuple &ip, StreamFactory &factory, device_type device, const kernel_type &kernel, Args &... args) const
internal::make_sequence< NUM_INPUTS >::type input_sequence
StreamFactory::template async_msg_type< T > type
#define __TBB_ASSERT_EX(predicate, comment)
"Extended" version is useful to suppress warnings if a variable is only used with an assert
Definition: tbb_stddef.h:167
void operator()(const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op) __TBB_override
void operator()(const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op)
Base class for types that should not be copied or assigned.
Definition: tbb_stddef.h:330
internal::streaming_node_traits< JP, StreamFactory, Ports... >::indexer_node_type indexer_node_type
static void doit(F &f, Tuple &, Args2 &... args2)
#define __TBB_override
Definition: tbb_stddef.h:240
Detects whether two given types are the same.
void make_edge(sender< T > &p, receiver< T > &s)
Makes an edge between a single predecessor and a single successor.
Definition: flow_graph.h:3838
run_kernel_func(kernel_input_tuple &ip, const streaming_node &node, const args_storage &storage)
bool do_try_put(const kernel_input_tuple &ip, output_ports_type &op, internal::sequence< S... >) const
field of type K being used for matching.
void enqueue(kernel_input_tuple &ip, output_ports_type &op, const streaming_node &n) __TBB_override
bool do_try_put(const kernel_input_tuple &ip, output_ports_type &op) const
multifunction_node< kernel_input_tuple, output_tuple > kernel_multifunction_node
multifunction_node< indexer_node_output_type, kernel_input_tuple > device_selector_node
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
device_selector(UserFunctor uf, streaming_node &n, StreamFactory &f)
static void doit_impl(std::true_type x, F &f, Tuple &t, port_ref_impl< N1, N2 >, Args1 &... args1, Args2 &... args2)
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type type
args_storage(const kernel_type &kernel, StreamFactory &f, Args &&... args)
The graph class.
internal::streaming_node_traits< JP, StreamFactory, Ports... >::output_tuple output_tuple
void enqueue_kernel_impl(kernel_input_tuple &ip, StreamFactory &factory, device_type device, const kernel_type &kernel, Args &... args) const
indexer_node< typename async_msg_type< Ports >::type... > indexer_node_type
K key_from_message(const T &t)
Definition: flow_graph.h:721
base_type::input_ports_type get_input_ports(internal::sequence< S... >)
void move(tbb_thread &t1, tbb_thread &t2)
Definition: tbb_thread.h:319

Copyright © 2005-2020 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.