00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #ifndef __TBB_concurrent_queue_H
00022 #define __TBB_concurrent_queue_H
00023
00024 #include "_concurrent_queue_internal.h"
00025
00026 namespace tbb {
00027
00028 namespace strict_ppl {
00029
00031
00034 template<typename T, typename A = cache_aligned_allocator<T> >
00035 class concurrent_queue: public internal::concurrent_queue_base_v3<T> {
00036 template<typename Container, typename Value> friend class internal::concurrent_queue_iterator;
00037
00039 typedef typename A::template rebind<char>::other page_allocator_type;
00040 page_allocator_type my_allocator;
00041
00043 virtual void *allocate_block( size_t n ) {
00044 void *b = reinterpret_cast<void*>(my_allocator.allocate( n ));
00045 if( !b ) this->internal_throw_exception();
00046 return b;
00047 }
00048
00050 virtual void deallocate_block( void *b, size_t n ) {
00051 my_allocator.deallocate( reinterpret_cast<char*>(b), n );
00052 }
00053
00054 public:
00056 typedef T value_type;
00057
00059 typedef T& reference;
00060
00062 typedef const T& const_reference;
00063
00065 typedef size_t size_type;
00066
00068 typedef ptrdiff_t difference_type;
00069
00071 typedef A allocator_type;
00072
00074 explicit concurrent_queue(const allocator_type& a = allocator_type()) :
00075 internal::concurrent_queue_base_v3<T>( sizeof(T) ), my_allocator( a )
00076 {
00077 }
00078
00080 template<typename InputIterator>
00081 concurrent_queue( InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
00082 internal::concurrent_queue_base_v3<T>( sizeof(T) ), my_allocator( a )
00083 {
00084 for( ; begin != end; ++begin )
00085 internal_push(&*begin);
00086 }
00087
00089 concurrent_queue( const concurrent_queue& src, const allocator_type& a = allocator_type()) :
00090 internal::concurrent_queue_base_v3<T>( sizeof(T) ), my_allocator( a )
00091 {
00092 assign( src );
00093 }
00094
00096 ~concurrent_queue();
00097
00099 void push( const T& source ) {
00100 internal_push( &source );
00101 }
00102
00104
00106 bool try_pop( T& result ) {
00107 return internal_try_pop( &result );
00108 }
00109
00111 size_type unsafe_size() const {return this->internal_size();}
00112
00114 bool empty() const {return this->internal_empty();}
00115
00117 void clear() ;
00118
00120 allocator_type get_allocator() const { return this->my_allocator; }
00121
00122 typedef internal::concurrent_queue_iterator<concurrent_queue,T> iterator;
00123 typedef internal::concurrent_queue_iterator<concurrent_queue,const T> const_iterator;
00124
00125
00126
00127
00128 iterator unsafe_begin() {return iterator(*this);}
00129 iterator unsafe_end() {return iterator();}
00130 const_iterator unsafe_begin() const {return const_iterator(*this);}
00131 const_iterator unsafe_end() const {return const_iterator();}
00132 } ;
00133
00134 template<typename T, class A>
00135 concurrent_queue<T,A>::~concurrent_queue() {
00136 clear();
00137 this->internal_finish_clear();
00138 }
00139
00140 template<typename T, class A>
00141 void concurrent_queue<T,A>::clear() {
00142 while( !empty() ) {
00143 T value;
00144 internal_try_pop(&value);
00145 }
00146 }
00147
00148 }
00149
00151
00156 template<typename T, class A = cache_aligned_allocator<T> >
00157 class concurrent_bounded_queue: public internal::concurrent_queue_base_v3 {
00158 template<typename Container, typename Value> friend class internal::concurrent_queue_iterator;
00159
00161 typedef typename A::template rebind<char>::other page_allocator_type;
00162 page_allocator_type my_allocator;
00163
00165 class destroyer: internal::no_copy {
00166 T& my_value;
00167 public:
00168 destroyer( T& value ) : my_value(value) {}
00169 ~destroyer() {my_value.~T();}
00170 };
00171
00172 T& get_ref( page& page, size_t index ) {
00173 __TBB_ASSERT( index<items_per_page, NULL );
00174 return static_cast<T*>(static_cast<void*>(&page+1))[index];
00175 }
00176
00177 virtual void copy_item( page& dst, size_t index, const void* src ) {
00178 new( &get_ref(dst,index) ) T(*static_cast<const T*>(src));
00179 }
00180
00181 virtual void copy_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) {
00182 new( &get_ref(dst,dindex) ) T( static_cast<const T*>(static_cast<const void*>(&src+1))[sindex] );
00183 }
00184
00185 virtual void assign_and_destroy_item( void* dst, page& src, size_t index ) {
00186 T& from = get_ref(src,index);
00187 destroyer d(from);
00188 *static_cast<T*>(dst) = from;
00189 }
00190
00191 virtual page *allocate_page() {
00192 size_t n = sizeof(page) + items_per_page*item_size;
00193 page *p = reinterpret_cast<page*>(my_allocator.allocate( n ));
00194 if( !p ) internal_throw_exception();
00195 return p;
00196 }
00197
00198 virtual void deallocate_page( page *p ) {
00199 size_t n = sizeof(page) + items_per_page*item_size;
00200 my_allocator.deallocate( reinterpret_cast<char*>(p), n );
00201 }
00202
00203 public:
00205 typedef T value_type;
00206
00208 typedef A allocator_type;
00209
00211 typedef T& reference;
00212
00214 typedef const T& const_reference;
00215
00217
00219 typedef std::ptrdiff_t size_type;
00220
00222 typedef std::ptrdiff_t difference_type;
00223
00225 explicit concurrent_bounded_queue(const allocator_type& a = allocator_type()) :
00226 concurrent_queue_base_v3( sizeof(T) ), my_allocator( a )
00227 {
00228 }
00229
00231 concurrent_bounded_queue( const concurrent_bounded_queue& src, const allocator_type& a = allocator_type()) :
00232 concurrent_queue_base_v3( sizeof(T) ), my_allocator( a )
00233 {
00234 assign( src );
00235 }
00236
00238 template<typename InputIterator>
00239 concurrent_bounded_queue( InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
00240 concurrent_queue_base_v3( sizeof(T) ), my_allocator( a )
00241 {
00242 for( ; begin != end; ++begin )
00243 internal_push_if_not_full(&*begin);
00244 }
00245
00247 ~concurrent_bounded_queue();
00248
00250 void push( const T& source ) {
00251 internal_push( &source );
00252 }
00253
00255
00256 void pop( T& destination ) {
00257 internal_pop( &destination );
00258 }
00259
00261
00263 bool try_push( const T& source ) {
00264 return internal_push_if_not_full( &source );
00265 }
00266
00268
00270 bool try_pop( T& destination ) {
00271 return internal_pop_if_present( &destination );
00272 }
00273
00275
00278 size_type size() const {return internal_size();}
00279
00281 bool empty() const {return internal_empty();}
00282
00284 size_type capacity() const {
00285 return my_capacity;
00286 }
00287
00289
00291 void set_capacity( size_type capacity ) {
00292 internal_set_capacity( capacity, sizeof(T) );
00293 }
00294
00296 allocator_type get_allocator() const { return this->my_allocator; }
00297
00299 void clear() ;
00300
00301 typedef internal::concurrent_queue_iterator<concurrent_bounded_queue,T> iterator;
00302 typedef internal::concurrent_queue_iterator<concurrent_bounded_queue,const T> const_iterator;
00303
00304
00305
00306
00307 iterator unsafe_begin() {return iterator(*this);}
00308 iterator unsafe_end() {return iterator();}
00309 const_iterator unsafe_begin() const {return const_iterator(*this);}
00310 const_iterator unsafe_end() const {return const_iterator();}
00311
00312 };
00313
00314 template<typename T, class A>
00315 concurrent_bounded_queue<T,A>::~concurrent_bounded_queue() {
00316 clear();
00317 internal_finish_clear();
00318 }
00319
00320 template<typename T, class A>
00321 void concurrent_bounded_queue<T,A>::clear() {
00322 while( !empty() ) {
00323 T value;
00324 internal_pop_if_present(&value);
00325 }
00326 }
00327
00328 namespace deprecated {
00329
00331
00336 template<typename T, class A = cache_aligned_allocator<T> >
00337 class concurrent_queue: public concurrent_bounded_queue<T,A> {
00338 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
00339 template<typename Container, typename Value> friend class internal::concurrent_queue_iterator;
00340 #endif
00341
00342 public:
00344 explicit concurrent_queue(const A& a = A()) :
00345 concurrent_bounded_queue<T,A>( a )
00346 {
00347 }
00348
00350 concurrent_queue( const concurrent_queue& src, const A& a = A()) :
00351 concurrent_bounded_queue<T,A>( src, a )
00352 {
00353 }
00354
00356 template<typename InputIterator>
00357 concurrent_queue( InputIterator begin, InputIterator end, const A& a = A()) :
00358 concurrent_bounded_queue<T,A>( begin, end, a )
00359 {
00360 }
00361
00363
00365 bool push_if_not_full( const T& source ) {
00366 return try_push( source );
00367 }
00368
00370
00374 bool pop_if_present( T& destination ) {
00375 return try_pop( destination );
00376 }
00377
00378 typedef typename concurrent_bounded_queue<T,A>::iterator iterator;
00379 typedef typename concurrent_bounded_queue<T,A>::const_iterator const_iterator;
00380
00381
00382
00383
00384 iterator begin() {return this->unsafe_begin();}
00385 iterator end() {return this->unsafe_end();}
00386 const_iterator begin() const {return this->unsafe_begin();}
00387 const_iterator end() const {return this->unsafe_end();}
00388 };
00389
00390 }
00391
00392
00393 #if TBB_DEPRECATED
00394 using deprecated::concurrent_queue;
00395 #else
00396 using strict_ppl::concurrent_queue;
00397 #endif
00398
00399 }
00400
00401 #endif