pipeline.h

00001 /*
00002     Copyright 2005-2009 Intel Corporation.  All Rights Reserved.
00003 
00004     The source code contained or described herein and all documents related
00005     to the source code ("Material") are owned by Intel Corporation or its
00006     suppliers or licensors.  Title to the Material remains with Intel
00007     Corporation or its suppliers and licensors.  The Material is protected
00008     by worldwide copyright laws and treaty provisions.  No part of the
00009     Material may be used, copied, reproduced, modified, published, uploaded,
00010     posted, transmitted, distributed, or disclosed in any way without
00011     Intel's prior express written permission.
00012 
00013     No license under any patent, copyright, trade secret or other
00014     intellectual property right is granted to or conferred upon you by
00015     disclosure or delivery of the Materials, either expressly, by
00016     implication, inducement, estoppel or otherwise.  Any license under such
00017     intellectual property rights must be express and approved by Intel in
00018     writing.
00019 */
00020 
00021 #ifndef __TBB_pipeline_H 
00022 #define __TBB_pipeline_H 
00023 
00024 #include "atomic.h"
00025 #include "task.h"
00026 #include <cstddef>
00027 
00028 namespace tbb {
00029 
00030 class pipeline;
00031 class filter;
00032 
00034 namespace internal {
00035 
00036 // The argument for PIPELINE_VERSION should be an integer between 2 and 9
00037 #define __TBB_PIPELINE_VERSION(x) (unsigned char)(x-2)<<1
00038 
00039 typedef unsigned long Token;
00040 typedef long tokendiff_t;
00041 class stage_task;
00042 class input_buffer;
00043 class pipeline_root_task;
00044 class pipeline_cleaner;
00045 
00046 } // namespace internal
00048 
00050 
00051 class filter: internal::no_copy {
00052 private:
00054     static filter* not_in_pipeline() {return reinterpret_cast<filter*>(internal::intptr(-1));}
00055     
00057     static const unsigned char filter_is_serial = 0x1; 
00058 
00060 
00062     static const unsigned char filter_is_out_of_order = 0x1<<4;  
00063 
00065     static const unsigned char filter_is_bound = 0x1<<5;  
00066 
00067     static const unsigned char current_version = __TBB_PIPELINE_VERSION(5);
00068     static const unsigned char version_mask = 0x7<<1; // bits 1-3 are for version
00069 public:
00070     enum mode {
00072         parallel = current_version | filter_is_out_of_order, 
00074         serial_in_order = current_version | filter_is_serial,
00076         serial_out_of_order = current_version | filter_is_serial | filter_is_out_of_order,
00078         serial = serial_in_order
00079     };
00080 protected:
00081     filter( bool is_serial_ ) : 
00082         next_filter_in_pipeline(not_in_pipeline()),
00083         my_input_buffer(NULL),
00084         my_filter_mode(static_cast<unsigned char>(is_serial_ ? serial : parallel)),
00085         prev_filter_in_pipeline(not_in_pipeline()),
00086         my_pipeline(NULL),
00087         next_segment(NULL)
00088     {}
00089     
00090     filter( mode filter_mode ) :
00091         next_filter_in_pipeline(not_in_pipeline()),
00092         my_input_buffer(NULL),
00093         my_filter_mode(static_cast<unsigned char>(filter_mode)),
00094         prev_filter_in_pipeline(not_in_pipeline()),
00095         my_pipeline(NULL),
00096         next_segment(NULL)
00097     {}
00098 
00099 public:
00101     bool is_serial() const {
00102         return bool( my_filter_mode & filter_is_serial );
00103     }  
00104     
00106     bool is_ordered() const {
00107         return (my_filter_mode & (filter_is_out_of_order|filter_is_serial))==filter_is_serial;
00108     }
00109 
00111     bool is_bound() const {
00112         return ( my_filter_mode & filter_is_bound )==filter_is_bound;
00113     }
00114 
00116 
00117     virtual void* operator()( void* item ) = 0;
00118 
00120 
00121     virtual __TBB_EXPORTED_METHOD ~filter();
00122 
00123 #if __TBB_EXCEPTIONS
00125 
00127     virtual void finalize( void* /*item*/ ) {};
00128 #endif
00129 
00130 private:
00132     filter* next_filter_in_pipeline;
00133 
00135 
00136     internal::input_buffer* my_input_buffer;
00137 
00138     friend class internal::stage_task;
00139     friend class internal::pipeline_root_task;
00140     friend class pipeline;
00141     friend class thread_bound_filter;
00142 
00144     const unsigned char my_filter_mode;
00145 
00147     filter* prev_filter_in_pipeline;
00148 
00150     pipeline* my_pipeline;
00151 
00153 
00154     filter* next_segment;
00155 };
00156 
00158 
00159 class thread_bound_filter: public filter {
00160 public:
00161     enum result_type {
00162         // item was processed
00163         success,
00164         // item is currently not available
00165         item_not_available,
00166         // there are no more items to process
00167         end_of_stream
00168     };
00169 protected:
00170     thread_bound_filter(mode filter_mode): 
00171          filter(static_cast<mode>(filter_mode | filter::filter_is_bound))
00172     {}
00173 public:
00175 
00180     result_type __TBB_EXPORTED_METHOD try_process_item(); 
00181 
00183 
00187     result_type __TBB_EXPORTED_METHOD process_item();
00188 
00189 private:
00191     result_type internal_process_item(bool is_blocking);
00192 };
00193 
00195 
00196 class pipeline {
00197 public:
00199     __TBB_EXPORTED_METHOD pipeline();
00200 
00203     virtual __TBB_EXPORTED_METHOD ~pipeline();
00204 
00206     void __TBB_EXPORTED_METHOD add_filter( filter& filter_ );
00207 
00209     void __TBB_EXPORTED_METHOD run( size_t max_number_of_live_tokens );
00210 
00211 #if __TBB_EXCEPTIONS
00213     void __TBB_EXPORTED_METHOD run( size_t max_number_of_live_tokens, tbb::task_group_context& context );
00214 #endif
00215 
00217     void __TBB_EXPORTED_METHOD clear();
00218 
00219 private:
00220     friend class internal::stage_task;
00221     friend class internal::pipeline_root_task;
00222     friend class filter;
00223     friend class thread_bound_filter;
00224     friend class internal::pipeline_cleaner;
00225 
00227     filter* filter_list;
00228 
00230     filter* filter_end;
00231 
00233     task* end_counter;
00234 
00236     atomic<internal::Token> input_tokens;
00237 
00239     atomic<internal::Token> token_counter;
00240 
00242     bool end_of_input;
00243 
00245     bool has_thread_bound_filters;
00246 
00248     void remove_filter( filter& filter_ );
00249 
00251     void __TBB_EXPORTED_METHOD inject_token( task& self );
00252 
00253 #if __TBB_EXCEPTIONS
00255     void clear_filters();
00256 #endif
00257 };
00258 
00259 } // tbb
00260 
00261 #endif /* __TBB_pipeline_H */

Copyright © 2005-2009 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.