28 #include <common/config.h>
41 #ifdef STARPU_HAVE_HWLOC
53 #ifdef STARPU_USE_MPI_MASTER_SLAVE
63 #define STARPU_MAX_PIPELINE 4
65 enum initialization { UNINITIALIZED = 0, CHANGING, INITIALIZED };
67 struct _starpu_ctx_change_list;
72 starpu_pthread_mutex_t mutex;
73 enum starpu_worker_archtype arch;
75 struct starpu_perfmodel_arch perf_arch;
95 #ifdef STARPU_SPINLOCK_CHECK
96 const char *relax_on_file;
98 const char *relax_on_func;
99 const char *relax_off_file;
101 const char *relax_off_func;
128 struct _starpu_ctx_change_list ctx_change_list;
129 struct starpu_task_list local_tasks;
135 struct starpu_task *current_tasks[STARPU_MAX_PIPELINE];
136 #ifdef STARPU_SIMGRID
137 starpu_pthread_wait_t wait;
140 struct timespec cl_start;
141 struct timespec cl_end;
147 unsigned worker_is_running;
148 unsigned worker_is_initialized;
163 unsigned removed_from_ctx[STARPU_NMAX_SCHED_CTXS+1];
175 unsigned shares_tasks_lists[STARPU_NMAX_SCHED_CTXS+1];
177 unsigned poped_in_ctx[STARPU_NMAX_SCHED_CTXS+1];
183 unsigned reverse_phase[2];
193 #ifdef STARPU_HAVE_HWLOC
194 hwloc_bitmap_t hwloc_cpu_set;
195 hwloc_obj_t hwloc_obj;
200 char padding[STARPU_CACHELINE_SIZE];
209 int combined_workerid[STARPU_NMAXWORKERS];
212 starpu_pthread_mutex_t count_mutex;
218 #ifdef STARPU_HAVE_HWLOC
219 hwloc_bitmap_t hwloc_cpu_set;
224 char padding[STARPU_CACHELINE_SIZE];
233 starpu_pthread_mutex_t mutex;
240 unsigned set_is_initialized;
243 #ifdef STARPU_USE_MPI_MASTER_SLAVE
255 unsigned nsched_ctxs;
257 #ifdef STARPU_HAVE_HWLOC
294 unsigned nworkerpercuda;
295 int cuda_th_per_stream;
303 unsigned nhwmpidevices;
306 unsigned nmpicores[STARPU_MAXMPIDEVS];
311 unsigned nmicdevices;
314 unsigned nmiccores[STARPU_MAXMICDEVS];
356 #ifdef STARPU_HAVE_HWLOC
363 char currently_bound[STARPU_NMAXWORKERS];
364 char currently_shared[STARPU_NMAXWORKERS];
390 char padding1[STARPU_CACHELINE_SIZE];
400 starpu_pthread_mutex_t submitted_mutex;
403 char padding2[STARPU_CACHELINE_SIZE];
439 extern int _starpu_worker_parallel_blocks;
442 extern int _starpu_keys_initialized STARPU_ATTRIBUTE_INTERNAL;
443 extern starpu_pthread_key_t _starpu_worker_key STARPU_ATTRIBUTE_INTERNAL;
444 extern starpu_pthread_key_t _starpu_worker_set_key STARPU_ATTRIBUTE_INTERNAL;
447 void _starpu_set_argc_argv(
int *argc,
char ***argv);
448 int *_starpu_get_argc();
449 char ***_starpu_get_argv();
452 void _starpu_conf_check_environment(
struct starpu_conf *
conf);
455 void _starpu_may_pause(
void);
458 static inline unsigned _starpu_machine_is_running(
void)
464 ANNOTATE_HAPPENS_AFTER(&_starpu_config.running);
465 ret = _starpu_config.running;
466 ANNOTATE_HAPPENS_BEFORE(&_starpu_config.running);
475 uint32_t _starpu_worker_exists(
struct starpu_task *);
478 uint32_t _starpu_can_submit_cuda_task(
void);
481 uint32_t _starpu_can_submit_cpu_task(
void);
484 uint32_t _starpu_can_submit_opencl_task(
void);
488 unsigned _starpu_worker_can_block(
unsigned memnode,
struct _starpu_worker *worker);
493 void _starpu_block_worker(
int workerid, starpu_pthread_cond_t *cond, starpu_pthread_mutex_t *mutex);
496 void _starpu_driver_start(
struct _starpu_worker *worker,
unsigned fut_key,
unsigned sync);
498 void _starpu_worker_start(
struct _starpu_worker *worker,
unsigned fut_key,
unsigned sync);
500 static inline unsigned _starpu_worker_get_count(
void)
502 return _starpu_config.topology.nworkers;
504 #define starpu_worker_get_count _starpu_worker_get_count
509 static inline void _starpu_set_local_worker_key(
struct _starpu_worker *worker)
511 STARPU_ASSERT(_starpu_keys_initialized);
512 STARPU_PTHREAD_SETSPECIFIC(_starpu_worker_key, worker);
517 static inline struct _starpu_worker *_starpu_get_local_worker_key(
void)
519 if (!_starpu_keys_initialized)
521 return (
struct _starpu_worker *) STARPU_PTHREAD_GETSPECIFIC(_starpu_worker_key);
527 static inline void _starpu_set_local_worker_set_key(
struct _starpu_worker_set *worker)
529 STARPU_ASSERT(_starpu_keys_initialized);
530 STARPU_PTHREAD_SETSPECIFIC(_starpu_worker_set_key, worker);
537 if (!_starpu_keys_initialized)
539 return (
struct _starpu_worker_set *) STARPU_PTHREAD_GETSPECIFIC(_starpu_worker_set_key);
544 static inline struct _starpu_worker *_starpu_get_worker_struct(
unsigned id)
546 STARPU_ASSERT(
id < starpu_worker_get_count());
547 return &_starpu_config.workers[id];
552 static inline struct _starpu_sched_ctx *_starpu_get_sched_ctx_struct(
unsigned id)
554 return (
id > STARPU_NMAX_SCHED_CTXS) ? NULL : &_starpu_config.sched_ctxs[
id];
563 return &_starpu_config;
567 static inline int _starpu_get_disable_kernels(
void)
569 return _starpu_config.disable_kernels;
575 return _starpu_config.workers[workerid].status;
582 _starpu_config.workers[workerid].status = status;
588 return &_starpu_config.sched_ctxs[STARPU_GLOBAL_SCHED_CTX];
591 int starpu_worker_get_nids_by_type(
enum starpu_worker_archtype type,
int *workerids,
int maxsize);
595 int starpu_worker_get_nids_ctx_free_by_type(
enum starpu_worker_archtype type,
int *workerids,
int maxsize);
597 static inline unsigned _starpu_worker_mutex_is_sched_mutex(
int workerid, starpu_pthread_mutex_t *mutex)
603 static inline int _starpu_worker_get_nsched_ctxs(
int workerid)
605 return _starpu_config.workers[
workerid].nsched_ctxs;
609 static inline unsigned _starpu_get_nsched_ctxs(
void)
613 return _starpu_config.topology.nsched_ctxs;
617 static inline int _starpu_worker_get_id(
void)
621 worker = _starpu_get_local_worker_key();
633 #define starpu_worker_get_id _starpu_worker_get_id
637 static inline unsigned __starpu_worker_get_id_check(
const char *f,
int l)
641 int id = starpu_worker_get_id();
642 STARPU_ASSERT_MSG(
id>=0,
"%s:%d Cannot be called from outside a worker\n", f, l);
645 #define _starpu_worker_get_id_check(f,l) __starpu_worker_get_id_check(f,l)
647 enum starpu_node_kind _starpu_worker_get_node_kind(
enum starpu_worker_archtype type);
651 struct _starpu_sched_ctx* _starpu_worker_get_ctx_stream(
unsigned stream_workerid);
658 static inline void _starpu_worker_request_blocking_in_parallel(
struct _starpu_worker *
const worker)
660 _starpu_worker_parallel_blocks = 1;
681 STARPU_PTHREAD_COND_BROADCAST(&worker->
sched_cond);
682 #ifdef STARPU_SIMGRID
683 starpu_pthread_queue_broadcast(&_starpu_simgrid_task_queue[worker->
workerid]);
699 STARPU_PTHREAD_COND_BROADCAST(&worker->
sched_cond);
707 static inline void _starpu_worker_request_unblocking_in_parallel(
struct _starpu_worker *
const worker)
728 STARPU_PTHREAD_COND_BROADCAST(&worker->
sched_cond);
742 STARPU_PTHREAD_COND_BROADCAST(&worker->
sched_cond);
756 static inline void _starpu_worker_process_block_in_parallel_requests(
struct _starpu_worker *
const worker)
771 STARPU_PTHREAD_COND_BROADCAST(&worker->
sched_cond);
788 STARPU_PTHREAD_COND_BROADCAST(&worker->
sched_cond);
808 #ifdef STARPU_SPINLOCK_CHECK
809 static inline void __starpu_worker_enter_sched_op(
struct _starpu_worker *
const worker,
const char*file,
int line,
const char* func)
811 static inline void _starpu_worker_enter_sched_op(
struct _starpu_worker *
const worker)
818 _starpu_worker_process_block_in_parallel_requests(worker);
825 _starpu_worker_process_block_in_parallel_requests(worker);
847 #ifdef STARPU_SPINLOCK_CHECK
848 worker->relax_on_file = file;
849 worker->relax_on_line = line;
850 worker->relax_on_func = func;
853 #ifdef STARPU_SPINLOCK_CHECK
854 #define _starpu_worker_enter_sched_op(worker) __starpu_worker_enter_sched_op((worker), __FILE__, __LINE__, __starpu_func__)
862 #ifdef STARPU_SPINLOCK_CHECK
863 static inline void __starpu_worker_leave_sched_op(
struct _starpu_worker *
const worker,
const char*file,
int line,
const char* func)
865 static inline void _starpu_worker_leave_sched_op(
struct _starpu_worker *
const worker)
870 #ifdef STARPU_SPINLOCK_CHECK
871 worker->relax_off_file = file;
872 worker->relax_off_line = line;
873 worker->relax_off_func = func;
876 STARPU_PTHREAD_COND_BROADCAST(&worker->
sched_cond);
879 #ifdef STARPU_SPINLOCK_CHECK
880 #define _starpu_worker_leave_sched_op(worker) __starpu_worker_leave_sched_op((worker), __FILE__, __LINE__, __starpu_func__)
883 static inline int _starpu_worker_sched_op_pending(
void)
885 int workerid = starpu_worker_get_id();
889 STARPU_ASSERT(worker != NULL);
902 static inline void _starpu_worker_enter_changing_ctx_op(
struct _starpu_worker *
const worker)
925 STARPU_PTHREAD_COND_BROADCAST(&worker->
sched_cond);
926 #ifdef STARPU_SIMGRID
927 starpu_pthread_queue_broadcast(&_starpu_simgrid_task_queue[worker->
workerid]);
944 static inline void _starpu_worker_leave_changing_ctx_op(
struct _starpu_worker *
const worker)
948 STARPU_PTHREAD_COND_BROADCAST(&worker->
sched_cond);
953 #ifdef STARPU_SPINLOCK_CHECK
954 static inline void __starpu_worker_relax_on(
const char*file,
int line,
const char* func)
956 static inline void _starpu_worker_relax_on(
void)
964 STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->
sched_mutex);
965 #ifdef STARPU_SPINLOCK_CHECK
966 STARPU_ASSERT_MSG(worker->
state_relax_refcnt<UINT_MAX,
"relax last turn on in %s (%s:%d)\n", worker->relax_on_func, worker->relax_on_file, worker->relax_on_line);
971 #ifdef STARPU_SPINLOCK_CHECK
972 worker->relax_on_file = file;
973 worker->relax_on_line = line;
974 worker->relax_on_func = func;
976 STARPU_PTHREAD_COND_BROADCAST(&worker->
sched_cond);
977 STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->
sched_mutex);
979 #ifdef STARPU_SPINLOCK_CHECK
980 #define _starpu_worker_relax_on() __starpu_worker_relax_on(__FILE__, __LINE__, __starpu_func__)
982 #define starpu_worker_relax_on _starpu_worker_relax_on
985 #ifdef STARPU_SPINLOCK_CHECK
986 static inline void __starpu_worker_relax_on_locked(
struct _starpu_worker *worker,
const char*file,
int line,
const char* func)
988 static inline void _starpu_worker_relax_on_locked(
struct _starpu_worker *worker)
993 #ifdef STARPU_SPINLOCK_CHECK
994 STARPU_ASSERT_MSG(worker->
state_relax_refcnt<UINT_MAX,
"relax last turn on in %s (%s:%d)\n", worker->relax_on_func, worker->relax_on_file, worker->relax_on_line);
999 #ifdef STARPU_SPINLOCK_CHECK
1000 worker->relax_on_file = file;
1001 worker->relax_on_line = line;
1002 worker->relax_on_func = func;
1004 STARPU_PTHREAD_COND_BROADCAST(&worker->
sched_cond);
1006 #ifdef STARPU_SPINLOCK_CHECK
1007 #define _starpu_worker_relax_on_locked(worker) __starpu_worker_relax_on_locked(worker,__FILE__, __LINE__, __starpu_func__)
1010 #ifdef STARPU_SPINLOCK_CHECK
1011 static inline void __starpu_worker_relax_off(
const char*file,
int line,
const char* func)
1013 static inline void _starpu_worker_relax_off(
void)
1016 int workerid = starpu_worker_get_id();
1020 STARPU_ASSERT(worker != NULL);
1023 STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->
sched_mutex);
1024 #ifdef STARPU_SPINLOCK_CHECK
1025 STARPU_ASSERT_MSG(worker->
state_relax_refcnt>0,
"relax last turn off in %s (%s:%d)\n", worker->relax_on_func, worker->relax_on_file, worker->relax_on_line);
1030 #ifdef STARPU_SPINLOCK_CHECK
1031 worker->relax_off_file = file;
1032 worker->relax_off_line = line;
1033 worker->relax_off_func = func;
1035 STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->
sched_mutex);
1037 #ifdef STARPU_SPINLOCK_CHECK
1038 #define _starpu_worker_relax_off() __starpu_worker_relax_off(__FILE__, __LINE__, __starpu_func__)
1040 #define starpu_worker_relax_off _starpu_worker_relax_off
1042 #ifdef STARPU_SPINLOCK_CHECK
1043 static inline void __starpu_worker_relax_off_locked(
const char*file,
int line,
const char* func)
1045 static inline void _starpu_worker_relax_off_locked(
void)
1048 int workerid = starpu_worker_get_id();
1052 STARPU_ASSERT(worker != NULL);
1055 #ifdef STARPU_SPINLOCK_CHECK
1056 STARPU_ASSERT_MSG(worker->
state_relax_refcnt>0,
"relax last turn off in %s (%s:%d)\n", worker->relax_on_func, worker->relax_on_file, worker->relax_on_line);
1061 #ifdef STARPU_SPINLOCK_CHECK
1062 worker->relax_off_file = file;
1063 worker->relax_off_line = line;
1064 worker->relax_off_func = func;
1067 #ifdef STARPU_SPINLOCK_CHECK
1068 #define _starpu_worker_relax_off_locked() __starpu_worker_relax_off_locked(__FILE__, __LINE__, __starpu_func__)
1071 static inline int _starpu_worker_get_relax_state(
void)
1073 int workerid = starpu_worker_get_id();
1077 STARPU_ASSERT(worker != NULL);
1080 #define starpu_worker_get_relax_state _starpu_worker_get_relax_state
1086 static inline void _starpu_worker_lock(
int workerid)
1089 STARPU_ASSERT(worker != NULL);
1090 int cur_workerid = starpu_worker_get_id();
1093 starpu_worker_relax_on();
1095 STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->
sched_mutex);
1103 STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->
sched_mutex);
1107 static inline int _starpu_worker_trylock(
int workerid)
1109 struct _starpu_worker *cur_worker = _starpu_get_local_worker_key();
1110 int cur_workerid = cur_worker->
workerid;
1112 STARPU_ASSERT(worker != NULL);
1115 int ret = STARPU_PTHREAD_MUTEX_TRYLOCK_SCHED(&cur_worker->
sched_mutex);
1123 ret = STARPU_PTHREAD_MUTEX_TRYLOCK_SCHED(&worker->
sched_mutex);
1129 STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->
sched_mutex);
1132 _starpu_worker_relax_on_locked(cur_worker);
1133 STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&cur_worker->
sched_mutex);
1137 static inline void _starpu_worker_unlock(
int workerid)
1140 STARPU_ASSERT(worker != NULL);
1141 STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->
sched_mutex);
1142 int cur_workerid = starpu_worker_get_id();
1145 starpu_worker_relax_off();
1149 static inline void _starpu_worker_lock_self(
void)
1151 int workerid = starpu_worker_get_id_check();
1153 STARPU_ASSERT(worker != NULL);
1154 STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->
sched_mutex);
1157 static inline void _starpu_worker_unlock_self(
void)
1159 int workerid = starpu_worker_get_id_check();
1161 STARPU_ASSERT(worker != NULL);
1162 STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->
sched_mutex);
1165 static inline int _starpu_wake_worker_relax(
int workerid)
1168 int ret = starpu_wake_worker_locked(
workerid);
1173 int starpu_wake_worker_relax_light(
int workerid);
1179 void _starpu_worker_refuse_task(
struct _starpu_worker *worker,
struct starpu_task *task);
1183 #endif // __WORKERS_H__