OpenDNSSEC-signer
1.3.4
|
00001 /* 00002 * $Id: worker.c 5637 2011-09-15 09:09:12Z matthijs $ 00003 * 00004 * Copyright (c) 2009 NLNet Labs. All rights reserved. 00005 * 00006 * Redistribution and use in source and binary forms, with or without 00007 * modification, are permitted provided that the following conditions 00008 * are met: 00009 * 1. Redistributions of source code must retain the above copyright 00010 * notice, this list of conditions and the following disclaimer. 00011 * 2. Redistributions in binary form must reproduce the above copyright 00012 * notice, this list of conditions and the following disclaimer in the 00013 * documentation and/or other materials provided with the distribution. 00014 * 00015 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 00016 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 00017 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 00018 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY 00019 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 00020 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE 00021 * GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 00022 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER 00023 * IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 00024 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN 00025 * IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 00026 * 00027 */ 00028 00034 #include "adapter/adapi.h" 00035 #include "daemon/engine.h" 00036 #include "daemon/worker.h" 00037 #include "shared/allocator.h" 00038 #include "scheduler/schedule.h" 00039 #include "scheduler/task.h" 00040 #include "shared/locks.h" 00041 #include "shared/log.h" 00042 #include "shared/status.h" 00043 #include "shared/util.h" 00044 #include "signer/tools.h" 00045 #include "signer/zone.h" 00046 #include "signer/zonedata.h" 00047 00048 #include <time.h> /* time() */ 00049 00050 ods_lookup_table worker_str[] = { 00051 { WORKER_WORKER, "worker" }, 00052 { WORKER_DRUDGER, "drudger" }, 00053 { 0, NULL } 00054 }; 00055 00056 00061 worker_type* 00062 worker_create(allocator_type* allocator, int num, worker_id type) 00063 { 00064 worker_type* worker; 00065 00066 if (!allocator) { 00067 return NULL; 00068 } 00069 ods_log_assert(allocator); 00070 00071 worker = (worker_type*) allocator_alloc(allocator, sizeof(worker_type)); 00072 if (!worker) { 00073 return NULL; 00074 } 00075 00076 ods_log_debug("create worker[%i]", num +1); 00077 lock_basic_init(&worker->worker_lock); 00078 lock_basic_set(&worker->worker_alarm); 00079 lock_basic_lock(&worker->worker_lock); 00080 worker->allocator = allocator; 00081 worker->thread_num = num +1; 00082 worker->engine = NULL; 00083 worker->task = NULL; 00084 worker->working_with = TASK_NONE; 00085 worker->need_to_exit = 0; 00086 worker->type = type; 00087 worker->clock_in = 0; 00088 worker->jobs_appointed = 0; 00089 worker->jobs_completed = 0; 00090 worker->jobs_failed = 0; 00091 worker->sleeping = 0; 00092 worker->waiting = 0; 00093 lock_basic_unlock(&worker->worker_lock); 00094 return worker; 00095 } 00096 00097 00102 static const char* 00103 worker2str(worker_id type) 00104 { 00105 ods_lookup_table *lt = ods_lookup_by_id(worker_str, type); 00106 if (lt) { 00107 return lt->name; 00108 } 00109 return NULL; 00110 } 00111 00112 00117 static int 00118 worker_fulfilled(worker_type* worker) 00119 { 00120 return (worker->jobs_completed + worker->jobs_failed) == 00121 worker->jobs_appointed; 00122 } 00123 00124 00129 static void 00130 worker_perform_task(worker_type* worker) 00131 { 00132 engine_type* engine = NULL; 00133 zone_type* zone = NULL; 00134 task_type* task = NULL; 00135 task_id what = TASK_NONE; 00136 time_t when = 0; 00137 time_t never = (3600*24*365); 00138 ods_status status = ODS_STATUS_OK; 00139 int fallthrough = 0; 00140 int backup = 0; 00141 char* working_dir = NULL; 00142 char* cfg_filename = NULL; 00143 uint32_t tmpserial = 0; 00144 time_t start = 0; 00145 time_t end = 0; 00146 00147 /* sanity checking */ 00148 if (!worker || !worker->task || !worker->task->zone || !worker->engine) { 00149 return; 00150 } 00151 ods_log_assert(worker); 00152 ods_log_assert(worker->task); 00153 ods_log_assert(worker->task->zone); 00154 00155 engine = (engine_type*) worker->engine; 00156 task = (task_type*) worker->task; 00157 zone = (zone_type*) worker->task->zone; 00158 ods_log_debug("[%s[%i]] perform task %s for zone %s at %u", 00159 worker2str(worker->type), worker->thread_num, task_what2str(task->what), 00160 task_who2str(task->who), (uint32_t) worker->clock_in); 00161 00162 /* do what you have been told to do */ 00163 switch (task->what) { 00164 case TASK_SIGNCONF: 00165 worker->working_with = TASK_SIGNCONF; 00166 /* perform 'load signconf' task */ 00167 ods_log_verbose("[%s[%i]] load signconf for zone %s", 00168 worker2str(worker->type), worker->thread_num, 00169 task_who2str(task->who)); 00170 status = zone_load_signconf(zone, &what); 00171 if (status == ODS_STATUS_UNCHANGED) { 00172 if (!zone->signconf->last_modified) { 00173 ods_log_debug("[%s[%i]] no signconf.xml for zone %s yet", 00174 worker2str(worker->type), worker->thread_num, 00175 task_who2str(task->who)); 00176 } 00177 status = ODS_STATUS_ERR; 00178 } 00179 00180 /* what to do next */ 00181 when = time_now(); 00182 if (status == ODS_STATUS_UNCHANGED) { 00183 if (task->halted != TASK_NONE) { 00184 goto task_perform_continue; 00185 } else { 00186 status = ODS_STATUS_OK; 00187 } 00188 } 00189 00190 if (status == ODS_STATUS_OK) { 00191 status = zone_publish_dnskeys(zone, 0); 00192 } 00193 if (status == ODS_STATUS_OK) { 00194 status = zone_prepare_nsec3(zone, 0); 00195 } 00196 if (status == ODS_STATUS_OK) { 00197 status = zonedata_commit(zone->zonedata); 00198 } 00199 00200 if (status == ODS_STATUS_OK) { 00201 zone->prepared = 1; 00202 task->interrupt = TASK_NONE; 00203 task->halted = TASK_NONE; 00204 } else { 00205 if (task->halted == TASK_NONE) { 00206 goto task_perform_fail; 00207 } 00208 goto task_perform_continue; 00209 } 00210 fallthrough = 0; 00211 break; 00212 case TASK_READ: 00213 worker->working_with = TASK_READ; 00214 /* perform 'read input adapter' task */ 00215 ods_log_verbose("[%s[%i]] read zone %s", 00216 worker2str(worker->type), worker->thread_num, 00217 task_who2str(task->who)); 00218 if (!zone->prepared) { 00219 ods_log_debug("[%s[%i]] no valid signconf.xml for zone %s yet", 00220 worker2str(worker->type), worker->thread_num, 00221 task_who2str(task->who)); 00222 status = ODS_STATUS_ERR; 00223 } else { 00224 status = tools_input(zone); 00225 } 00226 00227 /* what to do next */ 00228 what = TASK_NSECIFY; 00229 when = time_now(); 00230 if (status != ODS_STATUS_OK) { 00231 if (task->halted == TASK_NONE) { 00232 goto task_perform_fail; 00233 } 00234 goto task_perform_continue; 00235 } 00236 fallthrough = 1; 00237 case TASK_NSECIFY: 00238 worker->working_with = TASK_NSECIFY; 00239 ods_log_verbose("[%s[%i]] nsecify zone %s", 00240 worker2str(worker->type), worker->thread_num, 00241 task_who2str(task->who)); 00242 status = tools_nsecify(zone); 00243 00244 /* what to do next */ 00245 what = TASK_SIGN; 00246 when = time_now(); 00247 if (status == ODS_STATUS_OK) { 00248 if (task->interrupt > TASK_SIGNCONF) { 00249 task->interrupt = TASK_NONE; 00250 task->halted = TASK_NONE; 00251 } 00252 } else { 00253 if (task->halted == TASK_NONE) { 00254 goto task_perform_fail; 00255 } 00256 goto task_perform_continue; 00257 } 00258 fallthrough = 1; 00259 case TASK_SIGN: 00260 worker->working_with = TASK_SIGN; 00261 ods_log_verbose("[%s[%i]] sign zone %s", 00262 worker2str(worker->type), worker->thread_num, 00263 task_who2str(task->who)); 00264 tmpserial = zone->zonedata->internal_serial; 00265 status = zone_update_serial(zone); 00266 if (status != ODS_STATUS_OK) { 00267 ods_log_error("[%s[%i]] unable to sign zone %s: " 00268 "failed to increment serial", 00269 worker2str(worker->type), worker->thread_num, 00270 task_who2str(task->who)); 00271 } else { 00272 /* start timer */ 00273 start = time(NULL); 00274 if (zone->stats) { 00275 lock_basic_lock(&zone->stats->stats_lock); 00276 if (!zone->stats->start_time) { 00277 zone->stats->start_time = start; 00278 } 00279 zone->stats->sig_count = 0; 00280 zone->stats->sig_soa_count = 0; 00281 zone->stats->sig_reuse = 0; 00282 zone->stats->sig_time = 0; 00283 lock_basic_unlock(&zone->stats->stats_lock); 00284 } 00285 00286 /* queue menial, hard signing work */ 00287 status = zonedata_queue(zone->zonedata, engine->signq, worker); 00288 ods_log_debug("[%s[%i]] wait until drudgers are finished " 00289 " signing zone %s, %u signatures queued", 00290 worker2str(worker->type), worker->thread_num, 00291 task_who2str(task->who), worker->jobs_appointed); 00292 00293 /* sleep until work is done */ 00294 if (!worker->need_to_exit) { 00295 worker_sleep_unless(worker, 0); 00296 } 00297 if (worker->jobs_failed) { 00298 ods_log_error("[%s[%i]] sign zone %s failed: %u of %u " 00299 "signatures failed", worker2str(worker->type), 00300 worker->thread_num, task_who2str(task->who), 00301 worker->jobs_failed, worker->jobs_appointed); 00302 status = ODS_STATUS_ERR; 00303 } else if (!worker_fulfilled(worker)) { 00304 ods_log_error("[%s[%i]] sign zone %s failed: %u of %u " 00305 "signatures completed", worker2str(worker->type), 00306 worker->thread_num, task_who2str(task->who), 00307 worker->jobs_completed, worker->jobs_appointed); 00308 status = ODS_STATUS_ERR; 00309 } else if (worker->need_to_exit) { 00310 ods_log_debug("[%s[%i]] sign zone %s failed: worker " 00311 "needs to exit", worker2str(worker->type), 00312 worker->thread_num, task_who2str(task->who)); 00313 status = ODS_STATUS_ERR; 00314 } else { 00315 ods_log_debug("[%s[%i]] sign zone %s ok: %u of %u " 00316 "signatures succeeded", worker2str(worker->type), 00317 worker->thread_num, task_who2str(task->who), 00318 worker->jobs_completed, worker->jobs_appointed); 00319 ods_log_assert(worker->jobs_appointed == 00320 worker->jobs_completed); 00321 } 00322 worker->jobs_appointed = 0; 00323 worker->jobs_completed = 0; 00324 worker->jobs_failed = 0; 00325 00326 /* stop timer */ 00327 end = time(NULL); 00328 if (status == ODS_STATUS_OK && zone->stats) { 00329 lock_basic_lock(&zone->stats->stats_lock); 00330 zone->stats->sig_time = (end-start); 00331 lock_basic_unlock(&zone->stats->stats_lock); 00332 } 00333 } 00334 00335 /* what to do next */ 00336 if (status != ODS_STATUS_OK) { 00337 /* rollback serial */ 00338 zone->zonedata->internal_serial = tmpserial; 00339 if (task->halted == TASK_NONE) { 00340 goto task_perform_fail; 00341 } 00342 goto task_perform_continue; 00343 } else { 00344 if (task->interrupt > TASK_SIGNCONF) { 00345 task->interrupt = TASK_NONE; 00346 task->halted = TASK_NONE; 00347 } 00348 } 00349 what = TASK_AUDIT; 00350 when = time_now(); 00351 fallthrough = 1; 00352 case TASK_AUDIT: 00353 worker->working_with = TASK_AUDIT; 00354 if (zone->signconf->audit) { 00355 ods_log_verbose("[%s[%i]] audit zone %s", 00356 worker2str(worker->type), worker->thread_num, 00357 task_who2str(task->who)); 00358 working_dir = strdup(engine->config->working_dir); 00359 cfg_filename = strdup(engine->config->cfg_filename); 00360 status = tools_audit(zone, working_dir, cfg_filename); 00361 if (working_dir) { free((void*)working_dir); } 00362 if (cfg_filename) { free((void*)cfg_filename); } 00363 working_dir = NULL; 00364 cfg_filename = NULL; 00365 } else { 00366 status = ODS_STATUS_OK; 00367 } 00368 00369 /* what to do next */ 00370 if (status != ODS_STATUS_OK) { 00371 if (task->halted == TASK_NONE) { 00372 goto task_perform_fail; 00373 } 00374 goto task_perform_continue; 00375 } 00376 what = TASK_WRITE; 00377 when = time_now(); 00378 fallthrough = 1; 00379 case TASK_WRITE: 00380 worker->working_with = TASK_WRITE; 00381 ods_log_verbose("[%s[%i]] write zone %s", 00382 worker2str(worker->type), worker->thread_num, 00383 task_who2str(task->who)); 00384 00385 status = tools_output(zone); 00386 zone->processed = 1; 00387 00388 /* what to do next */ 00389 if (status != ODS_STATUS_OK) { 00390 if (task->halted == TASK_NONE) { 00391 goto task_perform_fail; 00392 } 00393 goto task_perform_continue; 00394 } else { 00395 if (task->interrupt > TASK_SIGNCONF) { 00396 task->interrupt = TASK_NONE; 00397 task->halted = TASK_NONE; 00398 } 00399 } 00400 if (duration2time(zone->signconf->sig_resign_interval)) { 00401 what = TASK_SIGN; 00402 when = time_now() + 00403 duration2time(zone->signconf->sig_resign_interval); 00404 } else { 00405 what = TASK_NONE; 00406 when = time_now() + never; 00407 } 00408 backup = 1; 00409 fallthrough = 0; 00410 break; 00411 case TASK_NONE: 00412 worker->working_with = TASK_NONE; 00413 ods_log_warning("[%s[%i]] none task for zone %s", 00414 worker2str(worker->type), worker->thread_num, 00415 task_who2str(task->who)); 00416 when = time_now() + never; 00417 fallthrough = 0; 00418 break; 00419 default: 00420 ods_log_warning("[%s[%i]] unknown task, trying full sign zone %s", 00421 worker2str(worker->type), worker->thread_num, 00422 task_who2str(task->who)); 00423 what = TASK_SIGNCONF; 00424 when = time_now(); 00425 fallthrough = 0; 00426 break; 00427 } 00428 00429 /* no error, reset backoff */ 00430 task->backoff = 0; 00431 00432 /* set next task */ 00433 if (fallthrough == 0 && task->interrupt != TASK_NONE && 00434 task->interrupt != what) { 00435 ods_log_debug("[%s[%i]] interrupt task %s for zone %s", 00436 worker2str(worker->type), worker->thread_num, 00437 task_what2str(what), task_who2str(task->who)); 00438 00439 task->what = task->interrupt; 00440 task->when = time_now(); 00441 task->halted = what; 00442 } else { 00443 ods_log_debug("[%s[%i]] next task %s for zone %s", 00444 worker2str(worker->type), worker->thread_num, 00445 task_what2str(what), task_who2str(task->who)); 00446 00447 task->what = what; 00448 task->when = when; 00449 if (!fallthrough) { 00450 task->interrupt = TASK_NONE; 00451 task->halted = TASK_NONE; 00452 } 00453 } 00454 00455 /* backup the last successful run */ 00456 if (backup) { 00457 status = zone_backup(zone); 00458 if (status != ODS_STATUS_OK) { 00459 ods_log_warning("[%s[%i]] unable to backup zone %s: %s", 00460 worker2str(worker->type), worker->thread_num, 00461 task_who2str(task->who), ods_status2str(status)); 00462 /* just a warning */ 00463 status = ODS_STATUS_OK; 00464 } 00465 backup = 0; 00466 } 00467 return; 00468 00469 task_perform_fail: 00470 /* in case of failure, also mark zone processed (for single run usage) */ 00471 zone->processed = 1; 00472 00473 if (task->backoff) { 00474 task->backoff *= 2; 00475 if (task->backoff > ODS_SE_MAX_BACKOFF) { 00476 task->backoff = ODS_SE_MAX_BACKOFF; 00477 } 00478 } else { 00479 task->backoff = 60; 00480 } 00481 ods_log_info("[%s[%i]] backoff task %s for zone %s with %u seconds", 00482 worker2str(worker->type), worker->thread_num, 00483 task_what2str(task->what), task_who2str(task->who), task->backoff); 00484 00485 task->when = time_now() + task->backoff; 00486 return; 00487 00488 task_perform_continue: 00489 ods_log_info("[%s[%i]] continue task %s for zone %s", 00490 worker2str(worker->type), worker->thread_num, 00491 task_what2str(task->halted), task_who2str(task->who)); 00492 00493 what = task->halted; 00494 task->what = what; 00495 task->when = time_now(); 00496 task->interrupt = TASK_NONE; 00497 task->halted = TASK_NONE; 00498 if (zone->processed) { 00499 task->when += duration2time(zone->signconf->sig_resign_interval); 00500 } 00501 return; 00502 } 00503 00504 00509 static void 00510 worker_work(worker_type* worker) 00511 { 00512 time_t now, timeout = 1; 00513 zone_type* zone = NULL; 00514 ods_status status = ODS_STATUS_OK; 00515 00516 ods_log_assert(worker); 00517 ods_log_assert(worker->type == WORKER_WORKER); 00518 00519 while (worker->need_to_exit == 0) { 00520 ods_log_debug("[%s[%i]] report for duty", worker2str(worker->type), 00521 worker->thread_num); 00522 lock_basic_lock(&worker->engine->taskq->schedule_lock); 00523 /* [LOCK] schedule */ 00524 worker->task = schedule_pop_task(worker->engine->taskq); 00525 /* [UNLOCK] schedule */ 00526 if (worker->task) { 00527 worker->working_with = worker->task->what; 00528 lock_basic_unlock(&worker->engine->taskq->schedule_lock); 00529 00530 zone = worker->task->zone; 00531 lock_basic_lock(&zone->zone_lock); 00532 /* [LOCK] zone */ 00533 ods_log_debug("[%s[%i]] start working on zone %s", 00534 worker2str(worker->type), worker->thread_num, zone->name); 00535 00536 worker->clock_in = time(NULL); 00537 worker_perform_task(worker); 00538 00539 zone->task = worker->task; 00540 00541 ods_log_debug("[%s[%i]] finished working on zone %s", 00542 worker2str(worker->type), worker->thread_num, zone->name); 00543 /* [UNLOCK] zone */ 00544 00545 lock_basic_lock(&worker->engine->taskq->schedule_lock); 00546 /* [LOCK] zone, schedule */ 00547 worker->task = NULL; 00548 worker->working_with = TASK_NONE; 00549 status = schedule_task(worker->engine->taskq, zone->task, 1); 00550 /* [UNLOCK] zone, schedule */ 00551 lock_basic_unlock(&worker->engine->taskq->schedule_lock); 00552 lock_basic_unlock(&zone->zone_lock); 00553 00554 timeout = 1; 00555 } else { 00556 ods_log_debug("[%s[%i]] nothing to do", worker2str(worker->type), 00557 worker->thread_num); 00558 00559 /* [LOCK] schedule */ 00560 worker->task = schedule_get_first_task(worker->engine->taskq); 00561 /* [UNLOCK] schedule */ 00562 lock_basic_unlock(&worker->engine->taskq->schedule_lock); 00563 00564 now = time_now(); 00565 if (worker->task && !worker->engine->taskq->loading) { 00566 timeout = (worker->task->when - now); 00567 } else { 00568 timeout *= 2; 00569 if (timeout > ODS_SE_MAX_BACKOFF) { 00570 timeout = ODS_SE_MAX_BACKOFF; 00571 } 00572 } 00573 worker->task = NULL; 00574 worker_sleep(worker, timeout); 00575 } 00576 } 00577 return; 00578 } 00579 00580 00585 static void 00586 worker_drudge(worker_type* worker) 00587 { 00588 zone_type* zone = NULL; 00589 task_type* task = NULL; 00590 rrset_type* rrset = NULL; 00591 ods_status status = ODS_STATUS_OK; 00592 worker_type* chief = NULL; 00593 hsm_ctx_t* ctx = NULL; 00594 00595 ods_log_assert(worker); 00596 ods_log_assert(worker->type == WORKER_DRUDGER); 00597 00598 ctx = hsm_create_context(); 00599 if (ctx == NULL) { 00600 ods_log_error("[%s[%i]] unable to drudge: error " 00601 "creating libhsm context", worker2str(worker->type), 00602 worker->thread_num); 00603 } 00604 00605 while (worker->need_to_exit == 0) { 00606 ods_log_debug("[%s[%i]] report for duty", worker2str(worker->type), 00607 worker->thread_num); 00608 chief = NULL; 00609 zone = NULL; 00610 task = NULL; 00611 00612 lock_basic_lock(&worker->engine->signq->q_lock); 00613 /* [LOCK] schedule */ 00614 rrset = (rrset_type*) fifoq_pop(worker->engine->signq, &chief); 00615 /* [UNLOCK] schedule */ 00616 lock_basic_unlock(&worker->engine->signq->q_lock); 00617 if (rrset) { 00618 /* set up the work */ 00619 if (chief) { 00620 task = chief->task; 00621 } 00622 if (task) { 00623 zone = task->zone; 00624 } 00625 if (!zone) { 00626 ods_log_error("[%s[%i]] unable to drudge: no zone reference", 00627 worker2str(worker->type), worker->thread_num); 00628 } 00629 if (zone && ctx) { 00630 ods_log_assert(rrset); 00631 ods_log_assert(zone); 00632 ods_log_assert(zone->dname); 00633 ods_log_assert(zone->signconf); 00634 ods_log_assert(ctx); 00635 00636 worker->clock_in = time(NULL); 00637 status = rrset_sign(ctx, rrset, zone->dname, zone->signconf, 00638 chief->clock_in, zone->stats); 00639 } else { 00640 status = ODS_STATUS_ASSERT_ERR; 00641 } 00642 00643 if (chief) { 00644 lock_basic_lock(&chief->worker_lock); 00645 if (status == ODS_STATUS_OK) { 00646 chief->jobs_completed += 1; 00647 } else { 00648 chief->jobs_failed += 1; 00649 /* destroy context? */ 00650 } 00651 lock_basic_unlock(&chief->worker_lock); 00652 00653 if (worker_fulfilled(chief) && chief->sleeping) { 00654 ods_log_debug("[%s[%i]] wake up chief[%u], work is done", 00655 worker2str(worker->type), worker->thread_num, 00656 chief->thread_num); 00657 worker_wakeup(chief); 00658 chief = NULL; 00659 } 00660 } 00661 rrset = NULL; 00662 } else { 00663 ods_log_debug("[%s[%i]] nothing to do", worker2str(worker->type), 00664 worker->thread_num); 00665 worker_wait(&worker->engine->signq->q_lock, 00666 &worker->engine->signq->q_threshold); 00667 } 00668 } 00669 /* wake up chief */ 00670 if (chief && chief->sleeping) { 00671 ods_log_debug("[%s[%i]] wake up chief[%u], i am exiting", 00672 worker2str(worker->type), worker->thread_num, chief->thread_num); 00673 worker_wakeup(chief); 00674 } 00675 00676 /* cleanup open HSM sessions */ 00677 hsm_destroy_context(ctx); 00678 ctx = NULL; 00679 return; 00680 } 00681 00682 00687 void 00688 worker_start(worker_type* worker) 00689 { 00690 ods_log_assert(worker); 00691 switch (worker->type) { 00692 case WORKER_DRUDGER: 00693 worker_drudge(worker); 00694 break; 00695 case WORKER_WORKER: 00696 worker_work(worker); 00697 break; 00698 default: 00699 ods_log_error("[worker] illegal worker (id=%i)", worker->type); 00700 return; 00701 } 00702 return; 00703 } 00704 00705 00710 void 00711 worker_sleep(worker_type* worker, time_t timeout) 00712 { 00713 ods_log_assert(worker); 00714 lock_basic_lock(&worker->worker_lock); 00715 /* [LOCK] worker */ 00716 worker->sleeping = 1; 00717 lock_basic_sleep(&worker->worker_alarm, &worker->worker_lock, 00718 timeout); 00719 /* [UNLOCK] worker */ 00720 lock_basic_unlock(&worker->worker_lock); 00721 return; 00722 } 00723 00724 00729 void 00730 worker_sleep_unless(worker_type* worker, time_t timeout) 00731 { 00732 ods_log_assert(worker); 00733 lock_basic_lock(&worker->worker_lock); 00734 /* [LOCK] worker */ 00735 while (!worker->need_to_exit && !worker_fulfilled(worker)) { 00736 worker->sleeping = 1; 00737 lock_basic_sleep(&worker->worker_alarm, &worker->worker_lock, 00738 timeout); 00739 00740 ods_log_debug("[%s[%i]] somebody poked me, check completed jobs %u " 00741 "appointed, %u completed, %u failed", worker2str(worker->type), 00742 worker->thread_num, worker->jobs_appointed, worker->jobs_completed, 00743 worker->jobs_failed); 00744 } 00745 /* [UNLOCK] worker */ 00746 lock_basic_unlock(&worker->worker_lock); 00747 return; 00748 } 00749 00750 00755 void 00756 worker_wakeup(worker_type* worker) 00757 { 00758 ods_log_assert(worker); 00759 if (worker && worker->sleeping && !worker->waiting) { 00760 ods_log_debug("[%s[%i]] wake up", worker2str(worker->type), 00761 worker->thread_num); 00762 lock_basic_lock(&worker->worker_lock); 00763 /* [LOCK] worker */ 00764 lock_basic_alarm(&worker->worker_alarm); 00765 worker->sleeping = 0; 00766 /* [UNLOCK] worker */ 00767 lock_basic_unlock(&worker->worker_lock); 00768 } 00769 return; 00770 } 00771 00772 00777 void 00778 worker_wait(lock_basic_type* lock, cond_basic_type* condition) 00779 { 00780 lock_basic_lock(lock); 00781 /* [LOCK] worker */ 00782 lock_basic_sleep(condition, lock, 0); 00783 /* [UNLOCK] worker */ 00784 lock_basic_unlock(lock); 00785 return; 00786 } 00787 00788 00793 void 00794 worker_notify(lock_basic_type* lock, cond_basic_type* condition) 00795 { 00796 lock_basic_lock(lock); 00797 /* [LOCK] lock */ 00798 lock_basic_alarm(condition); 00799 /* [UNLOCK] lock */ 00800 lock_basic_unlock(lock); 00801 return; 00802 } 00803 00804 00809 void 00810 worker_notify_all(lock_basic_type* lock, cond_basic_type* condition) 00811 { 00812 lock_basic_lock(lock); 00813 /* [LOCK] lock */ 00814 lock_basic_broadcast(condition); 00815 /* [UNLOCK] lock */ 00816 lock_basic_unlock(lock); 00817 return; 00818 } 00819 00820 00825 void 00826 worker_cleanup(worker_type* worker) 00827 { 00828 allocator_type* allocator; 00829 cond_basic_type worker_cond; 00830 lock_basic_type worker_lock; 00831 00832 if (!worker) { 00833 return; 00834 } 00835 allocator = worker->allocator; 00836 worker_cond = worker->worker_alarm; 00837 worker_lock = worker->worker_lock; 00838 00839 allocator_deallocate(allocator, (void*) worker); 00840 lock_basic_destroy(&worker_lock); 00841 lock_basic_off(&worker_cond); 00842 return; 00843 }