GetFEM  5.5
getfem_omp.cc
1 /*===========================================================================
2 
3  Copyright (C) 2012-2026 Andriy Andreykiv.
4 
5  This file is a part of GetFEM
6 
7  GetFEM is free software; you can redistribute it and/or modify it
8  under the terms of the GNU Lesser General Public License as published
9  by the Free Software Foundation; either version 3 of the License, or
10  (at your option) any later version along with the GCC Runtime Library
11  Exception either version 3.1 or (at your option) any later version.
12  This program is distributed in the hope that it will be useful, but
13  WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
14  or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15  License and GCC Runtime Library Exception for more details.
16  You should have received a copy of the GNU Lesser General Public License
17  along with this program. If not, see https://www.gnu.org/licenses/.
18 
19 ===========================================================================*/
20 
21 #include "getfem/dal_singleton.h"
22 #include "getfem/getfem_locale.h"
23 #include "getfem/getfem_omp.h"
24 
25 #ifdef GETFEM_HAS_OPENMP
26  #include <thread>
27  #include <omp.h>
28 #endif
29 
30 using bgeot::scalar_type;
31 
32 namespace getfem{
33 
34 #ifdef GETFEM_HAS_OPENMP
35 
36  std::recursive_mutex omp_guard::mutex;
37 
38  omp_guard::omp_guard()
39  : plock{me_is_multithreaded_now() ?
40  std::make_unique<std::lock_guard<std::recursive_mutex>>(mutex)
41  : nullptr}
42  {}
43 
44  local_guard::local_guard(std::recursive_mutex& m) :
45  mutex{m},
46  plock{me_is_multithreaded_now() ?
47  std::make_shared<std::lock_guard<std::recursive_mutex>>(m)
48  : nullptr}
49  {}
50 
51  local_guard lock_factory::get_lock() const{
52  return local_guard{mutex};
53  }
54 
55  size_type global_thread_policy::this_thread() {
56  return partition_master::get().get_current_partition();
57  }
58 
59  size_type global_thread_policy::num_threads(){
60  return partition_master::get().get_nb_partitions();
61  }
62 
63  size_type true_thread_policy::this_thread() {
64  return omp_get_thread_num();
65  }
66 
67  size_type true_thread_policy::num_threads(){
68  return omp_get_max_threads();
69  }
70 
71  void set_num_threads(int n){
72  omp_set_num_threads(n);
73  partition_master::get().check_threads();
74  }
75 
77  // serial region
78  if(omp_get_num_threads() == 1 && omp_get_level() == 0) return false;
79  // parallel region with one thread
80  if(omp_get_num_threads() == 1 && omp_get_level() == 1) return true;
81  // parallel region with more than one thread
82  if(omp_in_parallel() == 1) return true;
83 
84  return false;
85  }
86 
87  bool not_multithreaded(){
88  return omp_get_max_threads() == 1;
89  }
90 
92  return std::thread::hardware_concurrency();
93  }
94 
95 #else
96 
97  size_type global_thread_policy::this_thread() {return 0;}
98 
99  size_type global_thread_policy::num_threads(){return 1;}
100 
101  size_type true_thread_policy::this_thread() {return 0;}
102 
103  size_type true_thread_policy::num_threads(){return 1;}
104 
105  bool me_is_multithreaded_now(){return false;}
106 
107  void set_num_threads(int /*n*/){}
108 
109  bool not_multithreaded(){return true;}
110 
112 
113 #endif
114 
115  /** Allows to re-throw exceptions, generated in OpemMP parallel section.
116  Collects exceptions from all threads and on destruction re-throws
117  the first one, so that
118  it can be again caught in the master thread. */
120  std::vector<std::exception_ptr> exceptions;
121 
122  void captureException(){
123  exceptions[true_thread_policy::this_thread()] = std::current_exception();
124  }
125 
126  public:
128  : exceptions(true_thread_policy::num_threads(), nullptr)
129  {}
130 
131  template <typename function, typename... parameters>
132  void run(function f, parameters... params){
133  try {f(params...);} catch (...) {captureException();}
134  }
135 
136  std::vector<std::exception_ptr> caughtExceptions() const{
137  std::vector<std::exception_ptr> non_empty_exceptions;
138  for (auto &&pException : exceptions){
139  if (pException != nullptr) non_empty_exceptions.push_back(pException);
140  }
141  return non_empty_exceptions;
142  }
143 
144  void rethrow() {
145  for (auto &&pException : exceptions){
146  if (pException != nullptr) std::rethrow_exception(pException);
147  }
148  }
149  };
150 
151  partition_iterator::partition_iterator(
152  partition_master &m, std::set<size_type>::const_iterator it_from_set)
153  : master{m}, it{it_from_set}
154  {}
155 
156  partition_iterator partition_iterator::operator++(){
157  ++it;
158  if (*this != master.end()) master.set_current_partition(*it);
159  return *this;
160  }
161 
162  bool partition_iterator::operator==(const partition_iterator &it1) const {
163  return it == it1.it;
164  }
165 
166  bool partition_iterator::operator!=(const partition_iterator &it1) const {
167  return !(*this == it1);
168  }
169 
170  size_type partition_iterator::operator*() const{
171  return *it;
172  }
173 
174  partition_master partition_master::instance;
175 
176  partition_master& partition_master::get(){
177  return instance;
178  }
179 
180  void partition_master::check_threads(){
181  GLOBAL_OMP_GUARD
182  auto must_update = false;
183  if (nb_user_threads != true_thread_policy::num_threads()){
184  nb_user_threads = true_thread_policy::num_threads();
185  must_update = true;
186  }
187  if (nb_partitions < nb_user_threads && !partitions_set_by_user){
188  nb_partitions = nb_user_threads;
189  must_update = true;
190  }
191  if (must_update){
192  update_partitions();
193  dal::singletons_manager::on_partitions_change();
194  }
195  }
196 
198  GMM_ASSERT1 (!partitions_set_by_user,
199  "Number of partitions can be set only once.");
200  if (n > nb_partitions){
201  nb_partitions = n;
202  nb_user_threads = true_thread_policy::num_threads();
203  update_partitions();
204  dal::singletons_manager::on_partitions_change();
205  }
206  else if (n < nb_partitions){
207  GMM_WARNING1("Not reducing number of partitions from "
208  << nb_partitions <<" to " << n <<
209  " as it might invalidate global storage.");
210  }
211  partitions_set_by_user = true;
212  }
213 
215  GMM_ASSERT1(nb_user_threads == true_thread_policy::num_threads(),
216  "The number of omp threads was changed outside partition_master."
217  "Please use getfem::set_num_threads for this.");
218  current_partition = *(std::begin(partitions.thrd_cast()));
219  return partition_iterator{*this, std::begin(partitions.thrd_cast())};
220  }
221 
223  return partition_iterator{*this, std::end(partitions.thrd_cast())};
224  }
225 
226  void partition_master::set_behaviour(thread_behaviour b){
227  if (b != behaviour){
228  GMM_ASSERT1(!me_is_multithreaded_now(),
229  "Cannot change thread policy in parallel section.");
230  behaviour = b;
231  check_threads();
232  }
233  }
234 
235  partition_master::partition_master()
236  : nb_user_threads{1}, nb_partitions{1} {
237  partitions_updated = false;
238  set_num_threads(1);
239  update_partitions();
240  }
241 
243  GMM_ASSERT2(behaviour == thread_behaviour::partition_threads ?
244  true_thread_policy::this_thread() < nb_partitions : true,
245  "Requesting current partition for thread " <<
246  true_thread_policy::this_thread() <<
247  " while number of partitions is " << nb_partitions
248  << ".");
249  return behaviour == thread_behaviour::partition_threads ?
250  current_partition : true_thread_policy::this_thread();
251  }
252 
254  return behaviour == thread_behaviour::partition_threads ?
255  nb_partitions : true_thread_policy::num_threads();
256  }
257 
258  void partition_master::set_current_partition(size_type p){
259  if (behaviour == thread_behaviour::partition_threads){
260  GMM_ASSERT2(partitions.thrd_cast().count(p) != 0, "Internal error: "
261  << p << " is not a valid partitions for thread "
262  << true_thread_policy::this_thread()
263  << ".");
264  current_partition = p;
265  }
266  }
267 
268  void partition_master::rewind_partitions(){
270  current_partition = *(std::begin(partitions.thrd_cast()));
271  }
272  else{
273  for (size_type t = 0; t != partitions.num_threads(); ++t){
274  current_partition(t) = *(std::begin(partitions(t)));
275  }
276  }
277  }
278 
279  void partition_master::update_partitions(){
280  partitions_updated = false;
281 
282  GLOBAL_OMP_GUARD
283 
284  if (partitions_updated) return;
285 
286  partitions = decltype(partitions){};
287  current_partition = decltype(current_partition){};
288 
289  auto n_threads = true_thread_policy::num_threads();
290  if(n_threads > nb_partitions){
291  GMM_WARNING0("Using " << n_threads <<
292  " threads which is above the maximum number of partitions :" <<
293  nb_partitions
294  << ".");
295  }
296  if (behaviour == thread_behaviour::partition_threads){
297  for (size_type t = 0; t != n_threads; ++t){
298  auto partition_size = static_cast<size_type>
299  (std::ceil(static_cast<scalar_type>(nb_partitions) /
300  static_cast<scalar_type >(n_threads)));
301  auto partition_begin = partition_size * t;
302  if (partition_begin >= nb_partitions) break;
303  auto partition_end = std::min(partition_size * (t + 1), nb_partitions);
304  auto hint_it = std::begin(partitions(t));
305  for (size_type i = partition_begin; i != partition_end; ++i){
306  hint_it = partitions(t).insert(hint_it, i);
307  }
308  current_partition(t) = partition_begin;
309  }
310  }
311  else{
312  for (size_type t = 0; t != n_threads; ++t){
313  partitions(t).insert(t);
314  current_partition(t) = t;
315  }
316  }
317 
318  partitions_updated = true;
319  }
320 
321  #if defined _WIN32 && !defined (__GNUC__)
322  #define GETFEM_ON_WIN
323  #endif
324 
325  parallel_boilerplate::
326  parallel_boilerplate()
327  : plocale{std::make_unique<standard_locale>()},
328  pexception{std::make_unique<thread_exception>()} {
329  #ifdef GETFEM_ON_WIN
330  _configthreadlocale(_ENABLE_PER_THREAD_LOCALE);
331  #endif
332  }
333 
334  void parallel_boilerplate::run_lambda(std::function<void(void)> lambda){
335  pexception->run(lambda);
336  }
337 
338  parallel_boilerplate::~parallel_boilerplate(){
339  #ifdef GETFEM_ON_WIN
340  _configthreadlocale(_DISABLE_PER_THREAD_LOCALE);
341  #endif
342  pexception->rethrow();
343  }
344 
345  void parallel_execution(std::function<void(void)> lambda,
346  bool iterate_over_partitions){
347  if (me_is_multithreaded_now()) {
348  lambda();
349  return;
350  }
351  parallel_boilerplate boilerplate;
352  auto &pm = partition_master::get();
353  if (pm.get_nb_partitions() < true_thread_policy::num_threads()){
354  pm.set_nb_partitions(true_thread_policy::num_threads());
355  }
356  #pragma omp parallel default(shared)
357  {
358  if (iterate_over_partitions) {
359  for (auto &&partitions : partition_master::get()) {
360  (void)partitions;
361  boilerplate.run_lambda(lambda);
362  }
363  }
364  else {
365  boilerplate.run_lambda(lambda);
366  }
367  }
368  if (iterate_over_partitions) partition_master::get().rewind_partitions();
369  }
370 
371 
372 #ifdef GETFEM_FORCE_SINGLE_THREAD_BLAS
373 # include <dlfcn.h>
374 # define BLAS_FORCE_SINGLE_THREAD \
375  int openblas_get_num_threads_res = 1; \
376  { \
377  typedef int (* ptrfunc1)(); \
378  ptrfunc1 func1 = ptrfunc1(dlsym(NULL, "openblas_get_num_threads")); \
379  if (func1) openblas_get_num_threads_res = (*func1)(); \
380  typedef void (* ptrfunc2)(int); \
381  ptrfunc2 func2 = ptrfunc2(dlsym(NULL, "openblas_set_num_threads")); \
382  if (func2) (*func2)(1); \
383  }
384 # define BLAS_RESTORE_NUM_THREAD \
385  { \
386  typedef void (* ptrfunc2)(int); \
387  ptrfunc func2 = ptrfunc2(dlsym(NULL, "openblas_set_num_threads")); \
388  if (func2) (*func)(openblas_get_num_threads_res); \
389  }
390 #else
391 # define BLAS_FORCE_SINGLE_THREAD
392 # define BLAS_RESTORE_NUM_THREAD
393 #endif
394 
395 
396  struct dummy_class_for_blas_nbthread_init {
397  dummy_class_for_blas_nbthread_init(void)
398  { BLAS_FORCE_SINGLE_THREAD; }
399  };
400 
401  static dummy_class_for_blas_nbthread_init dcfbnti;
402 
403 
404 } /* end of namespace getfem. */
Iterator that runs over partitions on the current thread and sets the global (but thread-specific) pa...
Definition: getfem_omp.h:355
A singleton that Manages partitions on individual threads.
Definition: getfem_omp.h:381
partition_iterator begin()
beginning of the partitions for the current thread
Definition: getfem_omp.cc:214
size_type get_current_partition() const
active partition on the thread.
Definition: getfem_omp.cc:242
size_type get_nb_partitions() const
number of partitions or threads, depending on thread policy
Definition: getfem_omp.cc:253
partition_iterator end()
end of the partitions for the current thread
Definition: getfem_omp.cc:222
void set_nb_partitions(size_type)
for thread_behaviour::partition_threads set the total number of partitions.
Definition: getfem_omp.cc:197
void set_behaviour(thread_behaviour)
Sets the behaviour for the full program: either partitioning parallel loops according to the number o...
Definition: getfem_omp.cc:226
Allows to re-throw exceptions, generated in OpemMP parallel section.
Definition: getfem_omp.cc:119
A simple singleton implementation.
thread safe standard locale with RAII semantics
Tools for multithreaded, OpenMP and Boost based parallelization.
size_t size_type
used as the common size type in the library
Definition: bgeot_poly.h:48
GEneric Tool for Finite Element Methods.
bool not_multithreaded()
is the program is running on a single thread
Definition: getfem_omp.cc:109
size_type max_concurrency()
Maximum number of threads that can run concurrently.
Definition: getfem_omp.cc:111
bool me_is_multithreaded_now()
is the program running in the parallel section
Definition: getfem_omp.cc:105
void set_num_threads(int n)
set maximum number of OpenMP threads
Definition: getfem_omp.cc:107