diff options
author | Rob Austein <sra@hactrn.net> | 2011-06-29 02:22:53 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2011-06-29 02:22:53 +0000 |
commit | 89f548c97011f60db7a1d5b72890edee4013d4e6 (patch) | |
tree | 2be7a82ccddef9e24343edc4ce1c04f2da9d24f2 | |
parent | 5dfd98d3b07fb5f84de514ea19a23a3a25efb470 (diff) |
Reorder operations in rsync_mgr()
svn path=/rcynic-ng/rcynic.c; revision=3909
-rw-r--r-- | rcynic-ng/rcynic.c | 236 |
1 files changed, 121 insertions, 115 deletions
diff --git a/rcynic-ng/rcynic.c b/rcynic-ng/rcynic.c index 996ac42b..7b8b2339 100644 --- a/rcynic-ng/rcynic.c +++ b/rcynic-ng/rcynic.c @@ -1592,6 +1592,7 @@ static int rsync_runable(const rcynic_ctx_t *rc, case rsync_state_initial: case rsync_state_running: + case rsync_state_terminating: return 1; case rsync_state_retry_wait: @@ -1599,8 +1600,6 @@ static int rsync_runable(const rcynic_ctx_t *rc, case rsync_state_conflict_wait: #warning rsync_state_conflict_wait not implemented yet - - case rsync_state_terminating: return 0; } @@ -1737,8 +1736,8 @@ static void rsync_run(const rcynic_ctx_t *rc, ctx->problem = rsync_problem_none; if (rc->rsync_timeout) ctx->deadline = time(0) + rc->rsync_timeout; - logmsg(rc, log_debug, "Subprocess %u started, current subprocess count %d", - (unsigned) ctx->pid, sk_rsync_ctx_t_num(rc->rsync_queue)); + logmsg(rc, log_debug, "Subprocess %u started, current subprocess count %d, URI %s", + (unsigned) ctx->pid, sk_rsync_ctx_t_num(rc->rsync_queue), ctx->uri.s); if (ctx->handler) ctx->handler(rc, ctx, rsync_status_pending, &ctx->uri, ctx->wsk); return; @@ -1760,8 +1759,7 @@ static void rsync_run(const rcynic_ctx_t *rc, /** * Process one line of rsync's output. This is a separate function - * primarily to centralize things like searching for magic error - * messages we have to handle. + * primarily to centralize scraping for magic error strings. */ static void do_one_rsync_log_line(const rcynic_ctx_t *rc, rsync_ctx_t *ctx) @@ -1780,6 +1778,50 @@ static void do_one_rsync_log_line(const rcynic_ctx_t *rc, } /** + * Construct select() arguments. + */ +static int rsync_construct_select(const rcynic_ctx_t *rc, + const time_t now, + fd_set *rfds, + struct timeval *tv) +{ + rsync_ctx_t *ctx; + time_t when = 0; + int i, n = 0; + + assert(rc && rc->rsync_queue && rfds && tv); + + FD_ZERO(rfds); + + for (i = 0; (ctx = sk_rsync_ctx_t_value(rc->rsync_queue, i)) != NULL; ++i) { + switch (ctx->state) { + + case rsync_state_running: + if (ctx->fd >= 0) { + FD_SET(ctx->fd, rfds); + if (ctx->fd > n) + n = ctx->fd; + } + if (!rc->rsync_timeout) + continue; + /* Fall through */ + + case rsync_state_retry_wait: + if (when == 0 || ctx->deadline < when) + when = ctx->deadline; + /* Fall through */ + + default: + continue; + } + } + + tv->tv_sec = when ? when - now : 0; + tv->tv_usec = 0; + return n; +} + +/** * Manager for queue of rsync tasks in progress. * * General plan here is to process one completed child, or output @@ -1795,7 +1837,7 @@ static void do_one_rsync_log_line(const rcynic_ctx_t *rc, */ static void rsync_mgr(const rcynic_ctx_t *rc) { - time_t now = time(0), when; + time_t now = time(0); int i, n, pid_status = -1; rsync_ctx_t *ctx = NULL; struct timeval tv; @@ -1807,13 +1849,55 @@ static void rsync_mgr(const rcynic_ctx_t *rc) assert(rc && rc->rsync_queue); /* - * Look for rsync contexts that have become runable. + * Check for log text from subprocesses. */ - if (rsync_count_running(rc) < rc->max_parallel_fetches) { - for (i = 0; (ctx = sk_rsync_ctx_t_value(rc->rsync_queue, i)) != NULL; ++i) { - if (ctx->state != rsync_state_running && rsync_runable(rc, ctx)) { - rsync_run(rc, ctx); - return; + + n = rsync_construct_select(rc, now, &rfds, &tv); + + if (n > 0 || tv.tv_sec) + n = select(n + 1, &rfds, NULL, NULL, tv.tv_sec ? &tv : NULL); + + if (n > 0) { + + int again = 1; + + while (again) { + again = 0; + + for (i = 0; (ctx = sk_rsync_ctx_t_value(rc->rsync_queue, i)) != NULL; ++i) { + if (ctx->fd <= 0 || !FD_ISSET(ctx->fd, &rfds)) + continue; + + assert(ctx->buflen < sizeof(ctx->buffer) - 1); + + while ((n = read(ctx->fd, ctx->buffer + ctx->buflen, sizeof(ctx->buffer) - 1 - ctx->buflen)) > 0) { + again = 1; + ctx->buflen += n; + assert(ctx->buflen < sizeof(ctx->buffer)); + ctx->buffer[ctx->buflen] = '\0'; + + while ((s = strchr(ctx->buffer, '\n')) != NULL) { + *s++ = '\0'; + do_one_rsync_log_line(rc, ctx); + assert(s > ctx->buffer && s < ctx->buffer + sizeof(ctx->buffer)); + ctx->buflen -= s - ctx->buffer; + assert(ctx->buflen < sizeof(ctx->buffer)); + if (ctx->buflen > 0) + memmove(ctx->buffer, s, ctx->buflen); + ctx->buffer[ctx->buflen] = '\0'; + } + + if (ctx->buflen == sizeof(ctx->buffer) - 1) { + ctx->buffer[sizeof(ctx->buffer) - 1] = '\0'; + do_one_rsync_log_line(rc, ctx); + ctx->buflen = 0; + } + } + + if (n == 0) { + (void) close(ctx->fd); + ctx->fd = -1; + } } } } @@ -1822,32 +1906,8 @@ static void rsync_mgr(const rcynic_ctx_t *rc) * Check for exited subprocesses. */ - while ((pid = waitpid(-1, &pid_status, WNOHANG)) == -1 && errno == EINTR) - ; - - switch (pid) { - - case -1: - /* - * If we have no children, we're done here. - */ - if (errno == ECHILD) - return; - - /* - * Not a lot we can do for errors other than EINTR or ECHILD, - * other than whining. - */ - logmsg(rc, log_sys_err, "waitpid() returned error: %s", strerror(errno)); - return; - - case 0: - /* - * We have children, but none ready to exit, continue to select(). - */ - break; + while ((pid = waitpid(-1, &pid_status, WNOHANG)) > 0) { - default: /* * Child exited, handle that and return. */ @@ -1857,7 +1917,7 @@ static void rsync_mgr(const rcynic_ctx_t *rc) break; if (ctx == NULL) { logmsg(rc, log_sys_err, "Couldn't find rsync context for pid %d", pid); - return; + continue; } close(ctx->fd); @@ -1870,8 +1930,6 @@ static void rsync_mgr(const rcynic_ctx_t *rc) ctx->buflen = 0; } - ctx->pid = 0; - switch (WEXITSTATUS(pid_status)) { case 0: @@ -1891,23 +1949,22 @@ static void rsync_mgr(const rcynic_ctx_t *rc) if (ctx->problem == rsync_problem_refused) { unsigned char r; - if (RAND_bytes(&r, sizeof(r))) - r %= 60; - else + if (!RAND_bytes(&r, sizeof(r))) r = 60; ctx->deadline = time(0) + 30 + r; ctx->state = rsync_state_retry_wait; ctx->problem = rsync_problem_none; + ctx->pid = 0; ctx->tries++; logmsg(rc, log_telemetry, "Scheduling retry for %s", ctx->uri.s); - return; + continue; } /* Otherwise, fall through */ default: - logmsg(rc, log_data_err, "rsync exited with status %d fetching %s", - WEXITSTATUS(pid_status), ctx->uri.s); + logmsg(rc, log_data_err, "rsync %u exited with status %d fetching %s", + (unsigned) pid, WEXITSTATUS(pid_status), ctx->uri.s); mib_increment(rc, &ctx->uri, (rc->rsync_timeout && now >= ctx->deadline ? rsync_timed_out @@ -1926,17 +1983,30 @@ static void rsync_mgr(const rcynic_ctx_t *rc) ctx->handler(rc, ctx, WEXITSTATUS(pid_status) ? rsync_status_failed : rsync_status_done, &ctx->uri, ctx->wsk); (void) sk_rsync_ctx_t_delete_ptr(rc->rsync_queue, ctx); free(ctx); - return; + ctx = NULL; + } + + if (pid == -1 && errno != EINTR && errno != ECHILD) + logmsg(rc, log_sys_err, "waitpid() returned error: %s", strerror(errno)); + + /* + * Look for rsync contexts that have become runable. + */ + if (rsync_count_running(rc) < rc->max_parallel_fetches) { + for (i = 0; (ctx = sk_rsync_ctx_t_value(rc->rsync_queue, i)) != NULL; ++i) { + if (ctx->state != rsync_state_running && rsync_runable(rc, ctx)) { + rsync_run(rc, ctx); + break; + } + } } /* * Deal with children that have been running too long. */ if (rc->rsync_timeout) { - int signaled = 0; for (i = 0; (ctx = sk_rsync_ctx_t_value(rc->rsync_queue, i)) != NULL; ++i) { - if (ctx->pid <= 0 || now < ctx->deadline || - (waitpid(ctx->pid, &pid_status, WNOHANG) == ctx->pid && WIFEXITED(pid_status))) + if (ctx->pid <= 0 || now < ctx->deadline) continue; if (ctx->state != rsync_state_terminating) { ctx->problem = rsync_problem_timed_out; @@ -1946,70 +2016,6 @@ static void rsync_mgr(const rcynic_ctx_t *rc) } (void) kill(ctx->pid, ctx->tries++ < KILL_MAX ? SIGTERM : SIGKILL); ctx->deadline = now + 2; - signaled++; - } - if (signaled) - return; - } - - /* - * Check for log text from children if we get this far. This is - * where we finally block (select()) if we've nothing else to do. - */ - - FD_ZERO(&rfds); - when = 0; - n = 0; - - for (i = 0; (ctx = sk_rsync_ctx_t_value(rc->rsync_queue, i)) != NULL; ++i) { - if (ctx->fd <= 0) - continue; - assert(ctx->state == rsync_state_running); - FD_SET(ctx->fd, &rfds); - if (ctx->fd > n) - n = ctx->fd; - if (when == 0 || ctx->deadline < when) - when = ctx->deadline; - } - - if (rc->rsync_timeout && when != 0) { - tv.tv_sec = when - now; - tv.tv_usec = 0; - n = select(n + 1, &rfds, NULL, NULL, &tv); - } else { - n = select(n + 1, &rfds, NULL, NULL, NULL); - } - - if (n <= 0) - return; - - for (i = 0; (ctx = sk_rsync_ctx_t_value(rc->rsync_queue, i)) != NULL; ++i) { - if (ctx->fd <= 0 || !FD_ISSET(ctx->fd, &rfds)) - continue; - - assert(ctx->buflen < sizeof(ctx->buffer) - 1); - - while ((n = read(ctx->fd, ctx->buffer + ctx->buflen, sizeof(ctx->buffer) - 1 - ctx->buflen)) > 0) { - ctx->buflen += n; - assert(ctx->buflen < sizeof(ctx->buffer)); - ctx->buffer[ctx->buflen] = '\0'; - - while ((s = strchr(ctx->buffer, '\n')) != NULL) { - *s++ = '\0'; - do_one_rsync_log_line(rc, ctx); - assert(s > ctx->buffer && s < ctx->buffer + sizeof(ctx->buffer)); - ctx->buflen -= s - ctx->buffer; - assert(ctx->buflen < sizeof(ctx->buffer)); - if (ctx->buflen > 0) - memmove(ctx->buffer, s, ctx->buflen); - ctx->buffer[ctx->buflen] = '\0'; - } - - if (ctx->buflen == sizeof(ctx->buffer) - 1) { - ctx->buffer[sizeof(ctx->buffer) - 1] = '\0'; - do_one_rsync_log_line(rc, ctx); - ctx->buflen = 0; - } } } } |