_concurrent_queue_internal.h

00001 /*
00002     Copyright 2005-2009 Intel Corporation.  All Rights Reserved.
00003 
00004     The source code contained or described herein and all documents related
00005     to the source code ("Material") are owned by Intel Corporation or its
00006     suppliers or licensors.  Title to the Material remains with Intel
00007     Corporation or its suppliers and licensors.  The Material is protected
00008     by worldwide copyright laws and treaty provisions.  No part of the
00009     Material may be used, copied, reproduced, modified, published, uploaded,
00010     posted, transmitted, distributed, or disclosed in any way without
00011     Intel's prior express written permission.
00012 
00013     No license under any patent, copyright, trade secret or other
00014     intellectual property right is granted to or conferred upon you by
00015     disclosure or delivery of the Materials, either expressly, by
00016     implication, inducement, estoppel or otherwise.  Any license under such
00017     intellectual property rights must be express and approved by Intel in
00018     writing.
00019 */
00020 
00021 #ifndef __TBB_concurrent_queue_internal_H
00022 #define __TBB_concurrent_queue_internal_H
00023 
00024 #include "tbb_stddef.h"
00025 #include "tbb_machine.h"
00026 #include "atomic.h"
00027 #include "spin_mutex.h"
00028 #include "cache_aligned_allocator.h"
00029 #include "tbb_exception.h"
00030 #include <iterator>
00031 #include <new>
00032 
00033 namespace tbb {
00034 
00035 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
00036 
00037 // forward declaration
00038 namespace strict_ppl {
00039 template<typename T, typename A> class concurrent_queue;
00040 }
00041 
00042 template<typename T, typename A> class concurrent_bounded_queue;
00043 
00044 namespace deprecated {
00045 template<typename T, typename A> class concurrent_queue;
00046 }
00047 #endif
00048 
00050 namespace strict_ppl {
00051 
00053 namespace internal {
00054 
00055 using namespace tbb::internal;
00056 
00057 typedef size_t ticket;
00058 
00059 static void* invalid_page;
00060 
00061 template<typename T> class micro_queue ;
00062 template<typename T> class micro_queue_pop_finalizer ;
00063 template<typename T> class concurrent_queue_base_v3;
00064 
00066 
00069 struct concurrent_queue_rep_base : no_copy {
00070     template<typename T> friend class micro_queue;
00071     template<typename T> friend class concurrent_queue_base_v3;
00072 
00073 protected:
00075     static const size_t phi = 3;
00076 
00077 public:
00078     // must be power of 2
00079     static const size_t n_queue = 8;
00080 
00082     struct page {
00083         page* next;
00084         uintptr_t mask; 
00085     };
00086 
00087     atomic<ticket> head_counter;
00088     char pad1[NFS_MaxLineSize-sizeof(atomic<ticket>)];
00089     atomic<ticket> tail_counter;
00090     char pad2[NFS_MaxLineSize-sizeof(atomic<ticket>)];
00091 
00093     size_t items_per_page;
00094 
00096     size_t item_size;
00097 
00099     atomic<size_t> n_invalid_entries;
00100 
00101     char pad3[NFS_MaxLineSize-sizeof(size_t)-sizeof(size_t)-sizeof(atomic<size_t>)];
00102 } ;
00103 
00105 
00108 class concurrent_queue_page_allocator
00109 {
00110     template<typename T> friend class micro_queue ;
00111     template<typename T> friend class micro_queue_pop_finalizer ;
00112 protected:
00113     virtual ~concurrent_queue_page_allocator() {}
00114 private:
00115     virtual concurrent_queue_rep_base::page* allocate_page() = 0;
00116     virtual void deallocate_page( concurrent_queue_rep_base::page* p ) = 0;
00117 } ;
00118 
00119 #if _MSC_VER && !defined(__INTEL_COMPILER)
00120 // unary minus operator applied to unsigned type, result still unsigned
00121 #pragma warning( push )
00122 #pragma warning( disable: 4146 )
00123 #endif
00124 
00126 
00128 template<typename T>
00129 class micro_queue : no_copy {
00130     typedef concurrent_queue_rep_base::page page;
00131 
00133     class destroyer: no_copy {
00134         T& my_value;
00135     public:
00136         destroyer( T& value ) : my_value(value) {}
00137         ~destroyer() {my_value.~T();}          
00138     };
00139 
00140     T& get_ref( page& page, size_t index ) {
00141         return static_cast<T*>(static_cast<void*>(&page+1))[index];
00142     }
00143 
00144     void copy_item( page& dst, size_t index, const void* src ) {
00145         new( &get_ref(dst,index) ) T(*static_cast<const T*>(src)); 
00146     }
00147 
00148     void copy_item( page& dst, size_t dindex, const page& src, size_t sindex ) {
00149         new( &get_ref(dst,dindex) ) T( static_cast<const T*>(static_cast<const void*>(&src+1))[sindex] );
00150     }
00151 
00152     void assign_and_destroy_item( void* dst, page& src, size_t index ) {
00153         T& from = get_ref(src,index);
00154         destroyer d(from);
00155         *static_cast<T*>(dst) = from;
00156     }
00157 
00158     void spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const ;
00159 
00160 public:
00161     friend class micro_queue_pop_finalizer<T>;
00162 
00163     atomic<page*> head_page;
00164     atomic<ticket> head_counter;
00165 
00166     atomic<page*> tail_page;
00167     atomic<ticket> tail_counter;
00168 
00169     spin_mutex page_mutex;
00170     
00171     void push( const void* item, ticket k, concurrent_queue_base_v3<T>& base ) ;
00172 
00173     bool pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) ;
00174 
00175     micro_queue& assign( const micro_queue& src, concurrent_queue_base_v3<T>& base ) ;
00176 
00177     page* make_copy( concurrent_queue_base_v3<T>& base, const page* src_page, size_t begin_in_page, size_t end_in_page, ticket& g_index ) ;
00178 
00179     void make_invalid( ticket k ) ;
00180 };
00181 
00182 template<typename T>
00183 void micro_queue<T>::spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const {
00184     atomic_backoff backoff;
00185     do {
00186         backoff.pause();
00187         if( counter&0x1 ) {
00188             ++rb.n_invalid_entries;
00189             throw_bad_last_alloc_exception_v4();
00190         }
00191     } while( counter!=k ) ;
00192 }
00193 
00194 template<typename T>
00195 void micro_queue<T>::push( const void* item, ticket k, concurrent_queue_base_v3<T>& base ) {
00196     k &= -concurrent_queue_rep_base::n_queue;
00197     page* p = NULL;
00198     size_t index = k/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
00199     if( !index ) {
00200         try {
00201             concurrent_queue_page_allocator& pa = base;
00202             p = pa.allocate_page();
00203         } catch (...) {
00204             ++base.my_rep->n_invalid_entries;
00205             make_invalid( k );
00206         }
00207         p->mask = 0;
00208         p->next = NULL;
00209     }
00210     
00211     if( tail_counter!=k ) spin_wait_until_my_turn( tail_counter, k, *base.my_rep );
00212         
00213     if( p ) {
00214         spin_mutex::scoped_lock lock( page_mutex );
00215         if( page* q = tail_page )
00216             q->next = p;
00217         else
00218             head_page = p; 
00219         tail_page = p;
00220     } else {
00221         p = tail_page;
00222     }
00223    
00224     try {
00225         copy_item( *p, index, item );
00226         // If no exception was thrown, mark item as present.
00227         p->mask |= uintptr_t(1)<<index;
00228         tail_counter += concurrent_queue_rep_base::n_queue; 
00229     } catch (...) {
00230         ++base.my_rep->n_invalid_entries;
00231         tail_counter += concurrent_queue_rep_base::n_queue; 
00232         throw;
00233     }
00234 }
00235 
00236 template<typename T>
00237 bool micro_queue<T>::pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) {
00238     k &= -concurrent_queue_rep_base::n_queue;
00239     if( head_counter!=k ) spin_wait_until_eq( head_counter, k );
00240     if( tail_counter==k ) spin_wait_while_eq( tail_counter, k );
00241     page& p = *head_page;
00242     __TBB_ASSERT( &p, NULL );
00243     size_t index = k/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
00244     bool success = false; 
00245     {
00246         micro_queue_pop_finalizer<T> finalizer( *this, base, k+concurrent_queue_rep_base::n_queue, index==base.my_rep->items_per_page-1 ? &p : NULL ); 
00247         if( p.mask & uintptr_t(1)<<index ) {
00248             success = true;
00249             assign_and_destroy_item( dst, p, index );
00250         } else {
00251             --base.my_rep->n_invalid_entries;
00252         }
00253     }
00254     return success;
00255 }
00256 
00257 template<typename T>
00258 micro_queue<T>& micro_queue<T>::assign( const micro_queue<T>& src, concurrent_queue_base_v3<T>& base ) {
00259     head_counter = src.head_counter;
00260     tail_counter = src.tail_counter;
00261     page_mutex   = src.page_mutex;
00262 
00263     const page* srcp = src.head_page;
00264     if( srcp ) {
00265         ticket g_index = head_counter;
00266         try {
00267             size_t n_items  = (tail_counter-head_counter)/concurrent_queue_rep_base::n_queue;
00268             size_t index = head_counter/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
00269             size_t end_in_first_page = (index+n_items<base.my_rep->items_per_page)?(index+n_items):base.my_rep->items_per_page;
00270 
00271             head_page = make_copy( base, srcp, index, end_in_first_page, g_index );
00272             page* cur_page = head_page;
00273 
00274             if( srcp != src.tail_page ) {
00275                 for( srcp = srcp->next; srcp!=src.tail_page; srcp=srcp->next ) {
00276                     cur_page->next = make_copy( base, srcp, 0, base.my_rep->items_per_page, g_index );
00277                     cur_page = cur_page->next;
00278                 }
00279 
00280                 __TBB_ASSERT( srcp==src.tail_page, NULL );
00281                 size_t last_index = tail_counter/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
00282                 if( last_index==0 ) last_index = base.my_rep->items_per_page;
00283 
00284                 cur_page->next = make_copy( base, srcp, 0, last_index, g_index );
00285                 cur_page = cur_page->next;
00286             }
00287             tail_page = cur_page;
00288         } catch (...) {
00289             make_invalid( g_index );
00290         }
00291     } else {
00292         head_page = tail_page = NULL;
00293     }
00294     return *this;
00295 }
00296 
00297 template<typename T>
00298 void micro_queue<T>::make_invalid( ticket k ) {
00299     static page dummy = {static_cast<page*>((void*)1), 0};
00300     // mark it so that no more pushes are allowed.
00301     invalid_page = &dummy;
00302     {
00303         spin_mutex::scoped_lock lock( page_mutex );
00304         tail_counter = k+concurrent_queue_rep_base::n_queue+1;
00305         if( page* q = tail_page )
00306             q->next = static_cast<page*>(invalid_page);
00307         else
00308             head_page = static_cast<page*>(invalid_page); 
00309         tail_page = static_cast<page*>(invalid_page);
00310     }
00311     throw;
00312 }
00313 
00314 template<typename T>
00315 concurrent_queue_rep_base::page* micro_queue<T>::make_copy( concurrent_queue_base_v3<T>& base, const concurrent_queue_rep_base::page* src_page, size_t begin_in_page, size_t end_in_page, ticket& g_index ) {
00316     concurrent_queue_page_allocator& pa = base;
00317     page* new_page = pa.allocate_page();
00318     new_page->next = NULL;
00319     new_page->mask = src_page->mask;
00320     for( ; begin_in_page!=end_in_page; ++begin_in_page, ++g_index )
00321         if( new_page->mask & uintptr_t(1)<<begin_in_page )
00322             copy_item( *new_page, begin_in_page, *src_page, begin_in_page );
00323     return new_page;
00324 }
00325 
00326 template<typename T>
00327 class micro_queue_pop_finalizer: no_copy {
00328     typedef concurrent_queue_rep_base::page page;
00329     ticket my_ticket;
00330     micro_queue<T>& my_queue;
00331     page* my_page; 
00332     concurrent_queue_page_allocator& allocator;
00333 public:
00334     micro_queue_pop_finalizer( micro_queue<T>& queue, concurrent_queue_base_v3<T>& b, ticket k, page* p ) :
00335         my_ticket(k), my_queue(queue), my_page(p), allocator(b)
00336     {}
00337     ~micro_queue_pop_finalizer() ;
00338 };
00339 
00340 template<typename T>
00341 micro_queue_pop_finalizer<T>::~micro_queue_pop_finalizer() {
00342     page* p = my_page;
00343     if( p ) {
00344         spin_mutex::scoped_lock lock( my_queue.page_mutex );
00345         page* q = p->next;
00346         my_queue.head_page = q;
00347         if( !q ) {
00348             my_queue.tail_page = NULL;
00349         }
00350     }
00351     my_queue.head_counter = my_ticket;
00352     if( p ) {
00353         allocator.deallocate_page( p );
00354     }
00355 }
00356 
00357 #if _MSC_VER && !defined(__INTEL_COMPILER)
00358 #pragma warning( pop )
00359 #endif // warning 4146 is back
00360 
00361 template<typename T> class concurrent_queue_iterator_rep ;
00362 template<typename T> class concurrent_queue_iterator_base_v3;
00363 
00365 
00368 template<typename T>
00369 struct concurrent_queue_rep : public concurrent_queue_rep_base {
00370     micro_queue<T> array[n_queue];
00371 
00373     static size_t index( ticket k ) {
00374         return k*phi%n_queue;
00375     }
00376 
00377     micro_queue<T>& choose( ticket k ) {
00378         // The formula here approximates LRU in a cache-oblivious way.
00379         return array[index(k)];
00380     }
00381 };
00382 
00384 
00388 template<typename T>
00389 class concurrent_queue_base_v3: public concurrent_queue_page_allocator {
00391     concurrent_queue_rep<T>* my_rep;
00392 
00393     friend struct concurrent_queue_rep<T>;
00394     friend class micro_queue<T>;
00395     friend class concurrent_queue_iterator_rep<T>;
00396     friend class concurrent_queue_iterator_base_v3<T>;
00397 
00398 protected:
00399     typedef typename concurrent_queue_rep<T>::page page;
00400 
00401 private:
00402     /* override */ virtual page *allocate_page() {
00403         concurrent_queue_rep<T>& r = *my_rep;
00404         size_t n = sizeof(page) + r.items_per_page*r.item_size;
00405         return reinterpret_cast<page*>(allocate_block ( n ));
00406     }
00407 
00408     /* override */ virtual void deallocate_page( concurrent_queue_rep_base::page *p ) {
00409         concurrent_queue_rep<T>& r = *my_rep;
00410         size_t n = sizeof(page) + r.items_per_page*r.item_size;
00411         deallocate_block( reinterpret_cast<void*>(p), n );
00412     }
00413 
00415     virtual void *allocate_block( size_t n ) = 0;
00416 
00418     virtual void deallocate_block( void *p, size_t n ) = 0;
00419 
00420 protected:
00421     concurrent_queue_base_v3( size_t item_size ) ;
00422 
00423     /* override */ virtual ~concurrent_queue_base_v3() {
00424         size_t nq = my_rep->n_queue;
00425         for( size_t i=0; i<nq; i++ )
00426             __TBB_ASSERT( my_rep->array[i].tail_page==NULL, "pages were not freed properly" );
00427         cache_aligned_allocator<concurrent_queue_rep<T> >().deallocate(my_rep,1);
00428     }
00429 
00431     void internal_push( const void* src ) {
00432         concurrent_queue_rep<T>& r = *my_rep;
00433         ticket k = r.tail_counter++;
00434         r.choose(k).push( src, k, *this );
00435     }
00436 
00438 
00439     bool internal_try_pop( void* dst ) ;
00440 
00442     size_t internal_size() const ;
00443 
00445     bool internal_empty() const ;
00446 
00448     /* note that the name may be misleading, but it remains so due to a historical accident. */
00449     void internal_finish_clear() ;
00450 
00452     void internal_throw_exception() const {
00453         throw std::bad_alloc();
00454     }
00455 
00457     void assign( const concurrent_queue_base_v3& src ) ;
00458 };
00459 
00460 template<typename T>
00461 concurrent_queue_base_v3<T>::concurrent_queue_base_v3( size_t item_size ) {
00462     my_rep = cache_aligned_allocator<concurrent_queue_rep<T> >().allocate(1);
00463     __TBB_ASSERT( (size_t)my_rep % NFS_GetLineSize()==0, "alignment error" );
00464     __TBB_ASSERT( (size_t)&my_rep->head_counter % NFS_GetLineSize()==0, "alignment error" );
00465     __TBB_ASSERT( (size_t)&my_rep->tail_counter % NFS_GetLineSize()==0, "alignment error" );
00466     __TBB_ASSERT( (size_t)&my_rep->array % NFS_GetLineSize()==0, "alignment error" );
00467     memset(my_rep,0,sizeof(concurrent_queue_rep<T>));
00468     my_rep->item_size = item_size;
00469     my_rep->items_per_page = item_size<=8 ? 32 :
00470                              item_size<=16 ? 16 : 
00471                              item_size<=32 ? 8 :
00472                              item_size<=64 ? 4 :
00473                              item_size<=128 ? 2 :
00474                              1;
00475 }
00476 
00477 template<typename T>
00478 bool concurrent_queue_base_v3<T>::internal_try_pop( void* dst ) {
00479     concurrent_queue_rep<T>& r = *my_rep;
00480     ticket k;
00481     do {
00482         k = r.head_counter;
00483         for(;;) {
00484             if( r.tail_counter<=k ) {
00485                 // Queue is empty 
00486                 return false;
00487             }
00488             // Queue had item with ticket k when we looked.  Attempt to get that item.
00489             ticket tk=k;
00490 #if defined(_MSC_VER) && defined(_Wp64)
00491     #pragma warning (push)
00492     #pragma warning (disable: 4267)
00493 #endif
00494             k = r.head_counter.compare_and_swap( tk+1, tk );
00495 #if defined(_MSC_VER) && defined(_Wp64)
00496     #pragma warning (pop)
00497 #endif
00498             if( k==tk )
00499                 break;
00500             // Another thread snatched the item, retry.
00501         }
00502     } while( !r.choose( k ).pop( dst, k, *this ) );
00503     return true;
00504 }
00505 
00506 template<typename T>
00507 size_t concurrent_queue_base_v3<T>::internal_size() const {
00508     concurrent_queue_rep<T>& r = *my_rep;
00509     __TBB_ASSERT( sizeof(ptrdiff_t)<=sizeof(size_t), NULL );
00510     ticket hc = r.head_counter;
00511     size_t nie = r.n_invalid_entries;
00512     ticket tc = r.tail_counter;
00513     __TBB_ASSERT( hc!=tc || !nie, NULL );
00514     ptrdiff_t sz = tc-hc-nie;
00515     return sz<0 ? 0 :  size_t(sz);
00516 }
00517 
00518 template<typename T>
00519 bool concurrent_queue_base_v3<T>::internal_empty() const {
00520     concurrent_queue_rep<T>& r = *my_rep;
00521     ticket tc = r.tail_counter;
00522     ticket hc = r.head_counter;
00523     // if tc!=r.tail_counter, the queue was not empty at some point between the two reads.
00524     return tc==r.tail_counter && tc==hc+r.n_invalid_entries ;
00525 }
00526 
00527 template<typename T>
00528 void concurrent_queue_base_v3<T>::internal_finish_clear() {
00529     concurrent_queue_rep<T>& r = *my_rep;
00530     size_t nq = r.n_queue;
00531     for( size_t i=0; i<nq; ++i ) {
00532         page* tp = r.array[i].tail_page;
00533         __TBB_ASSERT( r.array[i].head_page==tp, "at most one page should remain" );
00534         if( tp!=NULL) {
00535             if( tp!=invalid_page ) deallocate_page( tp );
00536             r.array[i].tail_page = NULL;
00537         }
00538     }
00539 }
00540 
00541 template<typename T>
00542 void concurrent_queue_base_v3<T>::assign( const concurrent_queue_base_v3& src ) {
00543     concurrent_queue_rep<T>& r = *my_rep;
00544     r.items_per_page = src.my_rep->items_per_page;
00545 
00546     // copy concurrent_queue_rep.
00547     r.head_counter = src.my_rep->head_counter;
00548     r.tail_counter = src.my_rep->tail_counter;
00549     r.n_invalid_entries = src.my_rep->n_invalid_entries;
00550 
00551     // copy micro_queues
00552     for( size_t i = 0; i<r.n_queue; ++i )
00553         r.array[i].assign( src.my_rep->array[i], *this);
00554 
00555     __TBB_ASSERT( r.head_counter==src.my_rep->head_counter && r.tail_counter==src.my_rep->tail_counter, 
00556             "the source concurrent queue should not be concurrently modified." );
00557 }
00558 
00559 template<typename Container, typename Value> class concurrent_queue_iterator;
00560 
00561 template<typename T>
00562 class concurrent_queue_iterator_rep: no_assign {
00563 public:
00564     ticket head_counter;
00565     const concurrent_queue_base_v3<T>& my_queue;
00566     typename concurrent_queue_base_v3<T>::page* array[concurrent_queue_rep<T>::n_queue];
00567     concurrent_queue_iterator_rep( const concurrent_queue_base_v3<T>& queue ) :
00568         head_counter(queue.my_rep->head_counter),
00569         my_queue(queue)
00570     {
00571         for( size_t k=0; k<concurrent_queue_rep<T>::n_queue; ++k )
00572             array[k] = queue.my_rep->array[k].head_page;
00573     }
00574 
00576     bool get_item( void*& item, size_t k ) ;
00577 };
00578 
00579 template<typename T>
00580 bool concurrent_queue_iterator_rep<T>::get_item( void*& item, size_t k ) {
00581     if( k==my_queue.my_rep->tail_counter ) {
00582         item = NULL;
00583         return true;
00584     } else {
00585         typename concurrent_queue_base_v3<T>::page* p = array[concurrent_queue_rep<T>::index(k)];
00586         __TBB_ASSERT(p,NULL);
00587         size_t i = k/concurrent_queue_rep<T>::n_queue & (my_queue.my_rep->items_per_page-1);
00588         item = static_cast<unsigned char*>(static_cast<void*>(p+1)) + my_queue.my_rep->item_size*i;
00589         return (p->mask & uintptr_t(1)<<i)!=0;
00590     }
00591 }
00592 
00594 
00595 template<typename Value>
00596 class concurrent_queue_iterator_base_v3 : no_assign {
00598 
00599     concurrent_queue_iterator_rep<Value>* my_rep;
00600 
00601     template<typename C, typename T, typename U>
00602     friend bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
00603 
00604     template<typename C, typename T, typename U>
00605     friend bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
00606 protected:
00608     mutable void* my_item;
00609 
00610 public:
00612     concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {
00613 #if __GNUC__==4&&__GNUC_MINOR__==3
00614         // to get around a possible gcc 4.3 bug
00615         __asm__ __volatile__("": : :"memory");
00616 #endif
00617     }
00618 
00620     concurrent_queue_iterator_base_v3( const concurrent_queue_iterator_base_v3& i ) : my_rep(NULL), my_item(NULL) {
00621         assign(i);
00622     }
00623 
00625     concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3<Value>& queue ) ;
00626 
00627 protected:
00629     void assign( const concurrent_queue_iterator_base_v3<Value>& other ) ;
00630 
00632     void advance() ;
00633 
00635     ~concurrent_queue_iterator_base_v3() {
00636         cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().deallocate(my_rep, 1);
00637         my_rep = NULL;
00638     }
00639 };
00640 
00641 template<typename Value>
00642 concurrent_queue_iterator_base_v3<Value>::concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3<Value>& queue ) {
00643     my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().allocate(1);
00644     new( my_rep ) concurrent_queue_iterator_rep<Value>(queue);
00645     size_t k = my_rep->head_counter;
00646     if( !my_rep->get_item(my_item, k) ) advance();
00647 }
00648 
00649 template<typename Value>
00650 void concurrent_queue_iterator_base_v3<Value>::assign( const concurrent_queue_iterator_base_v3<Value>& other ) {
00651     if( my_rep!=other.my_rep ) {
00652         if( my_rep ) {
00653             cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().deallocate(my_rep, 1);
00654             my_rep = NULL;
00655         }
00656         if( other.my_rep ) {
00657             my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().allocate(1);
00658             new( my_rep ) concurrent_queue_iterator_rep<Value>( *other.my_rep );
00659         }
00660     }
00661     my_item = other.my_item;
00662 }
00663 
00664 template<typename Value>
00665 void concurrent_queue_iterator_base_v3<Value>::advance() {
00666     __TBB_ASSERT( my_item, "attempt to increment iterator past end of queue" );  
00667     size_t k = my_rep->head_counter;
00668     const concurrent_queue_base_v3<Value>& queue = my_rep->my_queue;
00669 #if TBB_USE_ASSERT
00670     void* tmp;
00671     my_rep->get_item(tmp,k);
00672     __TBB_ASSERT( my_item==tmp, NULL );
00673 #endif /* TBB_USE_ASSERT */
00674     size_t i = k/concurrent_queue_rep<Value>::n_queue & (queue.my_rep->items_per_page-1);
00675     if( i==queue.my_rep->items_per_page-1 ) {
00676         typename concurrent_queue_base_v3<Value>::page*& root = my_rep->array[concurrent_queue_rep<Value>::index(k)];
00677         root = root->next;
00678     }
00679     // advance k
00680     my_rep->head_counter = ++k;
00681     if( !my_rep->get_item(my_item, k) ) advance();
00682 }
00683 
00684 template<typename T>
00685 static inline const concurrent_queue_iterator_base_v3<const T>& add_constness( const concurrent_queue_iterator_base_v3<T>& q )
00686 {
00687     return *reinterpret_cast<const concurrent_queue_iterator_base_v3<const T> *>(&q) ;
00688 }
00689 
00691 
00693 template<typename Container, typename Value>
00694 class concurrent_queue_iterator: public concurrent_queue_iterator_base_v3<Value>,
00695         public std::iterator<std::forward_iterator_tag,Value> {
00696 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
00697     template<typename T, class A>
00698     friend class ::tbb::strict_ppl::concurrent_queue;
00699 #else
00700 public: // workaround for MSVC
00701 #endif 
00703     concurrent_queue_iterator( const concurrent_queue_base_v3<Value>& queue ) :
00704         concurrent_queue_iterator_base_v3<Value>(queue)
00705     {
00706     }
00707 
00708 public:
00709     concurrent_queue_iterator() {}
00710 
00712     concurrent_queue_iterator( const concurrent_queue_iterator<Container,Value>& other ) :
00713         concurrent_queue_iterator_base_v3<Value>(other)
00714     {
00715     }
00716 
00717     template<typename T>
00718     concurrent_queue_iterator( const concurrent_queue_iterator<Container,T>& other ) :
00719         concurrent_queue_iterator_base_v3<Value>(add_constness(other))
00720     {
00721     }
00722 
00724     concurrent_queue_iterator& operator=( const concurrent_queue_iterator& other ) {
00725         assign(other);
00726         return *this;
00727     }
00728 
00730     Value& operator*() const {
00731         return *static_cast<Value*>(this->my_item);
00732     }
00733 
00734     Value* operator->() const {return &operator*();}
00735 
00737     concurrent_queue_iterator& operator++() {
00738         this->advance();
00739         return *this;
00740     }
00741 
00743     Value* operator++(int) {
00744         Value* result = &operator*();
00745         operator++();
00746         return result;
00747     }
00748 }; // concurrent_queue_iterator
00749 
00750 
00751 template<typename C, typename T, typename U>
00752 bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
00753     return i.my_item==j.my_item;
00754 }
00755 
00756 template<typename C, typename T, typename U>
00757 bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
00758     return i.my_item!=j.my_item;
00759 }
00760 
00761 } // namespace internal
00762 
00764 
00765 } // namespace strict_ppl
00766 
00768 namespace internal {
00769 
00770 class concurrent_queue_rep;
00771 class concurrent_queue_iterator_rep;
00772 class concurrent_queue_iterator_base_v3;
00773 template<typename Container, typename Value> class concurrent_queue_iterator;
00774 
00776 
00778 class concurrent_queue_base_v3: no_copy {
00780     concurrent_queue_rep* my_rep;
00781 
00782     friend class concurrent_queue_rep;
00783     friend struct micro_queue;
00784     friend class micro_queue_pop_finalizer;
00785     friend class concurrent_queue_iterator_rep;
00786     friend class concurrent_queue_iterator_base_v3;
00787 protected:
00789     struct page {
00790         page* next;
00791         uintptr_t mask; 
00792     };
00793 
00795     ptrdiff_t my_capacity;
00796    
00798     size_t items_per_page;
00799 
00801     size_t item_size;
00802 
00803 private:
00804     virtual void copy_item( page& dst, size_t index, const void* src ) = 0;
00805     virtual void assign_and_destroy_item( void* dst, page& src, size_t index ) = 0;
00806 protected:
00807     __TBB_EXPORTED_METHOD concurrent_queue_base_v3( size_t item_size );
00808     virtual __TBB_EXPORTED_METHOD ~concurrent_queue_base_v3();
00809 
00811     void __TBB_EXPORTED_METHOD internal_push( const void* src );
00812 
00814     void __TBB_EXPORTED_METHOD internal_pop( void* dst );
00815 
00817     bool __TBB_EXPORTED_METHOD internal_push_if_not_full( const void* src );
00818 
00820 
00821     bool __TBB_EXPORTED_METHOD internal_pop_if_present( void* dst );
00822 
00824     ptrdiff_t __TBB_EXPORTED_METHOD internal_size() const;
00825 
00827     bool __TBB_EXPORTED_METHOD internal_empty() const;
00828 
00830     void __TBB_EXPORTED_METHOD internal_set_capacity( ptrdiff_t capacity, size_t element_size );
00831 
00833     virtual page *allocate_page() = 0;
00834 
00836     virtual void deallocate_page( page *p ) = 0;
00837 
00839     /* note that the name may be misleading, but it remains so due to a historical accident. */
00840     void __TBB_EXPORTED_METHOD internal_finish_clear() ;
00841 
00843     void __TBB_EXPORTED_METHOD internal_throw_exception() const;
00844 
00846     void __TBB_EXPORTED_METHOD assign( const concurrent_queue_base_v3& src ) ;
00847 
00848 private:
00849     virtual void copy_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) = 0;
00850 };
00851 
00853 
00854 class concurrent_queue_iterator_base_v3 {
00856 
00857     concurrent_queue_iterator_rep* my_rep;
00858 
00859     template<typename C, typename T, typename U>
00860     friend bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
00861 
00862     template<typename C, typename T, typename U>
00863     friend bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
00864 protected:
00866     mutable void* my_item;
00867 
00869     concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {}
00870 
00872     concurrent_queue_iterator_base_v3( const concurrent_queue_iterator_base_v3& i ) : my_rep(NULL), my_item(NULL) {
00873         assign(i);
00874     }
00875 
00877     __TBB_EXPORTED_METHOD concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3& queue );
00878 
00880     void __TBB_EXPORTED_METHOD assign( const concurrent_queue_iterator_base_v3& i );
00881 
00883     void __TBB_EXPORTED_METHOD advance();
00884 
00886     __TBB_EXPORTED_METHOD ~concurrent_queue_iterator_base_v3();
00887 };
00888 
00889 typedef concurrent_queue_iterator_base_v3 concurrent_queue_iterator_base;
00890 
00892 
00894 template<typename Container, typename Value>
00895 class concurrent_queue_iterator: public concurrent_queue_iterator_base,
00896         public std::iterator<std::forward_iterator_tag,Value> {
00897 #if !defined(_MSC_VER) || defined(__INTEL_COMPILER)
00898     template<typename T, class A>
00899     friend class ::tbb::concurrent_bounded_queue;
00900 
00901     template<typename T, class A>
00902     friend class ::tbb::deprecated::concurrent_queue;
00903 #else
00904 public: // workaround for MSVC
00905 #endif 
00907     concurrent_queue_iterator( const concurrent_queue_base_v3& queue ) :
00908         concurrent_queue_iterator_base_v3(queue)
00909     {
00910     }
00911 
00912 public:
00913     concurrent_queue_iterator() {}
00914 
00917     concurrent_queue_iterator( const concurrent_queue_iterator<Container,typename Container::value_type>& other ) :
00918         concurrent_queue_iterator_base_v3(other)
00919     {}
00920 
00922     concurrent_queue_iterator& operator=( const concurrent_queue_iterator& other ) {
00923         assign(other);
00924         return *this;
00925     }
00926 
00928     Value& operator*() const {
00929         return *static_cast<Value*>(my_item);
00930     }
00931 
00932     Value* operator->() const {return &operator*();}
00933 
00935     concurrent_queue_iterator& operator++() {
00936         advance();
00937         return *this;
00938     }
00939 
00941     Value* operator++(int) {
00942         Value* result = &operator*();
00943         operator++();
00944         return result;
00945     }
00946 }; // concurrent_queue_iterator
00947 
00948 
00949 template<typename C, typename T, typename U>
00950 bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
00951     return i.my_item==j.my_item;
00952 }
00953 
00954 template<typename C, typename T, typename U>
00955 bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
00956     return i.my_item!=j.my_item;
00957 }
00958 
00959 } // namespace internal;
00960 
00962 
00963 } // namespace tbb
00964 
00965 #endif /* __TBB_concurrent_queue_internal_H */

Copyright © 2005-2009 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.