17 #ifndef __TBB__concurrent_queue_impl_H    18 #define __TBB__concurrent_queue_impl_H    20 #ifndef __TBB_concurrent_queue_H    21 #error Do not #include this internal file directly; use public TBB headers instead.    24 #include "../tbb_stddef.h"    25 #include "../tbb_machine.h"    26 #include "../atomic.h"    27 #include "../spin_mutex.h"    28 #include "../cache_aligned_allocator.h"    29 #include "../tbb_exception.h"    30 #include "../tbb_profiling.h"    32 #include __TBB_STD_SWAP_HEADER    37 #if !__TBB_TEMPLATE_FRIENDS_BROKEN    40 namespace strict_ppl {
    41 template<
typename T, 
typename A> 
class concurrent_queue;
    44 template<
typename T, 
typename A> 
class concurrent_bounded_queue;
    49 namespace strict_ppl {
    73     static const size_t phi = 3;
    77     static const size_t n_queue = 8;
   103     return uintptr_t(
p)>1;
   121 #if _MSC_VER && !defined(__INTEL_COMPILER)   123 #pragma warning( push )   124 #pragma warning( disable: 4146 )   133     typedef void (*item_constructor_t)(T* location, 
const void* src);
   145     void copy_item( 
page& dst, 
size_t dindex, 
const void* src, item_constructor_t construct_item ) {
   146         construct_item( &get_ref(dst, dindex), src );
   150         item_constructor_t construct_item )
   152         T& src_item = get_ref( const_cast<page&>(src), sindex );
   153         construct_item( &get_ref(dst, dindex), static_cast<const void*>(&src_item) );
   157         T& from = get_ref(src,index);
   177         return (&static_cast<padded_page*>(static_cast<void*>(&
p))->
last)[index];
   189         item_constructor_t construct_item ) ;
   194         item_constructor_t construct_item ) ;
   197         size_t end_in_page, 
ticket& g_index, item_constructor_t construct_item ) ;
   199     void invalidate_page_and_rethrow( 
ticket k ) ;
   216     item_constructor_t construct_item )
   226             ++base.
my_rep->n_invalid_entries;
   227             invalidate_page_and_rethrow( k );
   233     if( tail_counter != k ) spin_wait_until_my_turn( tail_counter, k, *base.
my_rep );
   249         copy_item( *
p, index, item, construct_item );
   255         ++base.
my_rep->n_invalid_entries;
   272     bool success = 
false;
   275         if( 
p->mask & uintptr_t(1)<<index ) {
   277             assign_and_destroy_item( dst, *
p, index );
   279             --base.
my_rep->n_invalid_entries;
   287     item_constructor_t construct_item )
   294         ticket g_index = head_counter;
   298             size_t end_in_first_page = (index+n_items<base.
my_rep->items_per_page)?(index+n_items):base.
my_rep->items_per_page;
   300             head_page = make_copy( base, srcp, index, end_in_first_page, g_index, construct_item );
   301             page* cur_page = head_page;
   305                     cur_page->
next = make_copy( base, srcp, 0, base.
my_rep->items_per_page, g_index, construct_item );
   306                     cur_page = cur_page->
next;
   311                 if( last_index==0 ) last_index = base.
my_rep->items_per_page;
   313                 cur_page->
next = make_copy( base, srcp, 0, last_index, g_index, construct_item );
   314                 cur_page = cur_page->
next;
   316             tail_page = cur_page;
   318             invalidate_page_and_rethrow( g_index );
   321         head_page = tail_page = NULL;
   329     page* invalid_page = (
page*)uintptr_t(1);
   335             q->
next = invalid_page;
   337             head_page = invalid_page;
   338         tail_page = invalid_page;
   346     ticket& g_index, item_constructor_t construct_item )
   350     new_page->
