StarPU Internal Handbook
workers.h
1 /* StarPU --- Runtime system for heterogeneous multicore architectures.
2  *
3  * Copyright (C) 2008-2020 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
4  * Copyright (C) 2013 Thibaut Lambert
5  * Copyright (C) 2016 Uppsala University
6  *
7  * StarPU is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU Lesser General Public License as published by
9  * the Free Software Foundation; either version 2.1 of the License, or (at
10  * your option) any later version.
11  *
12  * StarPU is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
15  *
16  * See the GNU Lesser General Public License in COPYING.LGPL for more details.
17  */
18 
19 #ifndef __WORKERS_H__
20 #define __WORKERS_H__
21 
23 /* @{ */
24 
25 #include <limits.h>
26 
27 #include <starpu.h>
28 #include <common/config.h>
29 #include <common/timing.h>
30 #include <common/fxt.h>
31 #include <common/thread.h>
32 #include <common/utils.h>
33 #include <core/jobs.h>
35 #include <core/sched_policy.h>
36 #include <core/topology.h>
37 #include <core/errorcheck.h>
38 #include <core/sched_ctx.h>
39 #include <core/sched_ctx_list.h>
40 #include <core/simgrid.h>
41 #ifdef STARPU_HAVE_HWLOC
42 #include <hwloc.h>
43 #endif
44 
45 #include <core/drivers.h>
48 
49 #ifdef STARPU_USE_MIC
51 #endif /* STARPU_USE_MIC */
52 
53 #ifdef STARPU_USE_MPI_MASTER_SLAVE
55 #endif
56 
57 #include <drivers/cpu/driver_cpu.h>
58 
59 #include <datawizard/datawizard.h>
60 
61 #include <starpu_parameters.h>
62 
63 #define STARPU_MAX_PIPELINE 4
64 
65 enum initialization { UNINITIALIZED = 0, CHANGING, INITIALIZED };
66 
67 struct _starpu_ctx_change_list;
68 
70 LIST_TYPE(_starpu_worker,
71  struct _starpu_machine_config *config;
72  starpu_pthread_mutex_t mutex;
73  enum starpu_worker_archtype arch;
74  uint32_t worker_mask;
75  struct starpu_perfmodel_arch perf_arch;
76  starpu_pthread_t worker_thread;
77  unsigned devid;
78  unsigned subworkerid;
79  int bindid;
80  int workerid;
84  starpu_pthread_cond_t started_cond;
85  starpu_pthread_cond_t ready_cond;
86  unsigned memory_node;
87  unsigned numa_memory_node;
92  starpu_pthread_cond_t sched_cond;
93  starpu_pthread_mutex_t sched_mutex;
94  unsigned state_relax_refcnt;
95 #ifdef STARPU_SPINLOCK_CHECK
96  const char *relax_on_file;
97  int relax_on_line;
98  const char *relax_on_func;
99  const char *relax_off_file;
100  int relax_off_line;
101  const char *relax_off_func;
102 #endif
120  starpu_pthread_t thread_changing_ctx;
128  struct _starpu_ctx_change_list ctx_change_list;
129  struct starpu_task_list local_tasks;
130  struct starpu_task **local_ordered_tasks;
134  struct starpu_task *current_task;
135  struct starpu_task *current_tasks[STARPU_MAX_PIPELINE];
136 #ifdef STARPU_SIMGRID
137  starpu_pthread_wait_t wait;
138 #endif
139 
140  struct timespec cl_start;
141  struct timespec cl_end;
142  unsigned char first_task;
143  unsigned char ntasks;
144  unsigned char pipeline_length;
145  unsigned char pipeline_stuck;
147  unsigned worker_is_running;
148  unsigned worker_is_initialized;
149  enum _starpu_worker_status status;
150  unsigned state_keep_awake;
151  char name[128];
152  char short_name[32];
153  unsigned run_by_starpu;
154  struct _starpu_driver_ops *driver_ops;
155 
156  struct _starpu_sched_ctx_list *sched_ctx_list;
157  int tmp_sched_ctx;
158  unsigned nsched_ctxs;
159  struct _starpu_barrier_counter tasks_barrier;
161  unsigned has_prev_init;
163  unsigned removed_from_ctx[STARPU_NMAX_SCHED_CTXS+1];
164 
165  unsigned spinning_backoff ;
169  struct starpu_task *task_transferring;
175  unsigned shares_tasks_lists[STARPU_NMAX_SCHED_CTXS+1];
176 
177  unsigned poped_in_ctx[STARPU_NMAX_SCHED_CTXS+1];
183  unsigned reverse_phase[2];
184 
185  unsigned pop_ctx_priority;
188  struct _starpu_sched_ctx *stream_ctx;
189 
190 #ifdef __GLIBC__
191  cpu_set_t cpu_set;
192 #endif /* __GLIBC__ */
193 #ifdef STARPU_HAVE_HWLOC
194  hwloc_bitmap_t hwloc_cpu_set;
195  hwloc_obj_t hwloc_obj;
196 #endif
197 
198  /* Keep this last, to make sure to separate worker data in separate
199  cache lines. */
200  char padding[STARPU_CACHELINE_SIZE];
201 );
202 
204 {
205  struct starpu_perfmodel_arch perf_arch;
206  uint32_t worker_mask;
207  int worker_size;
208  unsigned memory_node;
209  int combined_workerid[STARPU_NMAXWORKERS];
210 #ifdef STARPU_USE_MP
211  int count;
212  starpu_pthread_mutex_t count_mutex;
213 #endif
214 
215 #ifdef __GLIBC__
216  cpu_set_t cpu_set;
217 #endif /* __GLIBC__ */
218 #ifdef STARPU_HAVE_HWLOC
219  hwloc_bitmap_t hwloc_cpu_set;
220 #endif
221 
222  /* Keep this last, to make sure to separate worker data in separate
223  cache lines. */
224  char padding[STARPU_CACHELINE_SIZE];
225 };
226 
232 {
233  starpu_pthread_mutex_t mutex;
234  starpu_pthread_t worker_thread;
235  unsigned nworkers;
236  unsigned started;
237  void *retval;
238  struct _starpu_worker *workers;
239  starpu_pthread_cond_t ready_cond;
240  unsigned set_is_initialized;
241 };
242 
243 #ifdef STARPU_USE_MPI_MASTER_SLAVE
244 extern struct _starpu_worker_set mpi_worker_set[STARPU_MAXMPIDEVS];
245 #endif
246 
248 {
250  unsigned nworkers;
251 
254 
255  unsigned nsched_ctxs;
256 
257 #ifdef STARPU_HAVE_HWLOC
258 
259  hwloc_topology_t hwtopology;
260 #endif
261 
262  struct starpu_tree *tree;
263 
267  unsigned nhwcpus;
268 
272  unsigned nhwpus;
273 
277  unsigned nhwcudagpus;
278 
282  unsigned nhwopenclgpus;
283 
287  unsigned nhwmpi;
288 
290  unsigned ncpus;
291 
293  unsigned ncudagpus;
294  unsigned nworkerpercuda;
295  int cuda_th_per_stream;
296  int cuda_th_per_dev;
297 
299  unsigned nopenclgpus;
300 
302  unsigned nmpidevices;
303  unsigned nhwmpidevices;
304 
305  unsigned nhwmpicores[STARPU_MAXMPIDEVS];
306  unsigned nmpicores[STARPU_MAXMPIDEVS];
307 
310  unsigned nhwmicdevices;
311  unsigned nmicdevices;
312 
313  unsigned nhwmiccores[STARPU_MAXMICDEVS];
314  unsigned nmiccores[STARPU_MAXMICDEVS];
315 
323  unsigned workers_bindid[STARPU_NMAXWORKERS];
324 
331  unsigned workers_cuda_gpuid[STARPU_NMAXWORKERS];
332 
339  unsigned workers_opencl_gpuid[STARPU_NMAXWORKERS];
340 
341  /*** Indicates the successive MIC devices that should be used
342  * by the MIC driver. It is either filled according to the
343  * user's explicit parameters (from starpu_conf) or according
344  * to the STARPU_WORKERS_MICID env. variable. Otherwise, they
345  * are taken in ID order. */
349  unsigned workers_mpi_ms_deviceid[STARPU_NMAXWORKERS];
350 };
351 
353 {
354  struct _starpu_machine_topology topology;
355 
356 #ifdef STARPU_HAVE_HWLOC
357  int cpu_depth;
358  int pu_depth;
359 #endif
360 
363  char currently_bound[STARPU_NMAXWORKERS];
364  char currently_shared[STARPU_NMAXWORKERS];
365 
368 
371 
374 
377 
388 
389  /* Separate out previous variables from per-worker data. */
390  char padding1[STARPU_CACHELINE_SIZE];
391 
394  struct _starpu_worker workers[STARPU_NMAXWORKERS];
395 
398  struct _starpu_combined_worker combined_workers[STARPU_NMAX_COMBINEDWORKERS];
399 
400  starpu_pthread_mutex_t submitted_mutex;
401 
402  /* Separate out previous mutex from the rest of the data. */
403  char padding2[STARPU_CACHELINE_SIZE];
404 
406  struct
407  {
408  int *workerids;
409  unsigned nworkers;
411  unsigned nbindid;
416  uint32_t worker_mask;
417 
419  struct starpu_conf conf;
420 
422  unsigned running;
423 
424  int disable_kernels;
425 
429 
431  struct _starpu_sched_ctx sched_ctxs[STARPU_NMAX_SCHED_CTXS+1];
432 
434  unsigned submitting;
435 
436  int watchdog_ok;
437 };
438 
439 extern int _starpu_worker_parallel_blocks;
440 
441 extern struct _starpu_machine_config _starpu_config STARPU_ATTRIBUTE_INTERNAL;
442 extern int _starpu_keys_initialized STARPU_ATTRIBUTE_INTERNAL;
443 extern starpu_pthread_key_t _starpu_worker_key STARPU_ATTRIBUTE_INTERNAL;
444 extern starpu_pthread_key_t _starpu_worker_set_key STARPU_ATTRIBUTE_INTERNAL;
445 
447 void _starpu_set_argc_argv(int *argc, char ***argv);
448 int *_starpu_get_argc();
449 char ***_starpu_get_argv();
450 
452 void _starpu_conf_check_environment(struct starpu_conf *conf);
453 
455 void _starpu_may_pause(void);
456 
458 static inline unsigned _starpu_machine_is_running(void)
459 {
460  unsigned ret;
461  /* running is just protected by a memory barrier */
462  STARPU_RMB();
463 
464  ANNOTATE_HAPPENS_AFTER(&_starpu_config.running);
465  ret = _starpu_config.running;
466  ANNOTATE_HAPPENS_BEFORE(&_starpu_config.running);
467  return ret;
468 }
469 
470 
472 void _starpu_worker_init(struct _starpu_worker *workerarg, struct _starpu_machine_config *pconfig);
473 
475 uint32_t _starpu_worker_exists(struct starpu_task *);
476 
478 uint32_t _starpu_can_submit_cuda_task(void);
479 
481 uint32_t _starpu_can_submit_cpu_task(void);
482 
484 uint32_t _starpu_can_submit_opencl_task(void);
485 
488 unsigned _starpu_worker_can_block(unsigned memnode, struct _starpu_worker *worker);
489 
493 void _starpu_block_worker(int workerid, starpu_pthread_cond_t *cond, starpu_pthread_mutex_t *mutex);
494 
496 void _starpu_driver_start(struct _starpu_worker *worker, unsigned fut_key, unsigned sync);
498 void _starpu_worker_start(struct _starpu_worker *worker, unsigned fut_key, unsigned sync);
499 
500 static inline unsigned _starpu_worker_get_count(void)
501 {
502  return _starpu_config.topology.nworkers;
503 }
504 #define starpu_worker_get_count _starpu_worker_get_count
505 
509 static inline void _starpu_set_local_worker_key(struct _starpu_worker *worker)
510 {
511  STARPU_ASSERT(_starpu_keys_initialized);
512  STARPU_PTHREAD_SETSPECIFIC(_starpu_worker_key, worker);
513 }
514 
517 static inline struct _starpu_worker *_starpu_get_local_worker_key(void)
518 {
519  if (!_starpu_keys_initialized)
520  return NULL;
521  return (struct _starpu_worker *) STARPU_PTHREAD_GETSPECIFIC(_starpu_worker_key);
522 }
523 
527 static inline void _starpu_set_local_worker_set_key(struct _starpu_worker_set *worker)
528 {
529  STARPU_ASSERT(_starpu_keys_initialized);
530  STARPU_PTHREAD_SETSPECIFIC(_starpu_worker_set_key, worker);
531 }
532 
535 static inline struct _starpu_worker_set *_starpu_get_local_worker_set_key(void)
536 {
537  if (!_starpu_keys_initialized)
538  return NULL;
539  return (struct _starpu_worker_set *) STARPU_PTHREAD_GETSPECIFIC(_starpu_worker_set_key);
540 }
541 
544 static inline struct _starpu_worker *_starpu_get_worker_struct(unsigned id)
545 {
546  STARPU_ASSERT(id < starpu_worker_get_count());
547  return &_starpu_config.workers[id];
548 }
549 
552 static inline struct _starpu_sched_ctx *_starpu_get_sched_ctx_struct(unsigned id)
553 {
554  return (id > STARPU_NMAX_SCHED_CTXS) ? NULL : &_starpu_config.sched_ctxs[id];
555 }
556 
557 struct _starpu_combined_worker *_starpu_get_combined_worker_struct(unsigned id);
558 
561 static inline struct _starpu_machine_config *_starpu_get_machine_config(void)
562 {
563  return &_starpu_config;
564 }
565 
567 static inline int _starpu_get_disable_kernels(void)
568 {
569  return _starpu_config.disable_kernels;
570 }
571 
573 static inline enum _starpu_worker_status _starpu_worker_get_status(int workerid)
574 {
575  return _starpu_config.workers[workerid].status;
576 }
577 
580 static inline void _starpu_worker_set_status(int workerid, enum _starpu_worker_status status)
581 {
582  _starpu_config.workers[workerid].status = status;
583 }
584 
586 static inline struct _starpu_sched_ctx* _starpu_get_initial_sched_ctx(void)
587 {
588  return &_starpu_config.sched_ctxs[STARPU_GLOBAL_SCHED_CTX];
589 }
590 
591 int starpu_worker_get_nids_by_type(enum starpu_worker_archtype type, int *workerids, int maxsize);
592 
595 int starpu_worker_get_nids_ctx_free_by_type(enum starpu_worker_archtype type, int *workerids, int maxsize);
596 
597 static inline unsigned _starpu_worker_mutex_is_sched_mutex(int workerid, starpu_pthread_mutex_t *mutex)
598 {
599  struct _starpu_worker *w = _starpu_get_worker_struct(workerid);
600  return &w->sched_mutex == mutex;
601 }
602 
603 static inline int _starpu_worker_get_nsched_ctxs(int workerid)
604 {
605  return _starpu_config.workers[workerid].nsched_ctxs;
606 }
607 
609 static inline unsigned _starpu_get_nsched_ctxs(void)
610 {
611  /* topology.nsched_ctxs may be increased asynchronously in sched_ctx_create */
612  STARPU_RMB();
613  return _starpu_config.topology.nsched_ctxs;
614 }
615 
617 static inline int _starpu_worker_get_id(void)
618 {
619  struct _starpu_worker * worker;
620 
621  worker = _starpu_get_local_worker_key();
622  if (worker)
623  {
624  return worker->workerid;
625  }
626  else
627  {
628  /* there is no worker associated to that thread, perhaps it is
629  * a thread from the application or this is some SPU worker */
630  return -1;
631  }
632 }
633 #define starpu_worker_get_id _starpu_worker_get_id
634 
637 static inline unsigned __starpu_worker_get_id_check(const char *f, int l)
638 {
639  (void) l;
640  (void) f;
641  int id = starpu_worker_get_id();
642  STARPU_ASSERT_MSG(id>=0, "%s:%d Cannot be called from outside a worker\n", f, l);
643  return id;
644 }
645 #define _starpu_worker_get_id_check(f,l) __starpu_worker_get_id_check(f,l)
646 
647 enum starpu_node_kind _starpu_worker_get_node_kind(enum starpu_worker_archtype type);
648 
649 void _starpu_worker_set_stream_ctx(unsigned workerid, struct _starpu_sched_ctx *sched_ctx);
650 
651 struct _starpu_sched_ctx* _starpu_worker_get_ctx_stream(unsigned stream_workerid);
652 
658 static inline void _starpu_worker_request_blocking_in_parallel(struct _starpu_worker * const worker)
659 {
660  _starpu_worker_parallel_blocks = 1;
661  /* flush pending requests to start on a fresh transaction epoch */
662  while (worker->state_unblock_in_parallel_req)
663  STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
664 
665  /* announce blocking intent */
666  STARPU_ASSERT(worker->block_in_parallel_ref_count < UINT_MAX);
667  worker->block_in_parallel_ref_count++;
668 
669  if (worker->block_in_parallel_ref_count == 1)
670  {
671  /* only the transition from 0 to 1 triggers the block_in_parallel_req */
672 
673  STARPU_ASSERT(!worker->state_blocked_in_parallel);
674  STARPU_ASSERT(!worker->state_block_in_parallel_req);
675  STARPU_ASSERT(!worker->state_block_in_parallel_ack);
676  STARPU_ASSERT(!worker->state_unblock_in_parallel_req);
677  STARPU_ASSERT(!worker->state_unblock_in_parallel_ack);
678 
679  /* trigger the block_in_parallel_req */
680  worker->state_block_in_parallel_req = 1;
681  STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
682 #ifdef STARPU_SIMGRID
683  starpu_pthread_queue_broadcast(&_starpu_simgrid_task_queue[worker->workerid]);
684 #endif
685 
686  /* wait for block_in_parallel_req to be processed */
687  while (!worker->state_block_in_parallel_ack)
688  STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
689 
690  STARPU_ASSERT(worker->block_in_parallel_ref_count >= 1);
691  STARPU_ASSERT(worker->state_block_in_parallel_req);
692  STARPU_ASSERT(worker->state_blocked_in_parallel);
693 
694  /* reset block_in_parallel_req state flags */
695  worker->state_block_in_parallel_req = 0;
696  worker->state_block_in_parallel_ack = 0;
697 
698  /* broadcast block_in_parallel_req state flags reset */
699  STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
700  }
701 }
702 
707 static inline void _starpu_worker_request_unblocking_in_parallel(struct _starpu_worker * const worker)
708 {
709  /* flush pending requests to start on a fresh transaction epoch */
710  while (worker->state_block_in_parallel_req)
711  STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
712 
713  /* unblocking may be requested unconditionnally
714  * thus, check is unblocking is really needed */
715  if (worker->state_blocked_in_parallel)
716  {
717  if (worker->block_in_parallel_ref_count == 1)
718  {
719  /* only the transition from 1 to 0 triggers the unblock_in_parallel_req */
720 
721  STARPU_ASSERT(!worker->state_block_in_parallel_req);
722  STARPU_ASSERT(!worker->state_block_in_parallel_ack);
723  STARPU_ASSERT(!worker->state_unblock_in_parallel_req);
724  STARPU_ASSERT(!worker->state_unblock_in_parallel_ack);
725 
726  /* trigger the unblock_in_parallel_req */
727  worker->state_unblock_in_parallel_req = 1;
728  STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
729 
730  /* wait for the unblock_in_parallel_req to be processed */
731  while (!worker->state_unblock_in_parallel_ack)
732  STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
733 
734  STARPU_ASSERT(worker->state_unblock_in_parallel_req);
735  STARPU_ASSERT(!worker->state_blocked_in_parallel);
736 
737  /* reset unblock_in_parallel_req state flags */
738  worker->state_unblock_in_parallel_req = 0;
739  worker->state_unblock_in_parallel_ack = 0;
740 
741  /* broadcast unblock_in_parallel_req state flags reset */
742  STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
743  }
744 
745  /* announce unblocking complete */
746  STARPU_ASSERT(worker->block_in_parallel_ref_count > 0);
747  worker->block_in_parallel_ref_count--;
748  }
749 }
750 
756 static inline void _starpu_worker_process_block_in_parallel_requests(struct _starpu_worker * const worker)
757 {
758  while (worker->state_block_in_parallel_req)
759  {
760  STARPU_ASSERT(!worker->state_blocked_in_parallel);
761  STARPU_ASSERT(!worker->state_block_in_parallel_ack);
762  STARPU_ASSERT(!worker->state_unblock_in_parallel_req);
763  STARPU_ASSERT(!worker->state_unblock_in_parallel_ack);
764  STARPU_ASSERT(worker->block_in_parallel_ref_count > 0);
765 
766  /* enter effective blocked state */
767  worker->state_blocked_in_parallel = 1;
768 
769  /* notify block_in_parallel_req processing */
770  worker->state_block_in_parallel_ack = 1;
771  STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
772 
773  /* block */
774  while (!worker->state_unblock_in_parallel_req)
775  STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
776 
777  STARPU_ASSERT(worker->state_blocked_in_parallel);
778  STARPU_ASSERT(!worker->state_block_in_parallel_req);
779  STARPU_ASSERT(!worker->state_block_in_parallel_ack);
780  STARPU_ASSERT(!worker->state_unblock_in_parallel_ack);
781  STARPU_ASSERT(worker->block_in_parallel_ref_count > 0);
782 
783  /* leave effective blocked state */
784  worker->state_blocked_in_parallel = 0;
785 
786  /* notify unblock_in_parallel_req processing */
787  worker->state_unblock_in_parallel_ack = 1;
788  STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
789  }
790 }
791 
808 #ifdef STARPU_SPINLOCK_CHECK
809 static inline void __starpu_worker_enter_sched_op(struct _starpu_worker * const worker, const char*file, int line, const char* func)
810 #else
811 static inline void _starpu_worker_enter_sched_op(struct _starpu_worker * const worker)
812 #endif
813 {
814  STARPU_ASSERT(!worker->state_sched_op_pending);
816  {
817  /* process pending block requests before entering a sched_op region */
818  _starpu_worker_process_block_in_parallel_requests(worker);
819  while (worker->state_changing_ctx_notice)
820  {
821  STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
822 
823  /* new block requests may have been triggered during the wait,
824  * need to check again */
825  _starpu_worker_process_block_in_parallel_requests(worker);
826  }
827  }
828  else
829  {
830  /* if someone observed the worker state since the last call, postpone block request
831  * processing for one sched_op turn more, because the observer will not have seen
832  * new block requests between its observation and now.
833  *
834  * however, the worker still has to wait for context change operations to complete
835  * before entering sched_op again*/
836  while (worker->state_changing_ctx_notice)
837  {
838  STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
839  }
840  }
841 
842  /* no block request and no ctx change ahead,
843  * enter sched_op */
844  worker->state_sched_op_pending = 1;
846  worker->state_relax_refcnt = 0;
847 #ifdef STARPU_SPINLOCK_CHECK
848  worker->relax_on_file = file;
849  worker->relax_on_line = line;
850  worker->relax_on_func = func;
851 #endif
852 }
853 #ifdef STARPU_SPINLOCK_CHECK
854 #define _starpu_worker_enter_sched_op(worker) __starpu_worker_enter_sched_op((worker), __FILE__, __LINE__, __starpu_func__)
855 #endif
856 
862 #ifdef STARPU_SPINLOCK_CHECK
863 static inline void __starpu_worker_leave_sched_op(struct _starpu_worker * const worker, const char*file, int line, const char* func)
864 #else
865 static inline void _starpu_worker_leave_sched_op(struct _starpu_worker * const worker)
866 #endif
867 {
868  STARPU_ASSERT(worker->state_sched_op_pending);
869  worker->state_relax_refcnt = 1;
870 #ifdef STARPU_SPINLOCK_CHECK
871  worker->relax_off_file = file;
872  worker->relax_off_line = line;
873  worker->relax_off_func = func;
874 #endif
875  worker->state_sched_op_pending = 0;
876  STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
878 }
879 #ifdef STARPU_SPINLOCK_CHECK
880 #define _starpu_worker_leave_sched_op(worker) __starpu_worker_leave_sched_op((worker), __FILE__, __LINE__, __starpu_func__)
881 #endif
882 
883 static inline int _starpu_worker_sched_op_pending(void)
884 {
885  int workerid = starpu_worker_get_id();
886  if (workerid == -1)
887  return 0;
888  struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
889  STARPU_ASSERT(worker != NULL);
890  return worker->state_sched_op_pending;
891 }
892 
902 static inline void _starpu_worker_enter_changing_ctx_op(struct _starpu_worker * const worker)
903 {
904  STARPU_ASSERT(!starpu_pthread_equal(worker->thread_changing_ctx, starpu_pthread_self()));
905  /* flush pending requests to start on a fresh transaction epoch */
906  while (worker->state_changing_ctx_notice)
907  STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
908 
909  /* announce changing_ctx intent
910  *
911  * - an already started sched_op is allowed to complete
912  * - no new sched_op may be started
913  */
914  worker->state_changing_ctx_notice = 1;
915 
916  worker->thread_changing_ctx = starpu_pthread_self();
917 
918  /* allow for an already started sched_op to complete */
919  if (worker->state_sched_op_pending)
920  {
921  /* request sched_op to broadcast when way is cleared */
922  worker->state_changing_ctx_waiting = 1;
923 
924  /* wait for sched_op completion */
925  STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
926 #ifdef STARPU_SIMGRID
927  starpu_pthread_queue_broadcast(&_starpu_simgrid_task_queue[worker->workerid]);
928 #endif
929  do
930  {
931  STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
932  }
933  while (worker->state_sched_op_pending);
934 
935  /* reset flag so other sched_ops wont have to broadcast state */
936  worker->state_changing_ctx_waiting = 0;
937  }
938 }
939 
944 static inline void _starpu_worker_leave_changing_ctx_op(struct _starpu_worker * const worker)
945 {
946  worker->thread_changing_ctx = (starpu_pthread_t)0;
947  worker->state_changing_ctx_notice = 0;
948  STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
949 }
950 
953 #ifdef STARPU_SPINLOCK_CHECK
954 static inline void __starpu_worker_relax_on(const char*file, int line, const char* func)
955 #else
956 static inline void _starpu_worker_relax_on(void)
957 #endif
958 {
959  struct _starpu_worker *worker = _starpu_get_local_worker_key();
960  if (worker == NULL)
961  return;
962  if (!worker->state_sched_op_pending)
963  return;
964  STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
965 #ifdef STARPU_SPINLOCK_CHECK
966  STARPU_ASSERT_MSG(worker->state_relax_refcnt<UINT_MAX, "relax last turn on in %s (%s:%d)\n", worker->relax_on_func, worker->relax_on_file, worker->relax_on_line);
967 #else
968  STARPU_ASSERT(worker->state_relax_refcnt<UINT_MAX);
969 #endif
970  worker->state_relax_refcnt++;
971 #ifdef STARPU_SPINLOCK_CHECK
972  worker->relax_on_file = file;
973  worker->relax_on_line = line;
974  worker->relax_on_func = func;
975 #endif
976  STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
977  STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
978 }
979 #ifdef STARPU_SPINLOCK_CHECK
980 #define _starpu_worker_relax_on() __starpu_worker_relax_on(__FILE__, __LINE__, __starpu_func__)
981 #endif
982 #define starpu_worker_relax_on _starpu_worker_relax_on
983 
985 #ifdef STARPU_SPINLOCK_CHECK
986 static inline void __starpu_worker_relax_on_locked(struct _starpu_worker *worker, const char*file, int line, const char* func)
987 #else
988 static inline void _starpu_worker_relax_on_locked(struct _starpu_worker *worker)
989 #endif
990 {
991  if (!worker->state_sched_op_pending)
992  return;
993 #ifdef STARPU_SPINLOCK_CHECK
994  STARPU_ASSERT_MSG(worker->state_relax_refcnt<UINT_MAX, "relax last turn on in %s (%s:%d)\n", worker->relax_on_func, worker->relax_on_file, worker->relax_on_line);
995 #else
996  STARPU_ASSERT(worker->state_relax_refcnt<UINT_MAX);
997 #endif
998  worker->state_relax_refcnt++;
999 #ifdef STARPU_SPINLOCK_CHECK
1000  worker->relax_on_file = file;
1001  worker->relax_on_line = line;
1002  worker->relax_on_func = func;
1003 #endif
1004  STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
1005 }
1006 #ifdef STARPU_SPINLOCK_CHECK
1007 #define _starpu_worker_relax_on_locked(worker) __starpu_worker_relax_on_locked(worker,__FILE__, __LINE__, __starpu_func__)
1008 #endif
1009 
1010 #ifdef STARPU_SPINLOCK_CHECK
1011 static inline void __starpu_worker_relax_off(const char*file, int line, const char* func)
1012 #else
1013 static inline void _starpu_worker_relax_off(void)
1014 #endif
1015 {
1016  int workerid = starpu_worker_get_id();
1017  if (workerid == -1)
1018  return;
1019  struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
1020  STARPU_ASSERT(worker != NULL);
1021  if (!worker->state_sched_op_pending)
1022  return;
1023  STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
1024 #ifdef STARPU_SPINLOCK_CHECK
1025  STARPU_ASSERT_MSG(worker->state_relax_refcnt>0, "relax last turn off in %s (%s:%d)\n", worker->relax_on_func, worker->relax_on_file, worker->relax_on_line);
1026 #else
1027  STARPU_ASSERT(worker->state_relax_refcnt>0);
1028 #endif
1029  worker->state_relax_refcnt--;
1030 #ifdef STARPU_SPINLOCK_CHECK
1031  worker->relax_off_file = file;
1032  worker->relax_off_line = line;
1033  worker->relax_off_func = func;
1034 #endif
1035  STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
1036 }
1037 #ifdef STARPU_SPINLOCK_CHECK
1038 #define _starpu_worker_relax_off() __starpu_worker_relax_off(__FILE__, __LINE__, __starpu_func__)
1039 #endif
1040 #define starpu_worker_relax_off _starpu_worker_relax_off
1041 
1042 #ifdef STARPU_SPINLOCK_CHECK
1043 static inline void __starpu_worker_relax_off_locked(const char*file, int line, const char* func)
1044 #else
1045 static inline void _starpu_worker_relax_off_locked(void)
1046 #endif
1047 {
1048  int workerid = starpu_worker_get_id();
1049  if (workerid == -1)
1050  return;
1051  struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
1052  STARPU_ASSERT(worker != NULL);
1053  if (!worker->state_sched_op_pending)
1054  return;
1055 #ifdef STARPU_SPINLOCK_CHECK
1056  STARPU_ASSERT_MSG(worker->state_relax_refcnt>0, "relax last turn off in %s (%s:%d)\n", worker->relax_on_func, worker->relax_on_file, worker->relax_on_line);
1057 #else
1058  STARPU_ASSERT(worker->state_relax_refcnt>0);
1059 #endif
1060  worker->state_relax_refcnt--;
1061 #ifdef STARPU_SPINLOCK_CHECK
1062  worker->relax_off_file = file;
1063  worker->relax_off_line = line;
1064  worker->relax_off_func = func;
1065 #endif
1066 }
1067 #ifdef STARPU_SPINLOCK_CHECK
1068 #define _starpu_worker_relax_off_locked() __starpu_worker_relax_off_locked(__FILE__, __LINE__, __starpu_func__)
1069 #endif
1070 
1071 static inline int _starpu_worker_get_relax_state(void)
1072 {
1073  int workerid = starpu_worker_get_id();
1074  if (workerid < 0)
1075  return 1;
1076  struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
1077  STARPU_ASSERT(worker != NULL);
1078  return worker->state_relax_refcnt != 0;
1079 }
1080 #define starpu_worker_get_relax_state _starpu_worker_get_relax_state
1081 
1086 static inline void _starpu_worker_lock(int workerid)
1087 {
1088  struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
1089  STARPU_ASSERT(worker != NULL);
1090  int cur_workerid = starpu_worker_get_id();
1091  if (workerid != cur_workerid)
1092  {
1093  starpu_worker_relax_on();
1094 
1095  STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
1096  while (!worker->state_relax_refcnt)
1097  {
1098  STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
1099  }
1100  }
1101  else
1102  {
1103  STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
1104  }
1105 }
1106 
1107 static inline int _starpu_worker_trylock(int workerid)
1108 {
1109  struct _starpu_worker *cur_worker = _starpu_get_local_worker_key();
1110  int cur_workerid = cur_worker->workerid;
1111  struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
1112  STARPU_ASSERT(worker != NULL);
1113 
1114  /* Start with ourself */
1115  int ret = STARPU_PTHREAD_MUTEX_TRYLOCK_SCHED(&cur_worker->sched_mutex);
1116  if (ret)
1117  return ret;
1118  if (workerid == cur_workerid)
1119  /* We only needed to lock ourself */
1120  return 0;
1121 
1122  /* Now try to lock the other worker */
1123  ret = STARPU_PTHREAD_MUTEX_TRYLOCK_SCHED(&worker->sched_mutex);
1124  if (!ret)
1125  {
1126  /* Good, check that it is relaxed */
1127  ret = !worker->state_relax_refcnt;
1128  if (ret)
1129  STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
1130  }
1131  if (!ret)
1132  _starpu_worker_relax_on_locked(cur_worker);
1133  STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&cur_worker->sched_mutex);
1134  return ret;
1135 }
1136 
1137 static inline void _starpu_worker_unlock(int workerid)
1138 {
1139  struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
1140  STARPU_ASSERT(worker != NULL);
1141  STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
1142  int cur_workerid = starpu_worker_get_id();
1143  if (workerid != cur_workerid)
1144  {
1145  starpu_worker_relax_off();
1146  }
1147 }
1148 
1149 static inline void _starpu_worker_lock_self(void)
1150 {
1151  int workerid = starpu_worker_get_id_check();
1152  struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
1153  STARPU_ASSERT(worker != NULL);
1154  STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
1155 }
1156 
1157 static inline void _starpu_worker_unlock_self(void)
1158 {
1159  int workerid = starpu_worker_get_id_check();
1160  struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
1161  STARPU_ASSERT(worker != NULL);
1162  STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
1163 }
1164 
1165 static inline int _starpu_wake_worker_relax(int workerid)
1166 {
1167  _starpu_worker_lock(workerid);
1168  int ret = starpu_wake_worker_locked(workerid);
1169  _starpu_worker_unlock(workerid);
1170  return ret;
1171 }
1172 
1173 int starpu_wake_worker_relax_light(int workerid);
1174 
1179 void _starpu_worker_refuse_task(struct _starpu_worker *worker, struct starpu_task *task);
1180 
1181 /* @}*/
1182 
1183 #endif // __WORKERS_H__
driver_mpi_source.h
timing.h
_starpu_machine_config::opencl_nodeid
int opencl_nodeid
Definition: workers.h:383
_starpu_worker::has_prev_init
unsigned has_prev_init
Definition: workers.h:161
_starpu_machine_topology::workers_bindid
unsigned workers_bindid[STARPU_NMAXWORKERS]
Definition: workers.h:323
driver_cuda.h
perfmodel.h
topology.h
_starpu_machine_topology::nhwmiccores
unsigned nhwmiccores[STARPU_MAXMICDEVS]
Definition: workers.h:313
jobs.h
_starpu_machine_topology::nhwmpicores
unsigned nhwmpicores[STARPU_MAXMPIDEVS]
Definition: workers.h:305
_starpu_machine_config::workers
struct _starpu_worker workers[STARPU_NMAXWORKERS]
Definition: workers.h:394
_starpu_machine_topology::workers_cuda_gpuid
unsigned workers_cuda_gpuid[STARPU_NMAXWORKERS]
Definition: workers.h:331
_starpu_barrier_counter
Definition: barrier_counter.h:27
_starpu_worker::state_blocked_in_parallel_observed
unsigned state_blocked_in_parallel_observed
Definition: workers.h:107
_starpu_worker::worker_mask
uint32_t worker_mask
Definition: workers.h:74
_starpu_worker_status
_starpu_worker_status
Definition: errorcheck.h:26
_starpu_worker::is_slave_somewhere
unsigned is_slave_somewhere
Definition: workers.h:186
_starpu_worker::current_rank
int current_rank
Definition: workers.h:82
_starpu_worker::state_block_in_parallel_req
unsigned state_block_in_parallel_req
Definition: workers.h:108
_starpu_worker::thread_changing_ctx
starpu_pthread_t thread_changing_ctx
Definition: workers.h:120
_starpu_machine_topology::workers_mpi_ms_deviceid
unsigned workers_mpi_ms_deviceid[STARPU_NMAXWORKERS]
Definition: workers.h:349
_starpu_combined_worker::memory_node
unsigned memory_node
Definition: workers.h:208
_starpu_machine_config::cpus_nodeid
int cpus_nodeid
Definition: workers.h:379
_starpu_machine_topology::ncombinedworkers
unsigned ncombinedworkers
Definition: workers.h:253
_starpu_worker::pipeline_length
unsigned char pipeline_length
Definition: workers.h:144
starpu_parameters.h
_starpu_worker::state_unblock_in_parallel_req
unsigned state_unblock_in_parallel_req
Definition: workers.h:110
_starpu_worker::state_relax_refcnt
unsigned state_relax_refcnt
Definition: workers.h:94
_starpu_worker::sched_cond
starpu_pthread_cond_t sched_cond
Definition: workers.h:92
_starpu_machine_topology::nhwopenclgpus
unsigned nhwopenclgpus
Definition: workers.h:282
_starpu_worker::memory_node
unsigned memory_node
Definition: workers.h:86
drivers.h
_starpu_machine_topology::nmpidevices
unsigned nmpidevices
Definition: workers.h:302
_starpu_machine_config::sched_ctxs
struct _starpu_sched_ctx sched_ctxs[STARPU_NMAX_SCHED_CTXS+1]
Definition: workers.h:431
_starpu_worker::subworkerid
unsigned subworkerid
Definition: workers.h:78
_starpu_machine_topology::nhwmpi
unsigned nhwmpi
Definition: workers.h:287
_starpu_worker::state_unblock_in_parallel_ack
unsigned state_unblock_in_parallel_ack
Definition: workers.h:111
_starpu_worker::bindid
int bindid
Definition: workers.h:79
_starpu_combined_worker::perf_arch
struct starpu_perfmodel_arch perf_arch
Definition: workers.h:205
_starpu_worker::state_blocked_in_parallel
unsigned state_blocked_in_parallel
Definition: workers.h:106
_starpu_machine_config::conf
struct starpu_conf conf
Definition: workers.h:419
_starpu_worker::state_changing_ctx_waiting
unsigned state_changing_ctx_waiting
Definition: workers.h:104
_starpu_worker::local_ordered_tasks_size
unsigned local_ordered_tasks_size
Definition: workers.h:131
_starpu_sched_ctx
Definition: sched_ctx.h:46
_starpu_worker_apply_deferred_ctx_changes
void _starpu_worker_apply_deferred_ctx_changes(void)
_starpu_worker::spinning_backoff
unsigned spinning_backoff
Definition: workers.h:165
_starpu_worker::state_keep_awake
unsigned state_keep_awake
Definition: workers.h:150
_starpu_worker
Definition: workers.h:70
_starpu_machine_config::pause_depth
int pause_depth
Definition: workers.h:428
_starpu_machine_topology::nhwpus
unsigned nhwpus
Definition: workers.h:272
_starpu_machine_config::submitting
unsigned submitting
Definition: workers.h:434
sched_policy.h
_starpu_machine_topology::nworkers
unsigned nworkers
Definition: workers.h:250
_starpu_worker::pipeline_stuck
unsigned char pipeline_stuck
Definition: workers.h:145
_starpu_worker_set::started
unsigned started
Definition: workers.h:236
_starpu_worker::worker_thread
starpu_pthread_t worker_thread
Definition: workers.h:76
utils.h
driver_cpu.h
fxt.h
_starpu_worker::run_by_starpu
unsigned run_by_starpu
Definition: workers.h:153
_starpu_machine_topology::nhwmicdevices
unsigned nhwmicdevices
Definition: workers.h:310
_starpu_machine_config::current_bindid
int current_bindid
Definition: workers.h:362
_starpu_machine_config::worker_mask
uint32_t worker_mask
Definition: workers.h:416
_starpu_worker::state_changing_ctx_notice
unsigned state_changing_ctx_notice
Definition: workers.h:105
_starpu_machine_config::bindid_workers
struct _starpu_machine_config::@4 * bindid_workers
_starpu_machine_config::combined_workers
struct _starpu_combined_worker combined_workers[STARPU_NMAX_COMBINEDWORKERS]
Definition: workers.h:398
errorcheck.h
_starpu_worker::nb_buffers_transferred
unsigned nb_buffers_transferred
Definition: workers.h:167
_starpu_machine_config::nbindid
unsigned nbindid
Definition: workers.h:411
_starpu_machine_config::current_mpi_deviceid
int current_mpi_deviceid
Definition: workers.h:376
_starpu_worker::devid
unsigned devid
Definition: workers.h:77
_starpu_machine_config::running
unsigned running
Definition: workers.h:422
_starpu_worker::ntasks
unsigned char ntasks
Definition: workers.h:143
_starpu_worker::task_transferring
struct starpu_task * task_transferring
Definition: workers.h:169
_starpu_machine_config::current_cuda_gpuid
int current_cuda_gpuid
Definition: workers.h:367
_starpu_machine_topology::hwtopology
hwloc_topology_t hwtopology
Definition: workers.h:259
_starpu_combined_worker
Definition: workers.h:204
_starpu_machine_topology::nhwcudagpus
unsigned nhwcudagpus
Definition: workers.h:277
_starpu_worker::current_task
struct starpu_task * current_task
Definition: workers.h:134
_starpu_machine_config::mpi_nodeid
int mpi_nodeid
Definition: workers.h:387
driver_opencl.h
_starpu_machine_topology::ncpus
unsigned ncpus
Definition: workers.h:290
_starpu_machine_config::current_opencl_gpuid
int current_opencl_gpuid
Definition: workers.h:370
sched_ctx_list.h
_starpu_machine_topology::ncudagpus
unsigned ncudagpus
Definition: workers.h:293
_starpu_worker::worker_size
int worker_size
Definition: workers.h:83
_starpu_worker_set::ready_cond
starpu_pthread_cond_t ready_cond
Definition: workers.h:239
driver_mic_source.h
_starpu_machine_topology::nopenclgpus
unsigned nopenclgpus
Definition: workers.h:299
_starpu_machine_config
Definition: workers.h:353
_starpu_sched_ctx::id
unsigned id
Definition: sched_ctx.h:48
_starpu_machine_config::cuda_nodeid
int cuda_nodeid
Definition: workers.h:381
_starpu_machine_topology::nhwcpus
unsigned nhwcpus
Definition: workers.h:267
_starpu_machine_config::mic_nodeid
int mic_nodeid
Definition: workers.h:385
_starpu_machine_topology::workers_opencl_gpuid
unsigned workers_opencl_gpuid[STARPU_NMAXWORKERS]
Definition: workers.h:339
_starpu_worker::local_ordered_tasks
struct starpu_task ** local_ordered_tasks
Definition: workers.h:130
_starpu_worker::set
struct _starpu_worker_set * set
Definition: workers.h:146
_starpu_worker_set::worker_thread
starpu_pthread_t worker_thread
Definition: workers.h:234
_starpu_worker::combined_workerid
int combined_workerid
Definition: workers.h:81
_starpu_worker::first_task
unsigned char first_task
Definition: workers.h:142
sched_ctx.h
_starpu_worker::current_ordered_task
unsigned current_ordered_task
Definition: workers.h:132
_starpu_driver_ops
Definition: drivers.h:24
_starpu_machine_topology::tree
struct starpu_tree * tree
Definition: workers.h:262
datawizard.h
_starpu_worker::state_sched_op_pending
unsigned state_sched_op_pending
Definition: workers.h:103
_starpu_worker::nb_buffers_totransfer
unsigned nb_buffers_totransfer
Definition: workers.h:168
_starpu_worker::workerid
int workerid
Definition: workers.h:80
thread.h
_starpu_worker::current_ordered_task_order
unsigned current_ordered_task_order
Definition: workers.h:133
_starpu_worker::numa_memory_node
unsigned numa_memory_node
Definition: workers.h:87
_starpu_sched_ctx_list
Definition: sched_ctx_list.h:25
_starpu_worker::block_in_parallel_ref_count
unsigned block_in_parallel_ref_count
Definition: workers.h:119
_starpu_worker::state_block_in_parallel_ack
unsigned state_block_in_parallel_ack
Definition: workers.h:109
simgrid.h
_starpu_worker::sched_mutex
starpu_pthread_mutex_t sched_mutex
Definition: workers.h:93
_starpu_machine_topology
Definition: workers.h:248
_starpu_machine_config::current_mic_deviceid
int current_mic_deviceid
Definition: workers.h:373
_starpu_worker::ready_cond
starpu_pthread_cond_t ready_cond
Definition: workers.h:85
_starpu_worker::started_cond
starpu_pthread_cond_t started_cond
Definition: workers.h:84
_starpu_worker_set
Definition: workers.h:232
_starpu_combined_worker::worker_mask
uint32_t worker_mask
Definition: workers.h:206
_starpu_worker::pop_ctx_priority
unsigned pop_ctx_priority
Definition: workers.h:185