00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #ifndef __TBB_parallel_while
00022 #define __TBB_parallel_while
00023
00024 #include "task.h"
00025 #include <new>
00026
00027 namespace tbb {
00028
00029 template<typename Body>
00030 class parallel_while;
00031
00033 namespace internal {
00034
00035 template<typename Stream, typename Body> class while_task;
00036
00038
00040 template<typename Body>
00041 class while_iteration_task: public task {
00042 const Body& my_body;
00043 typename Body::argument_type my_value;
00044 task* execute() {
00045 my_body(my_value);
00046 return NULL;
00047 }
00048 while_iteration_task( const typename Body::argument_type& value, const Body& body ) :
00049 my_body(body), my_value(value)
00050 {}
00051 template<typename Body_> friend class while_group_task;
00052 friend class tbb::parallel_while<Body>;
00053 };
00054
00056
00058 template<typename Body>
00059 class while_group_task: public task {
00060 static const size_t max_arg_size = 4;
00061 const Body& my_body;
00062 size_t size;
00063 typename Body::argument_type my_arg[max_arg_size];
00064 while_group_task( const Body& body ) : my_body(body), size(0) {}
00065 task* execute() {
00066 typedef while_iteration_task<Body> iteration_type;
00067 __TBB_ASSERT( size>0, NULL );
00068 task_list list;
00069 task* t;
00070 size_t k=0;
00071 for(;;) {
00072 t = new( allocate_child() ) iteration_type(my_arg[k],my_body);
00073 if( ++k==size ) break;
00074 list.push_back(*t);
00075 }
00076 set_ref_count(int(k+1));
00077 spawn(list);
00078 spawn_and_wait_for_all(*t);
00079 return NULL;
00080 }
00081 template<typename Stream, typename Body_> friend class while_task;
00082 };
00083
00085
00087 template<typename Stream, typename Body>
00088 class while_task: public task {
00089 Stream& my_stream;
00090 const Body& my_body;
00091 empty_task& my_barrier;
00092 task* execute() {
00093 typedef while_group_task<Body> block_type;
00094 block_type& t = *new( allocate_additional_child_of(my_barrier) ) block_type(my_body);
00095 size_t k=0;
00096 while( my_stream.pop_if_present(t.my_arg[k]) ) {
00097 if( ++k==block_type::max_arg_size ) {
00098
00099 recycle_to_reexecute();
00100 break;
00101 }
00102 }
00103 if( k==0 ) {
00104 destroy(t);
00105 return NULL;
00106 } else {
00107 t.size = k;
00108 return &t;
00109 }
00110 }
00111 while_task( Stream& stream, const Body& body, empty_task& barrier ) :
00112 my_stream(stream),
00113 my_body(body),
00114 my_barrier(barrier)
00115 {}
00116 friend class tbb::parallel_while<Body>;
00117 };
00118
00119 }
00121
00123
00128 template<typename Body>
00129 class parallel_while: internal::no_copy {
00130 public:
00132 parallel_while() : my_body(NULL), my_barrier(NULL) {}
00133
00135 ~parallel_while() {
00136 if( my_barrier ) {
00137 my_barrier->destroy(*my_barrier);
00138 my_barrier = NULL;
00139 }
00140 }
00141
00143 typedef typename Body::argument_type value_type;
00144
00146
00149 template<typename Stream>
00150 void run( Stream& stream, const Body& body );
00151
00153
00154 void add( const value_type& item );
00155
00156 private:
00157 const Body* my_body;
00158 empty_task* my_barrier;
00159 };
00160
00161 template<typename Body>
00162 template<typename Stream>
00163 void parallel_while<Body>::run( Stream& stream, const Body& body ) {
00164 using namespace internal;
00165 empty_task& barrier = *new( task::allocate_root() ) empty_task();
00166 my_body = &body;
00167 my_barrier = &barrier;
00168 my_barrier->set_ref_count(2);
00169 while_task<Stream,Body>& w = *new( my_barrier->allocate_child() ) while_task<Stream,Body>( stream, body, barrier );
00170 my_barrier->spawn_and_wait_for_all(w);
00171 my_barrier->destroy(*my_barrier);
00172 my_barrier = NULL;
00173 my_body = NULL;
00174 }
00175
00176 template<typename Body>
00177 void parallel_while<Body>::add( const value_type& item ) {
00178 __TBB_ASSERT(my_barrier,"attempt to add to parallel_while that is not running");
00179 typedef internal::while_iteration_task<Body> iteration_type;
00180 iteration_type& i = *new( task::self().allocate_additional_child_of(*my_barrier) ) iteration_type(item,*my_body);
00181 task::self().spawn( i );
00182 }
00183
00184 }
00185
00186 #endif