next = NULL;
   352     for( ; begin_in_page!=end_in_page; ++begin_in_page, ++g_index )
   353         if( new_page->
mask & uintptr_t(1)<<begin_in_page )
   354             copy_item( *new_page, begin_in_page, *src_page, begin_in_page, construct_item );
   367         my_ticket(k), my_queue(queue), my_page(
p), allocator(b)
   378         my_queue.head_page = q;
   380             my_queue.tail_page = NULL;
   385         allocator.deallocate_page( 
p );
   389 #if _MSC_VER && !defined(__INTEL_COMPILER)   390 #pragma warning( pop )   391 #endif // warning 4146 is back   406         return k*phi%n_queue;
   411         return array[index(k)];
   441         return reinterpret_cast<page*>(allocate_block ( n ));
   447         deallocate_block( reinterpret_cast<void*>(
p), n );
   451     virtual void *allocate_block( 
size_t n ) = 0;
   454     virtual void deallocate_block( 
void *
p, 
size_t n ) = 0;
   462         for( 
size_t i=0; i<nq; i++ )
   463             __TBB_ASSERT( my_rep->
array[i].tail_page==NULL, 
"pages were not freed properly" );
   472          r.
choose(k).push( src, k, *
this, construct_item );
   477     bool internal_try_pop( 
void* dst ) ;
   480     size_t internal_size() 
const ;
   483     bool internal_empty() 
const ;
   487     void internal_finish_clear() ;
   497 #if __TBB_CPP11_RVALUE_REF_PRESENT   507     const size_t item_size = 
sizeof(T);
   514     my_rep->item_size = item_size;
   515     my_rep->items_per_page = item_size<=  8 ? 32 :
   516                              item_size<= 16 ? 16 :
   536 #if defined(_MSC_VER) && defined(_Wp64)   537     #pragma warning (push)   538     #pragma warning (disable: 4267)   541 #if defined(_MSC_VER) && defined(_Wp64)   542     #pragma warning (pop)   548     } 
while( !r.
choose( k ).pop( dst, k, *
this ) );
   555     __TBB_ASSERT( 
sizeof(ptrdiff_t)<=
sizeof(
size_t), NULL );
   560     ptrdiff_t sz = tc-hc-nie;
   561     return sz<0 ? 0 :  size_t(sz);
   577     for( 
size_t i=0; i<nq; ++i ) {
   581             deallocate_page( tp );
   582             r.
array[i].tail_page = NULL;
   601     for( 
size_t i = 0; i < r.
n_queue; ++i )
   602         r.
array[i].assign( src.
my_rep->array[i], *
this, construct_item);
   605             "the source concurrent queue should not be concurrently modified." );
   618         head_counter(queue.my_rep->head_counter),
   621         for( 
size_t k=0; k<concurrent_queue_rep<T>::n_queue; ++k )
   622             array[k] = queue.
