00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #ifndef __TBB_pipeline_H
00022 #define __TBB_pipeline_H
00023
00024 #include "atomic.h"
00025 #include "task.h"
00026 #include <cstddef>
00027
00028 namespace tbb {
00029
00030 class pipeline;
00031 class filter;
00032
00034 namespace internal {
00035
00036
00037 #define __TBB_PIPELINE_VERSION(x) (unsigned char)(x-2)<<1
00038
00039 typedef unsigned long Token;
00040 typedef long tokendiff_t;
00041 class stage_task;
00042 class input_buffer;
00043 class pipeline_root_task;
00044 class pipeline_cleaner;
00045
00046 }
00048
00050
00051 class filter: internal::no_copy {
00052 private:
00054 static filter* not_in_pipeline() {return reinterpret_cast<filter*>(internal::intptr(-1));}
00055
00057 static const unsigned char filter_is_serial = 0x1;
00058
00060
00062 static const unsigned char filter_is_out_of_order = 0x1<<4;
00063
00065 static const unsigned char filter_is_bound = 0x1<<5;
00066
00067 static const unsigned char current_version = __TBB_PIPELINE_VERSION(5);
00068 static const unsigned char version_mask = 0x7<<1;
00069 public:
00070 enum mode {
00072 parallel = current_version | filter_is_out_of_order,
00074 serial_in_order = current_version | filter_is_serial,
00076 serial_out_of_order = current_version | filter_is_serial | filter_is_out_of_order,
00078 serial = serial_in_order
00079 };
00080 protected:
00081 filter( bool is_serial_ ) :
00082 next_filter_in_pipeline(not_in_pipeline()),
00083 my_input_buffer(NULL),
00084 my_filter_mode(static_cast<unsigned char>(is_serial_ ? serial : parallel)),
00085 prev_filter_in_pipeline(not_in_pipeline()),
00086 my_pipeline(NULL),
00087 next_segment(NULL)
00088 {}
00089
00090 filter( mode filter_mode ) :
00091 next_filter_in_pipeline(not_in_pipeline()),
00092 my_input_buffer(NULL),
00093 my_filter_mode(static_cast<unsigned char>(filter_mode)),
00094 prev_filter_in_pipeline(not_in_pipeline()),
00095 my_pipeline(NULL),
00096 next_segment(NULL)
00097 {}
00098
00099 public:
00101 bool is_serial() const {
00102 return bool( my_filter_mode & filter_is_serial );
00103 }
00104
00106 bool is_ordered() const {
00107 return (my_filter_mode & (filter_is_out_of_order|filter_is_serial))==filter_is_serial;
00108 }
00109
00111 bool is_bound() const {
00112 return ( my_filter_mode & filter_is_bound )==filter_is_bound;
00113 }
00114
00116
00117 virtual void* operator()( void* item ) = 0;
00118
00120
00121 virtual __TBB_EXPORTED_METHOD ~filter();
00122
00123 #if __TBB_EXCEPTIONS
00125
00127 virtual void finalize( void* ) {};
00128 #endif
00129
00130 private:
00132 filter* next_filter_in_pipeline;
00133
00135
00136 internal::input_buffer* my_input_buffer;
00137
00138 friend class internal::stage_task;
00139 friend class internal::pipeline_root_task;
00140 friend class pipeline;
00141 friend class thread_bound_filter;
00142
00144 const unsigned char my_filter_mode;
00145
00147 filter* prev_filter_in_pipeline;
00148
00150 pipeline* my_pipeline;
00151
00153
00154 filter* next_segment;
00155 };
00156
00158
00159 class thread_bound_filter: public filter {
00160 public:
00161 enum result_type {
00162
00163 success,
00164
00165 item_not_available,
00166
00167 end_of_stream
00168 };
00169 protected:
00170 thread_bound_filter(mode filter_mode):
00171 filter(static_cast<mode>(filter_mode | filter::filter_is_bound))
00172 {}
00173 public:
00175
00180 result_type __TBB_EXPORTED_METHOD try_process_item();
00181
00183
00187 result_type __TBB_EXPORTED_METHOD process_item();
00188
00189 private:
00191 result_type internal_process_item(bool is_blocking);
00192 };
00193
00195
00196 class pipeline {
00197 public:
00199 __TBB_EXPORTED_METHOD pipeline();
00200
00203 virtual __TBB_EXPORTED_METHOD ~pipeline();
00204
00206 void __TBB_EXPORTED_METHOD add_filter( filter& filter_ );
00207
00209 void __TBB_EXPORTED_METHOD run( size_t max_number_of_live_tokens );
00210
00211 #if __TBB_EXCEPTIONS
00213 void __TBB_EXPORTED_METHOD run( size_t max_number_of_live_tokens, tbb::task_group_context& context );
00214 #endif
00215
00217 void __TBB_EXPORTED_METHOD clear();
00218
00219 private:
00220 friend class internal::stage_task;
00221 friend class internal::pipeline_root_task;
00222 friend class filter;
00223 friend class thread_bound_filter;
00224 friend class internal::pipeline_cleaner;
00225
00227 filter* filter_list;
00228
00230 filter* filter_end;
00231
00233 task* end_counter;
00234
00236 atomic<internal::Token> input_tokens;
00237
00239 atomic<internal::Token> token_counter;
00240
00242 bool end_of_input;
00243
00245 bool has_thread_bound_filters;
00246
00248 void remove_filter( filter& filter_ );
00249
00251 void __TBB_EXPORTED_METHOD inject_token( task& self );
00252
00253 #if __TBB_EXCEPTIONS
00255 void clear_filters();
00256 #endif
00257 };
00258
00259 }
00260
00261 #endif