diff options
-rw-r--r-- | rcynic-ng/rcynic.c | 427 |
1 files changed, 268 insertions, 159 deletions
diff --git a/rcynic-ng/rcynic.c b/rcynic-ng/rcynic.c index 7ed8d084..996ac42b 100644 --- a/rcynic-ng/rcynic.c +++ b/rcynic-ng/rcynic.c @@ -407,12 +407,16 @@ typedef struct rsync_ctx { enum { rsync_state_initial, /* Must be first */ rsync_state_running, - rsync_state_conflict_block, - rsync_state_refused_try_later, + rsync_state_conflict_wait, rsync_state_retry_wait, rsync_state_terminating } state; - unsigned tries, kill_count; + enum { + rsync_problem_none, /* Must be first */ + rsync_problem_timed_out, + rsync_problem_refused + } problem; + unsigned tries; pid_t pid; int fd; time_t started, deadline; @@ -1560,18 +1564,219 @@ static int rsync_cached_uri(const rcynic_ctx_t *rc, } /** - * Process one log line from rsync. This is a separate function - * primarily to centralize things like screen scraping rsync's output - * looking for magic error messages we have to handle. + * Return count of how many rsync contexts are in running. + */ +static int rsync_count_running(const rcynic_ctx_t *rc) +{ + const rsync_ctx_t *ctx; + int i, n = 0; + + assert(rc && rc->rsync_queue); + + for (i = 0; (ctx = sk_rsync_ctx_t_value(rc->rsync_queue, i)) != NULL; ++i) + if (ctx->state == rsync_state_running) + n++; + + return n; +} + +/** + * Test whether a rsync context is runable at this time. + */ +static int rsync_runable(const rcynic_ctx_t *rc, + const rsync_ctx_t *ctx) +{ + assert(rc && ctx); + + switch (ctx->state) { + + case rsync_state_initial: + case rsync_state_running: + return 1; + + case rsync_state_retry_wait: + return ctx->deadline <= time(0); + + case rsync_state_conflict_wait: +#warning rsync_state_conflict_wait not implemented yet + + case rsync_state_terminating: + return 0; + } + + return 0; +} + +/** + * Return count of runable rsync contexts. + */ +static int rsync_count_runable(const rcynic_ctx_t *rc) +{ + const rsync_ctx_t *ctx; + int i, n = 0; + + assert(rc && rc->rsync_queue); + + for (i = 0; (ctx = sk_rsync_ctx_t_value(rc->rsync_queue, i)) != NULL; ++i) + if (rsync_runable(rc, ctx)) + n++; + + return n; +} + +/** + * Run an rsync process. + */ +static void rsync_run(const rcynic_ctx_t *rc, + rsync_ctx_t *ctx) +{ + static const char * const rsync_cmd[] = { + "rsync", "--update", "--times", "--copy-links", "--itemize-changes" + }; + static const char * const rsync_tree_args[] = { + "--recursive", "--delete" + }; + + const char *argv[10]; + path_t path; + int i, argc = 0, flags, pipe_fds[2]; + + pipe_fds[0] = pipe_fds[1] = -1; + + assert(rc && ctx && ctx->state != rsync_state_running && rsync_runable(rc, ctx)); + + logmsg(rc, log_telemetry, "Fetching %s", ctx->uri.s); + + memset(argv, 0, sizeof(argv)); + + for (i = 0; i < sizeof(rsync_cmd)/sizeof(*rsync_cmd); i++) { + assert(argc < sizeof(argv)/sizeof(*argv)); + argv[argc++] = rsync_cmd[i]; + } + if (endswith(ctx->uri.s, "/")) { + for (i = 0; i < sizeof(rsync_tree_args)/sizeof(*rsync_tree_args); i++) { + assert(argc < sizeof(argv)/sizeof(*argv)); + argv[argc++] = rsync_tree_args[i]; + } + } + + if (rc->rsync_program) + argv[0] = rc->rsync_program; + + if (!uri_to_filename(rc, &ctx->uri, &path, &rc->unauthenticated)) { + logmsg(rc, log_data_err, "Couldn't extract filename from URI: %s", ctx->uri.s); + goto lose; + } + + assert(argc < sizeof(argv)/sizeof(*argv)); + argv[argc++] = ctx->uri.s; + + assert(argc < sizeof(argv)/sizeof(*argv)); + argv[argc++] = path.s; + + if (!mkdir_maybe(rc, &path)) { + logmsg(rc, log_sys_err, "Couldn't make target directory: %s", path.s); + goto lose; + } + + for (i = 0; i < argc; i++) + logmsg(rc, log_verbose, "rsync argv[%d]: %s", i, argv[i]); + + if (pipe(pipe_fds) < 0) { + logmsg(rc, log_sys_err, "pipe() failed: %s", strerror(errno)); + goto lose; + } + ctx->fd = pipe_fds[0]; + + if ((flags = fcntl(ctx->fd, F_GETFL, 0)) == -1) { + logmsg(rc, log_sys_err, "fcntl(F_GETFL) failed: %s", + strerror(errno)); + goto lose; + } + flags |= O_NONBLOCK; + if (fcntl(ctx->fd, F_SETFL, flags) == -1) { + logmsg(rc, log_sys_err, "fcntl(F_SETFL) failed: %s", + strerror(errno)); + goto lose; + } + + switch ((ctx->pid = vfork())) { + + case -1: + logmsg(rc, log_sys_err, "vfork() failed: %s", strerror(errno)); + goto lose; + + case 0: + /* + * Child + */ +#define whine(msg) write(2, msg, sizeof(msg) - 1) + if (close(pipe_fds[0]) < 0) + whine("close(pipe_fds[0]) failed\n"); + else if (dup2(pipe_fds[1], 1) < 0) + whine("dup2(pipe_fds[1], 1) failed\n"); + else if (dup2(pipe_fds[1], 2) < 0) + whine("dup2(pipe_fds[1], 2) failed\n"); + else if (close(pipe_fds[1]) < 0) + whine("close(pipe_fds[1]) failed\n"); + else if (execvp(argv[0], (char * const *) argv) < 0) + whine("execvp(argv[0], (char * const *) argv) failed\n"); + whine("last system error: "); + write(2, strerror(errno), strlen(strerror(errno))); + whine("\n"); + _exit(1); +#undef whine + + default: + /* + * Parent + */ + (void) close(pipe_fds[1]); + pipe_fds[1] = -1; + ctx->state = rsync_state_running; + ctx->problem = rsync_problem_none; + if (rc->rsync_timeout) + ctx->deadline = time(0) + rc->rsync_timeout; + logmsg(rc, log_debug, "Subprocess %u started, current subprocess count %d", + (unsigned) ctx->pid, sk_rsync_ctx_t_num(rc->rsync_queue)); + if (ctx->handler) + ctx->handler(rc, ctx, rsync_status_pending, &ctx->uri, ctx->wsk); + return; + + } + + lose: + if (pipe_fds[0] != -1) + (void) close(pipe_fds[0]); + if (pipe_fds[1] != -1) + (void) close(pipe_fds[1]); + if (rc->rsync_queue && ctx) + (void) sk_rsync_ctx_t_delete_ptr(rc->rsync_queue, ctx); + if (ctx && ctx->handler) + ctx->handler(rc, ctx, rsync_status_failed, &ctx->uri, ctx->wsk); + if (ctx) + free(ctx); +} + +/** + * Process one line of rsync's output. This is a separate function + * primarily to centralize things like searching for magic error + * messages we have to handle. */ static void do_one_rsync_log_line(const rcynic_ctx_t *rc, rsync_ctx_t *ctx) { + /* + * Send line to our log unless it's empty. + */ if (ctx->buffer[strspn(ctx->buffer, " \t\n\r")] != '\0') logmsg(rc, log_telemetry, "rsync[%u]: %s", ctx->pid, ctx->buffer); + /* + * Check for magic error strings + */ if (strstr(ctx->buffer, "@ERROR: max connections")) - ctx->state = rsync_state_refused_try_later; + ctx->problem = rsync_problem_refused; } /** @@ -1601,6 +1806,22 @@ static void rsync_mgr(const rcynic_ctx_t *rc) assert(rc && rc->rsync_queue); + /* + * Look for rsync contexts that have become runable. + */ + if (rsync_count_running(rc) < rc->max_parallel_fetches) { + for (i = 0; (ctx = sk_rsync_ctx_t_value(rc->rsync_queue, i)) != NULL; ++i) { + if (ctx->state != rsync_state_running && rsync_runable(rc, ctx)) { + rsync_run(rc, ctx); + return; + } + } + } + + /* + * Check for exited subprocesses. + */ + while ((pid = waitpid(-1, &pid_status, WNOHANG)) == -1 && errno == EINTR) ; @@ -1640,14 +1861,17 @@ static void rsync_mgr(const rcynic_ctx_t *rc) } close(ctx->fd); + ctx->fd = -1; if (ctx->buflen > 0) { assert(ctx->buflen < sizeof(ctx->buffer)); ctx->buffer[ctx->buflen] = '\0'; - logmsg(rc, log_telemetry, "rsync[%u]: %s", ctx->pid, ctx->buffer); + do_one_rsync_log_line(rc, ctx); ctx->buflen = 0; } + ctx->pid = 0; + switch (WEXITSTATUS(pid_status)) { case 0: @@ -1655,15 +1879,31 @@ static void rsync_mgr(const rcynic_ctx_t *rc) mib_increment(rc, &ctx->uri, rsync_succeeded); break; -#if 0 case 5: /* - * Handle rsync protocol error that really means "retry later". - * In theory, this is confirmation of error we already saw in - * rsync's stderr output, in which case ctx->state should be - * rsync_state_refused_try_later. + * Handle remote rsyncd refusing to talk to us because we've + * exceeded its connection limit. Back off for a short + * interval, then retry. */ - #endif + +#warning Parameters here should come from config file +#warning Need some kind of maximum retry count here too + + if (ctx->problem == rsync_problem_refused) { + unsigned char r; + if (RAND_bytes(&r, sizeof(r))) + r %= 60; + else + r = 60; + ctx->deadline = time(0) + 30 + r; + ctx->state = rsync_state_retry_wait; + ctx->problem = rsync_problem_none; + ctx->tries++; + logmsg(rc, log_telemetry, "Scheduling retry for %s", ctx->uri.s); + return; + } + + /* Otherwise, fall through */ default: logmsg(rc, log_data_err, "rsync exited with status %d fetching %s", @@ -1699,12 +1939,13 @@ static void rsync_mgr(const rcynic_ctx_t *rc) (waitpid(ctx->pid, &pid_status, WNOHANG) == ctx->pid && WIFEXITED(pid_status))) continue; if (ctx->state != rsync_state_terminating) { + ctx->problem = rsync_problem_timed_out; ctx->state = rsync_state_terminating; ctx->tries = 0; logmsg(rc, log_debug, "Subprocess %u is taking too long fetching %s", (unsigned) ctx->pid, ctx->uri.s); } (void) kill(ctx->pid, ctx->tries++ < KILL_MAX ? SIGTERM : SIGKILL); - ctx->deadline = now + 1; + ctx->deadline = now + 2; signaled++; } if (signaled) @@ -1723,6 +1964,7 @@ static void rsync_mgr(const rcynic_ctx_t *rc) for (i = 0; (ctx = sk_rsync_ctx_t_value(rc->rsync_queue, i)) != NULL; ++i) { if (ctx->fd <= 0) continue; + assert(ctx->state == rsync_state_running); FD_SET(ctx->fd, &rfds); if (ctx->fd > n) n = ctx->fd; @@ -1772,147 +2014,13 @@ static void rsync_mgr(const rcynic_ctx_t *rc) } } - -/** - * Run rsync. - */ -static void run_rsync(const rcynic_ctx_t *rc, - rsync_ctx_t *ctx) -{ - static const char * const rsync_cmd[] = { - "rsync", "--update", "--times", "--copy-links", "--itemize-changes", NULL - }; - - static const char * const rsync_tree_args[] = { - "--recursive", "--delete", NULL - }; - - const char *argv[10]; - path_t path; - int i, argc = 0, flags, pipe_fds[2]; - - pipe_fds[0] = pipe_fds[1] = -1; - - assert(rc && ctx); - - logmsg(rc, log_telemetry, "Fetching %s", ctx->uri.s); - - memset(argv, 0, sizeof(argv)); - - for (i = 0; rsync_cmd[i]; i++) { - assert(argc < sizeof(argv)/sizeof(*argv)); - argv[argc++] = rsync_cmd[i]; - } - if (endswith(ctx->uri.s, "/")) { - for (i = 0; rsync_tree_args[i]; i++) { - assert(argc < sizeof(argv)/sizeof(*argv)); - argv[argc++] = rsync_tree_args[i]; - } - } - - if (rc->rsync_program) - argv[0] = rc->rsync_program; - - if (!uri_to_filename(rc, &ctx->uri, &path, &rc->unauthenticated)) { - logmsg(rc, log_data_err, "Couldn't extract filename from URI: %s", ctx->uri.s); - goto lose; - } - - assert(argc < sizeof(argv)/sizeof(*argv)); - argv[argc++] = ctx->uri.s; - - assert(argc < sizeof(argv)/sizeof(*argv)); - argv[argc++] = path.s; - - if (!mkdir_maybe(rc, &path)) { - logmsg(rc, log_sys_err, "Couldn't make target directory: %s", path.s); - goto lose; - } - - for (i = 0; i < argc; i++) - logmsg(rc, log_verbose, "rsync argv[%d]: %s", i, argv[i]); - - if (pipe(pipe_fds) < 0) { - logmsg(rc, log_sys_err, "pipe() failed: %s", strerror(errno)); - goto lose; - } - ctx->fd = pipe_fds[0]; - - if ((flags = fcntl(ctx->fd, F_GETFL, 0)) == -1) { - logmsg(rc, log_sys_err, "fcntl(F_GETFL) failed: %s", - strerror(errno)); - goto lose; - } - flags |= O_NONBLOCK; - if (fcntl(ctx->fd, F_SETFL, flags) == -1) { - logmsg(rc, log_sys_err, "fcntl(F_SETFL) failed: %s", - strerror(errno)); - goto lose; - } - - switch ((ctx->pid = vfork())) { - - case -1: - logmsg(rc, log_sys_err, "vfork() failed: %s", strerror(errno)); - goto lose; - - case 0: - /* - * Child - */ -#define whine(msg) write(2, msg, sizeof(msg) - 1) - if (close(pipe_fds[0]) < 0) - whine("close(pipe_fds[0]) failed\n"); - else if (dup2(pipe_fds[1], 1) < 0) - whine("dup2(pipe_fds[1], 1) failed\n"); - else if (dup2(pipe_fds[1], 2) < 0) - whine("dup2(pipe_fds[1], 2) failed\n"); - else if (close(pipe_fds[1]) < 0) - whine("close(pipe_fds[1]) failed\n"); - else if (execvp(argv[0], (char * const *) argv) < 0) - whine("execvp(argv[0], (char * const *) argv) failed\n"); - whine("last system error: "); - write(2, strerror(errno), strlen(strerror(errno))); - whine("\n"); - _exit(1); -#undef whine - - default: - /* - * Parent - */ - (void) close(pipe_fds[1]); - pipe_fds[1] = -1; - if (rc->rsync_timeout) - ctx->deadline = time(0) + rc->rsync_timeout; - logmsg(rc, log_debug, "Subprocess %u started, current subprocess count %d", - (unsigned) ctx->pid, sk_rsync_ctx_t_num(rc->rsync_queue)); - if (ctx->handler) - ctx->handler(rc, ctx, rsync_status_pending, &ctx->uri, ctx->wsk); - return; - - } - - lose: - if (pipe_fds[0] != -1) - (void) close(pipe_fds[0]); - if (pipe_fds[1] != -1) - (void) close(pipe_fds[1]); - if (rc->rsync_queue && ctx) - (void) sk_rsync_ctx_t_delete_ptr(rc->rsync_queue, ctx); - if (ctx && ctx->handler) - ctx->handler(rc, ctx, rsync_status_failed, &ctx->uri, ctx->wsk); - if (ctx) - free(ctx); -} - /** * Set up rsync context and attempt to start it. */ -static void rsync(const rcynic_ctx_t *rc, - const uri_t *uri, - STACK_OF(walk_ctx_t) *wsk, - void (*handler)(const rcynic_ctx_t *, const rsync_ctx_t *, const rsync_status_t, const uri_t *, STACK_OF(walk_ctx_t) *)) +static void rsync_init(const rcynic_ctx_t *rc, + const uri_t *uri, + STACK_OF(walk_ctx_t) *wsk, + void (*handler)(const rcynic_ctx_t *, const rsync_ctx_t *, const rsync_status_t, const uri_t *, STACK_OF(walk_ctx_t) *)) { rsync_ctx_t *ctx = NULL; @@ -1948,7 +2056,8 @@ static void rsync(const rcynic_ctx_t *rc, return; } - run_rsync(rc, ctx); + if (rsync_runable(rc, ctx)) + rsync_run(rc, ctx); } /** @@ -1958,7 +2067,7 @@ static void rsync_file(const rcynic_ctx_t *rc, const uri_t *uri) { assert(!endswith(uri->s, "/")); - rsync(rc, uri, NULL, NULL); + rsync_init(rc, uri, NULL, NULL); } /** @@ -1970,7 +2079,7 @@ static void rsync_tree(const rcynic_ctx_t *rc, void (*handler)(const rcynic_ctx_t *, const rsync_ctx_t *, const rsync_status_t, const uri_t *, STACK_OF(walk_ctx_t) *)) { assert(endswith(uri->s, "/")); - rsync(rc, uri, wsk, handler); + rsync_init(rc, uri, wsk, handler); } @@ -3527,14 +3636,14 @@ static void rsync_sia_callback(const rcynic_ctx_t *rc, { walk_ctx_t *w = walk_ctx_stack_head(wsk); - assert(wsk); + assert(rc && wsk); switch (status) { case rsync_status_pending: - if (sk_rsync_ctx_t_num(rc->rsync_queue) >= rc->max_parallel_fetches) + if (rsync_count_runable(rc) >= rc->max_parallel_fetches) return; - + if ((wsk = walk_ctx_stack_clone(wsk)) == NULL) { logmsg(rc, log_sys_err, "walk_ctx_stack_clone() failed, probably memory exhaustion, blundering onwards without forking stack"); return; |