Home ⌂Doc Index ◂Up ▴
Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
aggregator.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2020 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 */
16 
17 #ifndef __TBB__aggregator_H
18 #define __TBB__aggregator_H
19 
20 #define __TBB_aggregator_H_include_area
22 
23 #if !TBB_PREVIEW_AGGREGATOR
24 #error Set TBB_PREVIEW_AGGREGATOR before including aggregator.h
25 #endif
26 
27 #include "atomic.h"
28 #include "tbb_profiling.h"
29 
30 namespace tbb {
31 namespace interface6 {
32 
33 using namespace tbb::internal;
34 
36  template<typename handler_type> friend class aggregator_ext;
37  uintptr_t status;
39 public:
40  enum aggregator_operation_status { agg_waiting=0, agg_finished };
41  aggregator_operation() : status(agg_waiting), my_next(NULL) {}
43  void start() { call_itt_notify(acquired, &status); }
45 
46  void finish() { itt_store_word_with_release(status, uintptr_t(agg_finished)); }
49 };
50 
51 namespace internal {
52 
54  friend class basic_handler;
55  virtual void apply_body() = 0;
56 public:
58  virtual ~basic_operation_base() {}
59 };
60 
61 template<typename Body>
63  const Body& my_body;
64  void apply_body() __TBB_override { my_body(); }
65 public:
66  basic_operation(const Body& b) : basic_operation_base(), my_body(b) {}
67 };
68 
70 public:
72  void operator()(aggregator_operation* op_list) const {
73  while (op_list) {
74  // ITT note: &(op_list->status) tag is used to cover accesses to the operation data.
75  // The executing thread "acquires" the tag (see start()) and then performs
76  // the associated operation w/o triggering a race condition diagnostics.
77  // A thread that created the operation is waiting for its status (see execute_impl()),
78  // so when this thread is done with the operation, it will "release" the tag
79  // and update the status (see finish()) to give control back to the waiting thread.
80  basic_operation_base& request = static_cast<basic_operation_base&>(*op_list);
81  // IMPORTANT: need to advance op_list to op_list->next() before calling request.finish()
82  op_list = op_list->next();
83  request.start();
84  request.apply_body();
85  request.finish();
86  }
87  }
88 };
89 
90 } // namespace internal
91 
93 
95 template <typename handler_type>
97 public:
98  aggregator_ext(const handler_type& h) : handler_busy(0), handle_operations(h) { mailbox = NULL; }
99 
101 
102  void process(aggregator_operation *op) { execute_impl(*op); }
103 
104 protected:
109 
110  // ITT note: &(op.status) tag is used to cover accesses to this operation. This
111  // thread has created the operation, and now releases it so that the handler
112  // thread may handle the associated operation w/o triggering a race condition;
113  // thus this tag will be acquired just before the operation is handled in the
114  // handle_operations functor.
116  // insert the operation into the list
117  do {
118  // ITT may flag the following line as a race; it is a false positive:
119  // This is an atomic read; we don't provide itt_hide_load_word for atomics
120  op.my_next = res = mailbox; // NOT A RACE
121  } while (mailbox.compare_and_swap(&op, res) != res);
122  if (!res) { // first in the list; handle the operations
123  // ITT note: &mailbox tag covers access to the handler_busy flag, which this
124  // waiting handler thread will try to set before entering handle_operations.
125  call_itt_notify(acquired, &mailbox);
126  start_handle_operations();
127  __TBB_ASSERT(op.status, NULL);
128  }
129  else { // not first; wait for op to be ready
133  }
134  }
135 
136 
137 private:
139  atomic<aggregator_operation *> mailbox;
140 
142 
143  uintptr_t handler_busy;
144 
145  handler_type handle_operations;
146 
149  aggregator_operation *pending_operations;
150 
151  // ITT note: &handler_busy tag covers access to mailbox as it is passed
152  // between active and waiting handlers. Below, the waiting handler waits until
153  // the active handler releases, and the waiting handler acquires &handler_busy as
154  // it becomes the active_handler. The release point is at the end of this
155  // function, when all operations in mailbox have been handled by the
156  // owner of this aggregator.
157  call_itt_notify(prepare, &handler_busy);
158  // get handler_busy: only one thread can possibly spin here at a time
159  spin_wait_until_eq(handler_busy, uintptr_t(0));
160  call_itt_notify(acquired, &handler_busy);
161  // acquire fence not necessary here due to causality rule and surrounding atomics
162  __TBB_store_with_release(handler_busy, uintptr_t(1));
163 
164  // ITT note: &mailbox tag covers access to the handler_busy flag itself.
165  // Capturing the state of the mailbox signifies that handler_busy has been
166  // set and a new active handler will now process that list's operations.
167  call_itt_notify(releasing, &mailbox);
168  // grab pending_operations
169  pending_operations = mailbox.fetch_and_store(NULL);
170 
171  // handle all the operations
172  handle_operations(pending_operations);
173 
174  // release the handler
175  itt_store_word_with_release(handler_busy, uintptr_t(0));
176  }
177 };
178 
180 class aggregator : private aggregator_ext<internal::basic_handler> {
181 public:
182  aggregator() : aggregator_ext<internal::basic_handler>(internal::basic_handler()) {}
184 
186  template<typename Body>
187  void execute(const Body& b) {
189  this->execute_impl(op);
190  }
191 };
192 
193 } // namespace interface6
194 
195 using interface6::aggregator;
196 using interface6::aggregator_ext;
197 using interface6::aggregator_operation;
198 
199 } // namespace tbb
200 
202 #undef __TBB_aggregator_H_include_area
203 
204 #endif // __TBB__aggregator_H
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function h
aggregator_operation * next()
Definition: aggregator.h:47
void call_itt_notify(notify_type, void *)
void set_next(aggregator_operation *n)
Definition: aggregator.h:48
void spin_wait_while_eq(const volatile T &location, U value)
Spin WHILE the value of the variable is equal to a given value.
Definition: tbb_machine.h:391
uintptr_t handler_busy
Controls thread access to handle_operations.
Definition: aggregator.h:143
void operator()(aggregator_operation *op_list) const
Definition: aggregator.h:72
Base class for types that should not be assigned.
Definition: tbb_stddef.h:322
void itt_store_word_with_release(tbb::atomic< T > &dst, U src)
aggregator_ext(const handler_type &h)
Definition: aggregator.h:98
void process(aggregator_operation *op)
EXPERT INTERFACE: Enter a user-made operation into the aggregator's mailbox.
Definition: aggregator.h:102
void spin_wait_until_eq(const volatile T &location, const U value)
Spin UNTIL the value of the variable is equal to a given value.
Definition: tbb_machine.h:399
void __TBB_store_with_release(volatile T &location, V value)
Definition: tbb_machine.h:713
T itt_hide_load_word(const T &src)
Basic aggregator interface.
Definition: aggregator.h:180
void start()
Call start before handling this operation.
Definition: aggregator.h:43
atomic< aggregator_operation * > mailbox
An atomically updated list (aka mailbox) of aggregator_operations.
Definition: aggregator.h:139
Base class for types that should not be copied or assigned.
Definition: tbb_stddef.h:330
#define __TBB_override
Definition: tbb_stddef.h:240
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
T itt_load_word_with_acquire(const tbb::atomic< T > &src)
void itt_hide_store_word(T &dst, T src)
Aggregator base class and expert interface.
Definition: aggregator.h:96
void finish()
Call finish when done handling this operation.
Definition: aggregator.h:46
The graph class.
void execute_impl(aggregator_operation &op)
Definition: aggregator.h:107
Identifiers declared inside namespace internal should never be used directly by client code.
Definition: atomic.h:65
aggregator_operation * my_next
Definition: aggregator.h:38
void start_handle_operations()
Trigger the handling of operations when the handler is free.
Definition: aggregator.h:148
void execute(const Body &b)
BASIC INTERFACE: Enter a function for exclusive execution by the aggregator.
Definition: aggregator.h:187

Copyright © 2005-2020 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.