00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #ifndef __TBB_concurrent_queue_internal_H
00022 #define __TBB_concurrent_queue_internal_H
00023
00024 #include "tbb_stddef.h"
00025 #include "tbb_machine.h"
00026 #include "atomic.h"
00027 #include "spin_mutex.h"
00028 #include "cache_aligned_allocator.h"
00029 #include "tbb_exception.h"
00030 #include <iterator>
00031 #include <new>
00032
00033 namespace tbb {
00034
00035 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
00036
00037
00038 namespace strict_ppl {
00039 template<typename T, typename A> class concurrent_queue;
00040 }
00041
00042 template<typename T, typename A> class concurrent_bounded_queue;
00043
00044 namespace deprecated {
00045 template<typename T, typename A> class concurrent_queue;
00046 }
00047 #endif
00048
00050 namespace strict_ppl {
00051
00053 namespace internal {
00054
00055 using namespace tbb::internal;
00056
00057 typedef size_t ticket;
00058
00059 static void* invalid_page;
00060
00061 template<typename T> class micro_queue ;
00062 template<typename T> class micro_queue_pop_finalizer ;
00063 template<typename T> class concurrent_queue_base_v3;
00064
00066
00069 struct concurrent_queue_rep_base : no_copy {
00070 template<typename T> friend class micro_queue;
00071 template<typename T> friend class concurrent_queue_base_v3;
00072
00073 protected:
00075 static const size_t phi = 3;
00076
00077 public:
00078
00079 static const size_t n_queue = 8;
00080
00082 struct page {
00083 page* next;
00084 uintptr_t mask;
00085 };
00086
00087 atomic<ticket> head_counter;
00088 char pad1[NFS_MaxLineSize-sizeof(atomic<ticket>)];
00089 atomic<ticket> tail_counter;
00090 char pad2[NFS_MaxLineSize-sizeof(atomic<ticket>)];
00091
00093 size_t items_per_page;
00094
00096 size_t item_size;
00097
00099 atomic<size_t> n_invalid_entries;
00100
00101 char pad3[NFS_MaxLineSize-sizeof(size_t)-sizeof(size_t)-sizeof(atomic<size_t>)];
00102 } ;
00103
00105
00108 class concurrent_queue_page_allocator
00109 {
00110 template<typename T> friend class micro_queue ;
00111 template<typename T> friend class micro_queue_pop_finalizer ;
00112 protected:
00113 virtual ~concurrent_queue_page_allocator() {}
00114 private:
00115 virtual concurrent_queue_rep_base::page* allocate_page() = 0;
00116 virtual void deallocate_page( concurrent_queue_rep_base::page* p ) = 0;
00117 } ;
00118
00119 #if _MSC_VER && !defined(__INTEL_COMPILER)
00120
00121 #pragma warning( push )
00122 #pragma warning( disable: 4146 )
00123 #endif
00124
00126
00128 template<typename T>
00129 class micro_queue : no_copy {
00130 typedef concurrent_queue_rep_base::page page;
00131
00133 class destroyer: no_copy {
00134 T& my_value;
00135 public:
00136 destroyer( T& value ) : my_value(value) {}
00137 ~destroyer() {my_value.~T();}
00138 };
00139
00140 T& get_ref( page& page, size_t index ) {
00141 return static_cast<T*>(static_cast<void*>(&page+1))[index];
00142 }
00143
00144 void copy_item( page& dst, size_t index, const void* src ) {
00145 new( &get_ref(dst,index) ) T(*static_cast<const T*>(src));
00146 }
00147
00148 void copy_item( page& dst, size_t dindex, const page& src, size_t sindex ) {
00149 new( &get_ref(dst,dindex) ) T( static_cast<const T*>(static_cast<const void*>(&src+1))[sindex] );
00150 }
00151
00152 void assign_and_destroy_item( void* dst, page& src, size_t index ) {
00153 T& from = get_ref(src,index);
00154 destroyer d(from);
00155 *static_cast<T*>(dst) = from;
00156 }
00157
00158 void spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const ;
00159
00160 public:
00161 friend class micro_queue_pop_finalizer<T>;
00162
00163 atomic<page*> head_page;
00164 atomic<ticket> head_counter;
00165
00166 atomic<page*> tail_page;
00167 atomic<ticket> tail_counter;
00168
00169 spin_mutex page_mutex;
00170
00171 void push( const void* item, ticket k, concurrent_queue_base_v3<T>& base ) ;
00172
00173 bool pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) ;
00174
00175 micro_queue& assign( const micro_queue& src, concurrent_queue_base_v3<T>& base ) ;
00176
00177 page* make_copy( concurrent_queue_base_v3<T>& base, const page* src_page, size_t begin_in_page, size_t end_in_page, ticket& g_index ) ;
00178
00179 void make_invalid( ticket k ) ;
00180 };
00181
00182 template<typename T>
00183 void micro_queue<T>::spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const {
00184 atomic_backoff backoff;
00185 do {
00186 backoff.pause();
00187 if( counter&0x1 ) {
00188 ++rb.n_invalid_entries;
00189 throw_bad_last_alloc_exception_v4();
00190 }
00191 } while( counter!=k ) ;
00192 }
00193
00194 template<typename T>
00195 void micro_queue<T>::push( const void* item, ticket k, concurrent_queue_base_v3<T>& base ) {
00196 k &= -concurrent_queue_rep_base::n_queue;
00197 page* p = NULL;
00198 size_t index = k/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
00199 if( !index ) {
00200 try {
00201 concurrent_queue_page_allocator& pa = base;
00202 p = pa.allocate_page();
00203 } catch (...) {
00204 ++base.my_rep->n_invalid_entries;
00205 make_invalid( k );
00206 }
00207 p->mask = 0;
00208 p->next = NULL;
00209 }
00210
00211 if( tail_counter!=k ) spin_wait_until_my_turn( tail_counter, k, *base.my_rep );
00212
00213 if( p ) {
00214 spin_mutex::scoped_lock lock( page_mutex );
00215 if( page* q = tail_page )
00216 q->next = p;
00217 else
00218 head_page = p;
00219 tail_page = p;
00220 } else {
00221 p = tail_page;
00222 }
00223
00224 try {
00225 copy_item( *p, index, item );
00226
00227 p->mask |= uintptr_t(1)<<index;
00228 tail_counter += concurrent_queue_rep_base::n_queue;
00229 } catch (...) {
00230 ++base.my_rep->n_invalid_entries;
00231 tail_counter += concurrent_queue_rep_base::n_queue;
00232 throw;
00233 }
00234 }
00235
00236 template<typename T>
00237 bool micro_queue<T>::pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) {
00238 k &= -concurrent_queue_rep_base::n_queue;
00239 if( head_counter!=k ) spin_wait_until_eq( head_counter, k );
00240 if( tail_counter==k ) spin_wait_while_eq( tail_counter, k );
00241 page& p = *head_page;
00242 __TBB_ASSERT( &p, NULL );
00243 size_t index = k/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
00244 bool success = false;
00245 {
00246 micro_queue_pop_finalizer<T> finalizer( *this, base, k+concurrent_queue_rep_base::n_queue, index==base.my_rep->items_per_page-1 ? &p : NULL );
00247 if( p.mask & uintptr_t(1)<<index ) {
00248 success = true;
00249 assign_and_destroy_item( dst, p, index );
00250 } else {
00251 --base.my_rep->n_invalid_entries;
00252 }
00253 }
00254 return success;
00255 }
00256
00257 template<typename T>
00258 micro_queue<T>& micro_queue<T>::assign( const micro_queue<T>& src, concurrent_queue_base_v3<T>& base ) {
00259 head_counter = src.head_counter;
00260 tail_counter = src.tail_counter;
00261 page_mutex = src.page_mutex;
00262
00263 const page* srcp = src.head_page;
00264 if( srcp ) {
00265 ticket g_index = head_counter;
00266 try {
00267 size_t n_items = (tail_counter-head_counter)/concurrent_queue_rep_base::n_queue;
00268 size_t index = head_counter/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
00269 size_t end_in_first_page = (index+n_items<base.my_rep->items_per_page)?(index+n_items):base.my_rep->items_per_page;
00270
00271 head_page = make_copy( base, srcp, index, end_in_first_page, g_index );
00272 page* cur_page = head_page;
00273
00274 if( srcp != src.tail_page ) {
00275 for( srcp = srcp->next; srcp!=src.tail_page; srcp=srcp->next ) {
00276 cur_page->next = make_copy( base, srcp, 0, base.my_rep->items_per_page, g_index );
00277 cur_page = cur_page->next;
00278 }
00279
00280 __TBB_ASSERT( srcp==src.tail_page, NULL );
00281 size_t last_index = tail_counter/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
00282 if( last_index==0 ) last_index = base.my_rep->items_per_page;
00283
00284 cur_page->next = make_copy( base, srcp, 0, last_index, g_index );
00285 cur_page = cur_page->next;
00286 }
00287 tail_page = cur_page;
00288 } catch (...) {
00289 make_invalid( g_index );
00290 }
00291 } else {
00292 head_page = tail_page = NULL;
00293 }
00294 return *this;
00295 }
00296
00297 template<typename T>
00298 void micro_queue<T>::make_invalid( ticket k ) {
00299 static page dummy = {static_cast<page*>((void*)1), 0};
00300
00301 invalid_page = &dummy;
00302 {
00303 spin_mutex::scoped_lock lock( page_mutex );
00304 tail_counter = k+concurrent_queue_rep_base::n_queue+1;
00305 if( page* q = tail_page )
00306 q->next = static_cast<page*>(invalid_page);
00307 else
00308 head_page = static_cast<page*>(invalid_page);
00309 tail_page = static_cast<page*>(invalid_page);
00310 }
00311 throw;
00312 }
00313
00314 template<typename T>
00315 concurrent_queue_rep_base::page* micro_queue<T>::make_copy( concurrent_queue_base_v3<T>& base, const concurrent_queue_rep_base::page* src_page, size_t begin_in_page, size_t end_in_page, ticket& g_index ) {
00316 concurrent_queue_page_allocator& pa = base;
00317 page* new_page = pa.allocate_page();
00318 new_page->next = NULL;
00319 new_page->mask = src_page->mask;
00320 for( ; begin_in_page!=end_in_page; ++begin_in_page, ++g_index )
00321 if( new_page->mask & uintptr_t(1)<<begin_in_page )
00322 copy_item( *new_page, begin_in_page, *src_page, begin_in_page );
00323 return new_page;
00324 }
00325
00326 template<typename T>
00327 class micro_queue_pop_finalizer: no_copy {
00328 typedef concurrent_queue_rep_base::page page;
00329 ticket my_ticket;
00330 micro_queue<T>& my_queue;
00331 page* my_page;
00332 concurrent_queue_page_allocator& allocator;
00333 public:
00334 micro_queue_pop_finalizer( micro_queue<T>& queue, concurrent_queue_base_v3<T>& b, ticket k, page* p ) :
00335 my_ticket(k), my_queue(queue), my_page(p), allocator(b)
00336 {}
00337 ~micro_queue_pop_finalizer() ;
00338 };
00339
00340 template<typename T>
00341 micro_queue_pop_finalizer<T>::~micro_queue_pop_finalizer() {
00342 page* p = my_page;
00343 if( p ) {
00344 spin_mutex::scoped_lock lock( my_queue.page_mutex );
00345 page* q = p->next;
00346 my_queue.head_page = q;
00347 if( !q ) {
00348 my_queue.tail_page = NULL;
00349 }
00350 }
00351 my_queue.head_counter = my_ticket;
00352 if( p ) {
00353 allocator.deallocate_page( p );
00354 }
00355 }
00356
00357 #if _MSC_VER && !defined(__INTEL_COMPILER)
00358 #pragma warning( pop )
00359 #endif // warning 4146 is back
00360
00361 template<typename T> class concurrent_queue_iterator_rep ;
00362 template<typename T> class concurrent_queue_iterator_base_v3;
00363
00365
00368 template<typename T>
00369 struct concurrent_queue_rep : public concurrent_queue_rep_base {
00370 micro_queue<T> array[n_queue];
00371
00373 static size_t index( ticket k ) {
00374 return k*phi%n_queue;
00375 }
00376
00377 micro_queue<T>& choose( ticket k ) {
00378
00379 return array[index(k)];
00380 }
00381 };
00382
00384
00388 template<typename T>
00389 class concurrent_queue_base_v3: public concurrent_queue_page_allocator {
00391 concurrent_queue_rep<T>* my_rep;
00392
00393 friend struct concurrent_queue_rep<T>;
00394 friend class micro_queue<T>;
00395 friend class concurrent_queue_iterator_rep<T>;
00396 friend class concurrent_queue_iterator_base_v3<T>;
00397
00398 protected:
00399 typedef typename concurrent_queue_rep<T>::page page;
00400
00401 private:
00402 virtual page *allocate_page() {
00403 concurrent_queue_rep<T>& r = *my_rep;
00404 size_t n = sizeof(page) + r.items_per_page*r.item_size;
00405 return reinterpret_cast<page*>(allocate_block ( n ));
00406 }
00407
00408 virtual void deallocate_page( concurrent_queue_rep_base::page *p ) {
00409 concurrent_queue_rep<T>& r = *my_rep;
00410 size_t n = sizeof(page) + r.items_per_page*r.item_size;
00411 deallocate_block( reinterpret_cast<void*>(p), n );
00412 }
00413
00415 virtual void *allocate_block( size_t n ) = 0;
00416
00418 virtual void deallocate_block( void *p, size_t n ) = 0;
00419
00420 protected:
00421 concurrent_queue_base_v3( size_t item_size ) ;
00422
00423 virtual ~concurrent_queue_base_v3() {
00424 size_t nq = my_rep->n_queue;
00425 for( size_t i=0; i<nq; i++ )
00426 __TBB_ASSERT( my_rep->array[i].tail_page==NULL, "pages were not freed properly" );
00427 cache_aligned_allocator<concurrent_queue_rep<T> >().deallocate(my_rep,1);
00428 }
00429
00431 void internal_push( const void* src ) {
00432 concurrent_queue_rep<T>& r = *my_rep;
00433 ticket k = r.tail_counter++;
00434 r.choose(k).push( src, k, *this );
00435 }
00436
00438
00439 bool internal_try_pop( void* dst ) ;
00440
00442 size_t internal_size() const ;
00443
00445 bool internal_empty() const ;
00446
00448
00449 void internal_finish_clear() ;
00450
00452 void internal_throw_exception() const {
00453 throw std::bad_alloc();
00454 }
00455
00457 void assign( const concurrent_queue_base_v3& src ) ;
00458 };
00459
00460 template<typename T>
00461 concurrent_queue_base_v3<T>::concurrent_queue_base_v3( size_t item_size ) {
00462 my_rep = cache_aligned_allocator<concurrent_queue_rep<T> >().allocate(1);
00463 __TBB_ASSERT( (size_t)my_rep % NFS_GetLineSize()==0, "alignment error" );
00464 __TBB_ASSERT( (size_t)&my_rep->head_counter % NFS_GetLineSize()==0, "alignment error" );
00465 __TBB_ASSERT( (size_t)&my_rep->tail_counter % NFS_GetLineSize()==0, "alignment error" );
00466 __TBB_ASSERT( (size_t)&my_rep->array % NFS_GetLineSize()==0, "alignment error" );
00467 memset(my_rep,0,sizeof(concurrent_queue_rep<T>));
00468 my_rep->item_size = item_size;
00469 my_rep->items_per_page = item_size<=8 ? 32 :
00470 item_size<=16 ? 16 :
00471 item_size<=32 ? 8 :
00472 item_size<=64 ? 4 :
00473 item_size<=128 ? 2 :
00474 1;
00475 }
00476
00477 template<typename T>
00478 bool concurrent_queue_base_v3<T>::internal_try_pop( void* dst ) {
00479 concurrent_queue_rep<T>& r = *my_rep;
00480 ticket k;
00481 do {
00482 k = r.head_counter;
00483 for(;;) {
00484 if( r.tail_counter<=k ) {
00485
00486 return false;
00487 }
00488
00489 ticket tk=k;
00490 #if defined(_MSC_VER) && defined(_Wp64)
00491 #pragma warning (push)
00492 #pragma warning (disable: 4267)
00493 #endif
00494 k = r.head_counter.compare_and_swap( tk+1, tk );
00495 #if defined(_MSC_VER) && defined(_Wp64)
00496 #pragma warning (pop)
00497 #endif
00498 if( k==tk )
00499 break;
00500
00501 }
00502 } while( !r.choose( k ).pop( dst, k, *this ) );
00503 return true;
00504 }
00505
00506 template<typename T>
00507 size_t concurrent_queue_base_v3<T>::internal_size() const {
00508 concurrent_queue_rep<T>& r = *my_rep;
00509 __TBB_ASSERT( sizeof(ptrdiff_t)<=sizeof(size_t), NULL );
00510 ticket hc = r.head_counter;
00511 size_t nie = r.n_invalid_entries;
00512 ticket tc = r.tail_counter;
00513 __TBB_ASSERT( hc!=tc || !nie, NULL );
00514 ptrdiff_t sz = tc-hc-nie;
00515 return sz<0 ? 0 : size_t(sz);
00516 }
00517
00518 template<typename T>
00519 bool concurrent_queue_base_v3<T>::internal_empty() const {
00520 concurrent_queue_rep<T>& r = *my_rep;
00521 ticket tc = r.tail_counter;
00522 ticket hc = r.head_counter;
00523
00524 return tc==r.tail_counter && tc==hc+r.n_invalid_entries ;
00525 }
00526
00527 template<typename T>
00528 void concurrent_queue_base_v3<T>::internal_finish_clear() {
00529 concurrent_queue_rep<T>& r = *my_rep;
00530 size_t nq = r.n_queue;
00531 for( size_t i=0; i<nq; ++i ) {
00532 page* tp = r.array[i].tail_page;
00533 __TBB_ASSERT( r.array[i].head_page==tp, "at most one page should remain" );
00534 if( tp!=NULL) {
00535 if( tp!=invalid_page ) deallocate_page( tp );
00536 r.array[i].tail_page = NULL;
00537 }
00538 }
00539 }
00540
00541 template<typename T>
00542 void concurrent_queue_base_v3<T>::assign( const concurrent_queue_base_v3& src ) {
00543 concurrent_queue_rep<T>& r = *my_rep;
00544 r.items_per_page = src.my_rep->items_per_page;
00545
00546
00547 r.head_counter = src.my_rep->head_counter;
00548 r.tail_counter = src.my_rep->tail_counter;
00549 r.n_invalid_entries = src.my_rep->n_invalid_entries;
00550
00551
00552 for( size_t i = 0; i<r.n_queue; ++i )
00553 r.array[i].assign( src.my_rep->array[i], *this);
00554
00555 __TBB_ASSERT( r.head_counter==src.my_rep->head_counter && r.tail_counter==src.my_rep->tail_counter,
00556 "the source concurrent queue should not be concurrently modified." );
00557 }
00558
00559 template<typename Container, typename Value> class concurrent_queue_iterator;
00560
00561 template<typename T>
00562 class concurrent_queue_iterator_rep: no_assign {
00563 public:
00564 ticket head_counter;
00565 const concurrent_queue_base_v3<T>& my_queue;
00566 typename concurrent_queue_base_v3<T>::page* array[concurrent_queue_rep<T>::n_queue];
00567 concurrent_queue_iterator_rep( const concurrent_queue_base_v3<T>& queue ) :
00568 head_counter(queue.my_rep->head_counter),
00569 my_queue(queue)
00570 {
00571 for( size_t k=0; k<concurrent_queue_rep<T>::n_queue; ++k )
00572 array[k] = queue.my_rep->array[k].head_page;
00573 }
00574
00576 bool get_item( void*& item, size_t k ) ;
00577 };
00578
00579 template<typename T>
00580 bool concurrent_queue_iterator_rep<T>::get_item( void*& item, size_t k ) {
00581 if( k==my_queue.my_rep->tail_counter ) {
00582 item = NULL;
00583 return true;
00584 } else {
00585 typename concurrent_queue_base_v3<T>::page* p = array[concurrent_queue_rep<T>::index(k)];
00586 __TBB_ASSERT(p,NULL);
00587 size_t i = k/concurrent_queue_rep<T>::n_queue & (my_queue.my_rep->items_per_page-1);
00588 item = static_cast<unsigned char*>(static_cast<void*>(p+1)) + my_queue.my_rep->item_size*i;
00589 return (p->mask & uintptr_t(1)<<i)!=0;
00590 }
00591 }
00592
00594
00595 template<typename Value>
00596 class concurrent_queue_iterator_base_v3 : no_assign {
00598
00599 concurrent_queue_iterator_rep<Value>* my_rep;
00600
00601 template<typename C, typename T, typename U>
00602 friend bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
00603
00604 template<typename C, typename T, typename U>
00605 friend bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
00606 protected:
00608 mutable void* my_item;
00609
00610 public:
00612 concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {
00613 #if __GNUC__==4&&__GNUC_MINOR__==3
00614
00615 __asm__ __volatile__("": : :"memory");
00616 #endif
00617 }
00618
00620 concurrent_queue_iterator_base_v3( const concurrent_queue_iterator_base_v3& i ) : my_rep(NULL), my_item(NULL) {
00621 assign(i);
00622 }
00623
00625 concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3<Value>& queue ) ;
00626
00627 protected:
00629 void assign( const concurrent_queue_iterator_base_v3<Value>& other ) ;
00630
00632 void advance() ;
00633
00635 ~concurrent_queue_iterator_base_v3() {
00636 cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().deallocate(my_rep, 1);
00637 my_rep = NULL;
00638 }
00639 };
00640
00641 template<typename Value>
00642 concurrent_queue_iterator_base_v3<Value>::concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3<Value>& queue ) {
00643 my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().allocate(1);
00644 new( my_rep ) concurrent_queue_iterator_rep<Value>(queue);
00645 size_t k = my_rep->head_counter;
00646 if( !my_rep->get_item(my_item, k) ) advance();
00647 }
00648
00649 template<typename Value>
00650 void concurrent_queue_iterator_base_v3<Value>::assign( const concurrent_queue_iterator_base_v3<Value>& other ) {
00651 if( my_rep!=other.my_rep ) {
00652 if( my_rep ) {
00653 cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().deallocate(my_rep, 1);
00654 my_rep = NULL;
00655 }
00656 if( other.my_rep ) {
00657 my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().allocate(1);
00658 new( my_rep ) concurrent_queue_iterator_rep<Value>( *other.my_rep );
00659 }
00660 }
00661 my_item = other.my_item;
00662 }
00663
00664 template<typename Value>
00665 void concurrent_queue_iterator_base_v3<Value>::advance() {
00666 __TBB_ASSERT( my_item, "attempt to increment iterator past end of queue" );
00667 size_t k = my_rep->head_counter;
00668 const concurrent_queue_base_v3<Value>& queue = my_rep->my_queue;
00669 #if TBB_USE_ASSERT
00670 void* tmp;
00671 my_rep->get_item(tmp,k);
00672 __TBB_ASSERT( my_item==tmp, NULL );
00673 #endif
00674 size_t i = k/concurrent_queue_rep<Value>::n_queue & (queue.my_rep->items_per_page-1);
00675 if( i==queue.my_rep->items_per_page-1 ) {
00676 typename concurrent_queue_base_v3<Value>::page*& root = my_rep->array[concurrent_queue_rep<Value>::index(k)];
00677 root = root->next;
00678 }
00679
00680 my_rep->head_counter = ++k;
00681 if( !my_rep->get_item(my_item, k) ) advance();
00682 }
00683
00684 template<typename T>
00685 static inline const concurrent_queue_iterator_base_v3<const T>& add_constness( const concurrent_queue_iterator_base_v3<T>& q )
00686 {
00687 return *reinterpret_cast<const concurrent_queue_iterator_base_v3<const T> *>(&q) ;
00688 }
00689
00691
00693 template<typename Container, typename Value>
00694 class concurrent_queue_iterator: public concurrent_queue_iterator_base_v3<Value>,
00695 public std::iterator<std::forward_iterator_tag,Value> {
00696 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
00697 template<typename T, class A>
00698 friend class ::tbb::strict_ppl::concurrent_queue;
00699 #else
00700 public:
00701 #endif
00703 concurrent_queue_iterator( const concurrent_queue_base_v3<Value>& queue ) :
00704 concurrent_queue_iterator_base_v3<Value>(queue)
00705 {
00706 }
00707
00708 public:
00709 concurrent_queue_iterator() {}
00710
00712 concurrent_queue_iterator( const concurrent_queue_iterator<Container,Value>& other ) :
00713 concurrent_queue_iterator_base_v3<Value>(other)
00714 {
00715 }
00716
00717 template<typename T>
00718 concurrent_queue_iterator( const concurrent_queue_iterator<Container,T>& other ) :
00719 concurrent_queue_iterator_base_v3<Value>(add_constness(other))
00720 {
00721 }
00722
00724 concurrent_queue_iterator& operator=( const concurrent_queue_iterator& other ) {
00725 assign(other);
00726 return *this;
00727 }
00728
00730 Value& operator*() const {
00731 return *static_cast<Value*>(this->my_item);
00732 }
00733
00734 Value* operator->() const {return &operator*();}
00735
00737 concurrent_queue_iterator& operator++() {
00738 this->advance();
00739 return *this;
00740 }
00741
00743 Value* operator++(int) {
00744 Value* result = &operator*();
00745 operator++();
00746 return result;
00747 }
00748 };
00749
00750
00751 template<typename C, typename T, typename U>
00752 bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
00753 return i.my_item==j.my_item;
00754 }
00755
00756 template<typename C, typename T, typename U>
00757 bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
00758 return i.my_item!=j.my_item;
00759 }
00760
00761 }
00762
00764
00765 }
00766
00768 namespace internal {
00769
00770 class concurrent_queue_rep;
00771 class concurrent_queue_iterator_rep;
00772 class concurrent_queue_iterator_base_v3;
00773 template<typename Container, typename Value> class concurrent_queue_iterator;
00774
00776
00778 class concurrent_queue_base_v3: no_copy {
00780 concurrent_queue_rep* my_rep;
00781
00782 friend class concurrent_queue_rep;
00783 friend struct micro_queue;
00784 friend class micro_queue_pop_finalizer;
00785 friend class concurrent_queue_iterator_rep;
00786 friend class concurrent_queue_iterator_base_v3;
00787 protected:
00789 struct page {
00790 page* next;
00791 uintptr_t mask;
00792 };
00793
00795 ptrdiff_t my_capacity;
00796
00798 size_t items_per_page;
00799
00801 size_t item_size;
00802
00803 private:
00804 virtual void copy_item( page& dst, size_t index, const void* src ) = 0;
00805 virtual void assign_and_destroy_item( void* dst, page& src, size_t index ) = 0;
00806 protected:
00807 __TBB_EXPORTED_METHOD concurrent_queue_base_v3( size_t item_size );
00808 virtual __TBB_EXPORTED_METHOD ~concurrent_queue_base_v3();
00809
00811 void __TBB_EXPORTED_METHOD internal_push( const void* src );
00812
00814 void __TBB_EXPORTED_METHOD internal_pop( void* dst );
00815
00817 bool __TBB_EXPORTED_METHOD internal_push_if_not_full( const void* src );
00818
00820
00821 bool __TBB_EXPORTED_METHOD internal_pop_if_present( void* dst );
00822
00824 ptrdiff_t __TBB_EXPORTED_METHOD internal_size() const;
00825
00827 bool __TBB_EXPORTED_METHOD internal_empty() const;
00828
00830 void __TBB_EXPORTED_METHOD internal_set_capacity( ptrdiff_t capacity, size_t element_size );
00831
00833 virtual page *allocate_page() = 0;
00834
00836 virtual void deallocate_page( page *p ) = 0;
00837
00839
00840 void __TBB_EXPORTED_METHOD internal_finish_clear() ;
00841
00843 void __TBB_EXPORTED_METHOD internal_throw_exception() const;
00844
00846 void __TBB_EXPORTED_METHOD assign( const concurrent_queue_base_v3& src ) ;
00847
00848 private:
00849 virtual void copy_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) = 0;
00850 };
00851
00853
00854 class concurrent_queue_iterator_base_v3 {
00856
00857 concurrent_queue_iterator_rep* my_rep;
00858
00859 template<typename C, typename T, typename U>
00860 friend bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
00861
00862 template<typename C, typename T, typename U>
00863 friend bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
00864 protected:
00866 mutable void* my_item;
00867
00869 concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {}
00870
00872 concurrent_queue_iterator_base_v3( const concurrent_queue_iterator_base_v3& i ) : my_rep(NULL), my_item(NULL) {
00873 assign(i);
00874 }
00875
00877 __TBB_EXPORTED_METHOD concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3& queue );
00878
00880 void __TBB_EXPORTED_METHOD assign( const concurrent_queue_iterator_base_v3& i );
00881
00883 void __TBB_EXPORTED_METHOD advance();
00884
00886 __TBB_EXPORTED_METHOD ~concurrent_queue_iterator_base_v3();
00887 };
00888
00889 typedef concurrent_queue_iterator_base_v3 concurrent_queue_iterator_base;
00890
00892
00894 template<typename Container, typename Value>
00895 class concurrent_queue_iterator: public concurrent_queue_iterator_base,
00896 public std::iterator<std::forward_iterator_tag,Value> {
00897 #if !defined(_MSC_VER) || defined(__INTEL_COMPILER)
00898 template<typename T, class A>
00899 friend class ::tbb::concurrent_bounded_queue;
00900
00901 template<typename T, class A>
00902 friend class ::tbb::deprecated::concurrent_queue;
00903 #else
00904 public:
00905 #endif
00907 concurrent_queue_iterator( const concurrent_queue_base_v3& queue ) :
00908 concurrent_queue_iterator_base_v3(queue)
00909 {
00910 }
00911
00912 public:
00913 concurrent_queue_iterator() {}
00914
00917 concurrent_queue_iterator( const concurrent_queue_iterator<Container,typename Container::value_type>& other ) :
00918 concurrent_queue_iterator_base_v3(other)
00919 {}
00920
00922 concurrent_queue_iterator& operator=( const concurrent_queue_iterator& other ) {
00923 assign(other);
00924 return *this;
00925 }
00926
00928 Value& operator*() const {
00929 return *static_cast<Value*>(my_item);
00930 }
00931
00932 Value* operator->() const {return &operator*();}
00933
00935 concurrent_queue_iterator& operator++() {
00936 advance();
00937 return *this;
00938 }
00939
00941 Value* operator++(int) {
00942 Value* result = &operator*();
00943 operator++();
00944 return result;
00945 }
00946 };
00947
00948
00949 template<typename C, typename T, typename U>
00950 bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
00951 return i.my_item==j.my_item;
00952 }
00953
00954 template<typename C, typename T, typename U>
00955 bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
00956 return i.my_item!=j.my_item;
00957 }
00958
00959 }
00960
00962
00963 }
00964
00965 #endif