aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--rcynic-ng/rcynic.c205
1 files changed, 112 insertions, 93 deletions
diff --git a/rcynic-ng/rcynic.c b/rcynic-ng/rcynic.c
index 23db139f..d8b0d54f 100644
--- a/rcynic-ng/rcynic.c
+++ b/rcynic-ng/rcynic.c
@@ -397,8 +397,11 @@ typedef struct rsync_ctx {
STACK_OF(walk_ctx_t) *stack;
int blocked;
pid_t pid;
- int pipe_fds[2];
+ int fd;
time_t started, deadline;
+ int kill_count;
+ char buffer[URI_MAX * 4];
+ size_t buflen;
} rsync_ctx_t;
DECLARE_STACK_OF(rsync_ctx_t)
@@ -1494,13 +1497,14 @@ typedef enum {
*/
static void rsync_mgr(const rcynic_ctx_t *rc)
{
- char *s, *b, buffer[URI_MAX * 4];
- rsync_ctx_t *ctx = NULL;
+ time_t now = time(0), when;
int i, n, pid_status = -1;
- time_t now = time(0);
+ rsync_ctx_t *ctx = NULL;
struct timeval tv;
+ uri_t uribuf;
fd_set rfds;
pid_t pid;
+ char *s;
assert(rc && rc->rsync_queue);
@@ -1526,106 +1530,111 @@ static void rsync_mgr(const rcynic_ctx_t *rc)
/*
* Child exited, handle that and return
*/
+
for (i = 0; (ctx = sk_rsync_ctx_t_value(rc->rsync_queue, i)) != NULL; ++i)
if (ctx->pid == pid)
break;
+
if (ctx == NULL) {
logmsg(rc, log_sys_err, "Couldn't find rsync context for pid %d", pid);
return;
}
-
- close(ctx->pipe_fds[0]);
-
- assert(i >= 0 && i < sizeof(buffer));
- if (i) {
- buffer[i] = '\0';
- logmsg(rc, log_telemetry, "%s", buffer);
- }
-
- if (n < 0 && errno != EAGAIN)
- logmsg(rc, log_sys_err, "Problem reading rsync's output: %s",
- strerror(errno));
-
- if (rc->rsync_timeout && now >= ctx->deadline)
- logmsg(rc, log_data_err,
- "Fetch of %s took longer than %d seconds, terminating fetch",
- ctx->uri.s, rc->rsync_timeout);
-
- assert(ctx->pid > 0);
- for (i = 0; i < KILL_MAX && pid == 0; i++) {
- if ((pid = waitpid(ctx->pid, &pid_status, WNOHANG)) != 0 && WIFEXITED(pid_status))
- break;
- kill(ctx->pid, SIGTERM);
- sleep(1);
- }
+ close(ctx->fd);
if (WEXITSTATUS(pid_status)) {
logmsg(rc, log_data_err, "rsync exited with status %d fetching %s",
WEXITSTATUS(pid_status), ctx->uri.s);
- mib_increment(rc, &ctx->uri, (rc->rsync_timeout && now >= ctx->deadline
- ? rsync_timed_out
- : rsync_failed));
+ mib_increment(rc, &ctx->uri,
+ (rc->rsync_timeout && now >= ctx->deadline
+ ? rsync_timed_out
+ : rsync_failed));
} else {
mib_increment(rc, &ctx->uri, rsync_succeeded);
}
- assert(strlen(ctx->uri.s) > SIZEOF_RSYNC);
- strcpy(buffer, ctx->uri.s + SIZEOF_RSYNC);
- if ((s = strrchr(buffer, '/')) != NULL && s[1] == '\0')
+ uribuf = ctx->uri;
+ while ((s = strrchr(uribuf.s, '/')) != NULL && s[1] == '\0')
*s = '\0';
- if (!sk_OPENSSL_STRING_push_strdup(rc->rsync_cache, buffer))
+ assert(strlen(ctx->uri.s) > SIZEOF_RSYNC);
+ if (!sk_OPENSSL_STRING_push_strdup(rc->rsync_cache, uribuf.s + SIZEOF_RSYNC))
logmsg(rc, log_sys_err, "Couldn't cache URI %s, blundering onward", ctx->uri.s);
+ return;
+ }
+
+ /*
+ * Deal with children that have been running too long.
+ */
+ if (rc->rsync_timeout) {
+ int signaled = 0;
+ for (i = 0; (ctx = sk_rsync_ctx_t_value(rc->rsync_queue, i)) != NULL; ++i) {
+ if (ctx->pid <= 0 || now < ctx->deadline ||
+ (waitpid(ctx->pid, &pid_status, WNOHANG) == ctx->pid && WIFEXITED(pid_status)))
+ continue;
+ (void) kill(ctx->pid, SIGTERM);
+ ctx->deadline = now + 1;
+ signaled++;
+ }
+ if (signaled)
+ return;
}
/*
- * Somewhere we have to deal with timing out children that have been
- * running too long.
+ * Check for input from children if we get this far.
*/
- /* Unconverted code below here */
+ FD_ZERO(&rfds);
+ when = 0;
+ n = 0;
- n = -1;
- i = 0;
- while ((pid = waitpid(-1, &pid_status, WNOHANG)) == 0 &&
- (!rc->rsync_timeout || (now = time(0)) < ctx->deadline)) {
- FD_ZERO(&rfds);
- FD_SET(ctx->pipe_fds[0], &rfds);
- if (rc->rsync_timeout) {
- tv.tv_sec = ctx->deadline - now;
- tv.tv_usec = 0;
- n = select(ctx->pipe_fds[0] + 1, &rfds, NULL, NULL, &tv);
- } else {
- n = select(ctx->pipe_fds[0] + 1, &rfds, NULL, NULL, NULL);
- }
- if (n == 0 || (n < 0 && errno == EINTR))
+ for (i = 0; (ctx = sk_rsync_ctx_t_value(rc->rsync_queue, i)) != NULL; ++i) {
+ if (ctx->fd <= 0)
continue;
- if (n < 0)
- break;
- while ((n = read(ctx->pipe_fds[0], buffer + i, sizeof(buffer) - i - 1)) > 0) {
- n += i;
- assert(n < sizeof(buffer));
- buffer[n] = '\0';
- for (b = buffer; (s = strchr(b, '\n')) != NULL; b = s) {
+ FD_SET(ctx->fd, &rfds);
+ if (ctx->fd > n)
+ n = ctx->fd;
+ if (when == 0 || ctx->deadline < when)
+ when = ctx->deadline;
+ }
+
+ if (rc->rsync_timeout && when != 0) {
+ tv.tv_sec = when - now;
+ tv.tv_usec = 0;
+ n = select(n + 1, &rfds, NULL, NULL, &tv);
+ } else {
+ n = select(n + 1, &rfds, NULL, NULL, NULL);
+ }
+
+ if (n <= 0)
+ return;
+
+ for (i = 0; (ctx = sk_rsync_ctx_t_value(rc->rsync_queue, i)) != NULL; ++i) {
+ if (ctx->fd <= 0 || !FD_ISSET(ctx->fd, &rfds))
+ continue;
+
+ assert(ctx->buflen < sizeof(ctx->buffer) - 1);
+
+ while ((n = read(ctx->fd, ctx->buffer + ctx->buflen, sizeof(ctx->buffer) - 1 - ctx->buflen)) > 0) {
+ ctx->buflen += n;
+ assert(ctx->buflen < sizeof(ctx->buffer));
+ ctx->buffer[ctx->buflen] = '\0';
+
+ while ((s = strchr(ctx->buffer, '\n')) != NULL) {
*s++ = '\0';
- logmsg(rc, log_telemetry, "%s", b);
+ logmsg(rc, log_telemetry, "%s", ctx->buffer);
+ assert(s >= ctx->buffer);
+ ctx->buflen = s - ctx->buffer;
+ if (ctx->buflen > 0)
+ memmove(ctx->buffer, s, ctx->buflen);
}
- i = strlen(b);
- assert(i < sizeof(buffer) && b + i < buffer + sizeof(buffer));
- if (b == buffer && i == sizeof(buffer) - 1) {
- logmsg(rc, log_telemetry, "%s\\", b);
- i = 0;
- }
- if (i > 0) {
- memmove(buffer, b, i);
+
+ if (ctx->buflen == sizeof(ctx->buffer) - 1) {
+ logmsg(rc, log_telemetry, "%s", ctx->buffer);
+ ctx->buflen = 0;
}
}
- if (n == 0 || (n < 0 && errno != EAGAIN))
- break;
}
-
- return;
}
/**
@@ -1660,7 +1669,7 @@ static rsync_status_t rsync(const rcynic_ctx_t *rc,
path_t path;
rsync_ctx_t *ctx = NULL;
rsync_status_t ret = rsync_status_failed;
- int i, n, argc = 0, pid_status = -1;
+ int i, n, argc = 0, pid_status = -1, pipe_fds[2];
time_t now;
struct timeval tv;
fd_set rfds;
@@ -1717,37 +1726,38 @@ static rsync_status_t rsync(const rcynic_ctx_t *rc,
}
memset(ctx, 0, sizeof(*ctx));
- ctx->pipe_fds[0] = ctx->pipe_fds[1] = -1;
+ ctx->fd = -1;
#warning This leaks memory and will until we clean up rsync_ctx_t handling here
- if (pipe(ctx->pipe_fds) < 0) {
+ if (pipe(pipe_fds) < 0) {
logmsg(rc, log_sys_err, "pipe() failed: %s", strerror(errno));
return rsync_status_failed;
}
+ ctx->fd = pipe_fds[0];
- if ((i = fcntl(ctx->pipe_fds[0], F_GETFL, 0)) == -1 ||
- fcntl(ctx->pipe_fds[0], F_SETFL, i | O_NONBLOCK) == -1) {
+ if ((i = fcntl(ctx->fd, F_GETFL, 0)) == -1 ||
+ fcntl(ctx->fd, F_SETFL, i | O_NONBLOCK) == -1) {
logmsg(rc, log_sys_err,
"Couldn't set rsync's output stream non-blocking: %s",
strerror(errno));
- close(ctx->pipe_fds[0]);
- close(ctx->pipe_fds[1]);
+ close(pipe_fds[0]);
+ close(pipe_fds[1]);
return rsync_status_failed;
}
switch ((ctx->pid = vfork())) {
case -1:
logmsg(rc, log_sys_err, "vfork() failed: %s", strerror(errno));
- close(ctx->pipe_fds[0]);
- close(ctx->pipe_fds[1]);
+ close(pipe_fds[0]);
+ close(pipe_fds[1]);
return rsync_status_failed;
case 0:
#define whine(msg) write(2, msg, sizeof(msg) - 1)
- close(ctx->pipe_fds[0]);
- if (dup2(ctx->pipe_fds[1], 1) < 0)
+ close(pipe_fds[0]);
+ if (dup2(pipe_fds[1], 1) < 0)
whine("dup2(1) failed\n");
- else if (dup2(ctx->pipe_fds[1], 2) < 0)
+ else if (dup2(pipe_fds[1], 2) < 0)
whine("dup2(2) failed\n");
else if (execvp(argv[0], (char * const *) argv) < 0)
whine("execvp() failed\n");
@@ -1758,9 +1768,10 @@ static rsync_status_t rsync(const rcynic_ctx_t *rc,
#undef whine
}
- close(ctx->pipe_fds[1]);
+ (void) close(pipe_fds[1]);
- ctx->started = now = time(0);
+ now = time(0);
+ ctx->started = now;
ctx->deadline = now + rc->rsync_timeout;
n = -1;
@@ -1768,19 +1779,19 @@ static rsync_status_t rsync(const rcynic_ctx_t *rc,
while ((wpid = waitpid(-1, &pid_status, WNOHANG)) == 0 &&
(!rc->rsync_timeout || (now = time(0)) < ctx->deadline)) {
FD_ZERO(&rfds);
- FD_SET(ctx->pipe_fds[0], &rfds);
+ FD_SET(ctx->fd, &rfds);
if (rc->rsync_timeout) {
tv.tv_sec = ctx->deadline - now;
tv.tv_usec = 0;
- n = select(ctx->pipe_fds[0] + 1, &rfds, NULL, NULL, &tv);
+ n = select(ctx->fd + 1, &rfds, NULL, NULL, &tv);
} else {
- n = select(ctx->pipe_fds[0] + 1, &rfds, NULL, NULL, NULL);
+ n = select(ctx->fd + 1, &rfds, NULL, NULL, NULL);
}
if (n == 0 || (n < 0 && errno == EINTR))
continue;
if (n < 0)
break;
- while ((n = read(ctx->pipe_fds[0], buffer + i, sizeof(buffer) - i - 1)) > 0) {
+ while ((n = read(ctx->fd, buffer + i, sizeof(buffer) - i - 1)) > 0) {
n += i;
assert(n < sizeof(buffer));
buffer[n] = '\0';
@@ -1802,7 +1813,7 @@ static rsync_status_t rsync(const rcynic_ctx_t *rc,
break;
}
- close(ctx->pipe_fds[0]);
+ close(ctx->fd);
assert(i >= 0 && i < sizeof(buffer));
if (i) {
@@ -3914,8 +3925,16 @@ int main(int argc, char *argv[])
goto done;
}
logmsg(&rc, log_telemetry, "Processing trust anchor from URI %s", uri.s);
- if (rsync_file(&rc, &uri) != rsync_status_done)
+ switch (rsync_file(&rc, &uri)) {
+ case rsync_status_pending:
+ logmsg(&rc, log_sys_err, "I don't know how to handle rsync_status_pending yet, help!");
+ goto done;
+ case rsync_status_failed:
logmsg(&rc, log_data_err, "Could not fetch trust anchor from %s", uri.s);
+ break;
+ case rsync_status_done:
+ break;
+ }
if (bio)
pkey = d2i_PUBKEY_bio(bio, NULL);
BIO_free_all(bio);