aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--rcynic-ng/rcynic.c427
1 files changed, 268 insertions, 159 deletions
diff --git a/rcynic-ng/rcynic.c b/rcynic-ng/rcynic.c
index 7ed8d084..996ac42b 100644
--- a/rcynic-ng/rcynic.c
+++ b/rcynic-ng/rcynic.c
@@ -407,12 +407,16 @@ typedef struct rsync_ctx {
enum {
rsync_state_initial, /* Must be first */
rsync_state_running,
- rsync_state_conflict_block,
- rsync_state_refused_try_later,
+ rsync_state_conflict_wait,
rsync_state_retry_wait,
rsync_state_terminating
} state;
- unsigned tries, kill_count;
+ enum {
+ rsync_problem_none, /* Must be first */
+ rsync_problem_timed_out,
+ rsync_problem_refused
+ } problem;
+ unsigned tries;
pid_t pid;
int fd;
time_t started, deadline;
@@ -1560,18 +1564,219 @@ static int rsync_cached_uri(const rcynic_ctx_t *rc,
}
/**
- * Process one log line from rsync. This is a separate function
- * primarily to centralize things like screen scraping rsync's output
- * looking for magic error messages we have to handle.
+ * Return count of how many rsync contexts are in running.
+ */
+static int rsync_count_running(const rcynic_ctx_t *rc)
+{
+ const rsync_ctx_t *ctx;
+ int i, n = 0;
+
+ assert(rc && rc->rsync_queue);
+
+ for (i = 0; (ctx = sk_rsync_ctx_t_value(rc->rsync_queue, i)) != NULL; ++i)
+ if (ctx->state == rsync_state_running)
+ n++;
+
+ return n;
+}
+
+/**
+ * Test whether a rsync context is runable at this time.
+ */
+static int rsync_runable(const rcynic_ctx_t *rc,
+ const rsync_ctx_t *ctx)
+{
+ assert(rc && ctx);
+
+ switch (ctx->state) {
+
+ case rsync_state_initial:
+ case rsync_state_running:
+ return 1;
+
+ case rsync_state_retry_wait:
+ return ctx->deadline <= time(0);
+
+ case rsync_state_conflict_wait:
+#warning rsync_state_conflict_wait not implemented yet
+
+ case rsync_state_terminating:
+ return 0;
+ }
+
+ return 0;
+}
+
+/**
+ * Return count of runable rsync contexts.
+ */
+static int rsync_count_runable(const rcynic_ctx_t *rc)
+{
+ const rsync_ctx_t *ctx;
+ int i, n = 0;
+
+ assert(rc && rc->rsync_queue);
+
+ for (i = 0; (ctx = sk_rsync_ctx_t_value(rc->rsync_queue, i)) != NULL; ++i)
+ if (rsync_runable(rc, ctx))
+ n++;
+
+ return n;
+}
+
+/**
+ * Run an rsync process.
+ */
+static void rsync_run(const rcynic_ctx_t *rc,
+ rsync_ctx_t *ctx)
+{
+ static const char * const rsync_cmd[] = {
+ "rsync", "--update", "--times", "--copy-links", "--itemize-changes"
+ };
+ static const char * const rsync_tree_args[] = {
+ "--recursive", "--delete"
+ };
+
+ const char *argv[10];
+ path_t path;
+ int i, argc = 0, flags, pipe_fds[2];
+
+ pipe_fds[0] = pipe_fds[1] = -1;
+
+ assert(rc && ctx && ctx->state != rsync_state_running && rsync_runable(rc, ctx));
+
+ logmsg(rc, log_telemetry, "Fetching %s", ctx->uri.s);
+
+ memset(argv, 0, sizeof(argv));
+
+ for (i = 0; i < sizeof(rsync_cmd)/sizeof(*rsync_cmd); i++) {
+ assert(argc < sizeof(argv)/sizeof(*argv));
+ argv[argc++] = rsync_cmd[i];
+ }
+ if (endswith(ctx->uri.s, "/")) {
+ for (i = 0; i < sizeof(rsync_tree_args)/sizeof(*rsync_tree_args); i++) {
+ assert(argc < sizeof(argv)/sizeof(*argv));
+ argv[argc++] = rsync_tree_args[i];
+ }
+ }
+
+ if (rc->rsync_program)
+ argv[0] = rc->rsync_program;
+
+ if (!uri_to_filename(rc, &ctx->uri, &path, &rc->unauthenticated)) {
+ logmsg(rc, log_data_err, "Couldn't extract filename from URI: %s", ctx->uri.s);
+ goto lose;
+ }
+
+ assert(argc < sizeof(argv)/sizeof(*argv));
+ argv[argc++] = ctx->uri.s;
+
+ assert(argc < sizeof(argv)/sizeof(*argv));
+ argv[argc++] = path.s;
+
+ if (!mkdir_maybe(rc, &path)) {
+ logmsg(rc, log_sys_err, "Couldn't make target directory: %s", path.s);
+ goto lose;
+ }
+
+ for (i = 0; i < argc; i++)
+ logmsg(rc, log_verbose, "rsync argv[%d]: %s", i, argv[i]);
+
+ if (pipe(pipe_fds) < 0) {
+ logmsg(rc, log_sys_err, "pipe() failed: %s", strerror(errno));
+ goto lose;
+ }
+ ctx->fd = pipe_fds[0];
+
+ if ((flags = fcntl(ctx->fd, F_GETFL, 0)) == -1) {
+ logmsg(rc, log_sys_err, "fcntl(F_GETFL) failed: %s",
+ strerror(errno));
+ goto lose;
+ }
+ flags |= O_NONBLOCK;
+ if (fcntl(ctx->fd, F_SETFL, flags) == -1) {
+ logmsg(rc, log_sys_err, "fcntl(F_SETFL) failed: %s",
+ strerror(errno));
+ goto lose;
+ }
+
+ switch ((ctx->pid = vfork())) {
+
+ case -1:
+ logmsg(rc, log_sys_err, "vfork() failed: %s", strerror(errno));
+ goto lose;
+
+ case 0:
+ /*
+ * Child
+ */
+#define whine(msg) write(2, msg, sizeof(msg) - 1)
+ if (close(pipe_fds[0]) < 0)
+ whine("close(pipe_fds[0]) failed\n");
+ else if (dup2(pipe_fds[1], 1) < 0)
+ whine("dup2(pipe_fds[1], 1) failed\n");
+ else if (dup2(pipe_fds[1], 2) < 0)
+ whine("dup2(pipe_fds[1], 2) failed\n");
+ else if (close(pipe_fds[1]) < 0)
+ whine("close(pipe_fds[1]) failed\n");
+ else if (execvp(argv[0], (char * const *) argv) < 0)
+ whine("execvp(argv[0], (char * const *) argv) failed\n");
+ whine("last system error: ");
+ write(2, strerror(errno), strlen(strerror(errno)));
+ whine("\n");
+ _exit(1);
+#undef whine
+
+ default:
+ /*
+ * Parent
+ */
+ (void) close(pipe_fds[1]);
+ pipe_fds[1] = -1;
+ ctx->state = rsync_state_running;
+ ctx->problem = rsync_problem_none;
+ if (rc->rsync_timeout)
+ ctx->deadline = time(0) + rc->rsync_timeout;
+ logmsg(rc, log_debug, "Subprocess %u started, current subprocess count %d",
+ (unsigned) ctx->pid, sk_rsync_ctx_t_num(rc->rsync_queue));
+ if (ctx->handler)
+ ctx->handler(rc, ctx, rsync_status_pending, &ctx->uri, ctx->wsk);
+ return;
+
+ }
+
+ lose:
+ if (pipe_fds[0] != -1)
+ (void) close(pipe_fds[0]);
+ if (pipe_fds[1] != -1)
+ (void) close(pipe_fds[1]);
+ if (rc->rsync_queue && ctx)
+ (void) sk_rsync_ctx_t_delete_ptr(rc->rsync_queue, ctx);
+ if (ctx && ctx->handler)
+ ctx->handler(rc, ctx, rsync_status_failed, &ctx->uri, ctx->wsk);
+ if (ctx)
+ free(ctx);
+}
+
+/**
+ * Process one line of rsync's output. This is a separate function
+ * primarily to centralize things like searching for magic error
+ * messages we have to handle.
*/
static void do_one_rsync_log_line(const rcynic_ctx_t *rc,
rsync_ctx_t *ctx)
{
+ /*
+ * Send line to our log unless it's empty.
+ */
if (ctx->buffer[strspn(ctx->buffer, " \t\n\r")] != '\0')
logmsg(rc, log_telemetry, "rsync[%u]: %s", ctx->pid, ctx->buffer);
+ /*
+ * Check for magic error strings
+ */
if (strstr(ctx->buffer, "@ERROR: max connections"))
- ctx->state = rsync_state_refused_try_later;
+ ctx->problem = rsync_problem_refused;
}
/**
@@ -1601,6 +1806,22 @@ static void rsync_mgr(const rcynic_ctx_t *rc)
assert(rc && rc->rsync_queue);
+ /*
+ * Look for rsync contexts that have become runable.
+ */
+ if (rsync_count_running(rc) < rc->max_parallel_fetches) {
+ for (i = 0; (ctx = sk_rsync_ctx_t_value(rc->rsync_queue, i)) != NULL; ++i) {
+ if (ctx->state != rsync_state_running && rsync_runable(rc, ctx)) {
+ rsync_run(rc, ctx);
+ return;
+ }
+ }
+ }
+
+ /*
+ * Check for exited subprocesses.
+ */
+
while ((pid = waitpid(-1, &pid_status, WNOHANG)) == -1 && errno == EINTR)
;
@@ -1640,14 +1861,17 @@ static void rsync_mgr(const rcynic_ctx_t *rc)
}
close(ctx->fd);
+ ctx->fd = -1;
if (ctx->buflen > 0) {
assert(ctx->buflen < sizeof(ctx->buffer));
ctx->buffer[ctx->buflen] = '\0';
- logmsg(rc, log_telemetry, "rsync[%u]: %s", ctx->pid, ctx->buffer);
+ do_one_rsync_log_line(rc, ctx);
ctx->buflen = 0;
}
+ ctx->pid = 0;
+
switch (WEXITSTATUS(pid_status)) {
case 0:
@@ -1655,15 +1879,31 @@ static void rsync_mgr(const rcynic_ctx_t *rc)
mib_increment(rc, &ctx->uri, rsync_succeeded);
break;
-#if 0
case 5:
/*
- * Handle rsync protocol error that really means "retry later".
- * In theory, this is confirmation of error we already saw in
- * rsync's stderr output, in which case ctx->state should be
- * rsync_state_refused_try_later.
+ * Handle remote rsyncd refusing to talk to us because we've
+ * exceeded its connection limit. Back off for a short
+ * interval, then retry.
*/
- #endif
+
+#warning Parameters here should come from config file
+#warning Need some kind of maximum retry count here too
+
+ if (ctx->problem == rsync_problem_refused) {
+ unsigned char r;
+ if (RAND_bytes(&r, sizeof(r)))
+ r %= 60;
+ else
+ r = 60;
+ ctx->deadline = time(0) + 30 + r;
+ ctx->state = rsync_state_retry_wait;
+ ctx->problem = rsync_problem_none;
+ ctx->tries++;
+ logmsg(rc, log_telemetry, "Scheduling retry for %s", ctx->uri.s);
+ return;
+ }
+
+ /* Otherwise, fall through */
default:
logmsg(rc, log_data_err, "rsync exited with status %d fetching %s",
@@ -1699,12 +1939,13 @@ static void rsync_mgr(const rcynic_ctx_t *rc)
(waitpid(ctx->pid, &pid_status, WNOHANG) == ctx->pid && WIFEXITED(pid_status)))
continue;
if (ctx->state != rsync_state_terminating) {
+ ctx->problem = rsync_problem_timed_out;
ctx->state = rsync_state_terminating;
ctx->tries = 0;
logmsg(rc, log_debug, "Subprocess %u is taking too long fetching %s", (unsigned) ctx->pid, ctx->uri.s);
}
(void) kill(ctx->pid, ctx->tries++ < KILL_MAX ? SIGTERM : SIGKILL);
- ctx->deadline = now + 1;
+ ctx->deadline = now + 2;
signaled++;
}
if (signaled)
@@ -1723,6 +1964,7 @@ static void rsync_mgr(const rcynic_ctx_t *rc)
for (i = 0; (ctx = sk_rsync_ctx_t_value(rc->rsync_queue, i)) != NULL; ++i) {
if (ctx->fd <= 0)
continue;
+ assert(ctx->state == rsync_state_running);
FD_SET(ctx->fd, &rfds);
if (ctx->fd > n)
n = ctx->fd;
@@ -1772,147 +2014,13 @@ static void rsync_mgr(const rcynic_ctx_t *rc)
}
}
-
-/**
- * Run rsync.
- */
-static void run_rsync(const rcynic_ctx_t *rc,
- rsync_ctx_t *ctx)
-{
- static const char * const rsync_cmd[] = {
- "rsync", "--update", "--times", "--copy-links", "--itemize-changes", NULL
- };
-
- static const char * const rsync_tree_args[] = {
- "--recursive", "--delete", NULL
- };
-
- const char *argv[10];
- path_t path;
- int i, argc = 0, flags, pipe_fds[2];
-
- pipe_fds[0] = pipe_fds[1] = -1;
-
- assert(rc && ctx);
-
- logmsg(rc, log_telemetry, "Fetching %s", ctx->uri.s);
-
- memset(argv, 0, sizeof(argv));
-
- for (i = 0; rsync_cmd[i]; i++) {
- assert(argc < sizeof(argv)/sizeof(*argv));
- argv[argc++] = rsync_cmd[i];
- }
- if (endswith(ctx->uri.s, "/")) {
- for (i = 0; rsync_tree_args[i]; i++) {
- assert(argc < sizeof(argv)/sizeof(*argv));
- argv[argc++] = rsync_tree_args[i];
- }
- }
-
- if (rc->rsync_program)
- argv[0] = rc->rsync_program;
-
- if (!uri_to_filename(rc, &ctx->uri, &path, &rc->unauthenticated)) {
- logmsg(rc, log_data_err, "Couldn't extract filename from URI: %s", ctx->uri.s);
- goto lose;
- }
-
- assert(argc < sizeof(argv)/sizeof(*argv));
- argv[argc++] = ctx->uri.s;
-
- assert(argc < sizeof(argv)/sizeof(*argv));
- argv[argc++] = path.s;
-
- if (!mkdir_maybe(rc, &path)) {
- logmsg(rc, log_sys_err, "Couldn't make target directory: %s", path.s);
- goto lose;
- }
-
- for (i = 0; i < argc; i++)
- logmsg(rc, log_verbose, "rsync argv[%d]: %s", i, argv[i]);
-
- if (pipe(pipe_fds) < 0) {
- logmsg(rc, log_sys_err, "pipe() failed: %s", strerror(errno));
- goto lose;
- }
- ctx->fd = pipe_fds[0];
-
- if ((flags = fcntl(ctx->fd, F_GETFL, 0)) == -1) {
- logmsg(rc, log_sys_err, "fcntl(F_GETFL) failed: %s",
- strerror(errno));
- goto lose;
- }
- flags |= O_NONBLOCK;
- if (fcntl(ctx->fd, F_SETFL, flags) == -1) {
- logmsg(rc, log_sys_err, "fcntl(F_SETFL) failed: %s",
- strerror(errno));
- goto lose;
- }
-
- switch ((ctx->pid = vfork())) {
-
- case -1:
- logmsg(rc, log_sys_err, "vfork() failed: %s", strerror(errno));
- goto lose;
-
- case 0:
- /*
- * Child
- */
-#define whine(msg) write(2, msg, sizeof(msg) - 1)
- if (close(pipe_fds[0]) < 0)
- whine("close(pipe_fds[0]) failed\n");
- else if (dup2(pipe_fds[1], 1) < 0)
- whine("dup2(pipe_fds[1], 1) failed\n");
- else if (dup2(pipe_fds[1], 2) < 0)
- whine("dup2(pipe_fds[1], 2) failed\n");
- else if (close(pipe_fds[1]) < 0)
- whine("close(pipe_fds[1]) failed\n");
- else if (execvp(argv[0], (char * const *) argv) < 0)
- whine("execvp(argv[0], (char * const *) argv) failed\n");
- whine("last system error: ");
- write(2, strerror(errno), strlen(strerror(errno)));
- whine("\n");
- _exit(1);
-#undef whine
-
- default:
- /*
- * Parent
- */
- (void) close(pipe_fds[1]);
- pipe_fds[1] = -1;
- if (rc->rsync_timeout)
- ctx->deadline = time(0) + rc->rsync_timeout;
- logmsg(rc, log_debug, "Subprocess %u started, current subprocess count %d",
- (unsigned) ctx->pid, sk_rsync_ctx_t_num(rc->rsync_queue));
- if (ctx->handler)
- ctx->handler(rc, ctx, rsync_status_pending, &ctx->uri, ctx->wsk);
- return;
-
- }
-
- lose:
- if (pipe_fds[0] != -1)
- (void) close(pipe_fds[0]);
- if (pipe_fds[1] != -1)
- (void) close(pipe_fds[1]);
- if (rc->rsync_queue && ctx)
- (void) sk_rsync_ctx_t_delete_ptr(rc->rsync_queue, ctx);
- if (ctx && ctx->handler)
- ctx->handler(rc, ctx, rsync_status_failed, &ctx->uri, ctx->wsk);
- if (ctx)
- free(ctx);
-}
-
/**
* Set up rsync context and attempt to start it.
*/
-static void rsync(const rcynic_ctx_t *rc,
- const uri_t *uri,
- STACK_OF(walk_ctx_t) *wsk,
- void (*handler)(const rcynic_ctx_t *, const rsync_ctx_t *, const rsync_status_t, const uri_t *, STACK_OF(walk_ctx_t) *))
+static void rsync_init(const rcynic_ctx_t *rc,
+ const uri_t *uri,
+ STACK_OF(walk_ctx_t) *wsk,
+ void (*handler)(const rcynic_ctx_t *, const rsync_ctx_t *, const rsync_status_t, const uri_t *, STACK_OF(walk_ctx_t) *))
{
rsync_ctx_t *ctx = NULL;
@@ -1948,7 +2056,8 @@ static void rsync(const rcynic_ctx_t *rc,
return;
}
- run_rsync(rc, ctx);
+ if (rsync_runable(rc, ctx))
+ rsync_run(rc, ctx);
}
/**
@@ -1958,7 +2067,7 @@ static void rsync_file(const rcynic_ctx_t *rc,
const uri_t *uri)
{
assert(!endswith(uri->s, "/"));
- rsync(rc, uri, NULL, NULL);
+ rsync_init(rc, uri, NULL, NULL);
}
/**
@@ -1970,7 +2079,7 @@ static void rsync_tree(const rcynic_ctx_t *rc,
void (*handler)(const rcynic_ctx_t *, const rsync_ctx_t *, const rsync_status_t, const uri_t *, STACK_OF(walk_ctx_t) *))
{
assert(endswith(uri->s, "/"));
- rsync(rc, uri, wsk, handler);
+ rsync_init(rc, uri, wsk, handler);
}
@@ -3527,14 +3636,14 @@ static void rsync_sia_callback(const rcynic_ctx_t *rc,
{
walk_ctx_t *w = walk_ctx_stack_head(wsk);
- assert(wsk);
+ assert(rc && wsk);
switch (status) {
case rsync_status_pending:
- if (sk_rsync_ctx_t_num(rc->rsync_queue) >= rc->max_parallel_fetches)
+ if (rsync_count_runable(rc) >= rc->max_parallel_fetches)
return;
-
+
if ((wsk = walk_ctx_stack_clone(wsk)) == NULL) {
logmsg(rc, log_sys_err, "walk_ctx_stack_clone() failed, probably memory exhaustion, blundering onwards without forking stack");
return;