Home ⌂Doc Index ◂Up ▴
Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
observer_proxy.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2020 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 */
16 
17 #include "tbb/tbb_config.h"
18 
19 #if __TBB_SCHEDULER_OBSERVER
20 
21 #include "observer_proxy.h"
22 #include "tbb_main.h"
23 #include "governor.h"
24 #include "scheduler.h"
25 #include "arena.h"
26 
27 namespace tbb {
28 namespace internal {
29 
30 padded<observer_list> the_global_observer_list;
31 
32 #if TBB_USE_ASSERT
33 static atomic<int> observer_proxy_count;
34 
35 struct check_observer_proxy_count {
36  ~check_observer_proxy_count() {
37  if( observer_proxy_count!=0 ) {
38  runtime_warning( "Leaked %ld observer_proxy objects\n", long(observer_proxy_count) );
39  }
40  }
41 };
42 
43 static check_observer_proxy_count the_check_observer_proxy_count;
44 #endif /* TBB_USE_ASSERT */
45 
46 #if __TBB_ARENA_OBSERVER
47 interface6::task_scheduler_observer* observer_proxy::get_v6_observer() {
48  if(my_version != 6) return NULL;
49  return static_cast<interface6::task_scheduler_observer*>(my_observer);
50 }
51 #endif
52 
53 #if __TBB_ARENA_OBSERVER
54 bool observer_proxy::is_global() {
55  return !get_v6_observer() || get_v6_observer()->my_context_tag == interface6::task_scheduler_observer::global_tag;
56 }
57 #endif /* __TBB_ARENA_OBSERVER */
58 
59 observer_proxy::observer_proxy( task_scheduler_observer_v3& tso )
60  : my_list(NULL), my_next(NULL), my_prev(NULL), my_observer(&tso)
61 {
62 #if TBB_USE_ASSERT
63  ++observer_proxy_count;
64 #endif /* TBB_USE_ASSERT */
65  // 1 for observer
66  my_ref_count = 1;
67  my_version =
68 #if __TBB_ARENA_OBSERVER
69  load<relaxed>(my_observer->my_busy_count)
71 #endif
72  0;
73  __TBB_ASSERT( my_version >= 6 || !load<relaxed>(my_observer->my_busy_count), NULL );
74 }
75 
76 #if TBB_USE_ASSERT
77 observer_proxy::~observer_proxy () {
78  __TBB_ASSERT( !my_ref_count, "Attempt to destroy proxy still in use" );
79  poison_value(my_ref_count);
80  poison_pointer(my_prev);
81  poison_pointer(my_next);
82  --observer_proxy_count;
83 }
84 #endif /* TBB_USE_ASSERT */
85 
86 template<memory_semantics M, class T, class V>
87 T atomic_fetch_and_store ( T* addr, const V& val ) {
88  return (T)atomic_traits<sizeof(T), M>::fetch_and_store( addr, (T)val );
89 }
90 
91 void observer_list::clear () {
92  __TBB_ASSERT( this != &the_global_observer_list, "Method clear() cannot be used on the list of global observers" );
93  // Though the method will work fine for the empty list, we require the caller
94  // to check for the list emptiness before invoking it to avoid extra overhead.
95  __TBB_ASSERT( !empty(), NULL );
96  {
97  scoped_lock lock(mutex(), /*is_writer=*/true);
98  observer_proxy *next = my_head;
99  while ( observer_proxy *p = next ) {
100  __TBB_ASSERT( p->my_version >= 6, NULL );
101  next = p->my_next;
102  // Both proxy p and observer p->my_observer (if non-null) are guaranteed
103  // to be alive while the list is locked.
104  task_scheduler_observer_v3 *obs = p->my_observer;
105  // Make sure that possible concurrent observer destruction does not
106  // conflict with the proxy list cleanup.
107  if ( !obs || !(p = (observer_proxy*)__TBB_FetchAndStoreW(&obs->my_proxy, 0)) )
108  continue;
109  // accessing 'obs' after detaching of obs->my_proxy leads to the race with observer destruction
110  __TBB_ASSERT( !next || p == next->my_prev, NULL );
111  __TBB_ASSERT( is_alive(p->my_ref_count), "Observer's proxy died prematurely" );
112  __TBB_ASSERT( p->my_ref_count == 1, "Reference for observer is missing" );
113 #if TBB_USE_ASSERT
114  p->my_observer = NULL;
115  p->my_ref_count = 0;
116 #endif /* TBB_USE_ASSERT */
117  remove(p);
118  delete p;
119  }
120  }
121  while( my_head )
122  __TBB_Yield();
123 }
124 
125 void observer_list::insert ( observer_proxy* p ) {
126  scoped_lock lock(mutex(), /*is_writer=*/true);
127  if ( my_head ) {
128  p->my_prev = my_tail;
129  my_tail->my_next = p;
130  }
131  else
132  my_head = p;
133  my_tail = p;
134 }
135 
136 void observer_list::remove ( observer_proxy* p ) {
137  __TBB_ASSERT( my_head, "Attempt to remove an item from an empty list" );
138  __TBB_ASSERT( !my_tail->my_next, "Last item's my_next must be NULL" );
139  if( p == my_tail ) {
140  __TBB_ASSERT( !p->my_next, NULL );
141  my_tail = p->my_prev;
142  }
143  else {
144  __TBB_ASSERT( p->my_next, NULL );
145  p->my_next->my_prev = p->my_prev;
146  }
147  if ( p == my_head ) {
148  __TBB_ASSERT( !p->my_prev, NULL );
149  my_head = p->my_next;
150  }
151  else {
152  __TBB_ASSERT( p->my_prev, NULL );
153  p->my_prev->my_next = p->my_next;
154  }
155  __TBB_ASSERT( (my_head && my_tail) || (!my_head && !my_tail), NULL );
156 }
157 
158 void observer_list::remove_ref( observer_proxy* p ) {
159  int r = p->my_ref_count;
160  __TBB_ASSERT( is_alive(r), NULL );
161  while(r>1) {
162  __TBB_ASSERT( r!=0, NULL );
163  int r_old = p->my_ref_count.compare_and_swap(r-1,r);
164  if( r_old==r ) {
165  // Successfully decremented count.
166  return;
167  }
168  r = r_old;
169  }
170  __TBB_ASSERT( r==1, NULL );
171  // Reference count might go to zero
172  {
173  // Use lock to avoid resurrection by a thread concurrently walking the list
174  observer_list::scoped_lock lock(mutex(), /*is_writer=*/true);
175  r = --p->my_ref_count;
176  if( !r )
177  remove(p);
178  }
179  __TBB_ASSERT( r || !p->my_ref_count, NULL );
180  if( !r )
181  delete p;
182 }
183 
184 void observer_list::do_notify_entry_observers( observer_proxy*& last, bool worker ) {
185  // Pointer p marches though the list from last (exclusively) to the end.
186  observer_proxy *p = last, *prev = p;
187  for(;;) {
188  task_scheduler_observer_v3* tso=NULL;
189  // Hold lock on list only long enough to advance to the next proxy in the list.
190  {
191  scoped_lock lock(mutex(), /*is_writer=*/false);
192  do {
193  if( p ) {
194  // We were already processing the list.
195  if( observer_proxy* q = p->my_next ) {
196  if( p == prev )
197  remove_ref_fast(prev); // sets prev to NULL if successful
198  p = q;
199  }
200  else {
201  // Reached the end of the list.
202  if( p == prev ) {
203  // Keep the reference as we store the 'last' pointer in scheduler
204  __TBB_ASSERT(p->my_ref_count >= 1 + (p->my_observer?1:0), NULL);
205  } else {
206  // The last few proxies were empty
207  __TBB_ASSERT(p->my_ref_count, NULL);
208  ++p->my_ref_count;
209  if( prev ) {
210  lock.release();
211  remove_ref(prev);
212  }
213  }
214  last = p;
215  return;
216  }
217  } else {
218  // Starting pass through the list
219  p = my_head;
220  if( !p )
221  return;
222  }
223  tso = p->my_observer;
224  } while( !tso );
225  ++p->my_ref_count;
226  ++tso->my_busy_count;
227  }
228  __TBB_ASSERT( !prev || p!=prev, NULL );
229  // Release the proxy pinned before p
230  if( prev )
231  remove_ref(prev);
232  // Do not hold any locks on the list while calling user's code.
233  // Do not intercept any exceptions that may escape the callback so that
234  // they are either handled by the TBB scheduler or passed to the debugger.
235  tso->on_scheduler_entry(worker);
236  __TBB_ASSERT(p->my_ref_count, NULL);
237  intptr_t bc = --tso->my_busy_count;
238  __TBB_ASSERT_EX( bc>=0, "my_busy_count underflowed" );
239  prev = p;
240  }
241 }
242 
243 void observer_list::do_notify_exit_observers( observer_proxy* last, bool worker ) {
244  // Pointer p marches though the list from the beginning to last (inclusively).
245  observer_proxy *p = NULL, *prev = NULL;
246  for(;;) {
247  task_scheduler_observer_v3* tso=NULL;
248  // Hold lock on list only long enough to advance to the next proxy in the list.
249  {
250  scoped_lock lock(mutex(), /*is_writer=*/false);
251  do {
252  if( p ) {
253  // We were already processing the list.
254  if( p != last ) {
255  __TBB_ASSERT( p->my_next, "List items before 'last' must have valid my_next pointer" );
256  if( p == prev )
257  remove_ref_fast(prev); // sets prev to NULL if successful
258  p = p->my_next;
259  } else {
260  // remove the reference from the last item
261  remove_ref_fast(p);
262  if( p ) {
263  lock.release();
264  remove_ref(p);
265  }
266  return;
267  }
268  } else {
269  // Starting pass through the list
270  p = my_head;
271  __TBB_ASSERT( p, "Nonzero 'last' must guarantee that the global list is non-empty" );
272  }
273  tso = p->my_observer;
274  } while( !tso );
275  // The item is already refcounted
276  if ( p != last ) // the last is already referenced since entry notification
277  ++p->my_ref_count;
278  ++tso->my_busy_count;
279  }
280  __TBB_ASSERT( !prev || p!=prev, NULL );
281  if( prev )
282  remove_ref(prev);
283  // Do not hold any locks on the list while calling user's code.
284  // Do not intercept any exceptions that may escape the callback so that
285  // they are either handled by the TBB scheduler or passed to the debugger.
286  tso->on_scheduler_exit(worker);
287  __TBB_ASSERT(p->my_ref_count || p == last, NULL);
288  intptr_t bc = --tso->my_busy_count;
289  __TBB_ASSERT_EX( bc>=0, "my_busy_count underflowed" );
290  prev = p;
291  }
292 }
293 
294 void task_scheduler_observer_v3::observe( bool enable ) {
295  if( enable ) {
296  if( !my_proxy ) {
297  my_proxy = new observer_proxy( *this );
298  my_busy_count = 0; // proxy stores versioning information, clear it
299 #if __TBB_ARENA_OBSERVER
300  if ( !my_proxy->is_global() ) {
301  // Local observer activation
302  generic_scheduler* s = governor::local_scheduler_if_initialized();
303  __TBB_ASSERT( my_proxy->get_v6_observer(), NULL );
304  intptr_t tag = my_proxy->get_v6_observer()->my_context_tag;
305  if( tag != interface6::task_scheduler_observer::implicit_tag ) { // explicit arena
306  task_arena *a = reinterpret_cast<task_arena*>(tag);
307  if ( a->my_arena==NULL ) // Avoid recursion during arena initialization
308  a->initialize();
309  my_proxy->my_list = &a->my_arena->my_observers;
310  } else {
311  if( !(s && s->my_arena) )
314  __TBB_ASSERT( s && s->my_arena, NULL );
315  my_proxy->my_list = &s->my_arena->my_observers;
316  }
317  my_proxy->my_list->insert(my_proxy);
318  // Notify newly activated observer and other pending ones if it belongs to current arena
319  if(s && &s->my_arena->my_observers == my_proxy->my_list )
320  my_proxy->my_list->notify_entry_observers( s->my_last_local_observer, s->is_worker() );
321  } else
322 #endif /* __TBB_ARENA_OBSERVER */
323  {
324  // Obsolete. Global observer activation
327  my_proxy->my_list = &the_global_observer_list;
328  my_proxy->my_list->insert(my_proxy);
329  if( generic_scheduler* s = governor::local_scheduler_if_initialized() ) {
330  // Notify newly created observer of its own thread.
331  // Any other pending observers are notified too.
332  the_global_observer_list.notify_entry_observers( s->my_last_global_observer, s->is_worker() );
333  }
334  }
335  }
336  } else {
337  // Make sure that possible concurrent proxy list cleanup does not conflict
338  // with the observer destruction here.
339  if ( observer_proxy* proxy = (observer_proxy*)__TBB_FetchAndStoreW(&my_proxy, 0) ) {
340  // List destruction should not touch this proxy after we've won the above interlocked exchange.
341  __TBB_ASSERT( proxy->my_observer == this, NULL );
342  __TBB_ASSERT( is_alive(proxy->my_ref_count), "Observer's proxy died prematurely" );
343  __TBB_ASSERT( proxy->my_ref_count >= 1, "reference for observer missing" );
344  observer_list &list = *proxy->my_list;
345  {
346  // Ensure that none of the list walkers relies on observer pointer validity
347  observer_list::scoped_lock lock(list.mutex(), /*is_writer=*/true);
348  proxy->my_observer = NULL;
349  // Proxy may still be held by other threads (to track the last notified observer)
350  if( !--proxy->my_ref_count ) {// nobody can increase it under exclusive lock
351  list.remove(proxy);
352  __TBB_ASSERT( !proxy->my_ref_count, NULL );
353  delete proxy;
354  }
355  }
356  while( my_busy_count ) // other threads are still accessing the callback
357  __TBB_Yield();
358  }
359  }
360 }
361 
362 } // namespace internal
363 } // namespace tbb
364 
365 #endif /* __TBB_SCHEDULER_OBSERVER */
#define poison_value(g)
static generic_scheduler * init_scheduler(int num_threads, stack_size_type stack_size, bool auto_init)
Processes scheduler initialization request (possibly nested) in a master thread.
Definition: governor.cpp:172
auto last(Container &c) -> decltype(begin(c))
void * addr
#define __TBB_Yield()
Definition: ibm_aix51.h:44
void const char const char int ITT_FORMAT __itt_group_sync p
void __TBB_EXPORTED_METHOD observe(bool state=true)
Enable or disable observation.
#define __TBB_ASSERT_EX(predicate, comment)
"Extended" version is useful to suppress warnings if a variable is only used with an assert
Definition: tbb_stddef.h:167
static generic_scheduler * local_scheduler_if_initialized()
Definition: governor.h:139
void DoOneTimeInitializations()
Performs thread-safe lazy one-time general TBB initialization.
Definition: tbb_main.cpp:215
void const char const char int ITT_FORMAT __itt_group_sync s
atomic< intptr_t > my_busy_count
Counter preventing the observer from being destroyed while in use by the scheduler.
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
void poison_pointer(T *__TBB_atomic &)
Definition: tbb_stddef.h:305
static bool initialization_done()
Definition: tbb_main.h:64
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
observer_proxy * my_proxy
Pointer to the proxy holding this observer.
The graph class.
static const int automatic
Typedef for number of threads that is automatic.
void __TBB_EXPORTED_FUNC runtime_warning(const char *format,...)
Report a runtime warning.

Copyright © 2005-2020 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.