tlx
Loading...
Searching...
No Matches
parallel_multiway_merge.hpp
Go to the documentation of this file.
1/*******************************************************************************
2 * tlx/algorithm/parallel_multiway_merge.hpp
3 *
4 * Parallel multiway merge.
5 *
6 * Copied and modified from STXXL, see http://stxxl.org, which itself extracted
7 * it from MCSTL http://algo2.iti.uni-karlsruhe.de/singler/mcstl/. Both are
8 * distributed under the Boost Software License, Version 1.0.
9 *
10 * Part of tlx - http://panthema.net/tlx
11 *
12 * Copyright (C) 2007 Johannes Singler <singler@ira.uka.de>
13 * Copyright (C) 2014-2018 Timo Bingmann <tb@panthema.net>
14 *
15 * All rights reserved. Published under the Boost Software License, Version 1.0
16 ******************************************************************************/
17
18#ifndef TLX_ALGORITHM_PARALLEL_MULTIWAY_MERGE_HEADER
19#define TLX_ALGORITHM_PARALLEL_MULTIWAY_MERGE_HEADER
20
21#include <algorithm>
22#include <functional>
23#include <thread>
24#include <vector>
25
26#if defined(_OPENMP)
27#include <omp.h>
28#endif
29
32#include <tlx/simple_vector.hpp>
33
34namespace tlx {
35
36//! \addtogroup tlx_algorithm
37//! \{
38
39//! default oversampling factor for parallel_multiway_merge
41
42/*!
43 * Parallel multi-way merge routine.
44 *
45 * Implemented either using OpenMP or with std::threads, depending on if
46 * compiled with -fopenmp or not. The OpenMP version uses the implicit thread
47 * pool, which is faster when using this method often.
48 *
49 * \param seqs_begin Begin iterator of iterator pair input sequence.
50 * \param seqs_end End iterator of iterator pair input sequence.
51 * \param target Begin iterator out output sequence.
52 * \param size Maximum size to merge.
53 * \param comp Comparator.
54 * \param mwma MultiwayMergeAlgorithm set to use.
55 * \param mwmsa MultiwayMergeSplittingAlgorithm to use.
56 * \param num_threads Number of threads to use (defaults to all cores)
57 * \tparam Stable Stable merging incurs a performance penalty.
58 * \return End iterator of output sequence.
59 */
60template <
61 bool Stable,
62 typename RandomAccessIteratorIterator,
63 typename RandomAccessIterator3,
64 typename Comparator = std::less<
65 typename std::iterator_traits<
66 typename std::iterator_traits<RandomAccessIteratorIterator>
67 ::value_type::first_type>::value_type> >
68RandomAccessIterator3 parallel_multiway_merge_base(
69 RandomAccessIteratorIterator seqs_begin,
70 RandomAccessIteratorIterator seqs_end,
71 RandomAccessIterator3 target,
72 const typename std::iterator_traits<
73 typename std::iterator_traits<
74 RandomAccessIteratorIterator>::value_type::first_type>::
75 difference_type size,
76 Comparator comp = Comparator(),
79 size_t num_threads = std::thread::hardware_concurrency()) {
80
81 using RandomAccessIteratorPair =
82 typename std::iterator_traits<RandomAccessIteratorIterator>
83 ::value_type;
84 using RandomAccessIterator =
85 typename RandomAccessIteratorPair::first_type;
86 using DiffType = typename std::iterator_traits<RandomAccessIterator>
87 ::difference_type;
88
89 // leave only non-empty sequences
90 std::vector<RandomAccessIteratorPair> seqs_ne;
91 seqs_ne.reserve(static_cast<size_t>(seqs_end - seqs_begin));
92 DiffType total_size = 0;
93
94 for (RandomAccessIteratorIterator ii = seqs_begin; ii != seqs_end; ++ii)
95 {
96 if (ii->first != ii->second) {
97 total_size += ii->second - ii->first;
98 seqs_ne.push_back(*ii);
99 }
100 }
101
102 size_t num_seqs = seqs_ne.size();
103
104 if (total_size == 0 || num_seqs == 0)
105 return target;
106
107 if (static_cast<DiffType>(num_threads) > total_size)
108 num_threads = total_size;
109
110 // thread t will have to merge chunks[iam][0..k - 1]
111
113
114 for (size_t s = 0; s < num_threads; ++s)
115 chunks[s].resize(num_seqs);
116
117 if (mwmsa == MWMSA_SAMPLING)
118 {
120 seqs_ne.begin(), seqs_ne.end(),
121 static_cast<DiffType>(size), total_size, comp,
122 chunks.data(), num_threads,
124 }
125 else // (mwmsa == MWMSA_EXACT)
126 {
128 seqs_ne.begin(), seqs_ne.end(),
129 static_cast<DiffType>(size), total_size, comp,
130 chunks.data(), num_threads);
131 }
132
133#if defined(_OPENMP)
134#pragma omp parallel num_threads(num_threads)
135 {
136 size_t iam = omp_get_thread_num();
137
138 DiffType target_position = 0, local_size = 0;
139
140 for (size_t s = 0; s < num_seqs; ++s)
141 {
142 target_position += chunks[iam][s].first - seqs_ne[s].first;
143 local_size += chunks[iam][s].second - chunks[iam][s].first;
144 }
145
147 chunks[iam].begin(), chunks[iam].end(),
148 target + target_position,
149 std::min(local_size, static_cast<DiffType>(size) - target_position),
150 comp, mwma);
151 }
152#else
153 std::vector<std::thread> threads(num_threads);
154
155 for (size_t iam = 0; iam < num_threads; ++iam) {
156 threads[iam] = std::thread(
157 [&, iam]() {
158 DiffType target_position = 0, local_size = 0;
159
160 for (size_t s = 0; s < num_seqs; ++s)
161 {
162 target_position += chunks[iam][s].first - seqs_ne[s].first;
163 local_size += chunks[iam][s].second - chunks[iam][s].first;
164 }
165
167 chunks[iam].begin(), chunks[iam].end(),
168 target + target_position,
169 std::min(local_size, static_cast<DiffType>(size) - target_position),
170 comp, mwma);
171 });
172 }
173
174 for (size_t i = 0; i < num_threads; ++i)
175 threads[i].join();
176#endif
177
178 // update ends of sequences
179 size_t count_seqs = 0;
180 for (RandomAccessIteratorIterator ii = seqs_begin; ii != seqs_end; ++ii)
181 {
182 if (ii->first != ii->second)
183 ii->first = chunks[num_threads - 1][count_seqs++].second;
184 }
185
186 return target + size;
187}
188
189/******************************************************************************/
190// parallel_multiway_merge() Frontends
191
192//! setting to force all parallel_multiway_merge() calls to run sequentially
194
195//! setting to force parallel_multiway_merge() calls to run with parallel code
197
198//! minimal number of sequences for switching to parallel merging
200
201//! minimal number of items for switching to parallel merging
203
204/*!
205 * Parallel multi-way merge routine.
206 *
207 * Implemented either using OpenMP or with std::threads, depending on if
208 * compiled with -fopenmp or not. The OpenMP version uses the implicit thread
209 * pool, which is faster when using this method often.
210 *
211 * \param seqs_begin Begin iterator of iterator pair input sequence.
212 * \param seqs_end End iterator of iterator pair input sequence.
213 * \param target Begin iterator out output sequence.
214 * \param size Maximum size to merge.
215 * \param comp Comparator.
216 * \param mwma MultiwayMergeAlgorithm set to use.
217 * \param mwmsa MultiwayMergeSplittingAlgorithm to use.
218 * \param num_threads Number of threads to use (defaults to all cores)
219 * \tparam Stable Stable merging incurs a performance penalty.
220 * \return End iterator of output sequence.
221 */
222template <
223 typename RandomAccessIteratorIterator,
224 typename RandomAccessIterator3,
225 typename Comparator = std::less<
226 typename std::iterator_traits<
227 typename std::iterator_traits<RandomAccessIteratorIterator>
228 ::value_type::first_type>::value_type> >
229RandomAccessIterator3 parallel_multiway_merge(
230 RandomAccessIteratorIterator seqs_begin,
231 RandomAccessIteratorIterator seqs_end,
232 RandomAccessIterator3 target,
233 const typename std::iterator_traits<
234 typename std::iterator_traits<
235 RandomAccessIteratorIterator>::value_type::first_type>::
236 difference_type size,
237 Comparator comp = Comparator(),
240 size_t num_threads = std::thread::hardware_concurrency()) {
241
242 if (seqs_begin == seqs_end)
243 return target;
244
247 (num_threads > 1 &&
248 (static_cast<size_t>(seqs_end - seqs_begin)
250 static_cast<size_t>(size) >= parallel_multiway_merge_minimal_n))) {
251 return parallel_multiway_merge_base</* Stable */ false>(
252 seqs_begin, seqs_end, target, size, comp,
253 mwma, mwmsa, num_threads);
254 }
255 else {
256 return multiway_merge_base</* Stable */ false, /* Sentinels */ false>(
257 seqs_begin, seqs_end, target, size, comp, mwma);
258 }
259}
260
261/*!
262 * Stable parallel multi-way merge routine.
263 *
264 * Implemented either using OpenMP or with std::threads, depending on if
265 * compiled with -fopenmp or not. The OpenMP version uses the implicit thread
266 * pool, which is faster when using this method often.
267 *
268 * \param seqs_begin Begin iterator of iterator pair input sequence.
269 * \param seqs_end End iterator of iterator pair input sequence.
270 * \param target Begin iterator out output sequence.
271 * \param size Maximum size to merge.
272 * \param comp Comparator.
273 * \param mwma MultiwayMergeAlgorithm set to use.
274 * \param mwmsa MultiwayMergeSplittingAlgorithm to use.
275 * \param num_threads Number of threads to use (defaults to all cores)
276 * \tparam Stable Stable merging incurs a performance penalty.
277 * \return End iterator of output sequence.
278 */
279template <
280 typename RandomAccessIteratorIterator,
281 typename RandomAccessIterator3,
282 typename Comparator = std::less<
283 typename std::iterator_traits<
284 typename std::iterator_traits<RandomAccessIteratorIterator>
285 ::value_type::first_type>::value_type> >
286RandomAccessIterator3 stable_parallel_multiway_merge(
287 RandomAccessIteratorIterator seqs_begin,
288 RandomAccessIteratorIterator seqs_end,
289 RandomAccessIterator3 target,
290 const typename std::iterator_traits<
291 typename std::iterator_traits<
292 RandomAccessIteratorIterator>::value_type::first_type>::
293 difference_type size,
294 Comparator comp = Comparator(),
297 size_t num_threads = std::thread::hardware_concurrency()) {
298
299 if (seqs_begin == seqs_end)
300 return target;
301
304 (num_threads > 1 &&
305 (static_cast<size_t>(seqs_end - seqs_begin)
307 static_cast<size_t>(size) >= parallel_multiway_merge_minimal_n))) {
308 return parallel_multiway_merge_base</* Stable */ true>(
309 seqs_begin, seqs_end, target, size, comp,
310 mwma, mwmsa, num_threads);
311 }
312 else {
313 return multiway_merge_base</* Stable */ true, /* Sentinels */ false>(
314 seqs_begin, seqs_end, target, size, comp, mwma);
315 }
316}
317
318/*!
319 * Parallel multi-way merge routine with sentinels.
320 *
321 * Implemented either using OpenMP or with std::threads, depending on if
322 * compiled with -fopenmp or not. The OpenMP version uses the implicit thread
323 * pool, which is faster when using this method often.
324 *
325 * \param seqs_begin Begin iterator of iterator pair input sequence.
326 * \param seqs_end End iterator of iterator pair input sequence.
327 * \param target Begin iterator out output sequence.
328 * \param size Maximum size to merge.
329 * \param comp Comparator.
330 * \param mwma MultiwayMergeAlgorithm set to use.
331 * \param mwmsa MultiwayMergeSplittingAlgorithm to use.
332 * \param num_threads Number of threads to use (defaults to all cores)
333 * \tparam Stable Stable merging incurs a performance penalty.
334 * \return End iterator of output sequence.
335 */
336template <
337 typename RandomAccessIteratorIterator,
338 typename RandomAccessIterator3,
339 typename Comparator = std::less<
340 typename std::iterator_traits<
341 typename std::iterator_traits<RandomAccessIteratorIterator>
342 ::value_type::first_type>::value_type> >
344 RandomAccessIteratorIterator seqs_begin,
345 RandomAccessIteratorIterator seqs_end,
346 RandomAccessIterator3 target,
347 const typename std::iterator_traits<
348 typename std::iterator_traits<
349 RandomAccessIteratorIterator>::value_type::first_type>::
350 difference_type size,
351 Comparator comp = Comparator(),
354 size_t num_threads = std::thread::hardware_concurrency()) {
355
356 if (seqs_begin == seqs_end)
357 return target;
358
361 (num_threads > 1 &&
362 (static_cast<size_t>(seqs_end - seqs_begin)
364 static_cast<size_t>(size) >= parallel_multiway_merge_minimal_n))) {
365 return parallel_multiway_merge_base</* Stable */ false>(
366 seqs_begin, seqs_end, target, size, comp,
367 mwma, mwmsa, num_threads);
368 }
369 else {
370 return multiway_merge_base</* Stable */ false, /* Sentinels */ true>(
371 seqs_begin, seqs_end, target, size, comp, mwma);
372 }
373}
374
375/*!
376 * Stable parallel multi-way merge routine with sentinels.
377 *
378 * Implemented either using OpenMP or with std::threads, depending on if
379 * compiled with -fopenmp or not. The OpenMP version uses the implicit thread
380 * pool, which is faster when using this method often.
381 *
382 * \param seqs_begin Begin iterator of iterator pair input sequence.
383 * \param seqs_end End iterator of iterator pair input sequence.
384 * \param target Begin iterator out output sequence.
385 * \param size Maximum size to merge.
386 * \param comp Comparator.
387 * \param mwma MultiwayMergeAlgorithm set to use.
388 * \param mwmsa MultiwayMergeSplittingAlgorithm to use.
389 * \param num_threads Number of threads to use (defaults to all cores)
390 * \tparam Stable Stable merging incurs a performance penalty.
391 * \return End iterator of output sequence.
392 */
393template <
394 typename RandomAccessIteratorIterator,
395 typename RandomAccessIterator3,
396 typename Comparator = std::less<
397 typename std::iterator_traits<
398 typename std::iterator_traits<RandomAccessIteratorIterator>
399 ::value_type::first_type>::value_type> >
401 RandomAccessIteratorIterator seqs_begin,
402 RandomAccessIteratorIterator seqs_end,
403 RandomAccessIterator3 target,
404 const typename std::iterator_traits<
405 typename std::iterator_traits<
406 RandomAccessIteratorIterator>::value_type::first_type>::
407 difference_type size,
408 Comparator comp = Comparator(),
411 size_t num_threads = std::thread::hardware_concurrency()) {
412
413 if (seqs_begin == seqs_end)
414 return target;
415
418 (num_threads > 1 &&
419 (static_cast<size_t>(seqs_end - seqs_begin)
421 static_cast<size_t>(size) >= parallel_multiway_merge_minimal_n))) {
422 return parallel_multiway_merge_base</* Stable */ true>(
423 seqs_begin, seqs_end, target, size, comp,
424 mwma, mwmsa, num_threads);
425 }
426 else {
427 return multiway_merge_base</* Stable */ true, /* Sentinels */ true>(
428 seqs_begin, seqs_end, target, size, comp, mwma);
429 }
430}
431
432//! \}
433
434} // namespace tlx
435
436#endif // !TLX_ALGORITHM_PARALLEL_MULTIWAY_MERGE_HEADER
437
438/******************************************************************************/
Simpler non-growing vector without initialization.
iterator data() noexcept
return iterator to beginning of vector
size_t parallel_multiway_merge_oversampling
default oversampling factor for parallel_multiway_merge
RandomAccessIterator3 parallel_multiway_merge(RandomAccessIteratorIterator seqs_begin, RandomAccessIteratorIterator seqs_end, RandomAccessIterator3 target, const typename std::iterator_traits< typename std::iterator_traits< RandomAccessIteratorIterator >::value_type::first_type >::difference_type size, Comparator comp=Comparator(), MultiwayMergeAlgorithm mwma=MWMA_ALGORITHM_DEFAULT, MultiwayMergeSplittingAlgorithm mwmsa=MWMSA_DEFAULT, size_t num_threads=std::thread::hardware_concurrency())
Parallel multi-way merge routine.
size_t parallel_multiway_merge_minimal_n
minimal number of items for switching to parallel merging
void multiway_merge_sampling_splitting(const RandomAccessIteratorIterator &seqs_begin, const RandomAccessIteratorIterator &seqs_end, typename std::iterator_traits< typename std::iterator_traits< RandomAccessIteratorIterator >::value_type::first_type >::difference_type size, typename std::iterator_traits< typename std::iterator_traits< RandomAccessIteratorIterator >::value_type::first_type >::difference_type total_size, Comparator comp, std::vector< typename std::iterator_traits< RandomAccessIteratorIterator >::value_type > *chunks, const size_t num_threads, const size_t merge_oversampling)
Splitting method for parallel multi-way merge routine: use sampling and binary search for in-exact sp...
RandomAccessIterator3 multiway_merge_base(RandomAccessIteratorIterator seqs_begin, RandomAccessIteratorIterator seqs_end, RandomAccessIterator3 target, typename std::iterator_traits< typename std::iterator_traits< RandomAccessIteratorIterator >::value_type::first_type >::difference_type size, Comparator comp=Comparator(), MultiwayMergeAlgorithm mwma=MWMA_ALGORITHM_DEFAULT)
Sequential multi-way merging switch.
size_t parallel_multiway_merge_minimal_k
minimal number of sequences for switching to parallel merging
RandomAccessIterator3 parallel_multiway_merge_base(RandomAccessIteratorIterator seqs_begin, RandomAccessIteratorIterator seqs_end, RandomAccessIterator3 target, const typename std::iterator_traits< typename std::iterator_traits< RandomAccessIteratorIterator >::value_type::first_type >::difference_type size, Comparator comp=Comparator(), MultiwayMergeAlgorithm mwma=MWMA_ALGORITHM_DEFAULT, MultiwayMergeSplittingAlgorithm mwmsa=MWMSA_DEFAULT, size_t num_threads=std::thread::hardware_concurrency())
Parallel multi-way merge routine.
RandomAccessIterator3 stable_parallel_multiway_merge(RandomAccessIteratorIterator seqs_begin, RandomAccessIteratorIterator seqs_end, RandomAccessIterator3 target, const typename std::iterator_traits< typename std::iterator_traits< RandomAccessIteratorIterator >::value_type::first_type >::difference_type size, Comparator comp=Comparator(), MultiwayMergeAlgorithm mwma=MWMA_ALGORITHM_DEFAULT, MultiwayMergeSplittingAlgorithm mwmsa=MWMSA_DEFAULT, size_t num_threads=std::thread::hardware_concurrency())
Stable parallel multi-way merge routine.
RandomAccessIterator3 parallel_multiway_merge_sentinels(RandomAccessIteratorIterator seqs_begin, RandomAccessIteratorIterator seqs_end, RandomAccessIterator3 target, const typename std::iterator_traits< typename std::iterator_traits< RandomAccessIteratorIterator >::value_type::first_type >::difference_type size, Comparator comp=Comparator(), MultiwayMergeAlgorithm mwma=MWMA_ALGORITHM_DEFAULT, MultiwayMergeSplittingAlgorithm mwmsa=MWMSA_DEFAULT, size_t num_threads=std::thread::hardware_concurrency())
Parallel multi-way merge routine with sentinels.
bool parallel_multiway_merge_force_parallel
setting to force parallel_multiway_merge() calls to run with parallel code
MultiwayMergeSplittingAlgorithm
Different splitting strategies for sorting/merging: by sampling, exact.
MultiwayMergeAlgorithm
Different merging algorithms: bubblesort-alike, loser-tree variants, enum sentinel.
RandomAccessIterator3 stable_parallel_multiway_merge_sentinels(RandomAccessIteratorIterator seqs_begin, RandomAccessIteratorIterator seqs_end, RandomAccessIterator3 target, const typename std::iterator_traits< typename std::iterator_traits< RandomAccessIteratorIterator >::value_type::first_type >::difference_type size, Comparator comp=Comparator(), MultiwayMergeAlgorithm mwma=MWMA_ALGORITHM_DEFAULT, MultiwayMergeSplittingAlgorithm mwmsa=MWMSA_DEFAULT, size_t num_threads=std::thread::hardware_concurrency())
Stable parallel multi-way merge routine with sentinels.
void multiway_merge_exact_splitting(const RandomAccessIteratorIterator &seqs_begin, const RandomAccessIteratorIterator &seqs_end, typename std::iterator_traits< typename std::iterator_traits< RandomAccessIteratorIterator >::value_type::first_type >::difference_type size, typename std::iterator_traits< typename std::iterator_traits< RandomAccessIteratorIterator >::value_type::first_type >::difference_type total_size, Comparator comp, std::vector< typename std::iterator_traits< RandomAccessIteratorIterator >::value_type > *chunks, const size_t num_threads)
Splitting method for parallel multi-way merge routine: use multisequence selection for exact splittin...
bool parallel_multiway_merge_force_sequential
setting to force all parallel_multiway_merge() calls to run sequentially
@ MWMA_ALGORITHM_DEFAULT
std::string join(char glue, const std::vector< std::string > &parts)
Join a vector of strings by some glue character between each pair from the sequence.
Definition join.cpp:16
STL namespace.