diff options
Diffstat (limited to 'rcynic-ng/rcynic.c')
-rw-r--r-- | rcynic-ng/rcynic.c | 168 |
1 files changed, 96 insertions, 72 deletions
diff --git a/rcynic-ng/rcynic.c b/rcynic-ng/rcynic.c index 3acbad75..f0c2a697 100644 --- a/rcynic-ng/rcynic.c +++ b/rcynic-ng/rcynic.c @@ -398,6 +398,7 @@ DECLARE_STACK_OF(walk_ctx_t) typedef enum { rsync_status_done, /* Request completed */ rsync_status_failed, /* Request failed */ + rsync_status_timed_out, /* Request timed out */ rsync_status_pending, /* Request in progress */ rsync_status_skipped /* Request not attempted */ } rsync_status_t; @@ -1139,31 +1140,35 @@ static int rm_rf(const path_t *name) /** - * Add an entry to the dead host cache. + * Check to see whether a hostname is in the dead host cache. */ -static void dead_host_add(const rcynic_ctx_t *rc, const uri_t *uri) +static int dead_host_check(const rcynic_ctx_t *rc, const uri_t *uri) { hostname_t hostname; assert(rc && uri && rc->dead_host_cache); - if (!uri_to_hostname(uri, &hostname)) - return; - - (void) sk_OPENSSL_STRING_push_strdup(rc->dead_host_cache, hostname.s); + return (uri_to_hostname(uri, &hostname) && + sk_OPENSSL_STRING_find(rc->dead_host_cache, hostname.s) >= 0); } + /** - * Check to see whether a hostname is in the dead host cache. + * Add an entry to the dead host cache. */ -static int dead_host_check(const rcynic_ctx_t *rc, const uri_t *uri) +static void dead_host_add(const rcynic_ctx_t *rc, const uri_t *uri) { hostname_t hostname; assert(rc && uri && rc->dead_host_cache); - return (uri_to_hostname(uri, &hostname) && - sk_OPENSSL_STRING_find(rc->dead_host_cache, hostname.s) >= 0); + if (dead_host_check(rc, uri)) + return; + + if (!uri_to_hostname(uri, &hostname)) + return; + + (void) sk_OPENSSL_STRING_push_strdup(rc->dead_host_cache, hostname.s); } @@ -1622,12 +1627,12 @@ static int rsync_runable(const rcynic_ctx_t *rc, case rsync_state_initial: case rsync_state_running: - case rsync_state_terminating: return 1; case rsync_state_retry_wait: return ctx->deadline <= time(0); + case rsync_state_terminating: case rsync_state_conflict_wait: #warning rsync_state_conflict_wait not implemented yet return 0; @@ -1672,7 +1677,7 @@ static void rsync_run(const rcynic_ctx_t *rc, pipe_fds[0] = pipe_fds[1] = -1; - assert(rc && ctx && ctx->state != rsync_state_running && rsync_runable(rc, ctx)); + assert(rc && ctx && ctx->pid == 0 && ctx->state != rsync_state_running && rsync_runable(rc, ctx)); assert(rsync_count_running(rc) < rc->max_parallel_fetches); @@ -1796,6 +1801,9 @@ static void rsync_run(const rcynic_ctx_t *rc, static void do_one_rsync_log_line(const rcynic_ctx_t *rc, rsync_ctx_t *ctx) { + unsigned u; + char *s; + /* * Send line to our log unless it's empty. */ @@ -1805,8 +1813,11 @@ static void do_one_rsync_log_line(const rcynic_ctx_t *rc, /* * Check for magic error strings */ - if (strstr(ctx->buffer, "@ERROR: max connections")) + if ((s = strstr(ctx->buffer, "@ERROR: max connections")) != NULL) { ctx->problem = rsync_problem_refused; + if (sscanf(s, "@ERROR: max connections (%u) reached -- try again later", &u) == 1) + logmsg(rc, log_debug, "Subprocess %u reported limit of %u for %s", ctx->pid, u, ctx->uri.s); + } } /** @@ -1879,55 +1890,6 @@ static void rsync_mgr(const rcynic_ctx_t *rc) assert(rc && rc->rsync_queue); - assert(rsync_count_running(rc) <= rc->max_parallel_fetches); - - /* - * Check for log text from subprocesses. - */ - - n = rsync_construct_select(rc, now, &rfds, &tv); - - if (n > 0 || tv.tv_sec) - n = select(n + 1, &rfds, NULL, NULL, tv.tv_sec ? &tv : NULL); - - if (n > 0) { - - for (i = 0; (ctx = sk_rsync_ctx_t_value(rc->rsync_queue, i)) != NULL; ++i) { - if (ctx->fd <= 0 || !FD_ISSET(ctx->fd, &rfds)) - continue; - - assert(ctx->buflen < sizeof(ctx->buffer) - 1); - - while ((n = read(ctx->fd, ctx->buffer + ctx->buflen, sizeof(ctx->buffer) - 1 - ctx->buflen)) > 0) { - ctx->buflen += n; - assert(ctx->buflen < sizeof(ctx->buffer)); - ctx->buffer[ctx->buflen] = '\0'; - - while ((s = strchr(ctx->buffer, '\n')) != NULL) { - *s++ = '\0'; - do_one_rsync_log_line(rc, ctx); - assert(s > ctx->buffer && s < ctx->buffer + sizeof(ctx->buffer)); - ctx->buflen -= s - ctx->buffer; - assert(ctx->buflen < sizeof(ctx->buffer)); - if (ctx->buflen > 0) - memmove(ctx->buffer, s, ctx->buflen); - ctx->buffer[ctx->buflen] = '\0'; - } - - if (ctx->buflen == sizeof(ctx->buffer) - 1) { - ctx->buffer[sizeof(ctx->buffer) - 1] = '\0'; - do_one_rsync_log_line(rc, ctx); - ctx->buflen = 0; - } - } - - if (n == 0) { - (void) close(ctx->fd); - ctx->fd = -1; - } - } - } - /* * Check for exited subprocesses. */ @@ -1935,13 +1897,16 @@ static void rsync_mgr(const rcynic_ctx_t *rc) while ((pid = waitpid(-1, &pid_status, WNOHANG)) > 0) { /* - * Child exited, handle that and return. + * Child exited, handle it. */ + logmsg(rc, log_debug, "Subprocess %d exited with status %d", pid, WEXITSTATUS(pid_status)); + for (i = 0; (ctx = sk_rsync_ctx_t_value(rc->rsync_queue, i)) != NULL; ++i) if (ctx->pid == pid) break; if (ctx == NULL) { + assert(i == sk_rsync_ctx_t_num(rc->rsync_queue)); logmsg(rc, log_sys_err, "Couldn't find rsync context for pid %d", pid); continue; } @@ -1959,7 +1924,11 @@ static void rsync_mgr(const rcynic_ctx_t *rc) switch (WEXITSTATUS(pid_status)) { case 0: - log_validation_status(rc, &ctx->uri, rsync_succeeded, object_generation_null); + log_validation_status(rc, &ctx->uri, + (ctx->problem == rsync_problem_timed_out + ? rsync_timed_out + : rsync_succeeded), + object_generation_null); break; case 5: /* "Error starting client-server protocol" */ @@ -2008,9 +1977,13 @@ static void rsync_mgr(const rcynic_ctx_t *rc) } rsync_cache_add(rc, &ctx->uri); - if (ctx->handler) - ctx->handler(rc, ctx, WEXITSTATUS(pid_status) ? rsync_status_failed : rsync_status_done, &ctx->uri, ctx->wsk); + ctx->handler(rc, ctx, (ctx->problem == rsync_problem_timed_out + ? rsync_status_timed_out + : WEXITSTATUS(pid_status) != 0 + ? rsync_status_failed + : rsync_status_done), + &ctx->uri, ctx->wsk); (void) sk_rsync_ctx_t_delete_ptr(rc->rsync_queue, ctx); free(ctx); ctx = NULL; @@ -2024,11 +1997,57 @@ static void rsync_mgr(const rcynic_ctx_t *rc) /* * Look for rsync contexts that have become runable. */ - if (rsync_count_running(rc) < rc->max_parallel_fetches) { + for (i = 0; (ctx = sk_rsync_ctx_t_value(rc->rsync_queue, i)) != NULL; ++i) + if (ctx->state != rsync_state_running && + rsync_runable(rc, ctx) && + rsync_count_running(rc) < rc->max_parallel_fetches) + rsync_run(rc, ctx); + + assert(rsync_count_running(rc) <= rc->max_parallel_fetches); + + /* + * Check for log text from subprocesses. + */ + + n = rsync_construct_select(rc, now, &rfds, &tv); + + if (n > 0 || tv.tv_sec) + n = select(n + 1, &rfds, NULL, NULL, tv.tv_sec ? &tv : NULL); + + if (n > 0) { + for (i = 0; (ctx = sk_rsync_ctx_t_value(rc->rsync_queue, i)) != NULL; ++i) { - if (ctx->state != rsync_state_running && rsync_runable(rc, ctx)) { - rsync_run(rc, ctx); - break; + if (ctx->fd <= 0 || !FD_ISSET(ctx->fd, &rfds)) + continue; + + assert(ctx->buflen < sizeof(ctx->buffer) - 1); + + while ((n = read(ctx->fd, ctx->buffer + ctx->buflen, sizeof(ctx->buffer) - 1 - ctx->buflen)) > 0) { + ctx->buflen += n; + assert(ctx->buflen < sizeof(ctx->buffer)); + ctx->buffer[ctx->buflen] = '\0'; + + while ((s = strchr(ctx->buffer, '\n')) != NULL) { + *s++ = '\0'; + do_one_rsync_log_line(rc, ctx); + assert(s > ctx->buffer && s < ctx->buffer + sizeof(ctx->buffer)); + ctx->buflen -= s - ctx->buffer; + assert(ctx->buflen < sizeof(ctx->buffer)); + if (ctx->buflen > 0) + memmove(ctx->buffer, s, ctx->buflen); + ctx->buffer[ctx->buflen] = '\0'; + } + + if (ctx->buflen == sizeof(ctx->buffer) - 1) { + ctx->buffer[sizeof(ctx->buffer) - 1] = '\0'; + do_one_rsync_log_line(rc, ctx); + ctx->buflen = 0; + } + } + + if (n == 0) { + (void) close(ctx->fd); + ctx->fd = -1; } } } @@ -2048,14 +2067,15 @@ static void rsync_mgr(const rcynic_ctx_t *rc) ctx->problem = rsync_problem_timed_out; ctx->state = rsync_state_terminating; ctx->tries = 0; - logmsg(rc, log_telemetry, "Subprocess %u is taking too long fetching %s", (unsigned) ctx->pid, ctx->uri.s); + logmsg(rc, log_telemetry, "Subprocess %u is taking too long fetching %s, whacking it", (unsigned) ctx->pid, ctx->uri.s); + dead_host_add(rc, &ctx->uri); } else if (sig == SIGTERM) { logmsg(rc, log_telemetry, "Whacking subprocess %u again", (unsigned) ctx->pid); } else { logmsg(rc, log_telemetry, "Whacking subprocess %u with big hammer", (unsigned) ctx->pid); } (void) kill(ctx->pid, sig); - ctx->deadline = now + 2; + ctx->deadline = now + 1; } } } @@ -3677,6 +3697,10 @@ static void rsync_sia_callback(const rcynic_ctx_t *rc, log_validation_status(rc, uri, rsync_failed, object_generation_null); break; + case rsync_status_timed_out: + log_validation_status(rc, uri, rsync_timed_out, object_generation_null); + break; + case rsync_status_skipped: log_validation_status(rc, uri, rsync_skipped, object_generation_null); break; |