concurrent_queue.h

00001 /*
00002     Copyright 2005-2009 Intel Corporation.  All Rights Reserved.
00003 
00004     The source code contained or described herein and all documents related
00005     to the source code ("Material") are owned by Intel Corporation or its
00006     suppliers or licensors.  Title to the Material remains with Intel
00007     Corporation or its suppliers and licensors.  The Material is protected
00008     by worldwide copyright laws and treaty provisions.  No part of the
00009     Material may be used, copied, reproduced, modified, published, uploaded,
00010     posted, transmitted, distributed, or disclosed in any way without
00011     Intel's prior express written permission.
00012 
00013     No license under any patent, copyright, trade secret or other
00014     intellectual property right is granted to or conferred upon you by
00015     disclosure or delivery of the Materials, either expressly, by
00016     implication, inducement, estoppel or otherwise.  Any license under such
00017     intellectual property rights must be express and approved by Intel in
00018     writing.
00019 */
00020 
00021 #ifndef __TBB_concurrent_queue_H
00022 #define __TBB_concurrent_queue_H
00023 
00024 #include "_concurrent_queue_internal.h"
00025 
00026 namespace tbb {
00027 
00028 namespace strict_ppl {
00029 
00031 
00034 template<typename T, typename A = cache_aligned_allocator<T> > 
00035 class concurrent_queue: public internal::concurrent_queue_base_v3<T> {
00036     template<typename Container, typename Value> friend class internal::concurrent_queue_iterator;
00037 
00039     typedef typename A::template rebind<char>::other page_allocator_type;
00040     page_allocator_type my_allocator;
00041 
00043     /*overide*/ virtual void *allocate_block( size_t n ) {
00044         void *b = reinterpret_cast<void*>(my_allocator.allocate( n ));
00045         if( !b ) this->internal_throw_exception(); 
00046         return b;
00047     }
00048 
00050     /*override*/ virtual void deallocate_block( void *b, size_t n ) {
00051         my_allocator.deallocate( reinterpret_cast<char*>(b), n );
00052     }
00053 
00054 public:
00056     typedef T value_type;
00057 
00059     typedef T& reference;
00060 
00062     typedef const T& const_reference;
00063 
00065     typedef size_t size_type;
00066 
00068     typedef ptrdiff_t difference_type;
00069 
00071     typedef A allocator_type;
00072 
00074     explicit concurrent_queue(const allocator_type& a = allocator_type()) : 
00075         internal::concurrent_queue_base_v3<T>( sizeof(T) ), my_allocator( a )
00076     {
00077     }
00078 
00080     template<typename InputIterator>
00081     concurrent_queue( InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
00082         internal::concurrent_queue_base_v3<T>( sizeof(T) ), my_allocator( a )
00083     {
00084         for( ; begin != end; ++begin )
00085             internal_push(&*begin);
00086     }
00087     
00089     concurrent_queue( const concurrent_queue& src, const allocator_type& a = allocator_type()) : 
00090         internal::concurrent_queue_base_v3<T>( sizeof(T) ), my_allocator( a )
00091     {
00092         assign( src );
00093     }
00094     
00096     ~concurrent_queue();
00097 
00099     void push( const T& source ) {
00100         internal_push( &source );
00101     }
00102 
00104 
00106     bool try_pop( T& result ) {
00107         return internal_try_pop( &result );
00108     }
00109 
00111     size_type unsafe_size() const {return this->internal_size();}
00112 
00114     bool empty() const {return this->internal_empty();}
00115 
00117     void clear() ;
00118 
00120     allocator_type get_allocator() const { return this->my_allocator; }
00121 
00122     typedef internal::concurrent_queue_iterator<concurrent_queue,T> iterator;
00123     typedef internal::concurrent_queue_iterator<concurrent_queue,const T> const_iterator;
00124 
00125     //------------------------------------------------------------------------
00126     // The iterators are intended only for debugging.  They are slow and not thread safe.
00127     //------------------------------------------------------------------------
00128     iterator unsafe_begin() {return iterator(*this);}
00129     iterator unsafe_end() {return iterator();}
00130     const_iterator unsafe_begin() const {return const_iterator(*this);}
00131     const_iterator unsafe_end() const {return const_iterator();}
00132 } ;
00133 
00134 template<typename T, class A>
00135 concurrent_queue<T,A>::~concurrent_queue() {
00136     clear();
00137     this->internal_finish_clear();
00138 }
00139 
00140 template<typename T, class A>
00141 void concurrent_queue<T,A>::clear() {
00142     while( !empty() ) {
00143         T value;
00144         internal_try_pop(&value);
00145     }
00146 }
00147 
00148 } // namespace strict_ppl
00149     
00151 
00156 template<typename T, class A = cache_aligned_allocator<T> >
00157 class concurrent_bounded_queue: public internal::concurrent_queue_base_v3 {
00158     template<typename Container, typename Value> friend class internal::concurrent_queue_iterator;
00159 
00161     typedef typename A::template rebind<char>::other page_allocator_type;
00162     page_allocator_type my_allocator;
00163 
00165     class destroyer: internal::no_copy {
00166         T& my_value;
00167     public:
00168         destroyer( T& value ) : my_value(value) {}
00169         ~destroyer() {my_value.~T();}          
00170     };
00171 
00172     T& get_ref( page& page, size_t index ) {
00173         __TBB_ASSERT( index<items_per_page, NULL );
00174         return static_cast<T*>(static_cast<void*>(&page+1))[index];
00175     }
00176 
00177     /*override*/ virtual void copy_item( page& dst, size_t index, const void* src ) {
00178         new( &get_ref(dst,index) ) T(*static_cast<const T*>(src)); 
00179     }
00180 
00181     /*override*/ virtual void copy_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) {
00182         new( &get_ref(dst,dindex) ) T( static_cast<const T*>(static_cast<const void*>(&src+1))[sindex] );
00183     }
00184 
00185     /*override*/ virtual void assign_and_destroy_item( void* dst, page& src, size_t index ) {
00186         T& from = get_ref(src,index);
00187         destroyer d(from);
00188         *static_cast<T*>(dst) = from;
00189     }
00190 
00191     /*overide*/ virtual page *allocate_page() {
00192         size_t n = sizeof(page) + items_per_page*item_size;
00193         page *p = reinterpret_cast<page*>(my_allocator.allocate( n ));
00194         if( !p ) internal_throw_exception(); 
00195         return p;
00196     }
00197 
00198     /*override*/ virtual void deallocate_page( page *p ) {
00199         size_t n = sizeof(page) + items_per_page*item_size;
00200         my_allocator.deallocate( reinterpret_cast<char*>(p), n );
00201     }
00202 
00203 public:
00205     typedef T value_type;
00206 
00208     typedef A allocator_type;
00209 
00211     typedef T& reference;
00212 
00214     typedef const T& const_reference;
00215 
00217 
00219     typedef std::ptrdiff_t size_type;
00220 
00222     typedef std::ptrdiff_t difference_type;
00223 
00225     explicit concurrent_bounded_queue(const allocator_type& a = allocator_type()) : 
00226         concurrent_queue_base_v3( sizeof(T) ), my_allocator( a )
00227     {
00228     }
00229 
00231     concurrent_bounded_queue( const concurrent_bounded_queue& src, const allocator_type& a = allocator_type()) : 
00232         concurrent_queue_base_v3( sizeof(T) ), my_allocator( a )
00233     {
00234         assign( src );
00235     }
00236 
00238     template<typename InputIterator>
00239     concurrent_bounded_queue( InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
00240         concurrent_queue_base_v3( sizeof(T) ), my_allocator( a )
00241     {
00242         for( ; begin != end; ++begin )
00243             internal_push_if_not_full(&*begin);
00244     }
00245 
00247     ~concurrent_bounded_queue();
00248 
00250     void push( const T& source ) {
00251         internal_push( &source );
00252     }
00253 
00255 
00256     void pop( T& destination ) {
00257         internal_pop( &destination );
00258     }
00259 
00261 
00263     bool try_push( const T& source ) {
00264         return internal_push_if_not_full( &source );
00265     }
00266 
00268 
00270     bool try_pop( T& destination ) {
00271         return internal_pop_if_present( &destination );
00272     }
00273 
00275 
00278     size_type size() const {return internal_size();}
00279 
00281     bool empty() const {return internal_empty();}
00282 
00284     size_type capacity() const {
00285         return my_capacity;
00286     }
00287 
00289 
00291     void set_capacity( size_type capacity ) {
00292         internal_set_capacity( capacity, sizeof(T) );
00293     }
00294 
00296     allocator_type get_allocator() const { return this->my_allocator; }
00297 
00299     void clear() ;
00300 
00301     typedef internal::concurrent_queue_iterator<concurrent_bounded_queue,T> iterator;
00302     typedef internal::concurrent_queue_iterator<concurrent_bounded_queue,const T> const_iterator;
00303 
00304     //------------------------------------------------------------------------
00305     // The iterators are intended only for debugging.  They are slow and not thread safe.
00306     //------------------------------------------------------------------------
00307     iterator unsafe_begin() {return iterator(*this);}
00308     iterator unsafe_end() {return iterator();}
00309     const_iterator unsafe_begin() const {return const_iterator(*this);}
00310     const_iterator unsafe_end() const {return const_iterator();}
00311 
00312 }; 
00313 
00314 template<typename T, class A>
00315 concurrent_bounded_queue<T,A>::~concurrent_bounded_queue() {
00316     clear();
00317     internal_finish_clear();
00318 }
00319 
00320 template<typename T, class A>
00321 void concurrent_bounded_queue<T,A>::clear() {
00322     while( !empty() ) {
00323         T value;
00324         internal_pop_if_present(&value);
00325     }
00326 }
00327 
00328 namespace deprecated {
00329 
00331 
00336 template<typename T, class A = cache_aligned_allocator<T> > 
00337 class concurrent_queue: public concurrent_bounded_queue<T,A> {
00338 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
00339     template<typename Container, typename Value> friend class internal::concurrent_queue_iterator;
00340 #endif 
00341 
00342 public:
00344     explicit concurrent_queue(const A& a = A()) : 
00345         concurrent_bounded_queue<T,A>( a )
00346     {
00347     }
00348 
00350     concurrent_queue( const concurrent_queue& src, const A& a = A()) : 
00351         concurrent_bounded_queue<T,A>( src, a )
00352     {
00353     }
00354 
00356     template<typename InputIterator>
00357     concurrent_queue( InputIterator begin, InputIterator end, const A& a = A()) :
00358         concurrent_bounded_queue<T,A>( begin, end, a )
00359     {
00360     }
00361 
00363 
00365     bool push_if_not_full( const T& source ) {
00366         return try_push( source );
00367     }
00368 
00370 
00374     bool pop_if_present( T& destination ) {
00375         return try_pop( destination );
00376     }
00377 
00378     typedef typename concurrent_bounded_queue<T,A>::iterator iterator;
00379     typedef typename concurrent_bounded_queue<T,A>::const_iterator const_iterator;
00380     //
00381     //------------------------------------------------------------------------
00382     // The iterators are intended only for debugging.  They are slow and not thread safe.
00383     //------------------------------------------------------------------------
00384     iterator begin() {return this->unsafe_begin();}
00385     iterator end() {return this->unsafe_end();}
00386     const_iterator begin() const {return this->unsafe_begin();}
00387     const_iterator end() const {return this->unsafe_end();}
00388 }; 
00389 
00390 }
00391     
00392 
00393 #if TBB_DEPRECATED
00394 using deprecated::concurrent_queue;
00395 #else
00396 using strict_ppl::concurrent_queue;    
00397 #endif
00398 
00399 } // namespace tbb
00400 
00401 #endif /* __TBB_concurrent_queue_H */

Copyright © 2005-2009 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.