concurrent_queue.h

00001 /*
00002     Copyright 2005-2011 Intel Corporation.  All Rights Reserved.
00003 
00004     The source code contained or described herein and all documents related
00005     to the source code ("Material") are owned by Intel Corporation or its
00006     suppliers or licensors.  Title to the Material remains with Intel
00007     Corporation or its suppliers and licensors.  The Material is protected
00008     by worldwide copyright laws and treaty provisions.  No part of the
00009     Material may be used, copied, reproduced, modified, published, uploaded,
00010     posted, transmitted, distributed, or disclosed in any way without
00011     Intel's prior express written permission.
00012 
00013     No license under any patent, copyright, trade secret or other
00014     intellectual property right is granted to or conferred upon you by
00015     disclosure or delivery of the Materials, either expressly, by
00016     implication, inducement, estoppel or otherwise.  Any license under such
00017     intellectual property rights must be express and approved by Intel in
00018     writing.
00019 */
00020 
00021 #ifndef __TBB_concurrent_queue_H
00022 #define __TBB_concurrent_queue_H
00023 
00024 #include "internal/_concurrent_queue_impl.h"
00025 
00026 namespace tbb {
00027 
00028 namespace strict_ppl {
00029 
00031 
00034 template<typename T, typename A = cache_aligned_allocator<T> > 
00035 class concurrent_queue: public internal::concurrent_queue_base_v3<T> {
00036     template<typename Container, typename Value> friend class internal::concurrent_queue_iterator;
00037 
00039     typedef typename A::template rebind<char>::other page_allocator_type;
00040     page_allocator_type my_allocator;
00041 
00043     /*override*/ virtual void *allocate_block( size_t n ) {
00044         void *b = reinterpret_cast<void*>(my_allocator.allocate( n ));
00045         if( !b )
00046             internal::throw_exception(internal::eid_bad_alloc); 
00047         return b;
00048     }
00049 
00051     /*override*/ virtual void deallocate_block( void *b, size_t n ) {
00052         my_allocator.deallocate( reinterpret_cast<char*>(b), n );
00053     }
00054 
00055 public:
00057     typedef T value_type;
00058 
00060     typedef T& reference;
00061 
00063     typedef const T& const_reference;
00064 
00066     typedef size_t size_type;
00067 
00069     typedef ptrdiff_t difference_type;
00070 
00072     typedef A allocator_type;
00073 
00075     explicit concurrent_queue(const allocator_type& a = allocator_type()) : 
00076         my_allocator( a )
00077     {
00078     }
00079 
00081     template<typename InputIterator>
00082     concurrent_queue( InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
00083         my_allocator( a )
00084     {
00085         for( ; begin != end; ++begin )
00086             this->internal_push(&*begin);
00087     }
00088     
00090     concurrent_queue( const concurrent_queue& src, const allocator_type& a = allocator_type()) : 
00091         internal::concurrent_queue_base_v3<T>(), my_allocator( a )
00092     {
00093         this->assign( src );
00094     }
00095     
00097     ~concurrent_queue();
00098 
00100     void push( const T& source ) {
00101         this->internal_push( &source );
00102     }
00103 
00105 
00107     bool try_pop( T& result ) {
00108         return this->internal_try_pop( &result );
00109     }
00110 
00112     size_type unsafe_size() const {return this->internal_size();}
00113 
00115     bool empty() const {return this->internal_empty();}
00116 
00118     void clear() ;
00119 
00121     allocator_type get_allocator() const { return this->my_allocator; }
00122 
00123     typedef internal::concurrent_queue_iterator<concurrent_queue,T> iterator;
00124     typedef internal::concurrent_queue_iterator<concurrent_queue,const T> const_iterator;
00125 
00126     //------------------------------------------------------------------------
00127     // The iterators are intended only for debugging.  They are slow and not thread safe.
00128     //------------------------------------------------------------------------
00129     iterator unsafe_begin() {return iterator(*this);}
00130     iterator unsafe_end() {return iterator();}
00131     const_iterator unsafe_begin() const {return const_iterator(*this);}
00132     const_iterator unsafe_end() const {return const_iterator();}
00133 } ;
00134 
00135 template<typename T, class A>
00136 concurrent_queue<T,A>::~concurrent_queue() {
00137     clear();
00138     this->internal_finish_clear();
00139 }
00140 
00141 template<typename T, class A>
00142 void concurrent_queue<T,A>::clear() {
00143     while( !empty() ) {
00144         T value;
00145         this->internal_try_pop(&value);
00146     }
00147 }
00148 
00149 } // namespace strict_ppl
00150     
00152 
00157 template<typename T, class A = cache_aligned_allocator<T> >
00158 class concurrent_bounded_queue: public internal::concurrent_queue_base_v3 {
00159     template<typename Container, typename Value> friend class internal::concurrent_queue_iterator;
00160 
00162     typedef typename A::template rebind<char>::other page_allocator_type;
00163     page_allocator_type my_allocator;
00164 
00165     typedef typename concurrent_queue_base_v3::padded_page<T> padded_page;
00166  
00168     class destroyer: internal::no_copy {
00169         T& my_value;
00170     public:
00171         destroyer( T& value ) : my_value(value) {}
00172         ~destroyer() {my_value.~T();}          
00173     };
00174 
00175     T& get_ref( page& p, size_t index ) {
00176         __TBB_ASSERT( index<items_per_page, NULL );
00177         return (&static_cast<padded_page*>(static_cast<void*>(&p))->last)[index];
00178     }
00179 
00180     /*override*/ virtual void copy_item( page& dst, size_t index, const void* src ) {
00181         new( &get_ref(dst,index) ) T(*static_cast<const T*>(src)); 
00182     }
00183 
00184     /*override*/ virtual void copy_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) {
00185         new( &get_ref(dst,dindex) ) T( get_ref( const_cast<page&>(src), sindex ) );
00186     }
00187 
00188     /*override*/ virtual void assign_and_destroy_item( void* dst, page& src, size_t index ) {
00189         T& from = get_ref(src,index);
00190         destroyer d(from);
00191         *static_cast<T*>(dst) = from;
00192     }
00193 
00194     /*override*/ virtual page *allocate_page() {
00195         size_t n = sizeof(padded_page) + (items_per_page-1)*sizeof(T);
00196         page *p = reinterpret_cast<page*>(my_allocator.allocate( n ));
00197         if( !p )
00198             internal::throw_exception(internal::eid_bad_alloc); 
00199         return p;
00200     }
00201 
00202     /*override*/ virtual void deallocate_page( page *p ) {
00203         size_t n = sizeof(padded_page) + (items_per_page-1)*sizeof(T);
00204         my_allocator.deallocate( reinterpret_cast<char*>(p), n );
00205     }
00206 
00207 public:
00209     typedef T value_type;
00210 
00212     typedef A allocator_type;
00213 
00215     typedef T& reference;
00216 
00218     typedef const T& const_reference;
00219 
00221 
00223     typedef std::ptrdiff_t size_type;
00224 
00226     typedef std::ptrdiff_t difference_type;
00227 
00229     explicit concurrent_bounded_queue(const allocator_type& a = allocator_type()) : 
00230         concurrent_queue_base_v3( sizeof(T) ), my_allocator( a )
00231     {
00232     }
00233 
00235     concurrent_bounded_queue( const concurrent_bounded_queue& src, const allocator_type& a = allocator_type()) : 
00236         concurrent_queue_base_v3( sizeof(T) ), my_allocator( a )
00237     {
00238         assign( src );
00239     }
00240 
00242     template<typename InputIterator>
00243     concurrent_bounded_queue( InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
00244         concurrent_queue_base_v3( sizeof(T) ), my_allocator( a )
00245     {
00246         for( ; begin != end; ++begin )
00247             internal_push_if_not_full(&*begin);
00248     }
00249 
00251     ~concurrent_bounded_queue();
00252 
00254     void push( const T& source ) {
00255         internal_push( &source );
00256     }
00257 
00259 
00260     void pop( T& destination ) {
00261         internal_pop( &destination );
00262     }
00263 
00264 #if TBB_USE_EXCEPTIONS
00266     void abort() {
00267         internal_abort();
00268     }
00269 #endif
00270 
00272 
00274     bool try_push( const T& source ) {
00275         return internal_push_if_not_full( &source );
00276     }
00277 
00279 
00281     bool try_pop( T& destination ) {
00282         return internal_pop_if_present( &destination );
00283     }
00284 
00286 
00289     size_type size() const {return internal_size();}
00290 
00292     bool empty() const {return internal_empty();}
00293 
00295     size_type capacity() const {
00296         return my_capacity;
00297     }
00298 
00300 
00302     void set_capacity( size_type new_capacity ) {
00303         internal_set_capacity( new_capacity, sizeof(T) );
00304     }
00305 
00307     allocator_type get_allocator() const { return this->my_allocator; }
00308 
00310     void clear() ;
00311 
00312     typedef internal::concurrent_queue_iterator<concurrent_bounded_queue,T> iterator;
00313     typedef internal::concurrent_queue_iterator<concurrent_bounded_queue,const T> const_iterator;
00314 
00315     //------------------------------------------------------------------------
00316     // The iterators are intended only for debugging.  They are slow and not thread safe.
00317     //------------------------------------------------------------------------
00318     iterator unsafe_begin() {return iterator(*this);}
00319     iterator unsafe_end() {return iterator();}
00320     const_iterator unsafe_begin() const {return const_iterator(*this);}
00321     const_iterator unsafe_end() const {return const_iterator();}
00322 
00323 }; 
00324 
00325 template<typename T, class A>
00326 concurrent_bounded_queue<T,A>::~concurrent_bounded_queue() {
00327     clear();
00328     internal_finish_clear();
00329 }
00330 
00331 template<typename T, class A>
00332 void concurrent_bounded_queue<T,A>::clear() {
00333     while( !empty() ) {
00334         T value;
00335         internal_pop_if_present(&value);
00336     }
00337 }
00338 
00339 namespace deprecated {
00340 
00342 
00347 template<typename T, class A = cache_aligned_allocator<T> > 
00348 class concurrent_queue: public concurrent_bounded_queue<T,A> {
00349 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
00350     template<typename Container, typename Value> friend class internal::concurrent_queue_iterator;
00351 #endif 
00352 
00353 public:
00355     explicit concurrent_queue(const A& a = A()) : 
00356         concurrent_bounded_queue<T,A>( a )
00357     {
00358     }
00359 
00361     concurrent_queue( const concurrent_queue& src, const A& a = A()) : 
00362         concurrent_bounded_queue<T,A>( src, a )
00363     {
00364     }
00365 
00367     template<typename InputIterator>
00368     concurrent_queue( InputIterator b /*begin*/, InputIterator e /*end*/, const A& a = A()) :
00369         concurrent_bounded_queue<T,A>( b, e, a )
00370     {
00371     }
00372 
00374 
00376     bool push_if_not_full( const T& source ) {
00377         return this->try_push( source );
00378     }
00379 
00381 
00385     bool pop_if_present( T& destination ) {
00386         return this->try_pop( destination );
00387     }
00388 
00389     typedef typename concurrent_bounded_queue<T,A>::iterator iterator;
00390     typedef typename concurrent_bounded_queue<T,A>::const_iterator const_iterator;
00391     //
00392     //------------------------------------------------------------------------
00393     // The iterators are intended only for debugging.  They are slow and not thread safe.
00394     //------------------------------------------------------------------------
00395     iterator begin() {return this->unsafe_begin();}
00396     iterator end() {return this->unsafe_end();}
00397     const_iterator begin() const {return this->unsafe_begin();}
00398     const_iterator end() const {return this->unsafe_end();}
00399 }; 
00400 
00401 }
00402     
00403 
00404 #if TBB_DEPRECATED
00405 using deprecated::concurrent_queue;
00406 #else
00407 using strict_ppl::concurrent_queue;    
00408 #endif
00409 
00410 } // namespace tbb
00411 
00412 #endif /* __TBB_concurrent_queue_H */

Copyright © 2005-2011 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.