workstealing.h
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040 #ifndef _GLIBCXX_PARALLEL_WORKSTEALING_H
00041 #define _GLIBCXX_PARALLEL_WORKSTEALING_H 1
00042
00043 #include <parallel/parallel.h>
00044 #include <parallel/random_number.h>
00045 #include <parallel/compatibility.h>
00046
00047 namespace __gnu_parallel
00048 {
00049
00050 #define _GLIBCXX_JOB_VOLATILE volatile
00051
00052
00053 template<typename _DifferenceTp>
00054 struct Job
00055 {
00056 typedef _DifferenceTp difference_type;
00057
00058
00059
00060
00061
00062 _GLIBCXX_JOB_VOLATILE difference_type first;
00063
00064
00065
00066
00067 _GLIBCXX_JOB_VOLATILE difference_type last;
00068
00069
00070
00071
00072 _GLIBCXX_JOB_VOLATILE difference_type load;
00073 };
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093 template<typename RandomAccessIterator,
00094 typename Op,
00095 typename Fu,
00096 typename Red,
00097 typename Result>
00098 Op
00099 for_each_template_random_access_workstealing(RandomAccessIterator begin,
00100 RandomAccessIterator end,
00101 Op op, Fu& f, Red r,
00102 Result base, Result& output,
00103 typename std::iterator_traits
00104 <RandomAccessIterator>::
00105 difference_type bound)
00106 {
00107 _GLIBCXX_CALL(end - begin)
00108
00109 typedef std::iterator_traits<RandomAccessIterator> traits_type;
00110 typedef typename traits_type::difference_type difference_type;
00111
00112 const _Settings& __s = _Settings::get();
00113
00114 difference_type chunk_size = static_cast<difference_type>(__s.workstealing_chunk_size);
00115
00116
00117 difference_type length = (bound < 0) ? (end - begin) : bound;
00118
00119
00120 const int stride = __s.cache_line_size * 10 / sizeof(Job<difference_type>) + 1;
00121
00122
00123 thread_index_t busy = 0;
00124
00125 Job<difference_type> *job;
00126
00127 omp_lock_t output_lock;
00128 omp_init_lock(&output_lock);
00129
00130
00131 output = base;
00132
00133
00134 thread_index_t num_threads =
00135 __gnu_parallel::max<thread_index_t>(1,
00136 __gnu_parallel::min<difference_type>(length, get_max_threads()));
00137
00138 # pragma omp parallel shared(busy) num_threads(num_threads)
00139 {
00140
00141 # pragma omp single
00142 {
00143 num_threads = omp_get_num_threads();
00144
00145
00146 job = new Job<difference_type>[num_threads * stride];
00147 }
00148
00149
00150
00151
00152 bool iam_working = false;
00153
00154
00155 thread_index_t iam = omp_get_thread_num();
00156
00157
00158 Job<difference_type>& my_job = job[iam * stride];
00159
00160
00161 thread_index_t victim;
00162
00163
00164 Result result = Result();
00165
00166
00167 difference_type steal;
00168
00169
00170
00171 random_number rand_gen(iam, num_threads);
00172
00173
00174 # pragma omp atomic
00175 ++busy;
00176
00177 iam_working = true;
00178
00179
00180 my_job.first =
00181 static_cast<difference_type>(iam * (length / num_threads));
00182
00183 my_job.last = (iam == (num_threads - 1)) ?
00184 (length - 1) : ((iam + 1) * (length / num_threads) - 1);
00185 my_job.load = my_job.last - my_job.first + 1;
00186
00187
00188 if (my_job.first <= my_job.last)
00189 {
00190
00191 difference_type my_first = my_job.first;
00192 result = f(op, begin + my_first);
00193 ++my_job.first;
00194 --my_job.load;
00195 }
00196
00197 RandomAccessIterator current;
00198
00199 # pragma omp barrier
00200
00201
00202
00203 while (busy > 0)
00204 {
00205
00206 # pragma omp flush(busy)
00207
00208
00209 while (my_job.first <= my_job.last)
00210 {
00211
00212
00213 difference_type current_job =
00214 fetch_and_add<difference_type>(&(my_job.first), chunk_size);
00215
00216
00217
00218 my_job.load = my_job.last - my_job.first + 1;
00219 for (difference_type job_counter = 0;
00220 job_counter < chunk_size && current_job <= my_job.last;
00221 ++job_counter)
00222 {
00223
00224 current = begin + current_job;
00225 ++current_job;
00226
00227
00228 result = r(result, f(op, current));
00229 }
00230
00231 # pragma omp flush(busy)
00232 }
00233
00234
00235 if (iam_working)
00236 {
00237
00238 # pragma omp atomic
00239 --busy;
00240
00241 iam_working = false;
00242 }
00243
00244 difference_type supposed_first, supposed_last, supposed_load;
00245 do
00246 {
00247
00248 yield();
00249 # pragma omp flush(busy)
00250 victim = rand_gen();
00251 supposed_first = job[victim * stride].first;
00252 supposed_last = job[victim * stride].last;
00253 supposed_load = job[victim * stride].load;
00254 }
00255 while (busy > 0
00256 && ((supposed_load <= 0)
00257 || ((supposed_first + supposed_load - 1) != supposed_last)));
00258
00259 if (busy == 0)
00260 break;
00261
00262 if (supposed_load > 0)
00263 {
00264
00265
00266 steal = (supposed_load < 2) ? 1 : supposed_load / 2;
00267
00268
00269 difference_type stolen_first =
00270 fetch_and_add<difference_type>(
00271 &(job[victim * stride].first), steal);
00272 difference_type stolen_try =
00273 stolen_first + steal - difference_type(1);
00274
00275 my_job.first = stolen_first;
00276 my_job.last = __gnu_parallel::min(stolen_try, supposed_last);
00277 my_job.load = my_job.last - my_job.first + 1;
00278
00279
00280 # pragma omp atomic
00281 ++busy;
00282 iam_working = true;
00283
00284 # pragma omp flush(busy)
00285 }
00286 # pragma omp flush(busy)
00287 }
00288
00289 omp_set_lock(&output_lock);
00290 output = r(output, result);
00291 omp_unset_lock(&output_lock);
00292 }
00293
00294 delete[] job;
00295
00296
00297
00298 f.finish_iterator = begin + length;
00299
00300 omp_destroy_lock(&output_lock);
00301
00302 return op;
00303 }
00304 }
00305
00306 #endif