173     template<
typename StageTask>
   188             spawner.spawn_stage_task(wakee);
   191 #if __TBB_TASK_GROUP_CONTEXT   192     void clear( 
filter* my_filter ) {
   239         new_array[i].is_valid = 
false;
   241     for( 
size_type i=0; i<old_size; ++i, ++t )
   242         new_array[t&(
new_size-1)] = old_array[t&(old_size-1)];
   282 #if __TBB_TASK_GROUP_CONTEXT   291 #endif // __TBB_TASK_GROUP_CONTEXT   318                     goto process_another_stage;
   369                         goto process_another_stage;
   379             if(ntokens_avail == 1) {
   380                 my_pipeline.filter_list->my_input_buffer->sema_V();
   392 process_another_stage:
   416             filter* first_suitable_filter = current_filter;
   417             while( current_filter ) {
   447                     if( !current_filter ) {
   452                         current_filter = first_suitable_filter;
   458                     first_suitable_filter = first_suitable_filter->
next_segment;
   459                     current_filter = first_suitable_filter;
   479             for(  
filter* subfilter=
first->next_filter_in_pipeline;
   481                   subfilter=subfilter->next_filter_in_pipeline )
   483                 if( subfilter->prev_filter_in_pipeline->is_bound() && !subfilter->is_bound() ) {
   486                     head_of_previous_segment = subfilter;
   493 #if _MSC_VER && !defined(__INTEL_COMPILER)   496     #pragma warning (disable: 4127)   507 #if __TBB_TASK_GROUP_CONTEXT   517 void pipeline::inject_token( 
task& ) {
   521 #if __TBB_TASK_GROUP_CONTEXT   522 void pipeline::clear_filters() {
   523     for( filter* f = filter_list; f; f = f->next_filter_in_pipeline ) {
   525             if( internal::input_buffer* b = f->my_input_buffer )
   531 pipeline::pipeline() :
   536     has_thread_bound_filters(false)
   542 pipeline::~pipeline() {
   546 void pipeline::clear() {
   548     for( filter* f = filter_list; f; f=next ) {
   549         if( internal::input_buffer* b = f->my_input_buffer ) {
   551             f->my_input_buffer = NULL;
   553         next=f->next_filter_in_pipeline;
   557             f->my_pipeline = NULL;
   560             f->next_segment = NULL;
   562     filter_list = filter_end = NULL;
   565 void pipeline::add_filter( filter& filter_ ) {
   570     __TBB_ASSERT( !end_counter, 
"invocation of add_filter on running pipeline" );
   573         filter_.my_pipeline = 
this;
   574         filter_.prev_filter_in_pipeline = filter_end;
   575         if ( filter_list == NULL)
   576             filter_list = &filter_;
   578             filter_end->next_filter_in_pipeline = &filter_;
   579         filter_.next_filter_in_pipeline = NULL;
   580         filter_end = &filter_;
   583             filter_end = reinterpret_cast<filter*>(&filter_list);
   585         *reinterpret_cast<filter**>(filter_end) = &filter_;
   586         filter_end = reinterpret_cast<filter*>(&filter_.next_filter_in_pipeline);
   587         *reinterpret_cast<filter**>(filter_end) = NULL;
   590         if( filter_.is_serial() ) {
   591             if( filter_.is_bound() )
   592                 has_thread_bound_filters = 
true;
   593             filter_.my_input_buffer = 
new internal::input_buffer( filter_.is_ordered(), filter_.is_bound() );
   595             if(filter_.prev_filter_in_pipeline) {
   596                 if(filter_.prev_filter_in_pipeline->is_bound()) {
   598                     filter_.my_input_buffer = 
new internal::input_buffer( 
false, 
false );
   601                 if(filter_.object_may_be_null() ) {
   603                     filter_.my_input_buffer = 
new internal::input_buffer( 
false, 
false );
   604                     filter_.my_input_buffer->create_my_tls();
   609         if( filter_.is_serial() ) {
   610             filter_.my_input_buffer = 
new internal::input_buffer( filter_.is_ordered(), false );
   616 void pipeline::remove_filter( filter& filter_ ) {
   619     __TBB_ASSERT( !end_counter, 
"invocation of remove_filter on running pipeline" );
   620     if (&filter_ == filter_list)
   621         filter_list = filter_.next_filter_in_pipeline;
   623         __TBB_ASSERT( filter_.prev_filter_in_pipeline, 
"filter list broken?" );
   624         filter_.prev_filter_in_pipeline->next_filter_in_pipeline = filter_.next_filter_in_pipeline;
   626     if (&filter_ == filter_end)
   627         filter_end = filter_.prev_filter_in_pipeline;
   629         __TBB_ASSERT( filter_.next_filter_in_pipeline, 
"filter list broken?" );
   630         filter_.next_filter_in_pipeline->prev_filter_in_pipeline = filter_.prev_filter_in_pipeline;
   632     if( internal::input_buffer* b = filter_.my_input_buffer ) {
   634         filter_.my_input_buffer = NULL;
   638         filter_.next_segment = NULL;
   639     filter_.my_pipeline = NULL;
   642 void pipeline::run( 
size_t max_number_of_live_tokens
   647     __TBB_ASSERT( max_number_of_live_tokens>0, 
"pipeline::run must have at least one token" );
   648     __TBB_ASSERT( !end_counter, 
"pipeline already running?" );
   650         internal::pipeline_cleaner my_pipeline_cleaner(*
this);
   651         end_of_input = 
false;
   653         if(has_thread_bound_filters) {
   655             if(filter_list->is_bound()) {
   656                 filter_list->my_input_buffer->sema_V();
   659 #if __TBB_TASK_GROUP_CONTEXT   667         if(has_thread_bound_filters) {
   668             for(filter* f = filter_list->next_filter_in_pipeline; f; f=f->next_filter_in_pipeline) {
   670                     f->my_input_buffer->sema_V(); 
   677 #if __TBB_TASK_GROUP_CONTEXT   678 void pipeline::run( 
size_t max_number_of_live_tokens ) {
   686         run(max_number_of_live_tokens, context);
   689 #endif // __TBB_TASK_GROUP_CONTEXT   693     __TBB_ASSERT(my_input_buffer, 
"has_more_work() called for filter with no input buffer");
   700             my_pipeline->remove_filter(*
this);
   712         my_pipeline->end_of_input = 
true;
   714         __TBB_ASSERT(my_input_buffer->end_of_input_tls_allocated, NULL);
   715         my_input_buffer->set_my_tls_end_of_input();
   720     return internal_process_item(
true);
   724     return internal_process_item(
false);
   728     __TBB_ASSERT(my_pipeline != NULL,
"It's not supposed that process_item is called for a filter that is not in a pipeline.");
   732     if( my_pipeline->end_of_input && !has_more_work() )
   733         return end_of_stream;
   735     if( !prev_filter_in_pipeline ) {
   736         if( my_pipeline->end_of_input )
   737             return end_of_stream;
   738         while( my_pipeline->input_tokens == 0 ) {
   740                 return item_not_available;
   741             my_input_buffer->sema_P();
   745             __TBB_ASSERT(my_pipeline->input_tokens > 0, 
"Token failed in thread-bound filter");
   746             my_pipeline->input_tokens--;
   748                 info.
my_token = my_pipeline->token_counter;
   751             my_pipeline->token_counter++; 
   753             my_pipeline->end_of_input = 
true;
   754             return end_of_stream;
   757         while( !my_input_buffer->has_item() ) {
   759                 return item_not_available;
   761             my_input_buffer->sema_P();
   762             if( my_pipeline->end_of_input && !has_more_work() ) {
   763                 return end_of_stream;
   766         if( !my_input_buffer->return_item(info, 
true) ) {
   771     if( next_filter_in_pipeline ) {
   772         if ( !next_filter_in_pipeline->my_input_buffer->put_token(info,
true) ) {
   773             __TBB_ASSERT(
false, 
"Couldn't put token after thread-bound buffer");
   776         size_t ntokens_avail = ++(my_pipeline->input_tokens);
   777         if( my_pipeline->filter_list->is_bound() ) {
   778             if( ntokens_avail == 1 ) {
   779                 my_pipeline->filter_list->my_input_buffer->sema_V();
 
Used to form groups of tasks.
 
pipeline_cleaner(pipeline &_pipeline)
 
pipeline_root_task(pipeline &pipeline)
 
size_type array_size
Size of array.
 
result_type internal_process_item(bool is_blocking)
Internal routine for item processing.
 
void push_back(task &task)
Push task onto back of list.
 
#define __TBB_TASK_GROUP_CONTEXT
 
bool object_may_be_null()
true if an input filter can emit null
 
bool is_cancelled() const
Returns true if the context has received cancellation request.
 
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t new_size
 
end_of_input_tls_t end_of_input_tls
 
result_type __TBB_EXPORTED_METHOD process_item()
Wait until a data item becomes available, and invoke operator() on that item.
 
Base class for user-defined tasks.
 
bool is_bound() const
True if filter is thread-bound.
 
#define __TBB_PIPELINE_VERSION(x)
 
internal::input_buffer * my_input_buffer
Buffer for incoming tokens, or NULL if not required.
 
A lock that occupies a single byte.
 
input_buffer(bool is_ordered_, bool is_bound_)
Construct empty buffer.
 
void __TBB_EXPORTED_FUNC handle_perror(int error_code, const char *aux_info)
Throws std::runtime_error with what() returning error_code description prefixed with aux_info.
 
filter * next_segment
Pointer to the next "segment" of filters, or NULL if not required.
 
pointer allocate(size_type n, const void *hint=0)
Allocate space for n objects, starting on a cache/sector line.
 
semaphore * my_sem
for thread-bound filter, semaphore for waiting, NULL otherwise.
 
Edsger Dijkstra's counting semaphore.
 
task_info * array
Array of deferred tasks that cannot yet start executing.
 
bool has_more_work()
has the filter not yet processed all the tokens it will ever see?
 
spin_mutex array_mutex
Serializes updates.
 
bool is_serial() const
True if filter is serial.
 
Token high_token
Used for out of order buffer, and for assigning my_token if is_ordered and my_token not already assig...
 
void note_done(Token token, StageTask &spawner)
Note that processing of a token is finished.
 
void create_sema(size_t initial_tokens)
 
A buffer of input items for a filter.
 
void reset()
Set to initial state (no object, no token)
 
static const unsigned char version_mask
 
bool my_token_ready
False until my_token is set.
 
void set_my_tls_end_of_input()
 
friend class tbb::pipeline
 
Token low_token
Lowest token that can start executing.
 
void spawn_stage_task(const task_info &info)
Creates and spawns stage_task from task_info.
 
stage_task(pipeline &pipeline)
Construct stage_task for first stage in a pipeline.
 
static internal::allocate_root_proxy allocate_root()
Returns proxy for overloaded new that allocates a root task.
 
bool my_tls_end_of_input()
 
task * execute() __TBB_override
The virtual task execution method.
 
void grow(size_type minimum_size)
Resize "array".
 
void recycle_as_continuation()
Change this to be a continuation of its former self.
 
friend class tbb::pipeline
 
result_type __TBB_EXPORTED_METHOD try_process_item()
If a data item is available, invoke operator() on that item.
 
filter * next_filter_in_pipeline
Pointer to next filter in the pipeline.
 
virtual void finalize(void *)
Destroys item if pipeline was cancelled.
 
internal::allocate_child_proxy & allocate_child()
Returns proxy for overloaded new that allocates a child task of *this.
 
bool end_of_input_tls_allocated
 
static const unsigned char exact_exception_propagation
7th bit defines exception propagation mode expected by the application.
 
bool is_bound
True for thread-bound filter, false otherwise.
 
static void spawn_root_and_wait(task &root)
Spawn task allocated by allocate_root, wait for it to complete, and deallocate it.
 
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p sync_releasing
 
filter * prev_filter_in_pipeline
Pointer to previous filter in the pipeline.
 
This structure is used to store task information in a input buffer.
 
bool is_ordered() const
True if filter must receive stream in order.
 
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task * task
 
Represents acquisition of a mutex.
 
bool is_ordered
True for ordered filter, false otherwise.
 
void __TBB_EXPORTED_METHOD set_end_of_input()
 
bool return_item(task_info &info, bool advance)
return an item, invalidate the queued item, but only advance if the filter
 
Base class for types that should not be copied or assigned.
 
void set_ref_count(int count)
Set reference count.
 
~input_buffer()
Destroy the buffer.
 
task * execute() __TBB_override
Should be overridden by derived classes.
 
auto first(Container &c) -> decltype(begin(c))
 
Token my_token
Invalid unless a task went through an ordered stage.
 
bool put_token(task_info &info_, bool force_put=false)
Put a token into the buffer.
 
Meets "allocator" requirements of ISO C++ Standard, Section 20.1.5.
 
bool has_item()
true if the current low_token is valid.
 
static filter * not_in_pipeline()
Value used to mark "not in pipeline".
 
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
 
A stage in a pipeline served by a user thread.
 
bool my_at_start
True if this task has not yet read the input.
 
#define ITT_NOTIFY(name, obj)
 
void deallocate(pointer p, size_type)
Free block of memory that starts on a cache line.
 
task * parent() const
task on whose behalf this task is working, or NULL if this is a root.
 
void poison_pointer(T *__TBB_atomic &)
 
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
 
static const size_type initial_buffer_size
Initial size for "array".
 
stage_task(pipeline &pipeline, filter *filter_, const task_info &info)
Construct stage_task for a subsequent stage in a pipeline.
 
void reset()
Roughly equivalent to the constructor of input stage task.
 
basic_tls< intptr_t > end_of_input_tls_t
for parallel filters that accepts NULLs, thread-local flag for reaching end_of_input
 
virtual __TBB_EXPORTED_METHOD ~filter()
Destroy filter.
 
const unsigned char my_filter_mode
Storage for filter mode and dynamically checked implementation version.
 
bool is_valid
True if my_object is valid.