Home ⌂Doc Index ◂Up ▴
Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
pipeline.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2020 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 */
16 
17 #include "tbb/pipeline.h"
18 #include "tbb/spin_mutex.h"
20 #include "itt_notify.h"
21 #include "semaphore.h"
22 #include "tls.h" // for parallel filters that do not use NULL as end_of_input
23 
24 
25 namespace tbb {
26 
27 namespace internal {
28 
30 struct task_info {
31  void* my_object;
37  bool is_valid;
39  void reset() {
40  my_object = NULL;
41  my_token = 0;
42  my_token_ready = false;
43  is_valid = false;
44  }
45 };
47 
50  friend class tbb::filter;
53  friend class tbb::pipeline;
54 
55  typedef Token size_type;
56 
59 
62 
64 
66 
68 
70 
73 
75 
76  void grow( size_type minimum_size );
77 
79 
80  static const size_type initial_buffer_size = 4;
81 
84 
86  bool is_ordered;
87 
89  bool is_bound;
90 
94  bool end_of_input_tls_allocated; // no way to test pthread creation of TLS
95 
96  void create_sema(size_t initial_tokens) { __TBB_ASSERT(!my_sem,NULL); my_sem = new internal::semaphore(initial_tokens); }
97  void free_sema() { __TBB_ASSERT(my_sem,NULL); delete my_sem; }
98  void sema_P() { __TBB_ASSERT(my_sem,NULL); my_sem->P(); }
99  void sema_V() { __TBB_ASSERT(my_sem,NULL); my_sem->V(); }
100 
101 public:
103  input_buffer( bool is_ordered_, bool is_bound_ ) :
104  array(NULL), my_sem(NULL), array_size(0),
105  low_token(0), high_token(0),
106  is_ordered(is_ordered_), is_bound(is_bound_),
109  __TBB_ASSERT( array, NULL );
110  if(is_bound) create_sema(0);
111  }
112 
115  __TBB_ASSERT( array, NULL );
118  if(my_sem) {
119  free_sema();
120  }
122  destroy_my_tls();
123  }
124  }
125 
127 
135  bool put_token( task_info& info_, bool force_put = false ) {
136  {
137  info_.is_valid = true;
139  Token token;
140  bool was_empty = !array[low_token&(array_size-1)].is_valid;
141  if( is_ordered ) {
142  if( !info_.my_token_ready ) {
143  info_.my_token = high_token++;
144  info_.my_token_ready = true;
145  }
146  token = info_.my_token;
147  } else
148  token = high_token++;
149  __TBB_ASSERT( (tokendiff_t)(token-low_token)>=0, NULL );
150  if( token!=low_token || is_bound || force_put ) {
151  // Trying to put token that is beyond low_token.
152  // Need to wait until low_token catches up before dispatching.
153  if( token-low_token>=array_size )
154  grow( token-low_token+1 );
155  ITT_NOTIFY( sync_releasing, this );
156  array[token&(array_size-1)] = info_;
157  if(was_empty && is_bound) {
158  sema_V();
159  }
160  return true;
161  }
162  }
163  return false;
164  }
165 
167 
168  // Uses template to avoid explicit dependency on stage_task.
169  // This is only called for serial filters, and is the reason for the
170  // advance parameter in return_item (we're incrementing low_token here.)
171  // Non-TBF serial stages don't advance the token at the start because the presence
172  // of the current token in the buffer keeps another stage from being spawned.
173  template<typename StageTask>
174  void note_done( Token token, StageTask& spawner ) {
175  task_info wakee;
176  wakee.reset();
177  {
179  if( !is_ordered || token==low_token ) {
180  // Wake the next task
181  task_info& item = array[++low_token & (array_size-1)];
182  ITT_NOTIFY( sync_acquired, this );
183  wakee = item;
184  item.is_valid = false;
185  }
186  }
187  if( wakee.is_valid )
188  spawner.spawn_stage_task(wakee);
189  }
190 
191 #if __TBB_TASK_GROUP_CONTEXT
192  void clear( filter* my_filter ) {
194  long t=low_token;
195  for( size_type i=0; i<array_size; ++i, ++t ){
196  task_info& temp = array[t&(array_size-1)];
197  if (temp.is_valid ) {
198  my_filter->finalize(temp.my_object);
199  temp.is_valid = false;
200  }
201  }
202  }
203 #endif
204 
206  // is parallel (as indicated by advance == true). If the filter is serial, leave the
207  // item in the buffer to keep another stage from being spawned.
208  bool return_item(task_info& info, bool advance) {
210  task_info& item = array[low_token&(array_size-1)];
211  ITT_NOTIFY( sync_acquired, this );
212  if( item.is_valid ) {
213  info = item;
214  item.is_valid = false;
215  if (advance) low_token++;
216  return true;
217  }
218  return false;
219  }
220 
223 
224  // end_of_input signal for parallel_pipeline, parallel input filters with 0 tokens allowed.
225  void create_my_tls() { int status = end_of_input_tls.create(); if(status) handle_perror(status, "TLS not allocated for filter"); end_of_input_tls_allocated = true; }
226  void destroy_my_tls() { int status = end_of_input_tls.destroy(); if(status) handle_perror(status, "Failed to destroy filter TLS"); }
227  bool my_tls_end_of_input() { return end_of_input_tls.get() != 0; }
229 };
230 
231 void input_buffer::grow( size_type minimum_size ) {
232  size_type old_size = array_size;
233  size_type new_size = old_size ? 2*old_size : initial_buffer_size;
234  while( new_size<minimum_size )
235  new_size*=2;
237  task_info* old_array = array;
238  for( size_type i=0; i<new_size; ++i )
239  new_array[i].is_valid = false;
240  long t=low_token;
241  for( size_type i=0; i<old_size; ++i, ++t )
242  new_array[t&(new_size-1)] = old_array[t&(old_size-1)];
243  array = new_array;
245  if( old_array )
246  cache_aligned_allocator<task_info>().deallocate(old_array,old_size);
247 }
248 
249 class stage_task: public task, public task_info {
250 private:
251  friend class tbb::pipeline;
252  pipeline& my_pipeline;
256 
257 public:
259 
260  stage_task( pipeline& pipeline ) :
261  my_pipeline(pipeline),
262  my_filter(pipeline.filter_list),
263  my_at_start(true)
264  {
266  }
268  stage_task( pipeline& pipeline, filter* filter_, const task_info& info ) :
269  task_info(info),
270  my_pipeline(pipeline),
271  my_filter(filter_),
272  my_at_start(false)
273  {}
275  void reset() {
277  my_filter = my_pipeline.filter_list;
278  my_at_start = true;
279  }
282 #if __TBB_TASK_GROUP_CONTEXT
283  ~stage_task()
284  {
286  __TBB_ASSERT(is_cancelled(), "Trying to finalize the task that wasn't cancelled");
288  my_object = NULL;
289  }
290  }
291 #endif // __TBB_TASK_GROUP_CONTEXT
292  void spawn_stage_task(const task_info& info)
294  {
295  stage_task* clone = new (allocate_additional_child_of(*parent()))
296  stage_task( my_pipeline, my_filter, info );
297  spawn(*clone);
298  }
299 };
300 
302  __TBB_ASSERT( !my_at_start || !my_object, NULL );
303  __TBB_ASSERT( !my_filter->is_bound(), NULL );
304  if( my_at_start ) {
305  if( my_filter->is_serial() ) {
306  my_object = (*my_filter)(my_object);
307  if( my_object || ( my_filter->object_may_be_null() && !my_pipeline.end_of_input) )
308  {
309  if( my_filter->is_ordered() ) {
310  my_token = my_pipeline.token_counter++; // ideally, with relaxed semantics
311  my_token_ready = true;
313  if( my_pipeline.has_thread_bound_filters )
314  my_pipeline.token_counter++; // ideally, with relaxed semantics
315  }
316  if( !my_filter->next_filter_in_pipeline ) { // we're only filter in pipeline
317  reset();
318  goto process_another_stage;
319  } else {
320  ITT_NOTIFY( sync_releasing, &my_pipeline.input_tokens );
321  if( --my_pipeline.input_tokens>0 )
322  spawn( *new( allocate_additional_child_of(*parent()) ) stage_task( my_pipeline ) );
323  }
324  } else {
325  my_pipeline.end_of_input = true;
326  return NULL;
327  }
328  } else /*not is_serial*/ {
329  if( my_pipeline.end_of_input )
330  return NULL;
332  if( my_pipeline.has_thread_bound_filters )
333  my_pipeline.token_counter++;
334  }
335  ITT_NOTIFY( sync_releasing, &my_pipeline.input_tokens );
336  if( --my_pipeline.input_tokens>0 )
337  spawn( *new( allocate_additional_child_of(*parent()) ) stage_task( my_pipeline ) );
338  my_object = (*my_filter)(my_object);
340  {
341  my_pipeline.end_of_input = true;
343  if( my_pipeline.has_thread_bound_filters )
344  my_pipeline.token_counter--; // fix token_counter
345  }
346  return NULL;
347  }
348  }
349  my_at_start = false;
350  } else {
351  my_object = (*my_filter)(my_object);
352  if( my_filter->is_serial() )
354  }
356  if( my_filter ) {
357  // There is another filter to execute.
358  if( my_filter->is_serial() ) {
359  // The next filter must execute tokens in order
360  if( my_filter->my_input_buffer->put_token(*this) ){
361  // Can't proceed with the same item
362  if( my_filter->is_bound() ) {
363  // Find the next non-thread-bound filter
364  do {
366  } while( my_filter && my_filter->is_bound() );
367  // Check if there is an item ready to process
369  goto process_another_stage;
370  }
371  my_filter = NULL; // To prevent deleting my_object twice if exception occurs
372  return NULL;
373  }
374  }
375  } else {
376  // Reached end of the pipe.
377  size_t ntokens_avail = ++my_pipeline.input_tokens;
378  if(my_pipeline.filter_list->is_bound() ) {
379  if(ntokens_avail == 1) {
380  my_pipeline.filter_list->my_input_buffer->sema_V();
381  }
382  return NULL;
383  }
384  if( ntokens_avail>1 // Only recycle if there is one available token
385  || my_pipeline.end_of_input ) {
386  return NULL; // No need to recycle for new input
387  }
388  ITT_NOTIFY( sync_acquired, &my_pipeline.input_tokens );
389  // Recycle as an input stage task.
390  reset();
391  }
392 process_another_stage:
393  /* A semi-hackish way to reexecute the same task object immediately without spawning.
394  recycle_as_continuation marks the task for future execution,
395  and then 'this' pointer is returned to bypass spawning. */
397  return this;
398 }
399 
400 class pipeline_root_task: public task {
401  pipeline& my_pipeline;
403 
405  if( !my_pipeline.end_of_input )
406  if( !my_pipeline.filter_list->is_bound() )
407  if( my_pipeline.input_tokens > 0 ) {
409  set_ref_count(1);
410  return new( allocate_child() ) stage_task( my_pipeline );
411  }
412  if( do_segment_scanning ) {
413  filter* current_filter = my_pipeline.filter_list->next_segment;
414  /* first non-thread-bound filter that follows thread-bound one
415  and may have valid items to process */
416  filter* first_suitable_filter = current_filter;
417  while( current_filter ) {
418  __TBB_ASSERT( !current_filter->is_bound(), "filter is thread-bound?" );
419  __TBB_ASSERT( current_filter->prev_filter_in_pipeline->is_bound(), "previous filter is not thread-bound?" );
420  if( !my_pipeline.end_of_input || current_filter->has_more_work())
421  {
422  task_info info;
423  info.reset();
424  task* bypass = NULL;
425  int refcnt = 0;
426  task_list list;
427  // No new tokens are created; it's OK to process all waiting tokens.
428  // If the filter is serial, the second call to return_item will return false.
429  while( current_filter->my_input_buffer->return_item(info, !current_filter->is_serial()) ) {
430  task* t = new( allocate_child() ) stage_task( my_pipeline, current_filter, info );
431  if( ++refcnt == 1 )
432  bypass = t;
433  else // there's more than one task
434  list.push_back(*t);
435  // TODO: limit the list size (to arena size?) to spawn tasks sooner
436  __TBB_ASSERT( refcnt <= int(my_pipeline.token_counter), "token counting error" );
437  info.reset();
438  }
439  if( refcnt ) {
440  set_ref_count( refcnt );
441  if( refcnt > 1 )
442  spawn(list);
444  return bypass;
445  }
446  current_filter = current_filter->next_segment;
447  if( !current_filter ) {
448  if( !my_pipeline.end_of_input ) {
450  return this;
451  }
452  current_filter = first_suitable_filter;
453  __TBB_Yield();
454  }
455  } else {
456  /* The preceding pipeline segment is empty.
457  Fast-forward to the next post-TBF segment. */
458  first_suitable_filter = first_suitable_filter->next_segment;
459  current_filter = first_suitable_filter;
460  }
461  } /* while( current_filter ) */
462  return NULL;
463  } else {
464  if( !my_pipeline.end_of_input ) {
466  return this;
467  }
468  return NULL;
469  }
470  }
471 public:
472  pipeline_root_task( pipeline& pipeline ): my_pipeline(pipeline), do_segment_scanning(false)
473  {
474  __TBB_ASSERT( my_pipeline.filter_list, NULL );
475  filter* first = my_pipeline.filter_list;
476  if( (first->my_filter_mode & first->version_mask) >= __TBB_PIPELINE_VERSION(5) ) {
477  // Scanning the pipeline for segments
478  filter* head_of_previous_segment = first;
479  for( filter* subfilter=first->next_filter_in_pipeline;
480  subfilter!=NULL;
481  subfilter=subfilter->next_filter_in_pipeline )
482  {
483  if( subfilter->prev_filter_in_pipeline->is_bound() && !subfilter->is_bound() ) {
484  do_segment_scanning = true;
485  head_of_previous_segment->next_segment = subfilter;
486  head_of_previous_segment = subfilter;
487  }
488  }
489  }
490  }
491 };
492 
493 #if _MSC_VER && !defined(__INTEL_COMPILER)
494  // Workaround for overzealous compiler warnings
495  // Suppress compiler warning about constant conditional expression
496  #pragma warning (disable: 4127)
497 #endif
498 
499 // The class destroys end_counter and clears all input buffers if pipeline was cancelled.
501  pipeline& my_pipeline;
502 public:
503  pipeline_cleaner(pipeline& _pipeline) :
504  my_pipeline(_pipeline)
505  {}
507 #if __TBB_TASK_GROUP_CONTEXT
508  if (my_pipeline.end_counter->is_cancelled()) // Pipeline was cancelled
509  my_pipeline.clear_filters();
510 #endif
511  my_pipeline.end_counter = NULL;
512  }
513 };
514 
515 } // namespace internal
516 
517 void pipeline::inject_token( task& ) {
518  __TBB_ASSERT(false,"illegal call to inject_token");
519 }
520 
521 #if __TBB_TASK_GROUP_CONTEXT
522 void pipeline::clear_filters() {
523  for( filter* f = filter_list; f; f = f->next_filter_in_pipeline ) {
524  if ((f->my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(4))
525  if( internal::input_buffer* b = f->my_input_buffer )
526  b->clear(f);
527  }
528 }
529 #endif
530 
531 pipeline::pipeline() :
532  filter_list(NULL),
533  filter_end(NULL),
534  end_counter(NULL),
535  end_of_input(false),
536  has_thread_bound_filters(false)
537 {
538  token_counter = 0;
539  input_tokens = 0;
540 }
541 
542 pipeline::~pipeline() {
543  clear();
544 }
545 
546 void pipeline::clear() {
547  filter* next;
548  for( filter* f = filter_list; f; f=next ) {
549  if( internal::input_buffer* b = f->my_input_buffer ) {
550  delete b;
551  f->my_input_buffer = NULL;
552  }
553  next=f->next_filter_in_pipeline;
554  f->next_filter_in_pipeline = filter::not_in_pipeline();
555  if ( (f->my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(3) ) {
556  f->prev_filter_in_pipeline = filter::not_in_pipeline();
557  f->my_pipeline = NULL;
558  }
559  if ( (f->my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(5) )
560  f->next_segment = NULL;
561  }
562  filter_list = filter_end = NULL;
563 }
564 
565 void pipeline::add_filter( filter& filter_ ) {
566 #if TBB_USE_ASSERT
567  if ( (filter_.my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(3) )
568  __TBB_ASSERT( filter_.prev_filter_in_pipeline==filter::not_in_pipeline(), "filter already part of pipeline?" );
569  __TBB_ASSERT( filter_.next_filter_in_pipeline==filter::not_in_pipeline(), "filter already part of pipeline?" );
570  __TBB_ASSERT( !end_counter, "invocation of add_filter on running pipeline" );
571 #endif
572  if ( (filter_.my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(3) ) {
573  filter_.my_pipeline = this;
574  filter_.prev_filter_in_pipeline = filter_end;
575  if ( filter_list == NULL)
576  filter_list = &filter_;
577  else
578  filter_end->next_filter_in_pipeline = &filter_;
579  filter_.next_filter_in_pipeline = NULL;
580  filter_end = &filter_;
581  } else {
582  if( !filter_end )
583  filter_end = reinterpret_cast<filter*>(&filter_list);
584 
585  *reinterpret_cast<filter**>(filter_end) = &filter_;
586  filter_end = reinterpret_cast<filter*>(&filter_.next_filter_in_pipeline);
587  *reinterpret_cast<filter**>(filter_end) = NULL;
588  }
589  if( (filter_.my_filter_mode & filter_.version_mask) >= __TBB_PIPELINE_VERSION(5) ) {
590  if( filter_.is_serial() ) {
591  if( filter_.is_bound() )
592  has_thread_bound_filters = true;
593  filter_.my_input_buffer = new internal::input_buffer( filter_.is_ordered(), filter_.is_bound() );
594  } else {
595  if(filter_.prev_filter_in_pipeline) {
596  if(filter_.prev_filter_in_pipeline->is_bound()) {
597  // successors to bound filters must have an input_buffer
598  filter_.my_input_buffer = new internal::input_buffer( /*is_ordered*/false, false );
599  }
600  } else { // input filter
601  if(filter_.object_may_be_null() ) {
602  //TODO: buffer only needed to hold TLS; could improve
603  filter_.my_input_buffer = new internal::input_buffer( /*is_ordered*/false, false );
604  filter_.my_input_buffer->create_my_tls();
605  }
606  }
607  }
608  } else {
609  if( filter_.is_serial() ) {
610  filter_.my_input_buffer = new internal::input_buffer( filter_.is_ordered(), false );
611  }
612  }
613 
614 }
615 
616 void pipeline::remove_filter( filter& filter_ ) {
617  __TBB_ASSERT( filter_.prev_filter_in_pipeline!=filter::not_in_pipeline(), "filter not part of pipeline" );
618  __TBB_ASSERT( filter_.next_filter_in_pipeline!=filter::not_in_pipeline(), "filter not part of pipeline" );
619  __TBB_ASSERT( !end_counter, "invocation of remove_filter on running pipeline" );
620  if (&filter_ == filter_list)
621  filter_list = filter_.next_filter_in_pipeline;
622  else {
623  __TBB_ASSERT( filter_.prev_filter_in_pipeline, "filter list broken?" );
624  filter_.prev_filter_in_pipeline->next_filter_in_pipeline = filter_.next_filter_in_pipeline;
625  }
626  if (&filter_ == filter_end)
627  filter_end = filter_.prev_filter_in_pipeline;
628  else {
629  __TBB_ASSERT( filter_.next_filter_in_pipeline, "filter list broken?" );
630  filter_.next_filter_in_pipeline->prev_filter_in_pipeline = filter_.prev_filter_in_pipeline;
631  }
632  if( internal::input_buffer* b = filter_.my_input_buffer ) {
633  delete b;
634  filter_.my_input_buffer = NULL;
635  }
636  filter_.next_filter_in_pipeline = filter_.prev_filter_in_pipeline = filter::not_in_pipeline();
637  if ( (filter_.my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(5) )
638  filter_.next_segment = NULL;
639  filter_.my_pipeline = NULL;
640 }
641 
642 void pipeline::run( size_t max_number_of_live_tokens
644  , tbb::task_group_context& context
645 #endif
646  ) {
647  __TBB_ASSERT( max_number_of_live_tokens>0, "pipeline::run must have at least one token" );
648  __TBB_ASSERT( !end_counter, "pipeline already running?" );
649  if( filter_list ) {
650  internal::pipeline_cleaner my_pipeline_cleaner(*this);
651  end_of_input = false;
652  input_tokens = internal::Token(max_number_of_live_tokens);
653  if(has_thread_bound_filters) {
654  // release input filter if thread-bound
655  if(filter_list->is_bound()) {
656  filter_list->my_input_buffer->sema_V();
657  }
658  }
659 #if __TBB_TASK_GROUP_CONTEXT
660  end_counter = new( task::allocate_root(context) ) internal::pipeline_root_task( *this );
661 #else
662  end_counter = new( task::allocate_root() ) internal::pipeline_root_task( *this );
663 #endif
664  // Start execution of tasks
665  task::spawn_root_and_wait( *end_counter );
666 
667  if(has_thread_bound_filters) {
668  for(filter* f = filter_list->next_filter_in_pipeline; f; f=f->next_filter_in_pipeline) {
669  if(f->is_bound()) {
670  f->my_input_buffer->sema_V(); // wake to end
671  }
672  }
673  }
674  }
675 }
676 
677 #if __TBB_TASK_GROUP_CONTEXT
678 void pipeline::run( size_t max_number_of_live_tokens ) {
679  if( filter_list ) {
680  // Construct task group context with the exception propagation mode expected
681  // by the pipeline caller.
682  uintptr_t ctx_traits = filter_list->my_filter_mode & filter::exact_exception_propagation ?
684  task_group_context::default_traits & ~task_group_context::exact_exception;
685  task_group_context context(task_group_context::bound, ctx_traits);
686  run(max_number_of_live_tokens, context);
687  }
688 }
689 #endif // __TBB_TASK_GROUP_CONTEXT
690 
692  __TBB_ASSERT(my_pipeline, NULL);
693  __TBB_ASSERT(my_input_buffer, "has_more_work() called for filter with no input buffer");
694  return (internal::tokendiff_t)(my_pipeline->token_counter - my_input_buffer->low_token) != 0;
695 }
696 
698  if ( (my_filter_mode & version_mask) >= __TBB_PIPELINE_VERSION(3) ) {
699  if ( next_filter_in_pipeline != filter::not_in_pipeline() )
700  my_pipeline->remove_filter(*this);
701  else
702  __TBB_ASSERT( prev_filter_in_pipeline == filter::not_in_pipeline(), "probably filter list is broken" );
703  } else {
704  __TBB_ASSERT( next_filter_in_pipeline==filter::not_in_pipeline(), "cannot destroy filter that is part of pipeline" );
705  }
706 }
707 
709  __TBB_ASSERT(my_input_buffer, NULL);
710  __TBB_ASSERT(object_may_be_null(), NULL);
711  if(is_serial()) {
712  my_pipeline->end_of_input = true;
713  } else {
714  __TBB_ASSERT(my_input_buffer->end_of_input_tls_allocated, NULL);
715  my_input_buffer->set_my_tls_end_of_input();
716  }
717 }
718 
720  return internal_process_item(true);
721 }
722 
724  return internal_process_item(false);
725 }
726 
728  __TBB_ASSERT(my_pipeline != NULL,"It's not supposed that process_item is called for a filter that is not in a pipeline.");
729  internal::task_info info;
730  info.reset();
731 
732  if( my_pipeline->end_of_input && !has_more_work() )
733  return end_of_stream;
734 
735  if( !prev_filter_in_pipeline ) {
736  if( my_pipeline->end_of_input )
737  return end_of_stream;
738  while( my_pipeline->input_tokens == 0 ) {
739  if( !is_blocking )
740  return item_not_available;
741  my_input_buffer->sema_P();
742  }
743  info.my_object = (*this)(info.my_object);
744  if( info.my_object ) {
745  __TBB_ASSERT(my_pipeline->input_tokens > 0, "Token failed in thread-bound filter");
746  my_pipeline->input_tokens--;
747  if( is_ordered() ) {
748  info.my_token = my_pipeline->token_counter;
749  info.my_token_ready = true;
750  }
751  my_pipeline->token_counter++; // ideally, with relaxed semantics
752  } else {
753  my_pipeline->end_of_input = true;
754  return end_of_stream;
755  }
756  } else { /* this is not an input filter */
757  while( !my_input_buffer->has_item() ) {
758  if( !is_blocking ) {
759  return item_not_available;
760  }
761  my_input_buffer->sema_P();
762  if( my_pipeline->end_of_input && !has_more_work() ) {
763  return end_of_stream;
764  }
765  }
766  if( !my_input_buffer->return_item(info, /*advance*/true) ) {
767  __TBB_ASSERT(false,"return_item failed");
768  }
769  info.my_object = (*this)(info.my_object);
770  }
771  if( next_filter_in_pipeline ) {
772  if ( !next_filter_in_pipeline->my_input_buffer->put_token(info,/*force_put=*/true) ) {
773  __TBB_ASSERT(false, "Couldn't put token after thread-bound buffer");
774  }
775  } else {
776  size_t ntokens_avail = ++(my_pipeline->input_tokens);
777  if( my_pipeline->filter_list->is_bound() ) {
778  if( ntokens_avail == 1 ) {
779  my_pipeline->filter_list->my_input_buffer->sema_V();
780  }
781  }
782  }
783 
784  return success;
785 }
786 
787 } // tbb
788 
void P()
wait/acquire
Definition: semaphore.h:105
Used to form groups of tasks.
Definition: task.h:358
pipeline_cleaner(pipeline &_pipeline)
Definition: pipeline.cpp:503
pipeline_root_task(pipeline &pipeline)
Definition: pipeline.cpp:472
size_type array_size
Size of array.
Definition: pipeline.cpp:65
result_type internal_process_item(bool is_blocking)
Internal routine for item processing.
Definition: pipeline.cpp:727
void push_back(task &task)
Push task onto back of list.
Definition: task.h:1091
#define __TBB_TASK_GROUP_CONTEXT
Definition: tbb_config.h:541
bool object_may_be_null()
true if an input filter can emit null
Definition: pipeline.h:143
bool is_cancelled() const
Returns true if the context has received cancellation request.
Definition: task.h:974
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t new_size
end_of_input_tls_t end_of_input_tls
Definition: pipeline.cpp:93
A list of children.
Definition: task.h:1074
result_type __TBB_EXPORTED_METHOD process_item()
Wait until a data item becomes available, and invoke operator() on that item.
Definition: pipeline.cpp:719
Base class for user-defined tasks.
Definition: task.h:615
bool is_bound() const
True if filter is thread-bound.
Definition: pipeline.h:138
#define __TBB_PIPELINE_VERSION(x)
Definition: pipeline.h:41
internal::input_buffer * my_input_buffer
Buffer for incoming tokens, or NULL if not required.
Definition: pipeline.h:173
A lock that occupies a single byte.
Definition: spin_mutex.h:39
input_buffer(bool is_ordered_, bool is_bound_)
Construct empty buffer.
Definition: pipeline.cpp:103
void __TBB_EXPORTED_FUNC handle_perror(int error_code, const char *aux_info)
Throws std::runtime_error with what() returning error_code description prefixed with aux_info.
Definition: tbb_misc.cpp:87
filter * next_segment
Pointer to the next "segment" of filters, or NULL if not required.
Definition: pipeline.h:191
pointer allocate(size_type n, const void *hint=0)
Allocate space for n objects, starting on a cache/sector line.
semaphore * my_sem
for thread-bound filter, semaphore for waiting, NULL otherwise.
Definition: pipeline.cpp:61
Edsger Dijkstra's counting semaphore.
Definition: semaphore.h:94
task_info * array
Array of deferred tasks that cannot yet start executing.
Definition: pipeline.cpp:58
bool has_more_work()
has the filter not yet processed all the tokens it will ever see?
Definition: pipeline.cpp:691
spin_mutex array_mutex
Serializes updates.
Definition: pipeline.cpp:72
bool is_serial() const
True if filter is serial.
Definition: pipeline.h:128
Token high_token
Used for out of order buffer, and for assigning my_token if is_ordered and my_token not already assig...
Definition: pipeline.cpp:83
void note_done(Token token, StageTask &spawner)
Note that processing of a token is finished.
Definition: pipeline.cpp:174
void create_sema(size_t initial_tokens)
Definition: pipeline.cpp:96
A buffer of input items for a filter.
Definition: pipeline.cpp:48
void reset()
Set to initial state (no object, no token)
Definition: pipeline.cpp:39
static const unsigned char version_mask
Definition: pipeline.h:92
bool my_token_ready
False until my_token is set.
Definition: pipeline.cpp:35
friend class tbb::pipeline
Definition: pipeline.cpp:251
Token low_token
Lowest token that can start executing.
Definition: pipeline.cpp:69
void spawn_stage_task(const task_info &info)
Creates and spawns stage_task from task_info.
Definition: pipeline.cpp:293
long tokendiff_t
Definition: pipeline.h:44
stage_task(pipeline &pipeline)
Construct stage_task for first stage in a pipeline.
Definition: pipeline.cpp:260
void V()
post/release
Definition: semaphore.h:110
static internal::allocate_root_proxy allocate_root()
Returns proxy for overloaded new that allocates a root task.
Definition: task.h:663
A stage in a pipeline.
Definition: pipeline.h:64
task * execute() __TBB_override
The virtual task execution method.
Definition: pipeline.cpp:301
void grow(size_type minimum_size)
Resize "array".
Definition: pipeline.cpp:231
void recycle_as_continuation()
Change this to be a continuation of its former self.
Definition: task.h:711
friend class tbb::pipeline
Definition: pipeline.cpp:53
result_type __TBB_EXPORTED_METHOD try_process_item()
If a data item is available, invoke operator() on that item.
Definition: pipeline.cpp:723
filter * next_filter_in_pipeline
Pointer to next filter in the pipeline.
Definition: pipeline.h:164
virtual void finalize(void *)
Destroys item if pipeline was cancelled.
Definition: pipeline.h:159
internal::allocate_child_proxy & allocate_child()
Returns proxy for overloaded new that allocates a child task of *this.
Definition: task.h:681
static const unsigned char exact_exception_propagation
7th bit defines exception propagation mode expected by the application.
Definition: pipeline.h:84
bool is_bound
True for thread-bound filter, false otherwise.
Definition: pipeline.cpp:89
static void spawn_root_and_wait(task &root)
Spawn task allocated by allocate_root, wait for it to complete, and deallocate it.
Definition: task.h:808
#define __TBB_Yield()
Definition: ibm_aix51.h:44
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p sync_releasing
filter * prev_filter_in_pipeline
Pointer to previous filter in the pipeline.
Definition: pipeline.h:184
unsigned long Token
Definition: pipeline.h:43
This structure is used to store task information in a input buffer.
Definition: pipeline.cpp:30
bool is_ordered() const
True if filter must receive stream in order.
Definition: pipeline.h:133
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task * task
Represents acquisition of a mutex.
Definition: spin_mutex.h:53
bool is_ordered
True for ordered filter, false otherwise.
Definition: pipeline.cpp:86
void __TBB_EXPORTED_METHOD set_end_of_input()
Definition: pipeline.cpp:708
bool return_item(task_info &info, bool advance)
return an item, invalidate the queued item, but only advance if the filter
Definition: pipeline.cpp:208
Base class for types that should not be copied or assigned.
Definition: tbb_stddef.h:330
void set_ref_count(int count)
Set reference count.
Definition: task.h:761
~input_buffer()
Destroy the buffer.
Definition: pipeline.cpp:114
task * execute() __TBB_override
Should be overridden by derived classes.
Definition: pipeline.cpp:404
auto first(Container &c) -> decltype(begin(c))
#define __TBB_override
Definition: tbb_stddef.h:240
Token my_token
Invalid unless a task went through an ordered stage.
Definition: pipeline.cpp:33
bool put_token(task_info &info_, bool force_put=false)
Put a token into the buffer.
Definition: pipeline.cpp:135
Meets "allocator" requirements of ISO C++ Standard, Section 20.1.5.
bool has_item()
true if the current low_token is valid.
Definition: pipeline.cpp:222
static filter * not_in_pipeline()
Value used to mark "not in pipeline".
Definition: pipeline.h:67
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
A stage in a pipeline served by a user thread.
Definition: pipeline.h:196
bool my_at_start
True if this task has not yet read the input.
Definition: pipeline.cpp:255
#define ITT_NOTIFY(name, obj)
Definition: itt_notify.h:112
void deallocate(pointer p, size_type)
Free block of memory that starts on a cache line.
task * parent() const
task on whose behalf this task is working, or NULL if this is a root.
Definition: task.h:865
void poison_pointer(T *__TBB_atomic &)
Definition: tbb_stddef.h:305
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
static const size_type initial_buffer_size
Initial size for "array".
Definition: pipeline.cpp:80
void set(T value)
Definition: tls.h:56
The graph class.
stage_task(pipeline &pipeline, filter *filter_, const task_info &info)
Construct stage_task for a subsequent stage in a pipeline.
Definition: pipeline.cpp:268
void reset()
Roughly equivalent to the constructor of input stage task.
Definition: pipeline.cpp:275
basic_tls< intptr_t > end_of_input_tls_t
for parallel filters that accepts NULLs, thread-local flag for reaching end_of_input
Definition: pipeline.cpp:92
virtual __TBB_EXPORTED_METHOD ~filter()
Destroy filter.
Definition: pipeline.cpp:697
const unsigned char my_filter_mode
Storage for filter mode and dynamically checked implementation version.
Definition: pipeline.h:181
bool is_valid
True if my_object is valid.
Definition: pipeline.cpp:37

Copyright © 2005-2020 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.