parallel_reduce.h

00001 /*
00002     Copyright 2005-2009 Intel Corporation.  All Rights Reserved.
00003 
00004     The source code contained or described herein and all documents related
00005     to the source code ("Material") are owned by Intel Corporation or its
00006     suppliers or licensors.  Title to the Material remains with Intel
00007     Corporation or its suppliers and licensors.  The Material is protected
00008     by worldwide copyright laws and treaty provisions.  No part of the
00009     Material may be used, copied, reproduced, modified, published, uploaded,
00010     posted, transmitted, distributed, or disclosed in any way without
00011     Intel's prior express written permission.
00012 
00013     No license under any patent, copyright, trade secret or other
00014     intellectual property right is granted to or conferred upon you by
00015     disclosure or delivery of the Materials, either expressly, by
00016     implication, inducement, estoppel or otherwise.  Any license under such
00017     intellectual property rights must be express and approved by Intel in
00018     writing.
00019 */
00020 
00021 #ifndef __TBB_parallel_reduce_H
00022 #define __TBB_parallel_reduce_H
00023 
00024 #include "task.h"
00025 #include "aligned_space.h"
00026 #include "partitioner.h"
00027 #include <new>
00028 
00029 namespace tbb {
00030 
00032 namespace internal {
00033 
00035     void __TBB_EXPORTED_FUNC itt_store_pointer_with_release_v3( void* dst, void* src );
00036 
00038     void* __TBB_EXPORTED_FUNC itt_load_pointer_with_acquire_v3( const void* src );
00039 
00040     template<typename T> inline void parallel_reduce_store_body( T*& dst, T* src ) {
00041 #if TBB_USE_THREADING_TOOLS
00042         itt_store_pointer_with_release_v3(&dst,src);
00043 #else
00044         __TBB_store_with_release(dst,src);
00045 #endif /* TBB_USE_THREADING_TOOLS */
00046     }
00047 
00048     template<typename T> inline T* parallel_reduce_load_body( T*& src ) {
00049 #if TBB_USE_THREADING_TOOLS
00050         return static_cast<T*>(itt_load_pointer_with_acquire_v3(&src));
00051 #else
00052         return __TBB_load_with_acquire(src);
00053 #endif /* TBB_USE_THREADING_TOOLS */
00054     }
00055 
00057 
00058     typedef char reduction_context;
00059 
00061 
00062     template<typename Body>
00063     class finish_reduce: public task {
00065         Body* my_body;
00066         bool has_right_zombie;
00067         const reduction_context my_context;
00068         aligned_space<Body,1> zombie_space;
00069         finish_reduce( char context ) : 
00070             my_body(NULL),
00071             has_right_zombie(false),
00072             my_context(context)
00073         {
00074         }
00075         task* execute() {
00076             if( has_right_zombie ) {
00077                 // Right child was stolen.
00078                 Body* s = zombie_space.begin();
00079                 my_body->join( *s );
00080                 s->~Body();
00081             }
00082             if( my_context==1 ) 
00083                 parallel_reduce_store_body( static_cast<finish_reduce*>(parent())->my_body, my_body );
00084             return NULL;
00085         }       
00086         template<typename Range,typename Body_, typename Partitioner>
00087         friend class start_reduce;
00088     };
00089 
00091 
00092     template<typename Range, typename Body, typename Partitioner>
00093     class start_reduce: public task {
00094         typedef finish_reduce<Body> finish_type;
00095         Body* my_body;
00096         Range my_range;
00097         typename Partitioner::partition_type my_partition;
00098         reduction_context my_context;
00099         /*override*/ task* execute();
00100         template<typename Body_>
00101         friend class finish_reduce;
00102     
00104         start_reduce( const Range& range, Body* body, Partitioner& partitioner ) :
00105             my_body(body),
00106             my_range(range),
00107             my_partition(partitioner),
00108             my_context(0)
00109         {
00110         }
00112 
00113         start_reduce( start_reduce& parent, split ) :
00114             my_body(parent.my_body),
00115             my_range(parent.my_range,split()),
00116             my_partition(parent.my_partition,split()),
00117             my_context(2)
00118         {
00119             my_partition.set_affinity(*this);
00120             parent.my_context = 1;
00121         }
00123         /*override*/ void note_affinity( affinity_id id ) {
00124             my_partition.note_affinity( id );
00125         }
00126 
00127 public:
00128         static void run( const Range& range, Body& body, Partitioner& partitioner ) {
00129             if( !range.empty() ) {
00130 #if !__TBB_EXCEPTIONS || TBB_JOIN_OUTER_TASK_GROUP
00131                 task::spawn_root_and_wait( *new(task::allocate_root()) start_reduce(range,&body,partitioner) );
00132 #else
00133                 // Bound context prevents exceptions from body to affect nesting or sibling algorithms,
00134                 // and allows users to handle exceptions safely by wrapping parallel_for in the try-block.
00135                 task_group_context context;
00136                 task::spawn_root_and_wait( *new(task::allocate_root(context)) start_reduce(range,&body,partitioner) );
00137 #endif /* __TBB_EXCEPTIONS && !TBB_JOIN_OUTER_TASK_GROUP */
00138             }
00139         }
00140 #if __TBB_EXCEPTIONS
00141         static void run( const Range& range, Body& body, Partitioner& partitioner, task_group_context& context ) {
00142             if( !range.empty() ) 
00143                 task::spawn_root_and_wait( *new(task::allocate_root(context)) start_reduce(range,&body,partitioner) );
00144         }
00145 #endif /* __TBB_EXCEPTIONS */
00146     };
00147 
00148     template<typename Range, typename Body, typename Partitioner>
00149     task* start_reduce<Range,Body,Partitioner>::execute() {
00150         if( my_context==2 ) {
00151             finish_type* p = static_cast<finish_type*>(parent() );
00152             if( !parallel_reduce_load_body(p->my_body) ) {
00153                 my_body = new( p->zombie_space.begin() ) Body(*my_body,split());
00154                 p->has_right_zombie = true;
00155             } 
00156         }
00157         if( !my_range.is_divisible() || my_partition.should_execute_range(*this) ) {
00158             (*my_body)( my_range );
00159             if( my_context==1 ) 
00160                 parallel_reduce_store_body(static_cast<finish_type*>(parent())->my_body, my_body );
00161             return my_partition.continue_after_execute_range(*this);
00162         } else {
00163             finish_type& c = *new( allocate_continuation()) finish_type(my_context);
00164             recycle_as_child_of(c);
00165             c.set_ref_count(2);    
00166             bool delay = my_partition.decide_whether_to_delay();
00167             start_reduce& b = *new( c.allocate_child() ) start_reduce(*this,split());
00168             my_partition.spawn_or_delay(delay,*this,b);
00169             return this;
00170         }
00171     } 
00172 
00174 
00178     template<typename Range, typename Value, typename RealBody, typename Reduction>
00179     class lambda_reduce_body {
00180 
00181 //FIXME: decide if my_real_body, my_reduction, and identity_element should be copied or referenced
00182 //       (might require some performance measurements)
00183 
00184         const Value&     identity_element;
00185         const RealBody&  my_real_body;
00186         const Reduction& my_reduction;
00187         Value            my_value;
00188         lambda_reduce_body& operator= ( const lambda_reduce_body& other );
00189     public:
00190         lambda_reduce_body( const Value& identity, const RealBody& body, const Reduction& reduction )
00191             : identity_element(identity)
00192             , my_real_body(body)
00193             , my_reduction(reduction)
00194             , my_value(identity)
00195         { }
00196         lambda_reduce_body( const lambda_reduce_body& other )
00197             : identity_element(other.identity_element)
00198             , my_real_body(other.my_real_body)
00199             , my_reduction(other.my_reduction)
00200             , my_value(other.my_value)
00201         { }
00202         lambda_reduce_body( lambda_reduce_body& other, tbb::split )
00203             : identity_element(other.identity_element)
00204             , my_real_body(other.my_real_body)
00205             , my_reduction(other.my_reduction)
00206             , my_value(other.identity_element)
00207         { }
00208         void operator()(Range& range) {
00209             my_value = my_real_body(range, const_cast<const Value&>(my_value));
00210         }
00211         void join( lambda_reduce_body& rhs ) {
00212             my_value = my_reduction(const_cast<const Value&>(my_value), const_cast<const Value&>(rhs.my_value));
00213         }
00214         Value result() const {
00215             return my_value;
00216         }
00217     };
00218 
00219 } // namespace internal
00221 
00222 // Requirements on Range concept are documented in blocked_range.h
00223 
00242 
00244 
00245 template<typename Range, typename Body>
00246 void parallel_reduce( const Range& range, Body& body ) {
00247     internal::start_reduce<Range,Body, const __TBB_DEFAULT_PARTITIONER>::run( range, body, __TBB_DEFAULT_PARTITIONER() );
00248 }
00249 
00251 
00252 template<typename Range, typename Body>
00253 void parallel_reduce( const Range& range, Body& body, const simple_partitioner& partitioner ) {
00254     internal::start_reduce<Range,Body,const simple_partitioner>::run( range, body, partitioner );
00255 }
00256 
00258 
00259 template<typename Range, typename Body>
00260 void parallel_reduce( const Range& range, Body& body, const auto_partitioner& partitioner ) {
00261     internal::start_reduce<Range,Body,const auto_partitioner>::run( range, body, partitioner );
00262 }
00263 
00265 
00266 template<typename Range, typename Body>
00267 void parallel_reduce( const Range& range, Body& body, affinity_partitioner& partitioner ) {
00268     internal::start_reduce<Range,Body,affinity_partitioner>::run( range, body, partitioner );
00269 }
00270 
00271 #if __TBB_EXCEPTIONS
00273 
00274 template<typename Range, typename Body>
00275 void parallel_reduce( const Range& range, Body& body, const simple_partitioner& partitioner, task_group_context& context ) {
00276     internal::start_reduce<Range,Body,const simple_partitioner>::run( range, body, partitioner, context );
00277 }
00278 
00280 
00281 template<typename Range, typename Body>
00282 void parallel_reduce( const Range& range, Body& body, const auto_partitioner& partitioner, task_group_context& context ) {
00283     internal::start_reduce<Range,Body,const auto_partitioner>::run( range, body, partitioner, context );
00284 }
00285 
00287 
00288 template<typename Range, typename Body>
00289 void parallel_reduce( const Range& range, Body& body, affinity_partitioner& partitioner, task_group_context& context ) {
00290     internal::start_reduce<Range,Body,affinity_partitioner>::run( range, body, partitioner, context );
00291 }
00292 #endif /* __TBB_EXCEPTIONS */
00293 
00297 
00298 
00299 template<typename Range, typename Value, typename RealBody, typename Reduction>
00300 Value parallel_reduce( const Range& range, const Value& identity, const RealBody& real_body, const Reduction& reduction ) {
00301     internal::lambda_reduce_body<Range,Value,RealBody,Reduction> body(identity, real_body, reduction);
00302     internal::start_reduce<Range,internal::lambda_reduce_body<Range,Value,RealBody,Reduction>,const __TBB_DEFAULT_PARTITIONER>
00303                           ::run(range, body, __TBB_DEFAULT_PARTITIONER() );
00304     return body.result();
00305 }
00306 
00308 
00309 template<typename Range, typename Value, typename RealBody, typename Reduction>
00310 Value parallel_reduce( const Range& range, const Value& identity, const RealBody& real_body, const Reduction& reduction,
00311                        const simple_partitioner& partitioner ) {
00312     internal::lambda_reduce_body<Range,Value,RealBody,Reduction> body(identity, real_body, reduction);
00313     internal::start_reduce<Range,internal::lambda_reduce_body<Range,Value,RealBody,Reduction>,const simple_partitioner>
00314                           ::run(range, body, partitioner );
00315     return body.result();
00316 }
00317 
00319 
00320 template<typename Range, typename Value, typename RealBody, typename Reduction>
00321 Value parallel_reduce( const Range& range, const Value& identity, const RealBody& real_body, const Reduction& reduction,
00322                        const auto_partitioner& partitioner ) {
00323     internal::lambda_reduce_body<Range,Value,RealBody,Reduction> body(identity, real_body, reduction);
00324     internal::start_reduce<Range,internal::lambda_reduce_body<Range,Value,RealBody,Reduction>,const auto_partitioner>
00325                           ::run( range, body, partitioner );
00326     return body.result();
00327 }
00328 
00330 
00331 template<typename Range, typename Value, typename RealBody, typename Reduction>
00332 Value parallel_reduce( const Range& range, const Value& identity, const RealBody& real_body, const Reduction& reduction,
00333                        affinity_partitioner& partitioner ) {
00334     internal::lambda_reduce_body<Range,Value,RealBody,Reduction> body(identity, real_body, reduction);
00335     internal::start_reduce<Range,internal::lambda_reduce_body<Range,Value,RealBody,Reduction>,affinity_partitioner>
00336                                         ::run( range, body, partitioner );
00337     return body.result();
00338 }
00339 
00340 #if __TBB_EXCEPTIONS
00342 
00343 template<typename Range, typename Value, typename RealBody, typename Reduction>
00344 Value parallel_reduce( const Range& range, const Value& identity, const RealBody& real_body, const Reduction& reduction,
00345                        const simple_partitioner& partitioner, task_group_context& context ) {
00346     internal::lambda_reduce_body<Range,Value,RealBody,Reduction> body(identity, real_body, reduction);
00347     internal::start_reduce<Range,internal::lambda_reduce_body<Range,Value,RealBody,Reduction>,const simple_partitioner>
00348                           ::run( range, body, partitioner, context );
00349     return body.result();
00350 }
00351 
00353 
00354 template<typename Range, typename Value, typename RealBody, typename Reduction>
00355 Value parallel_reduce( const Range& range, const Value& identity, const RealBody& real_body, const Reduction& reduction,
00356                        const auto_partitioner& partitioner, task_group_context& context ) {
00357     internal::lambda_reduce_body<Range,Value,RealBody,Reduction> body(identity, real_body, reduction);
00358     internal::start_reduce<Range,internal::lambda_reduce_body<Range,Value,RealBody,Reduction>,const auto_partitioner>
00359                           ::run( range, body, partitioner, context );
00360     return body.result();
00361 }
00362 
00364 
00365 template<typename Range, typename Value, typename RealBody, typename Reduction>
00366 Value parallel_reduce( const Range& range, const Value& identity, const RealBody& real_body, const Reduction& reduction,
00367                        affinity_partitioner& partitioner, task_group_context& context ) {
00368     internal::lambda_reduce_body<Range,Value,RealBody,Reduction> body(identity, real_body, reduction);
00369     internal::start_reduce<Range,internal::lambda_reduce_body<Range,Value,RealBody,Reduction>,affinity_partitioner>
00370                                         ::run( range, body, partitioner, context );
00371     return body.result();
00372 }
00373 #endif /* __TBB_EXCEPTIONS */
00374 
00375 
00376 } // namespace tbb
00377 
00378 #endif /* __TBB_parallel_reduce_H */
00379 

Copyright © 2005-2009 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.