Home ⌂Doc Index ◂Up ▴
Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
private_server.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2020 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 */
16 
17 #include "../rml/include/rml_tbb.h"
18 #include "../rml/server/thread_monitor.h"
19 #include "tbb/atomic.h"
21 #include "scheduler_common.h"
22 #include "governor.h"
23 #include "tbb_misc.h"
24 
25 using rml::internal::thread_monitor;
26 
27 namespace tbb {
28 namespace internal {
29 namespace rml {
30 
31 typedef thread_monitor::handle_type thread_handle;
32 
33 class private_server;
34 
36 private:
38 
44  enum state_t {
53  };
54  atomic<state_t> my_state;
55 
58 
60  tbb_client& my_client;
61 
63  const size_t my_index;
64 
66 
68  thread_monitor my_thread_monitor;
69 
72 
75 
76  friend class private_server;
77 
79  void run();
80 
82  void wake_or_launch();
83 
85  void start_shutdown();
86 
87  static __RML_DECL_THREAD_ROUTINE thread_routine( void* arg );
88 
89  static void release_handle(thread_handle my_handle, bool join);
90 
91 protected:
92  private_worker( private_server& server, tbb_client& client, const size_t i ) :
93  my_server(server), my_client(client), my_index(i),
95  {
96  my_state = st_init;
97  }
98 };
99 
101 
102 
103 #if _MSC_VER && !defined(__INTEL_COMPILER)
104  // Suppress overzealous compiler warnings about uninstantiable class
105  #pragma warning(push)
106  #pragma warning(disable:4510 4610)
107 #endif
110 public:
111  padded_private_worker( private_server& server, tbb_client& client, const size_t i )
112  : private_worker(server,client,i) { suppress_unused_warning(pad); }
113 };
114 #if _MSC_VER && !defined(__INTEL_COMPILER)
115  #pragma warning(pop)
116 #endif
117 
118 class private_server: public tbb_server, no_copy {
119 private:
120  tbb_client& my_client;
122 
123  const tbb_client::size_type my_n_thread;
124 
126  const size_t my_stack_size;
127 
129 
133  atomic<int> my_slack;
134 
136  atomic<int> my_ref_count;
137 
139 
141  tbb::atomic<private_worker*> my_asleep_list_root;
142 
146 
147 #if TBB_USE_ASSERT
148  atomic<int> my_net_slack_requests;
149 #endif /* TBB_USE_ASSERT */
150 
152 
155  // First test of a double-check idiom. Second test is inside wake_some(0).
156  if( my_asleep_list_root )
157  wake_some(0);
158  }
159 
162 
164  void wake_some( int additional_slack );
165 
166  virtual ~private_server();
167 
169  if( --my_ref_count==0 ) {
170  my_client.acknowledge_close_connection();
171  this->~private_server();
173  }
174  }
175 
176  friend class private_worker;
177 public:
178  private_server( tbb_client& client );
179 
180  version_type version() const __TBB_override {
181  return 0;
182  }
183 
184  void request_close_connection( bool /*exiting*/ ) __TBB_override {
185  for( size_t i=0; i<my_n_thread; ++i )
186  my_thread_array[i].start_shutdown();
188  }
189 
191 
193 
195 
196  void adjust_job_count_estimate( int delta ) __TBB_override;
197 
198 #if _WIN32||_WIN64
199  void register_master ( ::rml::server::execution_resource_t& ) __TBB_override {}
200  void unregister_master ( ::rml::server::execution_resource_t ) __TBB_override {}
201 #endif /* _WIN32||_WIN64 */
202 };
203 
204 //------------------------------------------------------------------------
205 // Methods of private_worker
206 //------------------------------------------------------------------------
207 #if _MSC_VER && !defined(__INTEL_COMPILER)
208  // Suppress overzealous compiler warnings about an initialized variable 'sink_for_alloca' not referenced
209  #pragma warning(push)
210  #pragma warning(disable:4189)
211 #endif
212 #if __MINGW32__ && __GNUC__==4 &&__GNUC_MINOR__>=2 && !__MINGW64__
213 // ensure that stack is properly aligned for TBB threads
214 __attribute__((force_align_arg_pointer))
215 #endif
216 __RML_DECL_THREAD_ROUTINE private_worker::thread_routine( void* arg ) {
217  private_worker* self = static_cast<private_worker*>(arg);
218  AVOID_64K_ALIASING( self->my_index );
219  self->run();
220  return 0;
221 }
222 #if _MSC_VER && !defined(__INTEL_COMPILER)
223  #pragma warning(pop)
224 #endif
225 
227  if (join)
228  thread_monitor::join(handle);
229  else
230  thread_monitor::detach_thread(handle);
231 }
232 
234  state_t s;
235 
236  do {
237  s = my_state;
238  __TBB_ASSERT( s!=st_quit, NULL );
239  } while( my_state.compare_and_swap( st_quit, s )!=s );
240  if( s==st_normal || s==st_starting ) {
241  // May have invalidated invariant for sleeping, so wake up the thread.
242  // Note that the notify() here occurs without maintaining invariants for my_slack.
243  // It does not matter, because my_state==st_quit overrides checking of my_slack.
244  my_thread_monitor.notify();
245  // Do not need release handle in st_init state,
246  // because in this case the thread wasn't started yet.
247  // For st_starting release is done at launch site.
248  if (s==st_normal)
250  } else if( s==st_init ) {
251  // Perform action that otherwise would be performed by associated thread when it quits.
253  }
254 }
255 
258 
259  // Transiting to st_normal here would require setting my_handle,
260  // which would create race with the launching thread and
261  // complications in handle management on Windows.
262 
263  ::rml::job& j = *my_client.create_one_job();
264  while( my_state!=st_quit ) {
265  if( my_server.my_slack>=0 ) {
266  my_client.process(j);
267  } else {
268  thread_monitor::cookie c;
269  // Prepare to wait
270  my_thread_monitor.prepare_wait(c);
271  // Check/set the invariant for sleeping
273  my_thread_monitor.commit_wait(c);
274  __TBB_ASSERT( my_state==st_quit || !my_next, "Thread monitor missed a spurious wakeup?" );
276  } else {
277  // Invariant broken
278  my_thread_monitor.cancel_wait();
279  }
280  }
281  }
282  my_client.cleanup(j);
283 
286 }
287 
289  if( my_state==st_init && my_state.compare_and_swap( st_starting, st_init )==st_init ) {
290  // after this point, remove_server_ref() must be done by created thread
291 #if USE_WINTHREAD
292  my_handle = thread_monitor::launch( thread_routine, this, my_server.my_stack_size, &this->my_index );
293 #elif USE_PTHREAD
294  {
295  affinity_helper fpa;
296  fpa.protect_affinity_mask( /*restore_process_mask=*/true );
297  my_handle = thread_monitor::launch( thread_routine, this, my_server.my_stack_size );
298  // Implicit destruction of fpa resets original affinity mask.
299  }
300 #endif /* USE_PTHREAD */
301  state_t s = my_state.compare_and_swap( st_normal, st_starting );
302  if (st_starting != s) {
303  // Do shutdown during startup. my_handle can't be released
304  // by start_shutdown, because my_handle value might be not set yet
305  // at time of transition from st_starting to st_quit.
306  __TBB_ASSERT( s==st_quit, NULL );
308  }
309  }
310  else {
311  __TBB_ASSERT( !my_next, "Should not wake a thread while it's still in asleep list" );
312  my_thread_monitor.notify();
313  }
314 }
315 
316 //------------------------------------------------------------------------
317 // Methods of private_server
318 //------------------------------------------------------------------------
319 private_server::private_server( tbb_client& client ) :
320  my_client(client),
321  my_n_thread(client.max_job_count()),
322  my_stack_size(client.min_stack_size()),
323  my_thread_array(NULL)
324 {
326  my_slack = 0;
327 #if TBB_USE_ASSERT
328  my_net_slack_requests = 0;
329 #endif /* TBB_USE_ASSERT */
330  my_asleep_list_root = NULL;
332  for( size_t i=0; i<my_n_thread; ++i ) {
333  private_worker* t = new( &my_thread_array[i] ) padded_private_worker( *this, client, i );
336  }
337 }
338 
340  __TBB_ASSERT( my_net_slack_requests==0, NULL );
341  for( size_t i=my_n_thread; i--; )
345 }
346 
348  asleep_list_mutex_type::scoped_lock lock;
349  if( !lock.try_acquire(my_asleep_list_mutex) )
350  return false;
351  // Contribute to slack under lock so that if another takes that unit of slack,
352  // it sees us sleeping on the list and wakes us up.
353  int k = ++my_slack;
354  if( k<=0 ) {
356  my_asleep_list_root = &t;
357  return true;
358  } else {
359  --my_slack;
360  return false;
361  }
362 }
363 
364 void private_server::wake_some( int additional_slack ) {
365  __TBB_ASSERT( additional_slack>=0, NULL );
366  private_worker* wakee[2];
367  private_worker**w = wakee;
368  {
369  asleep_list_mutex_type::scoped_lock lock(my_asleep_list_mutex);
370  while( my_asleep_list_root && w<wakee+2 ) {
371  if( additional_slack>0 ) {
372  if (additional_slack+my_slack<=0) // additional demand does not exceed surplus supply
373  break;
374  --additional_slack;
375  } else {
376  // Chain reaction; Try to claim unit of slack
377  int old;
378  do {
379  old = my_slack;
380  if( old<=0 ) goto done;
381  } while( my_slack.compare_and_swap(old-1,old)!=old );
382  }
383  // Pop sleeping worker to combine with claimed unit of slack
384  my_asleep_list_root = (*w++ = my_asleep_list_root)->my_next;
385  }
386  if( additional_slack ) {
387  // Contribute our unused slack to my_slack.
388  my_slack += additional_slack;
389  }
390  }
391 done:
392  while( w>wakee ) {
393  private_worker* ww = *--w;
394  ww->my_next = NULL;
395  ww->wake_or_launch();
396  }
397 }
398 
400 #if TBB_USE_ASSERT
401  my_net_slack_requests+=delta;
402 #endif /* TBB_USE_ASSERT */
403  if( delta<0 ) {
404  my_slack+=delta;
405  } else if( delta>0 ) {
406  wake_some( delta );
407  }
408 }
409 
411 tbb_server* make_private_server( tbb_client& client ) {
413 }
414 
415 } // namespace rml
416 } // namespace internal
417 
418 } // namespace tbb
thread_monitor my_thread_monitor
Monitor for sleeping when there is no work to do.
static const size_t cache_line_size
tbb::atomic< private_worker * > my_asleep_list_root
List of workers that are asleep or committed to sleeping until notified by another thread.
const size_t my_index
index used for avoiding the 64K aliasing problem
const size_t my_stack_size
Stack size for each thread. */.
padded_private_worker(private_server &server, tbb_client &client, const size_t i)
*this has associated thread that is starting up.
scheduler_mutex_type asleep_list_mutex_type
Protects my_asleep_list_root.
pointer allocate(size_type n, const void *hint=0)
Allocate space for n objects, starting on a cache/sector line.
static unsigned default_num_threads()
Definition: governor.h:84
private_server & my_server
Associated server.
asleep_list_mutex_type my_asleep_list_mutex
thread_monitor::handle_type thread_handle
Associated thread is doing normal life sequence.
void suppress_unused_warning(const T1 &)
Utility template function to prevent "unused" warnings by various compilers.
Definition: tbb_stddef.h:398
atomic< int > my_ref_count
Counter used to determine when to delete this.
void run()
Actions executed by the associated thread.
tbb_client & my_client
Associated client.
padded_private_worker * my_thread_array
Associated thread has ended normal life sequence and promises to never touch *this again.
void start_shutdown()
Called by a thread (usually not the associated thread) to commence termination.
char pad[cache_line_size - sizeof(private_worker)%cache_line_size]
#define __TBB_Yield()
Definition: ibm_aix51.h:44
const tbb_client::size_type my_n_thread
Maximum number of threads to be created.
private_worker * my_next
Link for list of workers that are sleeping or have no associated thread.
tbb_server * make_private_server(tbb_client &client)
Factory method called from task.cpp to create a private_server.
static bool does_client_join_workers(const tbb::internal::rml::tbb_client &client)
Definition: market.cpp:304
bool try_insert_in_asleep_list(private_worker &t)
Try to add t to list of sleeping workers.
static __RML_DECL_THREAD_ROUTINE thread_routine(void *arg)
version_type version() const __TBB_override
const size_t NFS_MaxLineSize
Compile-time constant that is upper bound on cache line/sector size.
Definition: tbb_stddef.h:216
Base class for types that should not be copied or assigned.
Definition: tbb_stddef.h:330
static void release_handle(thread_handle my_handle, bool join)
void wake_some(int additional_slack)
Equivalent of adding additional_slack to my_slack and waking up to 2 threads if my_slack permits.
state_t
State in finite-state machine that controls the worker.
void wake_or_launch()
Wake up associated thread (or launch a thread if there is none)
#define __TBB_override
Definition: tbb_stddef.h:240
void propagate_chain_reaction()
Wake up to two sleeping workers, if there are any sleeping.
thread_handle my_handle
Handle of the OS thread associated with this worker.
Meets "allocator" requirements of ISO C++ Standard, Section 20.1.5.
void const char const char int ITT_FORMAT __itt_group_sync s
private_worker(private_server &server, tbb_client &client, const size_t i)
atomic< int > my_slack
Number of jobs that could use their associated thread minus number of active threads.
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
void adjust_job_count_estimate(int delta) __TBB_override
void deallocate(pointer p, size_type)
Free block of memory that starts on a cache line.
void poison_pointer(T *__TBB_atomic &)
Definition: tbb_stddef.h:305
void independent_thread_number_changed(int) __TBB_override
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
The graph class.
__TBB_SCHEDULER_MUTEX_TYPE scheduler_mutex_type
Mutex type for global locks in the scheduler.
void request_close_connection(bool) __TBB_override
unsigned default_concurrency() const __TBB_override

Copyright © 2005-2020 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.