parallel_while.h

00001 /*
00002     Copyright 2005-2009 Intel Corporation.  All Rights Reserved.
00003 
00004     The source code contained or described herein and all documents related
00005     to the source code ("Material") are owned by Intel Corporation or its
00006     suppliers or licensors.  Title to the Material remains with Intel
00007     Corporation or its suppliers and licensors.  The Material is protected
00008     by worldwide copyright laws and treaty provisions.  No part of the
00009     Material may be used, copied, reproduced, modified, published, uploaded,
00010     posted, transmitted, distributed, or disclosed in any way without
00011     Intel's prior express written permission.
00012 
00013     No license under any patent, copyright, trade secret or other
00014     intellectual property right is granted to or conferred upon you by
00015     disclosure or delivery of the Materials, either expressly, by
00016     implication, inducement, estoppel or otherwise.  Any license under such
00017     intellectual property rights must be express and approved by Intel in
00018     writing.
00019 */
00020 
00021 #ifndef __TBB_parallel_while
00022 #define __TBB_parallel_while
00023 
00024 #include "task.h"
00025 #include <new>
00026 
00027 namespace tbb {
00028 
00029 template<typename Body>
00030 class parallel_while;
00031 
00033 namespace internal {
00034 
00035     template<typename Stream, typename Body> class while_task;
00036 
00038 
00040     template<typename Body>
00041     class while_iteration_task: public task {
00042         const Body& my_body;
00043         typename Body::argument_type my_value;
00044         /*override*/ task* execute() {
00045             my_body(my_value); 
00046             return NULL;
00047         }
00048         while_iteration_task( const typename Body::argument_type& value, const Body& body ) : 
00049             my_body(body), my_value(value)
00050         {}
00051         template<typename Body_> friend class while_group_task;
00052         friend class tbb::parallel_while<Body>;
00053     };
00054 
00056 
00058     template<typename Body>
00059     class while_group_task: public task {
00060         static const size_t max_arg_size = 4;         
00061         const Body& my_body;
00062         size_t size;
00063         typename Body::argument_type my_arg[max_arg_size];
00064         while_group_task( const Body& body ) : my_body(body), size(0) {} 
00065         /*override*/ task* execute() {
00066             typedef while_iteration_task<Body> iteration_type;
00067             __TBB_ASSERT( size>0, NULL );
00068             task_list list;
00069             task* t; 
00070             size_t k=0; 
00071             for(;;) {
00072                 t = new( allocate_child() ) iteration_type(my_arg[k],my_body); 
00073                 if( ++k==size ) break;
00074                 list.push_back(*t);
00075             }
00076             set_ref_count(int(k+1));
00077             spawn(list);
00078             spawn_and_wait_for_all(*t);
00079             return NULL;
00080         }
00081         template<typename Stream, typename Body_> friend class while_task;
00082     };
00083     
00085 
00087     template<typename Stream, typename Body>
00088     class while_task: public task {
00089         Stream& my_stream;
00090         const Body& my_body;
00091         empty_task& my_barrier;
00092         /*override*/ task* execute() {
00093             typedef while_group_task<Body> block_type;
00094             block_type& t = *new( allocate_additional_child_of(my_barrier) ) block_type(my_body);
00095             size_t k=0; 
00096             while( my_stream.pop_if_present(t.my_arg[k]) ) {
00097                 if( ++k==block_type::max_arg_size ) {
00098                     // There might be more iterations.
00099                     recycle_to_reexecute();
00100                     break;
00101                 }
00102             }
00103             if( k==0 ) {
00104                 destroy(t);
00105                 return NULL;
00106             } else {
00107                 t.size = k;
00108                 return &t;
00109             }
00110         }
00111         while_task( Stream& stream, const Body& body, empty_task& barrier ) : 
00112             my_stream(stream),
00113             my_body(body),
00114             my_barrier(barrier)
00115         {} 
00116         friend class tbb::parallel_while<Body>;
00117     };
00118 
00119 } // namespace internal
00121 
00123 
00128 template<typename Body>
00129 class parallel_while: internal::no_copy {
00130 public:
00132     parallel_while() : my_body(NULL), my_barrier(NULL) {}
00133 
00135     ~parallel_while() {
00136         if( my_barrier ) {
00137             my_barrier->destroy(*my_barrier);    
00138             my_barrier = NULL;
00139         }
00140     }
00141 
00143     typedef typename Body::argument_type value_type;
00144 
00146 
00149     template<typename Stream>
00150     void run( Stream& stream, const Body& body );
00151 
00153 
00154     void add( const value_type& item );
00155 
00156 private:
00157     const Body* my_body;
00158     empty_task* my_barrier;
00159 };
00160 
00161 template<typename Body>
00162 template<typename Stream>
00163 void parallel_while<Body>::run( Stream& stream, const Body& body ) {
00164     using namespace internal;
00165     empty_task& barrier = *new( task::allocate_root() ) empty_task();
00166     my_body = &body;
00167     my_barrier = &barrier;
00168     my_barrier->set_ref_count(2);
00169     while_task<Stream,Body>& w = *new( my_barrier->allocate_child() ) while_task<Stream,Body>( stream, body, barrier );
00170     my_barrier->spawn_and_wait_for_all(w);
00171     my_barrier->destroy(*my_barrier);
00172     my_barrier = NULL;
00173     my_body = NULL;
00174 }
00175 
00176 template<typename Body>
00177 void parallel_while<Body>::add( const value_type& item ) {
00178     __TBB_ASSERT(my_barrier,"attempt to add to parallel_while that is not running");
00179     typedef internal::while_iteration_task<Body> iteration_type;
00180     iteration_type& i = *new( task::self().allocate_additional_child_of(*my_barrier) ) iteration_type(item,*my_body);
00181     task::self().spawn( i );
00182 }
00183 
00184 } // namespace 
00185 
00186 #endif /* __TBB_parallel_while */

Copyright © 2005-2009 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.