parallel_do.h

00001 /*
00002     Copyright 2005-2009 Intel Corporation.  All Rights Reserved.
00003 
00004     The source code contained or described herein and all documents related
00005     to the source code ("Material") are owned by Intel Corporation or its
00006     suppliers or licensors.  Title to the Material remains with Intel
00007     Corporation or its suppliers and licensors.  The Material is protected
00008     by worldwide copyright laws and treaty provisions.  No part of the
00009     Material may be used, copied, reproduced, modified, published, uploaded,
00010     posted, transmitted, distributed, or disclosed in any way without
00011     Intel's prior express written permission.
00012 
00013     No license under any patent, copyright, trade secret or other
00014     intellectual property right is granted to or conferred upon you by
00015     disclosure or delivery of the Materials, either expressly, by
00016     implication, inducement, estoppel or otherwise.  Any license under such
00017     intellectual property rights must be express and approved by Intel in
00018     writing.
00019 */
00020 
00021 #ifndef __TBB_parallel_do_H
00022 #define __TBB_parallel_do_H
00023 
00024 #include "task.h"
00025 #include "aligned_space.h"
00026 #include <iterator>
00027 
00028 namespace tbb {
00029 
00031 namespace internal {
00032     template<typename Body, typename Item> class parallel_do_feeder_impl;
00033     template<typename Body> class do_group_task;
00034 
00036     template<typename T>
00037     struct strip { typedef T type; };
00038     template<typename T>
00039     struct strip<T&> { typedef T type; };
00040     template<typename T>
00041     struct strip<const T&> { typedef T type; };
00042     template<typename T>
00043     struct strip<volatile T&> { typedef T type; };
00044     template<typename T>
00045     struct strip<const volatile T&> { typedef T type; };
00046     // Most of the compilers remove cv-qualifiers from non-reference function argument types. 
00047     // But unfortunately there are those that don't.
00048     template<typename T>
00049     struct strip<const T> { typedef T type; };
00050     template<typename T>
00051     struct strip<volatile T> { typedef T type; };
00052     template<typename T>
00053     struct strip<const volatile T> { typedef T type; };
00054 } // namespace internal
00056 
00058 
00059 template<typename Item>
00060 class parallel_do_feeder: internal::no_copy
00061 {
00062     parallel_do_feeder() {}
00063     virtual ~parallel_do_feeder () {}
00064     virtual void internal_add( const Item& item ) = 0;
00065     template<typename Body_, typename Item_> friend class internal::parallel_do_feeder_impl;
00066 public:
00068     void add( const Item& item ) {internal_add(item);}
00069 };
00070 
00072 namespace internal {
00074 
00076     template<class Body, typename Item>
00077     class parallel_do_operator_selector
00078     {
00079         typedef parallel_do_feeder<Item> Feeder;
00080         template<typename A1, typename A2, typename CvItem >
00081         static void internal_call( const Body& obj, A1& arg1, A2&, void (Body::*)(CvItem) const ) {
00082             obj(arg1);
00083         }
00084         template<typename A1, typename A2, typename CvItem >
00085         static void internal_call( const Body& obj, A1& arg1, A2& arg2, void (Body::*)(CvItem, parallel_do_feeder<Item>&) const ) {
00086             obj(arg1, arg2);
00087         }
00088 
00089     public:
00090         template<typename A1, typename A2 >
00091         static void call( const Body& obj, A1& arg1, A2& arg2 )
00092         {
00093             internal_call( obj, arg1, arg2, &Body::operator() );
00094         }
00095     };
00096 
00098 
00100     template<typename Body, typename Item>
00101     class do_iteration_task: public task
00102     {
00103         typedef parallel_do_feeder_impl<Body, Item> feeder_type;
00104 
00105         Item my_value;
00106         feeder_type& my_feeder;
00107 
00108         do_iteration_task( const Item& value, feeder_type& feeder ) : 
00109             my_value(value), my_feeder(feeder)
00110         {}
00111 
00112         /*override*/ 
00113         task* execute()
00114         {
00115             parallel_do_operator_selector<Body, Item>::call(*my_feeder.my_body, my_value, my_feeder);
00116             return NULL;
00117         }
00118 
00119         template<typename Body_, typename Item_> friend class parallel_do_feeder_impl;
00120     }; // class do_iteration_task
00121 
00122     template<typename Iterator, typename Body, typename Item>
00123     class do_iteration_task_iter: public task
00124     {
00125         typedef parallel_do_feeder_impl<Body, Item> feeder_type;
00126 
00127         Iterator my_iter;
00128         feeder_type& my_feeder;
00129 
00130         do_iteration_task_iter( const Iterator& iter, feeder_type& feeder ) : 
00131             my_iter(iter), my_feeder(feeder)
00132         {}
00133 
00134         /*override*/ 
00135         task* execute()
00136         {
00137             parallel_do_operator_selector<Body, Item>::call(*my_feeder.my_body, *my_iter, my_feeder);
00138             return NULL;
00139         }
00140 
00141         template<typename Iterator_, typename Body_, typename Item_> friend class do_group_task_forward;    
00142         template<typename Body_, typename Item_> friend class do_group_task_input;    
00143         template<typename Iterator_, typename Body_, typename Item_> friend class do_task_iter;    
00144     }; // class do_iteration_task_iter
00145 
00147 
00149     template<class Body, typename Item>
00150     class parallel_do_feeder_impl : public parallel_do_feeder<Item>
00151     {
00152         /*override*/ 
00153         void internal_add( const Item& item )
00154         {
00155             typedef do_iteration_task<Body, Item> iteration_type;
00156 
00157             iteration_type& t = *new (task::self().allocate_additional_child_of(*my_barrier)) iteration_type(item, *this);
00158 
00159             t.spawn( t );
00160         }
00161     public:
00162         const Body* my_body;
00163         empty_task* my_barrier;
00164 
00165         parallel_do_feeder_impl()
00166         {
00167             my_barrier = new( task::allocate_root() ) empty_task();
00168             __TBB_ASSERT(my_barrier, "root task allocation failed");
00169         }
00170 
00171 #if __TBB_EXCEPTIONS
00172         parallel_do_feeder_impl(tbb::task_group_context &context)
00173         {
00174             my_barrier = new( task::allocate_root(context) ) empty_task();
00175             __TBB_ASSERT(my_barrier, "root task allocation failed");
00176         }
00177 #endif
00178 
00179         ~parallel_do_feeder_impl()
00180         {
00181             my_barrier->destroy(*my_barrier);
00182         }
00183     }; // class parallel_do_feeder_impl
00184 
00185 
00187 
00190     template<typename Iterator, typename Body, typename Item>
00191     class do_group_task_forward: public task
00192     {
00193         static const size_t max_arg_size = 4;         
00194 
00195         typedef parallel_do_feeder_impl<Body, Item> feeder_type;
00196 
00197         feeder_type& my_feeder;
00198         Iterator my_first;
00199         size_t my_size;
00200         
00201         do_group_task_forward( Iterator first, size_t size, feeder_type& feeder ) 
00202             : my_feeder(feeder), my_first(first), my_size(size)
00203         {}
00204 
00205         /*override*/ task* execute()
00206         {
00207             typedef do_iteration_task_iter<Iterator, Body, Item> iteration_type;
00208             __TBB_ASSERT( my_size>0, NULL );
00209             task_list list;
00210             task* t; 
00211             size_t k=0; 
00212             for(;;) {
00213                 t = new( allocate_child() ) iteration_type( my_first, my_feeder );
00214                 ++my_first;
00215                 if( ++k==my_size ) break;
00216                 list.push_back(*t);
00217             }
00218             set_ref_count(int(k+1));
00219             spawn(list);
00220             spawn_and_wait_for_all(*t);
00221             return NULL;
00222         }
00223 
00224         template<typename Iterator_, typename Body_, typename _Item> friend class do_task_iter;
00225     }; // class do_group_task_forward
00226 
00227     template<typename Body, typename Item>
00228     class do_group_task_input: public task
00229     {
00230         static const size_t max_arg_size = 4;         
00231         
00232         typedef parallel_do_feeder_impl<Body, Item> feeder_type;
00233 
00234         feeder_type& my_feeder;
00235         size_t my_size;
00236         aligned_space<Item, max_arg_size> my_arg;
00237 
00238         do_group_task_input( feeder_type& feeder ) 
00239             : my_feeder(feeder), my_size(0)
00240         {}
00241 
00242         /*override*/ task* execute()
00243         {
00244             typedef do_iteration_task_iter<Item*, Body, Item> iteration_type;
00245             __TBB_ASSERT( my_size>0, NULL );
00246             task_list list;
00247             task* t; 
00248             size_t k=0; 
00249             for(;;) {
00250                 t = new( allocate_child() ) iteration_type( my_arg.begin() + k, my_feeder );
00251                 if( ++k==my_size ) break;
00252                 list.push_back(*t);
00253             }
00254             set_ref_count(int(k+1));
00255             spawn(list);
00256             spawn_and_wait_for_all(*t);
00257             return NULL;
00258         }
00259 
00260         ~do_group_task_input(){
00261             for( size_t k=0; k<my_size; ++k)
00262                 (my_arg.begin() + k)->~Item();
00263         }
00264 
00265         template<typename Iterator_, typename Body_, typename Item_> friend class do_task_iter;
00266     }; // class do_group_task_input
00267     
00269 
00271     template<typename Iterator, typename Body, typename Item>
00272     class do_task_iter: public task
00273     {
00274         typedef parallel_do_feeder_impl<Body, Item> feeder_type;
00275 
00276     public:
00277         do_task_iter( Iterator first, Iterator last , feeder_type& feeder ) : 
00278             my_first(first), my_last(last), my_feeder(feeder)
00279         {}
00280 
00281     private:
00282         Iterator my_first;
00283         Iterator my_last;
00284         feeder_type& my_feeder;
00285 
00286         /* Do not merge run(xxx) and run_xxx() methods. They are separated in order
00287             to make sure that compilers will eliminate unused argument of type xxx
00288             (that is will not put it on stack). The sole purpose of this argument 
00289             is overload resolution.
00290             
00291             An alternative could be using template functions, but explicit specialization 
00292             of member function templates is not supported for non specialized class 
00293             templates. Besides template functions would always fall back to the least 
00294             efficient variant (the one for input iterators) in case of iterators having 
00295             custom tags derived from basic ones. */
00296         /*override*/ task* execute()
00297         {
00298             typedef typename std::iterator_traits<Iterator>::iterator_category iterator_tag;
00299             return run( (iterator_tag*)NULL );
00300         }
00301 
00304         inline task* run( void* ) { return run_for_input_iterator(); }
00305         
00306         task* run_for_input_iterator() {
00307             typedef do_group_task_input<Body, Item> block_type;
00308 
00309             block_type& t = *new( allocate_additional_child_of(*my_feeder.my_barrier) ) block_type(my_feeder);
00310             size_t k=0; 
00311             while( !(my_first == my_last) ) {
00312                 new (t.my_arg.begin() + k) Item(*my_first);
00313                 ++my_first;
00314                 if( ++k==block_type::max_arg_size ) {
00315                     if ( !(my_first == my_last) )
00316                         recycle_to_reexecute();
00317                     break;
00318                 }
00319             }
00320             if( k==0 ) {
00321                 destroy(t);
00322                 return NULL;
00323             } else {
00324                 t.my_size = k;
00325                 return &t;
00326             }
00327         }
00328 
00329         inline task* run( std::forward_iterator_tag* ) { return run_for_forward_iterator(); }
00330 
00331         task* run_for_forward_iterator() {
00332             typedef do_group_task_forward<Iterator, Body, Item> block_type;
00333 
00334             Iterator first = my_first;
00335             size_t k=0; 
00336             while( !(my_first==my_last) ) {
00337                 ++my_first;
00338                 if( ++k==block_type::max_arg_size ) {
00339                     if ( !(my_first==my_last) )
00340                         recycle_to_reexecute();
00341                     break;
00342                 }
00343             }
00344             return k==0 ? NULL : new( allocate_additional_child_of(*my_feeder.my_barrier) ) block_type(first, k, my_feeder);
00345         }
00346         
00347         inline task* run( std::random_access_iterator_tag* ) { return run_for_random_access_iterator(); }
00348 
00349         task* run_for_random_access_iterator() {
00350             typedef do_group_task_forward<Iterator, Body, Item> block_type;
00351             typedef do_iteration_task_iter<Iterator, Body, Item> iteration_type;
00352             
00353             size_t k = static_cast<size_t>(my_last-my_first); 
00354             if( k > block_type::max_arg_size ) {
00355                 Iterator middle = my_first + k/2;
00356 
00357                 empty_task& c = *new( allocate_continuation() ) empty_task;
00358                 do_task_iter& b = *new( c.allocate_child() ) do_task_iter(middle, my_last, my_feeder);
00359                 recycle_as_child_of(c);
00360 
00361                 my_last = middle;
00362                 c.set_ref_count(2);
00363                 c.spawn(b);
00364                 return this;
00365             }else if( k != 0 ) {
00366                 task_list list;
00367                 task* t; 
00368                 size_t k1=0; 
00369                 for(;;) {
00370                     t = new( allocate_child() ) iteration_type(my_first, my_feeder);
00371                     ++my_first;
00372                     if( ++k1==k ) break;
00373                     list.push_back(*t);
00374                 }
00375                 set_ref_count(int(k+1));
00376                 spawn(list);
00377                 spawn_and_wait_for_all(*t);
00378             }
00379             return NULL;
00380         }
00381     }; // class do_task_iter
00382 
00384 
00386     template<typename Iterator, typename Body, typename Item> 
00387     void run_parallel_do( Iterator first, Iterator last, const Body& body
00388 #if __TBB_EXCEPTIONS
00389         , task_group_context& context
00390 #endif
00391         )
00392     {
00393         typedef do_task_iter<Iterator, Body, Item> root_iteration_task;
00394 #if __TBB_EXCEPTIONS
00395         parallel_do_feeder_impl<Body, Item> feeder(context);
00396 #else
00397         parallel_do_feeder_impl<Body, Item> feeder;
00398 #endif
00399         feeder.my_body = &body;
00400 
00401         root_iteration_task &t = *new( feeder.my_barrier->allocate_child() ) root_iteration_task(first, last, feeder);
00402 
00403         feeder.my_barrier->set_ref_count(2);
00404         feeder.my_barrier->spawn_and_wait_for_all(t);
00405     }
00406 
00408 
00410     template<typename Iterator, typename Body, typename Item> 
00411     void select_parallel_do( Iterator first, Iterator last, const Body& body, void (Body::*)(Item) const
00412 #if __TBB_EXCEPTIONS
00413         , task_group_context& context 
00414 #endif // __TBB_EXCEPTIONS 
00415         )
00416     {
00417         run_parallel_do<Iterator, Body, typename strip<Item>::type>( first, last, body
00418 #if __TBB_EXCEPTIONS
00419             , context
00420 #endif // __TBB_EXCEPTIONS 
00421             );
00422     }
00423 
00425 
00427     template<typename Iterator, typename Body, typename Item, typename _Item> 
00428     void select_parallel_do( Iterator first, Iterator last, const Body& body, void (Body::*)(Item, parallel_do_feeder<_Item>&) const
00429 #if __TBB_EXCEPTIONS
00430         , task_group_context& context 
00431 #endif // __TBB_EXCEPTIONS
00432         )
00433     {
00434         run_parallel_do<Iterator, Body, typename strip<Item>::type>( first, last, body
00435 #if __TBB_EXCEPTIONS
00436             , context
00437 #endif // __TBB_EXCEPTIONS
00438             );
00439     }
00440 
00441 } // namespace internal
00443 
00444 
00467 
00468 
00469 template<typename Iterator, typename Body> 
00470 void parallel_do( Iterator first, Iterator last, const Body& body )
00471 {
00472     if ( first == last )
00473         return;
00474 #if __TBB_EXCEPTIONS
00475     task_group_context context;
00476 #endif // __TBB_EXCEPTIONS
00477     internal::select_parallel_do( first, last, body, &Body::operator()
00478 #if __TBB_EXCEPTIONS
00479         , context
00480 #endif // __TBB_EXCEPTIONS
00481         );
00482 }
00483 
00484 #if __TBB_EXCEPTIONS
00486 
00487 template<typename Iterator, typename Body> 
00488 void parallel_do( Iterator first, Iterator last, const Body& body, task_group_context& context  )
00489 {
00490     if ( first == last )
00491         return;
00492     internal::select_parallel_do( first, last, body, &Body::operator(), context );
00493 }
00494 #endif // __TBB_EXCEPTIONS
00495 
00497 
00498 } // namespace 
00499 
00500 #endif /* __TBB_parallel_do_H */

Copyright © 2005-2009 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.