Home ⌂Doc Index ◂Up ▴
Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
_flow_graph_async_msg_impl.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2020 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 */
16 
17 #ifndef __TBB__flow_graph_async_msg_impl_H
18 #define __TBB__flow_graph_async_msg_impl_H
19 
20 #ifndef __TBB_flow_graph_H
21 #error Do not #include this internal file directly; use public TBB headers instead.
22 #endif
23 
24 namespace internal {
25 
26 template <typename T>
28 public:
29  typedef receiver<T> async_storage_client;
30 
31  async_storage() : my_graph(nullptr) {
32  my_data_ready.store<tbb::relaxed>(false);
33  }
34 
36  // Release reference to the graph if async_storage
37  // was destructed before set() call
38  if (my_graph) {
39  my_graph->release_wait();
40  my_graph = nullptr;
41  }
42  }
43 
44  template<typename C>
45  async_storage(C&& data) : my_graph(nullptr), my_data( std::forward<C>(data) ) {
46  using namespace tbb::internal;
47  __TBB_STATIC_ASSERT( (is_same_type<typename strip<C>::type, typename strip<T>::type>::value), "incoming type must be T" );
48 
49  my_data_ready.store<tbb::relaxed>(true);
50  }
51 
52  template<typename C>
53  bool set(C&& data) {
54  using namespace tbb::internal;
55  __TBB_STATIC_ASSERT( (is_same_type<typename strip<C>::type, typename strip<T>::type>::value), "incoming type must be T" );
56 
57  {
59 
60  if (my_data_ready.load<tbb::relaxed>()) {
61  __TBB_ASSERT(false, "double set() call");
62  return false;
63  }
64 
65  my_data = std::forward<C>(data);
66  my_data_ready.store<tbb::release>(true);
67  }
68 
69  // Thread sync is on my_data_ready flag
70  for (typename subscriber_list_type::iterator it = my_clients.begin(); it != my_clients.end(); ++it) {
71  (*it)->try_put(my_data);
72  }
73 
74  // Data was sent, release reference to the graph
75  if (my_graph) {
76  my_graph->release_wait();
77  my_graph = nullptr;
78  }
79 
80  return true;
81  }
82 
83  task* subscribe(async_storage_client& client, graph& g) {
84  if (! my_data_ready.load<tbb::acquire>())
85  {
87 
88  if (! my_data_ready.load<tbb::relaxed>()) {
89 #if TBB_USE_ASSERT
90  for (typename subscriber_list_type::iterator it = my_clients.begin(); it != my_clients.end(); ++it) {
91  __TBB_ASSERT(*it != &client, "unexpected double subscription");
92  }
93 #endif // TBB_USE_ASSERT
94 
95  // Increase graph lifetime
96  my_graph = &g;
97  my_graph->reserve_wait();
98 
99  // Subscribe
100  my_clients.push_back(&client);
101  return SUCCESSFULLY_ENQUEUED;
102  }
103  }
104 
105  __TBB_ASSERT(my_data_ready.load<tbb::relaxed>(), "data is NOT ready");
106  return client.try_put_task(my_data);
107  }
108 
109 private:
110  graph* my_graph;
112  tbb::atomic<bool> my_data_ready;
114  typedef std::vector<async_storage_client*> subscriber_list_type;
116 };
117 
118 } // namespace internal
119 
120 template <typename T>
122  template< typename > friend class receiver;
123  template< typename, typename > friend struct internal::async_helpers;
124 public:
126 
127  async_msg() : my_storage(std::make_shared< internal::async_storage<T> >()) {}
128 
129  async_msg(const T& t) : my_storage(std::make_shared< internal::async_storage<T> >(t)) {}
130 
131  async_msg(T&& t) : my_storage(std::make_shared< internal::async_storage<T> >( std::move(t) )) {}
132 
133  virtual ~async_msg() {}
134 
135  void set(const T& t) {
136  my_storage->set(t);
137  }
138 
139  void set(T&& t) {
140  my_storage->set( std::move(t) );
141  }
142 
143 protected:
144  // Can be overridden in derived class to inform that
145  // async calculation chain is over
146  virtual void finalize() const {}
147 
148 private:
149  typedef std::shared_ptr< internal::async_storage<T> > async_storage_ptr;
151 };
152 
153 #endif // __TBB__flow_graph_async_msg_impl_H
std::vector< async_storage_client * > subscriber_list_type
A lock that occupies a single byte.
Definition: spin_mutex.h:39
#define __TBB_DEPRECATED
Definition: tbb_config.h:636
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type size_t void * data
#define __TBB_STATIC_ASSERT(condition, msg)
Definition: tbb_stddef.h:553
void set(const T &t)
Release.
Definition: atomic.h:59
task * subscribe(async_storage_client &client, graph &g)
std::shared_ptr< internal::async_storage< T > > async_storage_ptr
static tbb::task *const SUCCESSFULLY_ENQUEUED
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task * task
Represents acquisition of a mutex.
Definition: spin_mutex.h:53
Detects whether two given types are the same.
No ordering.
Definition: atomic.h:61
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
Acquire.
Definition: atomic.h:57
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value
virtual void finalize() const
async_storage_ptr my_storage
Identifiers declared inside namespace internal should never be used directly by client code.
Definition: atomic.h:65
void move(tbb_thread &t1, tbb_thread &t2)
Definition: tbb_thread.h:319

Copyright © 2005-2020 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.