aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--rcynic-ng/rcynic.c197
1 files changed, 72 insertions, 125 deletions
diff --git a/rcynic-ng/rcynic.c b/rcynic-ng/rcynic.c
index d8b0d54f..2291896c 100644
--- a/rcynic-ng/rcynic.c
+++ b/rcynic-ng/rcynic.c
@@ -389,11 +389,20 @@ typedef struct walk_ctx {
DECLARE_STACK_OF(walk_ctx_t)
/**
+ * Return codes from rsync functions.
+ */
+typedef enum {
+ rsync_status_done, /* Request completed */
+ rsync_status_failed, /* Request failed */
+ rsync_status_pending /* Request in progress */
+} rsync_status_t;
+
+/**
* Context for asyncronous rsync.
*/
typedef struct rsync_ctx {
uri_t uri;
- void (*handler)(const rcynic_ctx_t *, STACK_OF(walk_ctx_t) *, const uri_t *);
+ void (*handler)(const rcynic_ctx_t *, const rsync_status_t, STACK_OF(walk_ctx_t) *, const uri_t *);
STACK_OF(walk_ctx_t) *stack;
int blocked;
pid_t pid;
@@ -1473,15 +1482,6 @@ static int rsync_cached_uri(const rcynic_ctx_t *rc,
/**
- * Return codes from rsync functions.
- */
-typedef enum {
- rsync_status_done, /* Request completed */
- rsync_status_failed, /* Request failed */
- rsync_status_pending /* Request in progress */
-} rsync_status_t;
-
-/**
* Manager for queue of rsync tasks in progress.
*
* General plan here is to process one completed child, or output has
@@ -1542,7 +1542,14 @@ static void rsync_mgr(const rcynic_ctx_t *rc)
close(ctx->fd);
- if (WEXITSTATUS(pid_status)) {
+ if (ctx->buflen > 0) {
+ assert(ctx->buflen < sizeof(ctx->buffer));
+ ctx->buffer[ctx->buflen] = '\0';
+ logmsg(rc, log_telemetry, "rsync[%u]: %s", ctx->pid, ctx->buffer);
+ ctx->buflen = 0;
+ }
+
+ if (WEXITSTATUS(pid_status) != 0) {
logmsg(rc, log_data_err, "rsync exited with status %d fetching %s",
WEXITSTATUS(pid_status), ctx->uri.s);
mib_increment(rc, &ctx->uri,
@@ -1560,6 +1567,10 @@ static void rsync_mgr(const rcynic_ctx_t *rc)
if (!sk_OPENSSL_STRING_push_strdup(rc->rsync_cache, uribuf.s + SIZEOF_RSYNC))
logmsg(rc, log_sys_err, "Couldn't cache URI %s, blundering onward", ctx->uri.s);
+ if (ctx->handler)
+ ctx->handler(rc, WEXITSTATUS(pid_status) ? rsync_status_failed : rsync_status_done, ctx->stack, &ctx->uri);
+ sk_rsync_ctx_t_delete_ptr(rc->rsync_queue, ctx);
+ free(ctx);
return;
}
@@ -1572,7 +1583,7 @@ static void rsync_mgr(const rcynic_ctx_t *rc)
if (ctx->pid <= 0 || now < ctx->deadline ||
(waitpid(ctx->pid, &pid_status, WNOHANG) == ctx->pid && WIFEXITED(pid_status)))
continue;
- (void) kill(ctx->pid, SIGTERM);
+ (void) kill(ctx->pid, ctx->kill_count++ < KILL_MAX ? SIGTERM : SIGKILL);
ctx->deadline = now + 1;
signaled++;
}
@@ -1622,7 +1633,7 @@ static void rsync_mgr(const rcynic_ctx_t *rc)
while ((s = strchr(ctx->buffer, '\n')) != NULL) {
*s++ = '\0';
- logmsg(rc, log_telemetry, "%s", ctx->buffer);
+ logmsg(rc, log_telemetry, "rsync[%u]: %s", ctx->pid, ctx->buffer);
assert(s >= ctx->buffer);
ctx->buflen = s - ctx->buffer;
if (ctx->buflen > 0)
@@ -1630,13 +1641,14 @@ static void rsync_mgr(const rcynic_ctx_t *rc)
}
if (ctx->buflen == sizeof(ctx->buffer) - 1) {
- logmsg(rc, log_telemetry, "%s", ctx->buffer);
+ logmsg(rc, log_telemetry, "rsync[%u]: %s", ctx->pid, ctx->buffer);
ctx->buflen = 0;
}
}
}
}
+
/**
* Run rsync. This is fairly nasty, because we need to:
*
@@ -1665,15 +1677,11 @@ static rsync_status_t rsync(const rcynic_ctx_t *rc,
};
const char *argv[100];
- char *s, *b, buffer[URI_MAX * 4];
path_t path;
rsync_ctx_t *ctx = NULL;
- rsync_status_t ret = rsync_status_failed;
- int i, n, argc = 0, pid_status = -1, pipe_fds[2];
- time_t now;
- struct timeval tv;
- fd_set rfds;
- pid_t wpid;
+ int i, argc = 0, flags, pipe_fds[2];
+
+ pipe_fds[0] = pipe_fds[1] = -1;
assert(rc && uri);
@@ -1724,43 +1732,44 @@ static rsync_status_t rsync(const rcynic_ctx_t *rc,
logmsg(rc, log_sys_err, "malloc(rsync_ctxt_t) failed");
return rsync_status_failed;
}
-
memset(ctx, 0, sizeof(*ctx));
+ memcpy(&ctx->uri, uri, sizeof(ctx->uri));
ctx->fd = -1;
-#warning This leaks memory and will until we clean up rsync_ctx_t handling here
-
if (pipe(pipe_fds) < 0) {
logmsg(rc, log_sys_err, "pipe() failed: %s", strerror(errno));
- return rsync_status_failed;
+ goto lose;
}
ctx->fd = pipe_fds[0];
- if ((i = fcntl(ctx->fd, F_GETFL, 0)) == -1 ||
- fcntl(ctx->fd, F_SETFL, i | O_NONBLOCK) == -1) {
- logmsg(rc, log_sys_err,
- "Couldn't set rsync's output stream non-blocking: %s",
+ if ((flags = fcntl(ctx->fd, F_GETFL, 0)) == -1) {
+ logmsg(rc, log_sys_err, "fcntl(F_GETFL) failed: %s",
strerror(errno));
- close(pipe_fds[0]);
- close(pipe_fds[1]);
- return rsync_status_failed;
+ goto lose;
+ }
+ flags |= O_NONBLOCK;
+ if (fcntl(ctx->fd, F_SETFL, flags) == -1) {
+ logmsg(rc, log_sys_err, "fcntl(F_SETFL) failed: %s",
+ strerror(errno));
+ goto lose;
}
switch ((ctx->pid = vfork())) {
case -1:
logmsg(rc, log_sys_err, "vfork() failed: %s", strerror(errno));
- close(pipe_fds[0]);
- close(pipe_fds[1]);
- return rsync_status_failed;
+ goto lose;
case 0:
#define whine(msg) write(2, msg, sizeof(msg) - 1)
- close(pipe_fds[0]);
- if (dup2(pipe_fds[1], 1) < 0)
- whine("dup2(1) failed\n");
+ if (close(pipe_fds[0]) < 0)
+ whine("close(pipe_fds[0]) failed\n");
+ else if (dup2(pipe_fds[1], 1) < 0)
+ whine("dup2(pipe_fds[1], 1) failed\n");
else if (dup2(pipe_fds[1], 2) < 0)
- whine("dup2(2) failed\n");
+ whine("dup2(pipe_fds[1], 2) failed\n");
+ else if (close(pipe_fds[1]) < 0)
+ whine("close(pipe_fds[1]) failed\n");
else if (execvp(argv[0], (char * const *) argv) < 0)
- whine("execvp() failed\n");
+ whine("execvp(argv[0], (char * const *) argv) failed\n");
whine("last system error: ");
write(2, strerror(errno), strlen(strerror(errno)));
whine("\n");
@@ -1769,95 +1778,28 @@ static rsync_status_t rsync(const rcynic_ctx_t *rc,
}
(void) close(pipe_fds[1]);
+ pipe_fds[1] = -1;
- now = time(0);
- ctx->started = now;
- ctx->deadline = now + rc->rsync_timeout;
-
- n = -1;
- i = 0;
- while ((wpid = waitpid(-1, &pid_status, WNOHANG)) == 0 &&
- (!rc->rsync_timeout || (now = time(0)) < ctx->deadline)) {
- FD_ZERO(&rfds);
- FD_SET(ctx->fd, &rfds);
- if (rc->rsync_timeout) {
- tv.tv_sec = ctx->deadline - now;
- tv.tv_usec = 0;
- n = select(ctx->fd + 1, &rfds, NULL, NULL, &tv);
- } else {
- n = select(ctx->fd + 1, &rfds, NULL, NULL, NULL);
- }
- if (n == 0 || (n < 0 && errno == EINTR))
- continue;
- if (n < 0)
- break;
- while ((n = read(ctx->fd, buffer + i, sizeof(buffer) - i - 1)) > 0) {
- n += i;
- assert(n < sizeof(buffer));
- buffer[n] = '\0';
- for (b = buffer; (s = strchr(b, '\n')) != NULL; b = s) {
- *s++ = '\0';
- logmsg(rc, log_telemetry, "%s", b);
- }
- i = strlen(b);
- assert(i < sizeof(buffer) && b + i < buffer + sizeof(buffer));
- if (b == buffer && i == sizeof(buffer) - 1) {
- logmsg(rc, log_telemetry, "%s\\", b);
- i = 0;
- }
- if (i > 0) {
- memmove(buffer, b, i);
- }
- }
- if (n == 0 || (n < 0 && errno != EAGAIN))
- break;
- }
-
- close(ctx->fd);
-
- assert(i >= 0 && i < sizeof(buffer));
- if (i) {
- buffer[i] = '\0';
- logmsg(rc, log_telemetry, "%s", buffer);
- }
-
- if (n < 0 && errno != EAGAIN)
- logmsg(rc, log_sys_err, "Problem reading rsync's output: %s",
- strerror(errno));
-
- if (rc->rsync_timeout && now >= ctx->deadline)
- logmsg(rc, log_data_err,
- "Fetch of %s took longer than %d seconds, terminating fetch",
- uri->s, rc->rsync_timeout);
-
- assert(ctx->pid > 0);
- for (i = 0; i < KILL_MAX && wpid == 0; i++) {
- if ((wpid = waitpid(ctx->pid, &pid_status, WNOHANG)) != 0 && WIFEXITED(pid_status))
- break;
- kill(ctx->pid, SIGTERM);
- sleep(1);
+ if (!sk_rsync_ctx_t_push(rc->rsync_queue, ctx)) {
+ logmsg(rc, log_sys_err, "Couldn't push rsync state object onto queue, punting %s", uri->s);
+ goto lose;
}
- if (WEXITSTATUS(pid_status)) {
- logmsg(rc, log_data_err, "rsync exited with status %d fetching %s",
- WEXITSTATUS(pid_status), uri->s);
- ret = rsync_status_failed;
- mib_increment(rc, uri, (rc->rsync_timeout && now >= ctx->deadline
- ? rsync_timed_out
- : rsync_failed));
- } else {
- ret = rsync_status_done;
- mib_increment(rc, uri, rsync_succeeded);
- }
-
- assert(strlen(uri->s) > SIZEOF_RSYNC);
- strcpy(buffer, uri->s + SIZEOF_RSYNC);
- if ((s = strrchr(buffer, '/')) != NULL && s[1] == '\0')
- *s = '\0';
- if (!sk_OPENSSL_STRING_push_strdup(rc->rsync_cache, buffer))
- logmsg(rc, log_sys_err, "Couldn't cache URI %s, blundering onward", uri->s);
+ /*
+ * XXX Temporary control loop, needs to move to caller
+ */
+ while (sk_rsync_ctx_t_num(rc->rsync_queue) > 0)
+ rsync_mgr(rc);
+ return rsync_status_done;
- return ret;
+ lose:
+ if (pipe_fds[0] != -1)
+ (void) close(pipe_fds[0]);
+ if (pipe_fds[1] != -1)
+ (void) close(pipe_fds[1]);
+ if (ctx)
+ free(ctx);
+ return rsync_status_failed;
}
/**
@@ -3798,6 +3740,11 @@ int main(int argc, char *argv[])
goto done;
}
+ if ((rc.rsync_queue = sk_rsync_ctx_t_new_null()) == NULL) {
+ logmsg(&rc, log_sys_err, "Couldn't allocate rsync_queue");
+ goto done;
+ }
+
rc.use_syslog = use_syslog;
if (use_syslog)