Home ⌂Doc Index ◂Up ▴
Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
_concurrent_queue_impl.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2020 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 */
16 
17 #ifndef __TBB__concurrent_queue_impl_H
18 #define __TBB__concurrent_queue_impl_H
19 
20 #ifndef __TBB_concurrent_queue_H
21 #error Do not #include this internal file directly; use public TBB headers instead.
22 #endif
23 
24 #include "../tbb_stddef.h"
25 #include "../tbb_machine.h"
26 #include "../atomic.h"
27 #include "../spin_mutex.h"
28 #include "../cache_aligned_allocator.h"
29 #include "../tbb_exception.h"
30 #include "../tbb_profiling.h"
31 #include <new>
32 #include __TBB_STD_SWAP_HEADER
33 #include <iterator>
34 
35 namespace tbb {
36 
37 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
38 
39 // forward declaration
40 namespace strict_ppl {
41 template<typename T, typename A> class concurrent_queue;
42 }
43 
44 template<typename T, typename A> class concurrent_bounded_queue;
45 
46 #endif
47 
49 namespace strict_ppl {
50 
52 namespace internal {
53 
54 using namespace tbb::internal;
55 
56 typedef size_t ticket;
57 
58 template<typename T> class micro_queue ;
59 template<typename T> class micro_queue_pop_finalizer ;
60 template<typename T> class concurrent_queue_base_v3;
61 template<typename T> struct concurrent_queue_rep;
62 
64 
68  template<typename T> friend class micro_queue;
69  template<typename T> friend class concurrent_queue_base_v3;
70 
71 protected:
73  static const size_t phi = 3;
74 
75 public:
76  // must be power of 2
77  static const size_t n_queue = 8;
78 
80  struct page {
82  uintptr_t mask;
83  };
84 
85  atomic<ticket> head_counter;
86  char pad1[NFS_MaxLineSize-sizeof(atomic<ticket>)];
87  atomic<ticket> tail_counter;
88  char pad2[NFS_MaxLineSize-sizeof(atomic<ticket>)];
89 
92 
94  size_t item_size;
95 
97  atomic<size_t> n_invalid_entries;
98 
99  char pad3[NFS_MaxLineSize-sizeof(size_t)-sizeof(size_t)-sizeof(atomic<size_t>)];
100 } ;
101 
103  return uintptr_t(p)>1;
104 }
105 
107 
111 {
112  template<typename T> friend class micro_queue ;
113  template<typename T> friend class micro_queue_pop_finalizer ;
114 protected:
116 private:
117  virtual concurrent_queue_rep_base::page* allocate_page() = 0;
118  virtual void deallocate_page( concurrent_queue_rep_base::page* p ) = 0;
119 } ;
120 
121 #if _MSC_VER && !defined(__INTEL_COMPILER)
122 // unary minus operator applied to unsigned type, result still unsigned
123 #pragma warning( push )
124 #pragma warning( disable: 4146 )
125 #endif
126 
128 
130 template<typename T>
131 class micro_queue : no_copy {
132 public:
133  typedef void (*item_constructor_t)(T* location, const void* src);
134 private:
136 
140  public:
141  destroyer( T& value ) : my_value(value) {}
142  ~destroyer() {my_value.~T();}
143  };
144 
145  void copy_item( page& dst, size_t dindex, const void* src, item_constructor_t construct_item ) {
146  construct_item( &get_ref(dst, dindex), src );
147  }
148 
149  void copy_item( page& dst, size_t dindex, const page& src, size_t sindex,
150  item_constructor_t construct_item )
151  {
152  T& src_item = get_ref( const_cast<page&>(src), sindex );
153  construct_item( &get_ref(dst, dindex), static_cast<const void*>(&src_item) );
154  }
155 
156  void assign_and_destroy_item( void* dst, page& src, size_t index ) {
157  T& from = get_ref(src,index);
158  destroyer d(from);
159  *static_cast<T*>(dst) = tbb::internal::move( from );
160  }
161 
162  void spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const ;
163 
164 public:
165  friend class micro_queue_pop_finalizer<T>;
166 
167  struct padded_page: page {
169  padded_page();
171  void operator=( const padded_page& );
173  T last;
174  };
175 
176  static T& get_ref( page& p, size_t index ) {
177  return (&static_cast<padded_page*>(static_cast<void*>(&p))->last)[index];
178  }
179 
180  atomic<page*> head_page;
181  atomic<ticket> head_counter;
182 
183  atomic<page*> tail_page;
184  atomic<ticket> tail_counter;
185 
187 
188  void push( const void* item, ticket k, concurrent_queue_base_v3<T>& base,
189  item_constructor_t construct_item ) ;
190 
191  bool pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) ;
192 
193  micro_queue& assign( const micro_queue& src, concurrent_queue_base_v3<T>& base,
194  item_constructor_t construct_item ) ;
195 
196  page* make_copy( concurrent_queue_base_v3<T>& base, const page* src_page, size_t begin_in_page,
197  size_t end_in_page, ticket& g_index, item_constructor_t construct_item ) ;
198 
199  void invalidate_page_and_rethrow( ticket k ) ;
200 };
201 
202 template<typename T>
203 void micro_queue<T>::spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const {
204  for( atomic_backoff b(true);;b.pause() ) {
205  ticket c = counter;
206  if( c==k ) return;
207  else if( c&1 ) {
208  ++rb.n_invalid_entries;
210  }
211  }
212 }
213 
214 template<typename T>
215 void micro_queue<T>::push( const void* item, ticket k, concurrent_queue_base_v3<T>& base,
216  item_constructor_t construct_item )
217 {
219  page* p = NULL;
220  size_t index = modulo_power_of_two( k/concurrent_queue_rep_base::n_queue, base.my_rep->items_per_page);
221  if( !index ) {
222  __TBB_TRY {
224  p = pa.allocate_page();
225  } __TBB_CATCH (...) {
226  ++base.my_rep->n_invalid_entries;
227  invalidate_page_and_rethrow( k );
228  }
229  p->mask = 0;
230  p->next = NULL;
231  }
232 
233  if( tail_counter != k ) spin_wait_until_my_turn( tail_counter, k, *base.my_rep );
234  call_itt_notify(acquired, &tail_counter);
235 
236  if( p ) {
237  spin_mutex::scoped_lock lock( page_mutex );
238  page* q = tail_page;
239  if( is_valid_page(q) )
240  q->next = p;
241  else
242  head_page = p;
243  tail_page = p;
244  } else {
245  p = tail_page;
246  }
247 
248  __TBB_TRY {
249  copy_item( *p, index, item, construct_item );
250  // If no exception was thrown, mark item as present.
251  itt_hide_store_word(p->mask, p->mask | uintptr_t(1)<<index);
252  call_itt_notify(releasing, &tail_counter);
253  tail_counter += concurrent_queue_rep_base::n_queue;
254  } __TBB_CATCH (...) {
255  ++base.my_rep->n_invalid_entries;
256  call_itt_notify(releasing, &tail_counter);
257  tail_counter += concurrent_queue_rep_base::n_queue;
258  __TBB_RETHROW();
259  }
260 }
261 
262 template<typename T>
265  if( head_counter!=k ) spin_wait_until_eq( head_counter, k );
266  call_itt_notify(acquired, &head_counter);
267  if( tail_counter==k ) spin_wait_while_eq( tail_counter, k );
268  call_itt_notify(acquired, &tail_counter);
269  page *p = head_page;
270  __TBB_ASSERT( p, NULL );
271  size_t index = modulo_power_of_two( k/concurrent_queue_rep_base::n_queue, base.my_rep->items_per_page );
272  bool success = false;
273  {
274  micro_queue_pop_finalizer<T> finalizer( *this, base, k+concurrent_queue_rep_base::n_queue, index==base.my_rep->items_per_page-1 ? p : NULL );
275  if( p->mask & uintptr_t(1)<<index ) {
276  success = true;
277  assign_and_destroy_item( dst, *p, index );
278  } else {
279  --base.my_rep->n_invalid_entries;
280  }
281  }
282  return success;
283 }
284 
285 template<typename T>
287  item_constructor_t construct_item )
288 {
289  head_counter = src.head_counter;
290  tail_counter = src.tail_counter;
291 
292  const page* srcp = src.head_page;
293  if( is_valid_page(srcp) ) {
294  ticket g_index = head_counter;
295  __TBB_TRY {
296  size_t n_items = (tail_counter-head_counter)/concurrent_queue_rep_base::n_queue;
297  size_t index = modulo_power_of_two( head_counter/concurrent_queue_rep_base::n_queue, base.my_rep->items_per_page );
298  size_t end_in_first_page = (index+n_items<base.my_rep->items_per_page)?(index+n_items):base.my_rep->items_per_page;
299 
300  head_page = make_copy( base, srcp, index, end_in_first_page, g_index, construct_item );
301  page* cur_page = head_page;
302 
303  if( srcp != src.tail_page ) {
304  for( srcp = srcp->next; srcp!=src.tail_page; srcp=srcp->next ) {
305  cur_page->next = make_copy( base, srcp, 0, base.my_rep->items_per_page, g_index, construct_item );
306  cur_page = cur_page->next;
307  }
308 
309  __TBB_ASSERT( srcp==src.tail_page, NULL );
310  size_t last_index = modulo_power_of_two( tail_counter/concurrent_queue_rep_base::n_queue, base.my_rep->items_per_page );
311  if( last_index==0 ) last_index = base.my_rep->items_per_page;
312 
313  cur_page->next = make_copy( base, srcp, 0, last_index, g_index, construct_item );
314  cur_page = cur_page->next;
315  }
316  tail_page = cur_page;
317  } __TBB_CATCH (...) {
318  invalidate_page_and_rethrow( g_index );
319  }
320  } else {
321  head_page = tail_page = NULL;
322  }
323  return *this;
324 }
325 
326 template<typename T>
328  // Append an invalid page at address 1 so that no more pushes are allowed.
329  page* invalid_page = (page*)uintptr_t(1);
330  {
331  spin_mutex::scoped_lock lock( page_mutex );
333  page* q = tail_page;
334  if( is_valid_page(q) )
335  q->next = invalid_page;
336  else
337  head_page = invalid_page;
338  tail_page = invalid_page;
339  }
340  __TBB_RETHROW();
341 }
342 
343 template<typename T>
345  const concurrent_queue_rep_base::page* src_page, size_t begin_in_page, size_t end_in_page,
346  ticket& g_index, item_constructor_t construct_item )
347 {
349  page* new_page = pa.allocate_page();
350  new_page->next = NULL;
351  new_page->mask = src_page->mask;
352  for( ; begin_in_page!=end_in_page; ++begin_in_page, ++g_index )
353  if( new_page->mask & uintptr_t(1)<<begin_in_page )
354  copy_item( *new_page, begin_in_page, *src_page, begin_in_page, construct_item );
355  return new_page;
356 }
357 
358 template<typename T>
365 public:
367  my_ticket(k), my_queue(queue), my_page(p), allocator(b)
368  {}
370 };
371 
372 template<typename T>
374  page* p = my_page;
375  if( is_valid_page(p) ) {
376  spin_mutex::scoped_lock lock( my_queue.page_mutex );
377  page* q = p->next;
378  my_queue.head_page = q;
379  if( !is_valid_page(q) ) {
380  my_queue.tail_page = NULL;
381  }
382  }
383  itt_store_word_with_release(my_queue.head_counter, my_ticket);
384  if( is_valid_page(p) ) {
385  allocator.deallocate_page( p );
386  }
387 }
388 
389 #if _MSC_VER && !defined(__INTEL_COMPILER)
390 #pragma warning( pop )
391 #endif // warning 4146 is back
392 
393 template<typename T> class concurrent_queue_iterator_rep ;
394 template<typename T> class concurrent_queue_iterator_base_v3;
395 
397 
400 template<typename T>
402  micro_queue<T> array[n_queue];
403 
405  static size_t index( ticket k ) {
406  return k*phi%n_queue;
407  }
408 
410  // The formula here approximates LRU in a cache-oblivious way.
411  return array[index(k)];
412  }
413 };
414 
416 
420 template<typename T>
421 class concurrent_queue_base_v3: public concurrent_queue_page_allocator {
422 private:
425 
426  friend struct concurrent_queue_rep<T>;
427  friend class micro_queue<T>;
430 
431 protected:
433 
434 private:
437 
439  concurrent_queue_rep<T>& r = *my_rep;
440  size_t n = sizeof(padded_page) + (r.items_per_page-1)*sizeof(T);
441  return reinterpret_cast<page*>(allocate_block ( n ));
442  }
443 
445  concurrent_queue_rep<T>& r = *my_rep;
446  size_t n = sizeof(padded_page) + (r.items_per_page-1)*sizeof(T);
447  deallocate_block( reinterpret_cast<void*>(p), n );
448  }
449 
451  virtual void *allocate_block( size_t n ) = 0;
452 
454  virtual void deallocate_block( void *p, size_t n ) = 0;
455 
456 protected:
458 
460 #if TBB_USE_ASSERT
461  size_t nq = my_rep->n_queue;
462  for( size_t i=0; i<nq; i++ )
463  __TBB_ASSERT( my_rep->array[i].tail_page==NULL, "pages were not freed properly" );
464 #endif /* TBB_USE_ASSERT */
465  cache_aligned_allocator<concurrent_queue_rep<T> >().deallocate(my_rep,1);
466  }
467 
469  void internal_push( const void* src, item_constructor_t construct_item ) {
470  concurrent_queue_rep<T>& r = *my_rep;
471  ticket k = r.tail_counter++;
472  r.choose(k).push( src, k, *this, construct_item );
473  }
474 
476 
477  bool internal_try_pop( void* dst ) ;
478 
480  size_t internal_size() const ;
481 
483  bool internal_empty() const ;
484 
486  /* note that the name may be misleading, but it remains so due to a historical accident. */
487  void internal_finish_clear() ;
488 
492  }
493 
495  void assign( const concurrent_queue_base_v3& src, item_constructor_t construct_item ) ;
496 
497 #if __TBB_CPP11_RVALUE_REF_PRESENT
498  void internal_swap( concurrent_queue_base_v3& src ) {
500  std::swap( my_rep, src.my_rep );
501  }
502 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
503 };
504 
505 template<typename T>
507  const size_t item_size = sizeof(T);
508  my_rep = cache_aligned_allocator<concurrent_queue_rep<T> >().allocate(1);
509  __TBB_ASSERT( (size_t)my_rep % NFS_GetLineSize()==0, "alignment error" );
510  __TBB_ASSERT( (size_t)&my_rep->head_counter % NFS_GetLineSize()==0, "alignment error" );
511  __TBB_ASSERT( (size_t)&my_rep->tail_counter % NFS_GetLineSize()==0, "alignment error" );
512  __TBB_ASSERT( (size_t)&my_rep->array % NFS_GetLineSize()==0, "alignment error" );
513  memset(static_cast<void*>(my_rep),0,sizeof(concurrent_queue_rep<T>));
514  my_rep->item_size = item_size;
515  my_rep->items_per_page = item_size<= 8 ? 32 :
516  item_size<= 16 ? 16 :
517  item_size<= 32 ? 8 :
518  item_size<= 64 ? 4 :
519  item_size<=128 ? 2 :
520  1;
521 }
522 
523 template<typename T>
525  concurrent_queue_rep<T>& r = *my_rep;
526  ticket k;
527  do {
528  k = r.head_counter;
529  for(;;) {
530  if( (ptrdiff_t)(r.tail_counter-k)<=0 ) {
531  // Queue is empty
532  return false;
533  }
534  // Queue had item with ticket k when we looked. Attempt to get that item.
535  ticket tk=k;
536 #if defined(_MSC_VER) && defined(_Wp64)
537  #pragma warning (push)
538  #pragma warning (disable: 4267)
539 #endif
540  k = r.head_counter.compare_and_swap( tk+1, tk );
541 #if defined(_MSC_VER) && defined(_Wp64)
542  #pragma warning (pop)
543 #endif
544  if( k==tk )
545  break;
546  // Another thread snatched the item, retry.
547  }
548  } while( !r.choose( k ).pop( dst, k, *this ) );
549  return true;
550 }
551 
552 template<typename T>
554  concurrent_queue_rep<T>& r = *my_rep;
555  __TBB_ASSERT( sizeof(ptrdiff_t)<=sizeof(size_t), NULL );
556  ticket hc = r.head_counter;
557  size_t nie = r.n_invalid_entries;
558  ticket tc = r.tail_counter;
559  __TBB_ASSERT( hc!=tc || !nie, NULL );
560  ptrdiff_t sz = tc-hc-nie;
561  return sz<0 ? 0 : size_t(sz);
562 }
563 
564 template<typename T>
566  concurrent_queue_rep<T>& r = *my_rep;
567  ticket tc = r.tail_counter;
568  ticket hc = r.head_counter;
569  // if tc!=r.tail_counter, the queue was not empty at some point between the two reads.
570  return tc==r.tail_counter && tc==hc+r.n_invalid_entries ;
571 }
572 
573 template<typename T>
575  concurrent_queue_rep<T>& r = *my_rep;
576  size_t nq = r.n_queue;
577  for( size_t i=0; i<nq; ++i ) {
578  page* tp = r.array[i].tail_page;
579  if( is_valid_page(tp) ) {
580  __TBB_ASSERT( r.array[i].head_page==tp, "at most one page should remain" );
581  deallocate_page( tp );
582  r.array[i].tail_page = NULL;
583  } else
584  __TBB_ASSERT( !is_valid_page(r.array[i].head_page), "head page pointer corrupt?" );
585  }
586 }
587 
588 template<typename T>
590  item_constructor_t construct_item )
591 {
592  concurrent_queue_rep<T>& r = *my_rep;
593  r.items_per_page = src.my_rep->items_per_page;
594 
595  // copy concurrent_queue_rep data
596  r.head_counter = src.my_rep->head_counter;
597  r.tail_counter = src.my_rep->tail_counter;
598  r.n_invalid_entries = src.my_rep->n_invalid_entries;
599 
600  // copy or move micro_queues
601  for( size_t i = 0; i < r.n_queue; ++i )
602  r.array[i].assign( src.my_rep->array[i], *this, construct_item);
603 
604  __TBB_ASSERT( r.head_counter==src.my_rep->head_counter && r.tail_counter==src.my_rep->tail_counter,
605  "the source concurrent queue should not be concurrently modified." );
606 }
607 
608 template<typename Container, typename Value> class concurrent_queue_iterator;
609 
610 template<typename T>
613 public:
618  head_counter(queue.my_rep->head_counter),
619  my_queue(queue)
620  {
621  for( size_t k=0; k<concurrent_queue_rep<T>::n_queue; ++k )
622  array[k] = queue.my_rep->array[k].head_page;
623  }
624 
626  bool get_item( T*& item, size_t k ) ;
627 };
628 
629 template<typename T>
630 bool concurrent_queue_iterator_rep<T>::get_item( T*& item, size_t k ) {
631  if( k==my_queue.my_rep->tail_counter ) {
632  item = NULL;
633  return true;
634  } else {
636  __TBB_ASSERT(p,NULL);
637  size_t i = modulo_power_of_two( k/concurrent_queue_rep<T>::n_queue, my_queue.my_rep->items_per_page );
638  item = &micro_queue<T>::get_ref(*p,i);
639  return (p->mask & uintptr_t(1)<<i)!=0;
640  }
641 }
642 
644 
645 template<typename Value>
648 
650 
651  template<typename C, typename T, typename U>
653 
654  template<typename C, typename T, typename U>
656 protected:
658  Value* my_item;
659 
661  concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {
662 #if __TBB_GCC_OPTIMIZER_ORDERING_BROKEN
664 #endif
665  }
666 
669  : my_rep(NULL), my_item(NULL) {
670  assign(i);
671  }
672 
674  assign(i);
675  return *this;
676  }
677 
680 
682  void assign( const concurrent_queue_iterator_base_v3<Value>& other ) ;
683 
685  void advance() ;
686 
690  my_rep = NULL;
691  }
692 };
693 
694 template<typename Value>
697  new( my_rep ) concurrent_queue_iterator_rep<Value>(queue);
698  size_t k = my_rep->head_counter;
699  if( !my_rep->get_item(my_item, k) ) advance();
700 }
701 
702 template<typename Value>
704  if( my_rep!=other.my_rep ) {
705  if( my_rep ) {
707  my_rep = NULL;
708  }
709  if( other.my_rep ) {
711  new( my_rep ) concurrent_queue_iterator_rep<Value>( *other.my_rep );
712  }
713  }
714  my_item = other.my_item;
715 }
716 
717 template<typename Value>
719  __TBB_ASSERT( my_item, "attempt to increment iterator past end of queue" );
720  size_t k = my_rep->head_counter;
721  const concurrent_queue_base_v3<Value>& queue = my_rep->my_queue;
722 #if TBB_USE_ASSERT
723  Value* tmp;
724  my_rep->get_item(tmp,k);
725  __TBB_ASSERT( my_item==tmp, NULL );
726 #endif /* TBB_USE_ASSERT */
728  if( i==queue.my_rep->items_per_page-1 ) {
730  root = root->next;
731  }
732  // advance k
733  my_rep->head_counter = ++k;
734  if( !my_rep->get_item(my_item, k) ) advance();
735 }
736 
738 
739 template<typename T> struct tbb_remove_cv {typedef T type;};
740 template<typename T> struct tbb_remove_cv<const T> {typedef T type;};
741 template<typename T> struct tbb_remove_cv<volatile T> {typedef T type;};
742 template<typename T> struct tbb_remove_cv<const volatile T> {typedef T type;};
743 
745 
747 template<typename Container, typename Value>
748 class concurrent_queue_iterator: public concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>,
749  public std::iterator<std::forward_iterator_tag,Value> {
750 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
751  template<typename T, class A>
752  friend class ::tbb::strict_ppl::concurrent_queue;
753 #else
754 public:
755 #endif
758  concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>(queue)
759  {
760  }
761 
762 public:
764 
768  concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>(other)
769  {}
770 
774  return *this;
775  }
776 
778  Value& operator*() const {
779  return *static_cast<Value*>(this->my_item);
780  }
781 
782  Value* operator->() const {return &operator*();}
783 
786  this->advance();
787  return *this;
788  }
789 
791  Value* operator++(int) {
792  Value* result = &operator*();
793  operator++();
794  return result;
795  }
796 }; // concurrent_queue_iterator
797 
798 
799 template<typename C, typename T, typename U>
801  return i.my_item==j.my_item;
802 }
803 
804 template<typename C, typename T, typename U>
806  return i.my_item!=j.my_item;
807 }
808 
809 } // namespace internal
810 
812 
813 } // namespace strict_ppl
814 
816 namespace internal {
817 
821 template<typename Container, typename Value> class concurrent_queue_iterator;
822 
824 
827 private:
830 
831  friend class concurrent_queue_rep;
832  friend struct micro_queue;
836 protected:
838  struct page {
840  uintptr_t mask;
841  };
842 
844  ptrdiff_t my_capacity;
845 
848 
850  size_t item_size;
851 
852  enum copy_specifics { copy, move };
853 
854 #if __TBB_PROTECTED_NESTED_CLASS_BROKEN
855 public:
856 #endif
857  template<typename T>
858  struct padded_page: page {
860  padded_page();
862  void operator=( const padded_page& );
864  T last;
865  };
866 
867 private:
868  virtual void copy_item( page& dst, size_t index, const void* src ) = 0;
869  virtual void assign_and_destroy_item( void* dst, page& src, size_t index ) = 0;
870 protected:
873 
875  void __TBB_EXPORTED_METHOD internal_push( const void* src );
876 
878  void __TBB_EXPORTED_METHOD internal_pop( void* dst );
879 
881  void __TBB_EXPORTED_METHOD internal_abort();
882 
884  bool __TBB_EXPORTED_METHOD internal_push_if_not_full( const void* src );
885 
887 
888  bool __TBB_EXPORTED_METHOD internal_pop_if_present( void* dst );
889 
891  ptrdiff_t __TBB_EXPORTED_METHOD internal_size() const;
892 
895 
897  void __TBB_EXPORTED_METHOD internal_set_capacity( ptrdiff_t capacity, size_t element_size );
898 
900  virtual page *allocate_page() = 0;
901 
903  virtual void deallocate_page( page *p ) = 0;
904 
906  /* note that the name may be misleading, but it remains so due to a historical accident. */
908 
911 
914 
915 #if __TBB_CPP11_RVALUE_REF_PRESENT
918  std::swap( my_capacity, src.my_capacity );
919  std::swap( items_per_page, src.items_per_page );
920  std::swap( item_size, src.item_size );
921  std::swap( my_rep, src.my_rep );
922  }
923 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
924 
926  void internal_insert_item( const void* src, copy_specifics op_type );
927 
929  bool internal_insert_if_not_full( const void* src, copy_specifics op_type );
930 
932  void internal_assign( const concurrent_queue_base_v3& src, copy_specifics op_type );
933 private:
934  virtual void copy_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) = 0;
935 };
936 
938 
941 protected:
942  concurrent_queue_base_v8( size_t item_sz ) : concurrent_queue_base_v3( item_sz ) {}
943 
946 
949 
951  void __TBB_EXPORTED_METHOD internal_push_move( const void* src );
952 private:
953  friend struct micro_queue;
954  virtual void move_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) = 0;
955  virtual void move_item( page& dst, size_t index, const void* src ) = 0;
956 };
957 
959 
962 
964 
965  template<typename C, typename T, typename U>
967 
968  template<typename C, typename T, typename U>
970 
971  void initialize( const concurrent_queue_base_v3& queue, size_t offset_of_data );
972 protected:
974  void* my_item;
975 
978 
981  assign(i);
982  }
983 
985  assign(i);
986  return *this;
987  }
988 
990 
992 
995 
998 
1001 
1004 };
1005 
1007 
1009 
1011 template<typename Container, typename Value>
1013  public std::iterator<std::forward_iterator_tag,Value> {
1014 
1015 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
1016  template<typename T, class A>
1017  friend class ::tbb::concurrent_bounded_queue;
1018 #else
1019 public:
1020 #endif
1021 
1025  {
1026  }
1027 
1028 public:
1030 
1035  {}
1036 
1040  return *this;
1041  }
1042 
1044  Value& operator*() const {
1045  return *static_cast<Value*>(my_item);
1046  }
1047 
1048  Value* operator->() const {return &operator*();}
1049 
1052  advance();
1053  return *this;
1054  }
1055 
1057  Value* operator++(int) {
1058  Value* result = &operator*();
1059  operator++();
1060  return result;
1061  }
1062 }; // concurrent_queue_iterator
1063 
1064 
1065 template<typename C, typename T, typename U>
1067  return i.my_item==j.my_item;
1068 }
1069 
1070 template<typename C, typename T, typename U>
1072  return i.my_item!=j.my_item;
1073 }
1074 
1075 } // namespace internal;
1076 
1078 
1079 } // namespace tbb
1080 
1081 #endif /* __TBB__concurrent_queue_impl_H */
concurrent_queue_iterator(const concurrent_queue_base_v3 &queue)
Construct iterator pointing to head of queue.
argument_integer_type modulo_power_of_two(argument_integer_type arg, divisor_integer_type divisor)
A function to compute arg modulo divisor where divisor is a power of 2.
Definition: tbb_stddef.h:382
bool get_item(T *&item, size_t k)
Set item to point to kth element. Return true if at end of queue or item is marked valid; false other...
static size_t index(ticket k)
Map ticket to an array index.
parts of concurrent_queue_rep that do not have references to micro_queue
void assign(const concurrent_queue_iterator_base_v3< Value > &other)
Assignment.
concurrent_queue_iterator_rep * my_rep
concurrent_queue over which we are iterating.
micro_queue & assign(const micro_queue &src, concurrent_queue_base_v3< T > &base, item_constructor_t construct_item)
void call_itt_notify(notify_type, void *)
void internal_swap(concurrent_queue_base_v3 &src)
swap internal representation
Abstract class to define interface for page allocation/deallocation.
void spin_wait_while_eq(const volatile T &location, U value)
Spin WHILE the value of the variable is equal to a given value.
Definition: tbb_machine.h:391
A lock that occupies a single byte.
Definition: spin_mutex.h:39
micro_queue< T >::item_constructor_t item_constructor_t
concurrent_queue_iterator_base_v3(const concurrent_queue_iterator_base_v3 &i)
Copy constructor.
bool operator==(const concurrent_queue_iterator< C, T > &i, const concurrent_queue_iterator< C, U > &j)
#define __TBB_RETHROW()
Definition: tbb_stddef.h:286
concurrent_queue_rep< T > * my_rep
Internal representation.
Base class for types that should not be assigned.
Definition: tbb_stddef.h:322
void itt_store_word_with_release(tbb::atomic< T > &dst, U src)
bool internal_try_pop(void *dst)
Attempt to dequeue item from queue.
concurrent_queue_iterator_base_v3 & operator=(const concurrent_queue_iterator_base_v3 &i)
virtual void move_item(page &dst, size_t index, const void *src)=0
void pause()
Pause for a while.
Definition: tbb_machine.h:360
Meets requirements of a forward iterator for STL.
concurrent_queue_iterator(const concurrent_queue_iterator< Container, typename Container::value_type > &other)
bool operator!=(const concurrent_queue_iterator< C, T > &i, const concurrent_queue_iterator< C, U > &j)
concurrent_queue_iterator_rep(const concurrent_queue_base_v3< T > &queue)
void copy_item(page &dst, size_t dindex, const void *src, item_constructor_t construct_item)
static T & get_ref(page &p, size_t index)
auto last(Container &c) -> decltype(begin(c))
size_t internal_size() const
Get size of queue; result may be invalid if queue is modified concurrently.
page * make_copy(concurrent_queue_base_v3< T > &base, const page *src_page, size_t begin_in_page, size_t end_in_page, ticket &g_index, item_constructor_t construct_item)
void spin_wait_until_eq(const volatile T &location, const U value)
Spin UNTIL the value of the variable is equal to a given value.
Definition: tbb_machine.h:399
void internal_push(const void *src, item_constructor_t construct_item)
Enqueue item at tail of queue.
void advance()
Advance iterator one step towards tail of queue.
bool is_valid_page(const concurrent_queue_rep_base::page *p)
ptrdiff_t my_capacity
Capacity of the queue.
Meets requirements of a forward iterator for STL.
concurrent_queue_iterator(const concurrent_queue_iterator< Container, typename Container::value_type > &other)
friend bool operator==(const concurrent_queue_iterator< C, T > &i, const concurrent_queue_iterator< C, U > &j)
void __TBB_EXPORTED_METHOD move_content(concurrent_queue_base_v8 &src)
move items
Value & operator *() const
Reference to current item.
concurrent_queue_iterator & operator=(const concurrent_queue_iterator< Container, typename Container::value_type > &other)
Iterator assignment.
Internal representation of a ConcurrentQueue.
void assign(const concurrent_queue_base_v3 &src, item_constructor_t construct_item)
copy or move internal representation
virtual void move_page_item(page &dst, size_t dindex, const page &src, size_t sindex)=0
concurrent_queue_iterator_rep< Value > * my_rep
Represents concurrent_queue over which we are iterating.
concurrent_queue_iterator & operator++()
Advance to next item in queue.
virtual void deallocate_page(concurrent_queue_rep_base::page *p) __TBB_override
atomic< size_t > n_invalid_entries
number of invalid entries in the queue
void push(const void *item, ticket k, concurrent_queue_base_v3< T > &base, item_constructor_t construct_item)
Type-independent portion of concurrent_queue_iterator.
#define __TBB_compiler_fence()
Definition: icc_generic.h:51
friend bool operator!=(const concurrent_queue_iterator< C, T > &i, const concurrent_queue_iterator< C, U > &j)
void const char const char int ITT_FORMAT __itt_group_sync p
concurrent_queue_rep * my_rep
Internal representation.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d
Constness-independent portion of concurrent_queue_iterator.
void spin_wait_until_my_turn(atomic< ticket > &counter, ticket k, concurrent_queue_rep_base &rb) const
concurrent_queue_iterator_base_v3(const concurrent_queue_iterator_base_v3 &i)
Copy constructor.
#define __TBB_offsetof(class_name, member_name)
Extended variant of the standard offsetof macro.
Definition: tbb_stddef.h:266
Class used to ensure exception-safety of method "pop".
A queue using simple locking.
void copy_item(page &dst, size_t dindex, const page &src, size_t sindex, item_constructor_t construct_item)
Represents acquisition of a mutex.
Definition: spin_mutex.h:53
const size_t NFS_MaxLineSize
Compile-time constant that is upper bound on cache line/sector size.
Definition: tbb_stddef.h:216
Base class for types that should not be copied or assigned.
Definition: tbb_stddef.h:330
virtual concurrent_queue_rep_base::page * allocate_page()=0
concurrent_queue_iterator_base_v3 & operator=(const concurrent_queue_iterator_base_v3 &i)
Class that implements exponential backoff.
Definition: tbb_machine.h:345
#define __TBB_override
Definition: tbb_stddef.h:240
bool internal_empty() const
check if the queue is empty; thread safe
void operator=(const padded_page &)
Not defined anywhere - exists to quiet warnings.
void swap(atomic< T > &lhs, atomic< T > &rhs)
Definition: atomic.h:564
void assign_and_destroy_item(void *dst, page &src, size_t index)
Meets "allocator" requirements of ISO C++ Standard, Section 20.1.5.
#define __TBB_CATCH(e)
Definition: tbb_stddef.h:284
#define __TBB_EXPORTED_METHOD
Definition: tbb_stddef.h:98
#define __TBB_TRY
Definition: tbb_stddef.h:283
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
concurrent_queue_iterator & operator=(const concurrent_queue_iterator< Container, typename Container::value_type > &other)
Iterator assignment.
void itt_hide_store_word(T &dst, T src)
concurrent_queue_iterator & operator++()
Advance to next item in queue.
void throw_exception(exception_id eid)
Versionless convenience wrapper for throw_exception_v4()
concurrent_queue_rep_base::page page
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
representation of concurrent_queue_base
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type type
bool __TBB_EXPORTED_METHOD internal_push_move_if_not_full(const void *src)
Attempt to enqueue item onto queue using move operation.
The graph class.
concurrent_queue_iterator_base_v3 concurrent_queue_iterator_base
padded_page()
Not defined anywhere - exists to quiet warnings.
size_t __TBB_EXPORTED_FUNC NFS_GetLineSize()
Cache/sector line size.
micro_queue_pop_finalizer(micro_queue< T > &queue, concurrent_queue_base_v3< T > &b, ticket k, page *p)
void __TBB_EXPORTED_METHOD internal_push_move(const void *src)
Enqueue item at tail of queue using move operation.
Identifiers declared inside namespace internal should never be used directly by client code.
Definition: atomic.h:65
bool pop(void *dst, ticket k, concurrent_queue_base_v3< T > &base)
void move(tbb_thread &t1, tbb_thread &t2)
Definition: tbb_thread.h:319

Copyright © 2005-2020 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.