my_rep->array[k].head_page;
   626     bool get_item( T*& item, 
size_t k ) ;
   631     if( k==my_queue.my_rep->tail_counter ) {
   639         return (
p->mask & uintptr_t(1)<<i)!=0;
   645 template<
typename Value>
   651     template<
typename C, 
typename T, 
typename U>
   654     template<
typename C, 
typename T, 
typename U>
   662 #if __TBB_GCC_OPTIMIZER_ORDERING_BROKEN   669     : my_rep(NULL), my_item(NULL) {
   694 template<
typename Value>
   699     if( !my_rep->get_item(my_item, k) ) advance();
   702 template<
typename Value>
   704     if( my_rep!=other.
my_rep ) {
   717 template<
typename Value>
   719     __TBB_ASSERT( my_item, 
"attempt to increment iterator past end of queue" );
   720     size_t k = my_rep->head_counter;
   724     my_rep->get_item(tmp,k);
   733     my_rep->head_counter = ++k;
   734     if( !my_rep->get_item(my_item, k) ) advance();
   747 template<
typename Container, 
typename Value>
   749         public std::iterator<std::forward_iterator_tag,Value> {
   750 #if !__TBB_TEMPLATE_FRIENDS_BROKEN   751     template<
typename T, 
class A>
   752     friend class ::tbb::strict_ppl::concurrent_queue;
   778     Value& operator*()
 const {
   779         return *static_cast<Value*>(this->my_item);
   792         Value* result = &operator*();
   799 template<
typename C, 
typename T, 
typename U>
   804 template<
typename C, 
typename T, 
typename U>
   854 #if __TBB_PROTECTED_NESTED_CLASS_BROKEN   868     virtual void copy_item( 
page& dst, 
size_t index, 
const void* src ) = 0;
   869     virtual void assign_and_destroy_item( 
void* dst, 
page& src, 
size_t index ) = 0;
   915 #if __TBB_CPP11_RVALUE_REF_PRESENT   926     void internal_insert_item( 
const void* src, copy_specifics op_type );
   929     bool internal_insert_if_not_full( 
const void* src, copy_specifics op_type );
   934     virtual void copy_page_item( 
page& dst, 
size_t dindex, 
const page& src, 
size_t sindex ) = 0;
   955     virtual void move_item( 
page& dst, 
size_t index, 
const void* src ) = 0;
   965     template<
typename C, 
typename T, 
typename U>
   968     template<
typename C, 
typename T, 
typename U>
  1011 template<
typename Container, 
typename Value>
  1013         public std::iterator<std::forward_iterator_tag,Value> {
  1015 #if !__TBB_TEMPLATE_FRIENDS_BROKEN  1016     template<
typename T, 
class A>
  1017     friend class ::tbb::concurrent_bounded_queue;
  1045         return *static_cast<Value*>(
my_item);
  1065 template<
typename C, 
typename T, 
typename U>
  1070 template<
typename C, 
typename T, 
typename U>
 base class of concurrent_queue
 
concurrent_queue_rep< T >::page page
 
concurrent_queue_page_allocator & allocator
 
micro_queue< T >::padded_page padded_page
 
void internal_throw_exception() const
Obsolete.
 
concurrent_queue_iterator(const concurrent_queue_base_v3 &queue)
Construct iterator pointing to head of queue.
 
argument_integer_type modulo_power_of_two(argument_integer_type arg, divisor_integer_type divisor)
A function to compute arg modulo divisor where divisor is a power of 2.
 
micro_queue< T > & my_queue
 
bool get_item(T *&item, size_t k)
Set item to point to kth element. Return true if at end of queue or item is marked valid; false other...
 
static size_t index(ticket k)
Map ticket to an array index.
 
parts of concurrent_queue_rep that do not have references to micro_queue
 
atomic< ticket > head_counter
 
void assign(const concurrent_queue_iterator_base_v3< Value > &other)
Assignment.
 
Value * operator->() const
 
Value * operator++(int)
Post increment.
 
atomic< ticket > tail_counter
 
concurrent_queue_iterator_rep * my_rep
concurrent_queue over which we are iterating.
 
micro_queue & assign(const micro_queue &src, concurrent_queue_base_v3< T > &base, item_constructor_t construct_item)
 
T last
Must be last field.
 
concurrent_queue_rep_base::page page
 
void call_itt_notify(notify_type, void *)
 
concurrent_queue_iterator()
 
void invalidate_page_and_rethrow(ticket k)
 
void internal_swap(concurrent_queue_base_v3 &src)
swap internal representation
 
Abstract class to define interface for page allocation/deallocation.
 
void spin_wait_while_eq(const volatile T &location, U value)
Spin WHILE the value of the variable is equal to a given value.
 
A lock that occupies a single byte.
 
micro_queue< T >::item_constructor_t item_constructor_t
 
concurrent_queue_iterator_base_v3(const concurrent_queue_iterator_base_v3 &i)
Copy constructor.
 
Value * operator++(int)
Post increment.
 
bool operator==(const concurrent_queue_iterator< C, T > &i, const concurrent_queue_iterator< C, U > &j)
 
concurrent_queue_rep< T > * my_rep
Internal representation.
 
Base class for types that should not be assigned.
 
void itt_store_word_with_release(tbb::atomic< T > &dst, U src)
 
Value * operator->() const
 
bool internal_try_pop(void *dst)
Attempt to dequeue item from queue.
 
concurrent_queue_iterator_base_v3 & operator=(const concurrent_queue_iterator_base_v3 &i)
 
size_t item_size
Size of an item.
 
virtual void move_item(page &dst, size_t index, const void *src)=0
 
void pause()
Pause for a while.
 
Meets requirements of a forward iterator for STL.
 
concurrent_queue_iterator(const concurrent_queue_iterator< Container, typename Container::value_type > &other)
 
bool operator!=(const concurrent_queue_iterator< C, T > &i, const concurrent_queue_iterator< C, U > &j)
 
concurrent_queue_iterator_rep(const concurrent_queue_base_v3< T > &queue)
 
void copy_item(page &dst, size_t dindex, const void *src, item_constructor_t construct_item)
 
void * my_item
Pointer to current item.
 
static T & get_ref(page &p, size_t index)
 
auto last(Container &c) -> decltype(begin(c))
 
const concurrent_queue_base_v3< T > & my_queue
 
size_t internal_size() const
Get size of queue; result may be invalid if queue is modified concurrently.
 
page * make_copy(concurrent_queue_base_v3< T > &base, const page *src_page, size_t begin_in_page, size_t end_in_page, ticket &g_index, item_constructor_t construct_item)
 
void spin_wait_until_eq(const volatile T &location, const U value)
Spin UNTIL the value of the variable is equal to a given value.
 
concurrent_queue_iterator()
 
void internal_push(const void *src, item_constructor_t construct_item)
Enqueue item at tail of queue.
 
virtual page * allocate_page() __TBB_override
 
void advance()
Advance iterator one step towards tail of queue.
 
size_t items_per_page
Always a power of 2.
 
bool is_valid_page(const concurrent_queue_rep_base::page *p)
 
ptrdiff_t my_capacity
Capacity of the queue.
 
Meets requirements of a forward iterator for STL.
 
size_t item_size
Size of an item.
 
micro_queue< T >::padded_page padded_page
 
concurrent_queue_iterator(const concurrent_queue_iterator< Container, typename Container::value_type > &other)
 
friend bool operator==(const concurrent_queue_iterator< C, T > &i, const concurrent_queue_iterator< C, U > &j)
 
void __TBB_EXPORTED_METHOD move_content(concurrent_queue_base_v8 &src)
move items
 
Value & operator *() const
Reference to current item.
 
concurrent_queue_iterator & operator=(const concurrent_queue_iterator< Container, typename Container::value_type > &other)
Iterator assignment.
 
Internal representation of a ConcurrentQueue.
 
void assign(const concurrent_queue_base_v3 &src, item_constructor_t construct_item)
copy or move internal representation
 
virtual void move_page_item(page &dst, size_t dindex, const page &src, size_t sindex)=0
 
concurrent_queue_iterator_rep< Value > * my_rep
Represents concurrent_queue over which we are iterating.
 
concurrent_queue_iterator & operator++()
Advance to next item in queue.
 
virtual void deallocate_page(concurrent_queue_rep_base::page *p) __TBB_override
 
virtual ~concurrent_queue_base_v3()
 
atomic< size_t > n_invalid_entries
number of invalid entries in the queue
 
void push(const void *item, ticket k, concurrent_queue_base_v3< T > &base, item_constructor_t construct_item)
 
Type-independent portion of concurrent_queue_iterator.
 
atomic< page * > head_page
 
#define __TBB_compiler_fence()
 
friend bool operator!=(const concurrent_queue_iterator< C, T > &i, const concurrent_queue_iterator< C, U > &j)
 
concurrent_queue_base_v3()
 
void const char const char int ITT_FORMAT __itt_group_sync p
 
concurrent_queue_rep * my_rep
Internal representation.
 
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d
 
concurrent_queue_iterator_base_v3()
Default constructor.
 
Constness-independent portion of concurrent_queue_iterator.
 
void spin_wait_until_my_turn(atomic< ticket > &counter, ticket k, concurrent_queue_rep_base &rb) const
 
concurrent_queue_iterator_base_v3(const concurrent_queue_iterator_base_v3 &i)
Copy constructor.
 
#define __TBB_offsetof(class_name, member_name)
Extended variant of the standard offsetof macro.
 
Class used to ensure exception-safety of method "pop".
 
A queue using simple locking.
 
void copy_item(page &dst, size_t dindex, const page &src, size_t sindex, item_constructor_t construct_item)
 
atomic< page * > tail_page
 
virtual ~concurrent_queue_page_allocator()
 
Represents acquisition of a mutex.
 
const size_t NFS_MaxLineSize
Compile-time constant that is upper bound on cache line/sector size.
 
Base class for types that should not be copied or assigned.
 
T last
Must be last field.
 
void internal_finish_clear()
free any remaining pages
 
virtual concurrent_queue_rep_base::page * allocate_page()=0
 
concurrent_queue_iterator_base_v3 & operator=(const concurrent_queue_iterator_base_v3 &i)
 
Value * my_item
Pointer to current item.
 
Class that implements exponential backoff.
 
concurrent_queue_iterator_base_v3()
Default constructor.
 
Similar to C++0x std::remove_cv.
 
bool internal_empty() const
check if the queue is empty; thread safe
 
void operator=(const padded_page &)
Not defined anywhere - exists to quiet warnings.
 
void swap(atomic< T > &lhs, atomic< T > &rhs)
 
void assign_and_destroy_item(void *dst, page &src, size_t index)
 
Meets "allocator" requirements of ISO C++ Standard, Section 20.1.5.
 
#define __TBB_EXPORTED_METHOD
 
micro_queue< T > array[n_queue]
 
static const size_t n_queue
 
~concurrent_queue_iterator_base_v3()
Destructor.
 
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
 
concurrent_queue_iterator & operator=(const concurrent_queue_iterator< Container, typename Container::value_type > &other)
Iterator assignment.
 
void itt_hide_store_word(T &dst, T src)
 
concurrent_queue_iterator & operator++()
Advance to next item in queue.
 
micro_queue< T > & choose(ticket k)
 
size_t items_per_page
Always a power of 2.
 
atomic< ticket > head_counter
 
void throw_exception(exception_id eid)
Versionless convenience wrapper for throw_exception_v4()
 
concurrent_queue_rep_base::page page
 
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value
 
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
 
representation of concurrent_queue_base
 
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type type
 
bool __TBB_EXPORTED_METHOD internal_push_move_if_not_full(const void *src)
Attempt to enqueue item onto queue using move operation.
 
atomic< ticket > tail_counter
 
A queue using simple locking.
 
concurrent_queue_iterator_base_v3 concurrent_queue_iterator_base
 
padded_page()
Not defined anywhere - exists to quiet warnings.
 
size_t __TBB_EXPORTED_FUNC NFS_GetLineSize()
Cache/sector line size.
 
micro_queue_pop_finalizer(micro_queue< T > &queue, concurrent_queue_base_v3< T > &b, ticket k, page *p)
 
void __TBB_EXPORTED_METHOD internal_push_move(const void *src)
Enqueue item at tail of queue using move operation.
 
concurrent_queue_base_v8(size_t item_sz)
 
Identifiers declared inside namespace internal should never be used directly by client code.
 
~micro_queue_pop_finalizer()
 
bool pop(void *dst, ticket k, concurrent_queue_base_v3< T > &base)
 
void move(tbb_thread &t1, tbb_thread &t2)