Home ⌂Doc Index ◂Up ▴
Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
_flow_graph_impl.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2020 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 */
16 
17 #ifndef __TBB_flow_graph_impl_H
18 #define __TBB_flow_graph_impl_H
19 
20 #include "../tbb_stddef.h"
21 #include "../task.h"
22 #include "../task_arena.h"
23 #include "../flow_graph_abstractions.h"
24 
25 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
26 #include "../concurrent_priority_queue.h"
27 #endif
28 
29 #include <list>
30 
31 #if TBB_DEPRECATED_FLOW_ENQUEUE
32 #define FLOW_SPAWN(a) tbb::task::enqueue((a))
33 #else
34 #define FLOW_SPAWN(a) tbb::task::spawn((a))
35 #endif
36 
37 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
38 #define __TBB_FLOW_GRAPH_PRIORITY_EXPR( expr ) expr
39 #define __TBB_FLOW_GRAPH_PRIORITY_ARG0( priority ) , priority
40 #define __TBB_FLOW_GRAPH_PRIORITY_ARG1( arg1, priority ) arg1, priority
41 #else
42 #define __TBB_FLOW_GRAPH_PRIORITY_EXPR( expr )
43 #define __TBB_FLOW_GRAPH_PRIORITY_ARG0( priority )
44 #define __TBB_FLOW_GRAPH_PRIORITY_ARG1( arg1, priority ) arg1
45 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
46 
47 #if TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
48 #define __TBB_DEPRECATED_LIMITER_EXPR( expr ) expr
49 #define __TBB_DEPRECATED_LIMITER_ARG2( arg1, arg2 ) arg1, arg2
50 #define __TBB_DEPRECATED_LIMITER_ARG4( arg1, arg2, arg3, arg4 ) arg1, arg3, arg4
51 #else
52 #define __TBB_DEPRECATED_LIMITER_EXPR( expr )
53 #define __TBB_DEPRECATED_LIMITER_ARG2( arg1, arg2 ) arg1
54 #define __TBB_DEPRECATED_LIMITER_ARG4( arg1, arg2, arg3, arg4 ) arg1, arg2
55 #endif // TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
56 
57 namespace tbb {
58 namespace flow {
59 
60 namespace internal {
61 static tbb::task * const SUCCESSFULLY_ENQUEUED = (task *)-1;
62 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
63 typedef unsigned int node_priority_t;
65 #endif
66 }
67 
68 namespace interface10 {
69 class graph;
70 }
71 
72 namespace interface11 {
73 
75 
76 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
80 struct graph_task : public task {
81  graph_task( node_priority_t node_priority = no_priority ) : priority( node_priority ) {}
83 };
84 #else
85 typedef task graph_task;
86 #endif /* __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES */
87 
88 class graph_node;
89 
90 template <typename GraphContainerType, typename GraphNodeType>
93  friend class graph_node;
94 public:
95  typedef size_t size_type;
96  typedef GraphNodeType value_type;
97  typedef GraphNodeType* pointer;
98  typedef GraphNodeType& reference;
99  typedef const GraphNodeType& const_reference;
100  typedef std::forward_iterator_tag iterator_category;
101 
104 
108  {}
109 
112  if (this != &other) {
113  my_graph = other.my_graph;
114  current_node = other.current_node;
115  }
116  return *this;
117  }
118 
120  reference operator*() const;
121 
123  pointer operator->() const;
124 
126  bool operator==(const graph_iterator& other) const {
127  return ((my_graph == other.my_graph) && (current_node == other.current_node));
128  }
129 
131  bool operator!=(const graph_iterator& other) const { return !(operator==(other)); }
132 
136  return *this;
137  }
138 
141  graph_iterator result = *this;
142  operator++();
143  return result;
144  }
145 
146 private:
147  // the graph over which we are iterating
148  GraphContainerType *my_graph;
149  // pointer into my_graph's my_nodes list
151 
153  graph_iterator(GraphContainerType *g, bool begin);
154  void internal_forward();
155 }; // class graph_iterator
156 
157 // flags to modify the behavior of the graph reset(). Can be combined.
160  rf_reset_bodies = 1 << 0, // delete the current node body, reset to a copy of the initial node body.
161  rf_clear_edges = 1 << 1 // delete edges
162 };
163 
164 namespace internal {
165 
173 
174 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
176  bool operator()(const graph_task* left, const graph_task* right) {
177  return left->priority < right->priority;
178  }
179 };
180 
182 
183 class priority_task_selector : public task {
184 public:
186  : my_priority_queue(priority_queue) {}
188  graph_task* t = NULL;
189  bool result = my_priority_queue.try_pop(t);
190  __TBB_ASSERT_EX( result, "Number of critical tasks for scheduler and tasks"
191  " in graph's priority queue mismatched" );
193  "Incorrect task submitted to graph priority queue" );
195  "Tasks from graph's priority queue must have priority" );
196  task* t_next = t->execute();
197  task::destroy(*t);
198  return t_next;
199  }
200 private:
202 };
203 #endif /* __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES */
204 
205 }
206 
207 } // namespace interfaceX
208 namespace interface10 {
210 
213 
214  template< typename Body >
216  public:
217  run_task(Body& body
220  ) : tbb::flow::interface11::graph_task(node_priority),
221 #else
222  ) :
223 #endif
224  my_body(body) { }
226  my_body();
227  return NULL;
228  }
229  private:
230  Body my_body;
231  };
232 
233  template< typename Receiver, typename Body >
235  public:
236  run_and_put_task(Receiver &r, Body& body) : my_receiver(r), my_body(body) {}
238  tbb::task *res = my_receiver.try_put_task(my_body());
239  if (res == tbb::flow::interface11::SUCCESSFULLY_ENQUEUED) res = NULL;
240  return res;
241  }
242  private:
243  Receiver &my_receiver;
244  Body my_body;
245  };
246  typedef std::list<tbb::task *> task_list_type;
247 
248  class wait_functor {
250  public:
253  };
254 
258  public:
260  void operator()() const {
262  }
263  };
264 
265  void prepare_task_arena(bool reinit = false) {
266  if (reinit) {
267  __TBB_ASSERT(my_task_arena, "task arena is NULL");
270  }
271  else {
272  __TBB_ASSERT(my_task_arena == NULL, "task arena is not NULL");
274  }
275  if (!my_task_arena->is_active()) // failed to attach
276  my_task_arena->initialize(); // create a new, default-initialized arena
277  __TBB_ASSERT(my_task_arena->is_active(), "task arena is not active");
278  }
279 
280 public:
282  graph();
283 
285  explicit graph(tbb::task_group_context& use_this_context);
286 
288 
289  ~graph();
290 
291 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
292  void set_name(const char *name);
293 #endif
294 
296  reserve_wait();
297  }
298 
300  release_wait();
301  }
302 
304 
307 
309 
312 
314 
316  template< typename Receiver, typename Body >
317  __TBB_DEPRECATED void run(Receiver &r, Body body) {
319  task* rtask = new (task::allocate_additional_child_of(*root_task()))
322  }
323  }
324 
326 
328  template< typename Body >
329  __TBB_DEPRECATED void run(Body body) {
331  task* rtask = new (task::allocate_additional_child_of(*root_task())) run_task< Body >(body);
333  }
334  }
335 
337 
338  void wait_for_all() {
339  cancelled = false;
340  caught_exception = false;
341  if (my_root_task) {
342 #if TBB_USE_EXCEPTIONS
343  try {
344 #endif
346 #if __TBB_TASK_GROUP_CONTEXT
348 #endif
349 #if TBB_USE_EXCEPTIONS
350  }
351  catch (...) {
353  my_context->reset();
354  caught_exception = true;
355  cancelled = true;
356  throw;
357  }
358 #endif
359 #if __TBB_TASK_GROUP_CONTEXT
360  // TODO: the "if" condition below is just a work-around to support the concurrent wait
361  // mode. The cancellation and exception mechanisms are still broken in this mode.
362  // Consider using task group not to re-implement the same functionality.
364  my_context->reset(); // consistent with behavior in catch()
365 #endif
367 #if __TBB_TASK_GROUP_CONTEXT
368  }
369 #endif
370  }
371  }
372 
375  return my_root_task;
376  }
377 
378  // ITERATORS
379  template<typename C, typename N>
381 
382  // Graph iterator typedefs
385 
386  // Graph iterator constructors
388  iterator begin();
390  iterator end();
392  const_iterator begin() const;
394  const_iterator end() const;
396  const_iterator cbegin() const;
398  const_iterator cend() const;
399 
401  bool is_cancelled() { return cancelled; }
403 
404  // thread-unsafe state reset.
406 
407 private:
409 #if __TBB_TASK_GROUP_CONTEXT
411 #endif
413  bool cancelled;
417 
419 
423 
425 
426 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
428 #endif
429 
437 
439 
440 }; // class graph
441 } // namespace interface10
442 
443 namespace interface11 {
444 
446 
447 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
448 namespace internal{
449 class get_graph_helper;
450 }
451 #endif
452 
455  friend class graph;
456  template<typename C, typename N>
457  friend class graph_iterator;
458 
459 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
460  friend class internal::get_graph_helper;
461 #endif
462 
463 protected:
466 public:
467  explicit graph_node(graph& g);
468 
469  virtual ~graph_node();
470 
471 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
472  virtual void set_name(const char *name) = 0;
473 #endif
474 
475 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
476  virtual void extract() = 0;
477 #endif
478 
479 protected:
480  // performs the reset on an individual node.
481  virtual void reset_node(reset_flags f = rf_reset_protocol) = 0;
482 }; // class graph_node
483 
484 namespace internal {
485 
486 inline void activate_graph(graph& g) {
487  g.my_is_active = true;
488 }
489 
490 inline void deactivate_graph(graph& g) {
491  g.my_is_active = false;
492 }
493 
494 inline bool is_graph_active(graph& g) {
495  return g.my_is_active;
496 }
497 
498 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
500  task* critical_task = &t;
501  // TODO: change flow graph's interfaces to work with graph_task type instead of tbb::task.
502  graph_task* gt = static_cast<graph_task*>(&t);
503  if( gt->priority != no_priority ) {
508  critical_task = new( gt->allocate_continuation() ) priority_task_selector(g.my_priority_queue);
509  tbb::internal::make_critical( *critical_task );
510  g.my_priority_queue.push(gt);
511  }
512  return *critical_task;
513 }
514 #else
516  return t;
517 }
518 #endif /* __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES */
519 
521 inline void spawn_in_graph_arena(graph& g, tbb::task& arena_task) {
522  if (is_graph_active(g)) {
523  graph::spawn_functor s_fn(prioritize_task(g, arena_task));
525  g.my_task_arena->execute(s_fn);
526  }
527 }
528 
530 inline void enqueue_in_graph_arena(graph &g, tbb::task& arena_task) {
531  if (is_graph_active(g)) {
532  __TBB_ASSERT( g.my_task_arena && g.my_task_arena->is_active(), "Is graph's arena initialized and active?" );
533  task::enqueue(prioritize_task(g, arena_task), *g.my_task_arena);
534  }
535 }
536 
538  g.my_reset_task_list.push_back(tp);
539 }
540 
541 } // namespace internal
542 
543 } // namespace interfaceX
544 } // namespace flow
545 } // namespace tbb
546 
547 #endif // __TBB_flow_graph_impl_H
bool is_cancelled()
return status of graph execution
#define FLOW_SPAWN(a)
tbb::task & prioritize_task(tbb::flow::interface10::graph &g, tbb::task &arena_task)
Used to form groups of tasks.
Definition: task.h:358
graph_task(node_priority_t node_priority=no_priority)
task * execute() __TBB_override
Should be overridden by derived classes.
void reserve_wait() __TBB_override
Used to register that an external entity may still interact with the graph.
Definition: flow_graph.h:806
tbb::task_group_context * my_context
graph_iterator(const graph_iterator &other)
Copy constructor.
void wait_for_all()
Wait for reference count to become one, and set reference count to zero.
Definition: task.h:819
void prepare_task_arena(bool reinit=false)
__TBB_DEPRECATED void decrement_wait_count()
Base class for user-defined tasks.
Definition: task.h:615
graph_iterator operator++(int)
Post-increment.
void initialize()
Forces allocation of the resources for the task_arena as specified in constructor arguments.
Definition: task_arena.h:315
void enqueue_in_graph_arena(tbb::flow::interface10::graph &g, tbb::task &arena_task)
Enqueues a task inside graph arena.
reference operator *() const
Dereference.
Definition: flow_graph.h:755
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp begin
virtual task * execute()=0
Should be overridden by derived classes.
void spawn_in_graph_arena(tbb::flow::interface10::graph &g, tbb::task &arena_task)
Spawns a task inside graph arena.
tbb::task * execute() __TBB_override
Should be overridden by derived classes.
A lock that occupies a single byte.
Definition: spin_mutex.h:39
void reset(tbb::flow::interface11::reset_flags f=tbb::flow::interface11::rf_reset_protocol)
Definition: flow_graph.h:843
graph_iterator & operator=(const graph_iterator &other)
Assignment.
uintptr_t traits() const
Returns the context's trait.
Definition: task.h:578
Base class for types that should not be assigned.
Definition: tbb_stddef.h:322
const_iterator cbegin() const
start const iterator
Definition: flow_graph.h:874
void deactivate_graph(tbb::flow::interface10::graph &g)
void __TBB_EXPORTED_METHOD reset()
Forcefully reinitializes the context after the task tree it was associated with is completed.
#define __TBB_DEPRECATED
Definition: tbb_config.h:636
static void enqueue(task &t)
Enqueue task for starvation-resistant execution.
Definition: task.h:836
unsigned int node_priority_t
Tag class used to indicate the "attaching" constructor.
Definition: task_arena.h:304
iterator end()
end iterator
Definition: flow_graph.h:868
graph_iterator & operator++()
Pre-increment.
void make_critical(task &t)
Definition: task.h:1013
Pure virtual template classes that define interfaces for async communication.
bool is_graph_active(tbb::flow::interface10::graph &g)
bool __TBB_EXPORTED_METHOD is_group_execution_cancelled() const
Returns true if the context received cancellation request.
__TBB_DEPRECATED void increment_wait_count()
void add_task_to_graph_reset_list(tbb::flow::interface10::graph &g, tbb::task *tp)
tbb::flow::interface11::graph_iterator< const graph, const tbb::flow::interface11::graph_node > const_iterator
void wait_for_all()
Wait until graph is idle and decrement_wait_count calls equals increment_wait_count calls.
tbb::flow::interface11::graph_iterator< graph, tbb::flow::interface11::graph_node > iterator
__TBB_DEPRECATED void run(Body body)
Spawns a task that runs a function object.
void release_wait() __TBB_override
Deregisters an external entity that may have interacted with the graph.
Definition: flow_graph.h:813
internal::allocate_continuation_proxy & allocate_continuation()
Returns proxy for overloaded new that allocates a continuation task of *this.
Definition: task.h:676
tbb::flow::interface11::graph_node * my_nodes
bool operator()(const graph_task *left, const graph_task *right)
bool try_pop(reference elem)
Gets a reference to and removes highest priority element.
__TBB_DEPRECATED void run(Receiver &r, Body body)
Spawns a task that runs a body and puts its output to a specific receiver.
priority_task_selector(graph_task_priority_queue_t &priority_queue)
bool operator==(const graph_iterator &other) const
Equality.
static const node_priority_t no_priority
std::list< tbb::task * > task_list_type
tbb::concurrent_priority_queue< graph_task *, graph_task_comparator > graph_task_priority_queue_t
static tbb::task *const SUCCESSFULLY_ENQUEUED
#define __TBB_ASSERT_EX(predicate, comment)
"Extended" version is useful to suppress warnings if a variable is only used with an assert
Definition: tbb_stddef.h:167
tbb::flow::interface11::internal::graph_task_priority_queue_t my_priority_queue
graph()
Constructs a graph with isolated task_group_context.
Definition: flow_graph.h:774
Base class for types that should not be copied or assigned.
Definition: tbb_stddef.h:330
void set_ref_count(int count)
Set reference count.
Definition: task.h:761
std::forward_iterator_tag iterator_category
void const char const char int ITT_FORMAT __itt_group_sync x void const char * name
bool operator!=(const graph_iterator &other) const
Inequality.
The base of all graph nodes.
#define __TBB_override
Definition: tbb_stddef.h:240
iterator begin()
start iterator
Definition: flow_graph.h:866
__TBB_DEPRECATED tbb::task * root_task()
Returns the root task of the graph.
virtual void reset_node(reset_flags f=rf_reset_protocol)=0
#define __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
Definition: tbb_config.h:861
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
const_iterator cend() const
end const iterator
Definition: flow_graph.h:876
Base class for tasks generated by graph nodes.
tbb::flow::interface11::graph_node * my_nodes_last
tbb::task * execute() __TBB_override
Should be overridden by derived classes.
void remove_node(tbb::flow::interface11::graph_node *n)
Definition: flow_graph.h:831
The graph class.
internal::return_type_or_void< F >::type execute(F &f)
Definition: task_arena.h:423
run_task(Body &body, tbb::flow::interface11::node_priority_t node_priority=tbb::flow::interface11::no_priority)
~graph()
Destroys the graph.
Definition: flow_graph.h:798
pointer operator->() const
Dereference.
Definition: flow_graph.h:761
void push(const_reference elem)
Pushes elem onto the queue, increasing capacity of queue if necessary.
void activate_graph(tbb::flow::interface10::graph &g)
void register_node(tbb::flow::interface11::graph_node *n)
Definition: flow_graph.h:820

Copyright © 2005-2020 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.