diff options
author | Rob Austein <sra@hactrn.net> | 2011-06-20 23:58:40 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2011-06-20 23:58:40 +0000 |
commit | 427c404c8a0adef795ac8f2c5e3a8c518b37978e (patch) | |
tree | 36e544bf095d2252efffbd483a802ed38d9a3fba | |
parent | a4e25d8e8421afc49aae027bd4fbed4127161da9 (diff) |
Checkpoint
svn path=/rcynic-ng/rcynic.c; revision=3890
-rw-r--r-- | rcynic-ng/rcynic.c | 205 |
1 files changed, 112 insertions, 93 deletions
diff --git a/rcynic-ng/rcynic.c b/rcynic-ng/rcynic.c index 23db139f..d8b0d54f 100644 --- a/rcynic-ng/rcynic.c +++ b/rcynic-ng/rcynic.c @@ -397,8 +397,11 @@ typedef struct rsync_ctx { STACK_OF(walk_ctx_t) *stack; int blocked; pid_t pid; - int pipe_fds[2]; + int fd; time_t started, deadline; + int kill_count; + char buffer[URI_MAX * 4]; + size_t buflen; } rsync_ctx_t; DECLARE_STACK_OF(rsync_ctx_t) @@ -1494,13 +1497,14 @@ typedef enum { */ static void rsync_mgr(const rcynic_ctx_t *rc) { - char *s, *b, buffer[URI_MAX * 4]; - rsync_ctx_t *ctx = NULL; + time_t now = time(0), when; int i, n, pid_status = -1; - time_t now = time(0); + rsync_ctx_t *ctx = NULL; struct timeval tv; + uri_t uribuf; fd_set rfds; pid_t pid; + char *s; assert(rc && rc->rsync_queue); @@ -1526,106 +1530,111 @@ static void rsync_mgr(const rcynic_ctx_t *rc) /* * Child exited, handle that and return */ + for (i = 0; (ctx = sk_rsync_ctx_t_value(rc->rsync_queue, i)) != NULL; ++i) if (ctx->pid == pid) break; + if (ctx == NULL) { logmsg(rc, log_sys_err, "Couldn't find rsync context for pid %d", pid); return; } - - close(ctx->pipe_fds[0]); - - assert(i >= 0 && i < sizeof(buffer)); - if (i) { - buffer[i] = '\0'; - logmsg(rc, log_telemetry, "%s", buffer); - } - - if (n < 0 && errno != EAGAIN) - logmsg(rc, log_sys_err, "Problem reading rsync's output: %s", - strerror(errno)); - - if (rc->rsync_timeout && now >= ctx->deadline) - logmsg(rc, log_data_err, - "Fetch of %s took longer than %d seconds, terminating fetch", - ctx->uri.s, rc->rsync_timeout); - - assert(ctx->pid > 0); - for (i = 0; i < KILL_MAX && pid == 0; i++) { - if ((pid = waitpid(ctx->pid, &pid_status, WNOHANG)) != 0 && WIFEXITED(pid_status)) - break; - kill(ctx->pid, SIGTERM); - sleep(1); - } + close(ctx->fd); if (WEXITSTATUS(pid_status)) { logmsg(rc, log_data_err, "rsync exited with status %d fetching %s", WEXITSTATUS(pid_status), ctx->uri.s); - mib_increment(rc, &ctx->uri, (rc->rsync_timeout && now >= ctx->deadline - ? rsync_timed_out - : rsync_failed)); + mib_increment(rc, &ctx->uri, + (rc->rsync_timeout && now >= ctx->deadline + ? rsync_timed_out + : rsync_failed)); } else { mib_increment(rc, &ctx->uri, rsync_succeeded); } - assert(strlen(ctx->uri.s) > SIZEOF_RSYNC); - strcpy(buffer, ctx->uri.s + SIZEOF_RSYNC); - if ((s = strrchr(buffer, '/')) != NULL && s[1] == '\0') + uribuf = ctx->uri; + while ((s = strrchr(uribuf.s, '/')) != NULL && s[1] == '\0') *s = '\0'; - if (!sk_OPENSSL_STRING_push_strdup(rc->rsync_cache, buffer)) + assert(strlen(ctx->uri.s) > SIZEOF_RSYNC); + if (!sk_OPENSSL_STRING_push_strdup(rc->rsync_cache, uribuf.s + SIZEOF_RSYNC)) logmsg(rc, log_sys_err, "Couldn't cache URI %s, blundering onward", ctx->uri.s); + return; + } + + /* + * Deal with children that have been running too long. + */ + if (rc->rsync_timeout) { + int signaled = 0; + for (i = 0; (ctx = sk_rsync_ctx_t_value(rc->rsync_queue, i)) != NULL; ++i) { + if (ctx->pid <= 0 || now < ctx->deadline || + (waitpid(ctx->pid, &pid_status, WNOHANG) == ctx->pid && WIFEXITED(pid_status))) + continue; + (void) kill(ctx->pid, SIGTERM); + ctx->deadline = now + 1; + signaled++; + } + if (signaled) + return; } /* - * Somewhere we have to deal with timing out children that have been - * running too long. + * Check for input from children if we get this far. */ - /* Unconverted code below here */ + FD_ZERO(&rfds); + when = 0; + n = 0; - n = -1; - i = 0; - while ((pid = waitpid(-1, &pid_status, WNOHANG)) == 0 && - (!rc->rsync_timeout || (now = time(0)) < ctx->deadline)) { - FD_ZERO(&rfds); - FD_SET(ctx->pipe_fds[0], &rfds); - if (rc->rsync_timeout) { - tv.tv_sec = ctx->deadline - now; - tv.tv_usec = 0; - n = select(ctx->pipe_fds[0] + 1, &rfds, NULL, NULL, &tv); - } else { - n = select(ctx->pipe_fds[0] + 1, &rfds, NULL, NULL, NULL); - } - if (n == 0 || (n < 0 && errno == EINTR)) + for (i = 0; (ctx = sk_rsync_ctx_t_value(rc->rsync_queue, i)) != NULL; ++i) { + if (ctx->fd <= 0) continue; - if (n < 0) - break; - while ((n = read(ctx->pipe_fds[0], buffer + i, sizeof(buffer) - i - 1)) > 0) { - n += i; - assert(n < sizeof(buffer)); - buffer[n] = '\0'; - for (b = buffer; (s = strchr(b, '\n')) != NULL; b = s) { + FD_SET(ctx->fd, &rfds); + if (ctx->fd > n) + n = ctx->fd; + if (when == 0 || ctx->deadline < when) + when = ctx->deadline; + } + + if (rc->rsync_timeout && when != 0) { + tv.tv_sec = when - now; + tv.tv_usec = 0; + n = select(n + 1, &rfds, NULL, NULL, &tv); + } else { + n = select(n + 1, &rfds, NULL, NULL, NULL); + } + + if (n <= 0) + return; + + for (i = 0; (ctx = sk_rsync_ctx_t_value(rc->rsync_queue, i)) != NULL; ++i) { + if (ctx->fd <= 0 || !FD_ISSET(ctx->fd, &rfds)) + continue; + + assert(ctx->buflen < sizeof(ctx->buffer) - 1); + + while ((n = read(ctx->fd, ctx->buffer + ctx->buflen, sizeof(ctx->buffer) - 1 - ctx->buflen)) > 0) { + ctx->buflen += n; + assert(ctx->buflen < sizeof(ctx->buffer)); + ctx->buffer[ctx->buflen] = '\0'; + + while ((s = strchr(ctx->buffer, '\n')) != NULL) { *s++ = '\0'; - logmsg(rc, log_telemetry, "%s", b); + logmsg(rc, log_telemetry, "%s", ctx->buffer); + assert(s >= ctx->buffer); + ctx->buflen = s - ctx->buffer; + if (ctx->buflen > 0) + memmove(ctx->buffer, s, ctx->buflen); } - i = strlen(b); - assert(i < sizeof(buffer) && b + i < buffer + sizeof(buffer)); - if (b == buffer && i == sizeof(buffer) - 1) { - logmsg(rc, log_telemetry, "%s\\", b); - i = 0; - } - if (i > 0) { - memmove(buffer, b, i); + + if (ctx->buflen == sizeof(ctx->buffer) - 1) { + logmsg(rc, log_telemetry, "%s", ctx->buffer); + ctx->buflen = 0; } } - if (n == 0 || (n < 0 && errno != EAGAIN)) - break; } - - return; } /** @@ -1660,7 +1669,7 @@ static rsync_status_t rsync(const rcynic_ctx_t *rc, path_t path; rsync_ctx_t *ctx = NULL; rsync_status_t ret = rsync_status_failed; - int i, n, argc = 0, pid_status = -1; + int i, n, argc = 0, pid_status = -1, pipe_fds[2]; time_t now; struct timeval tv; fd_set rfds; @@ -1717,37 +1726,38 @@ static rsync_status_t rsync(const rcynic_ctx_t *rc, } memset(ctx, 0, sizeof(*ctx)); - ctx->pipe_fds[0] = ctx->pipe_fds[1] = -1; + ctx->fd = -1; #warning This leaks memory and will until we clean up rsync_ctx_t handling here - if (pipe(ctx->pipe_fds) < 0) { + if (pipe(pipe_fds) < 0) { logmsg(rc, log_sys_err, "pipe() failed: %s", strerror(errno)); return rsync_status_failed; } + ctx->fd = pipe_fds[0]; - if ((i = fcntl(ctx->pipe_fds[0], F_GETFL, 0)) == -1 || - fcntl(ctx->pipe_fds[0], F_SETFL, i | O_NONBLOCK) == -1) { + if ((i = fcntl(ctx->fd, F_GETFL, 0)) == -1 || + fcntl(ctx->fd, F_SETFL, i | O_NONBLOCK) == -1) { logmsg(rc, log_sys_err, "Couldn't set rsync's output stream non-blocking: %s", strerror(errno)); - close(ctx->pipe_fds[0]); - close(ctx->pipe_fds[1]); + close(pipe_fds[0]); + close(pipe_fds[1]); return rsync_status_failed; } switch ((ctx->pid = vfork())) { case -1: logmsg(rc, log_sys_err, "vfork() failed: %s", strerror(errno)); - close(ctx->pipe_fds[0]); - close(ctx->pipe_fds[1]); + close(pipe_fds[0]); + close(pipe_fds[1]); return rsync_status_failed; case 0: #define whine(msg) write(2, msg, sizeof(msg) - 1) - close(ctx->pipe_fds[0]); - if (dup2(ctx->pipe_fds[1], 1) < 0) + close(pipe_fds[0]); + if (dup2(pipe_fds[1], 1) < 0) whine("dup2(1) failed\n"); - else if (dup2(ctx->pipe_fds[1], 2) < 0) + else if (dup2(pipe_fds[1], 2) < 0) whine("dup2(2) failed\n"); else if (execvp(argv[0], (char * const *) argv) < 0) whine("execvp() failed\n"); @@ -1758,9 +1768,10 @@ static rsync_status_t rsync(const rcynic_ctx_t *rc, #undef whine } - close(ctx->pipe_fds[1]); + (void) close(pipe_fds[1]); - ctx->started = now = time(0); + now = time(0); + ctx->started = now; ctx->deadline = now + rc->rsync_timeout; n = -1; @@ -1768,19 +1779,19 @@ static rsync_status_t rsync(const rcynic_ctx_t *rc, while ((wpid = waitpid(-1, &pid_status, WNOHANG)) == 0 && (!rc->rsync_timeout || (now = time(0)) < ctx->deadline)) { FD_ZERO(&rfds); - FD_SET(ctx->pipe_fds[0], &rfds); + FD_SET(ctx->fd, &rfds); if (rc->rsync_timeout) { tv.tv_sec = ctx->deadline - now; tv.tv_usec = 0; - n = select(ctx->pipe_fds[0] + 1, &rfds, NULL, NULL, &tv); + n = select(ctx->fd + 1, &rfds, NULL, NULL, &tv); } else { - n = select(ctx->pipe_fds[0] + 1, &rfds, NULL, NULL, NULL); + n = select(ctx->fd + 1, &rfds, NULL, NULL, NULL); } if (n == 0 || (n < 0 && errno == EINTR)) continue; if (n < 0) break; - while ((n = read(ctx->pipe_fds[0], buffer + i, sizeof(buffer) - i - 1)) > 0) { + while ((n = read(ctx->fd, buffer + i, sizeof(buffer) - i - 1)) > 0) { n += i; assert(n < sizeof(buffer)); buffer[n] = '\0'; @@ -1802,7 +1813,7 @@ static rsync_status_t rsync(const rcynic_ctx_t *rc, break; } - close(ctx->pipe_fds[0]); + close(ctx->fd); assert(i >= 0 && i < sizeof(buffer)); if (i) { @@ -3914,8 +3925,16 @@ int main(int argc, char *argv[]) goto done; } logmsg(&rc, log_telemetry, "Processing trust anchor from URI %s", uri.s); - if (rsync_file(&rc, &uri) != rsync_status_done) + switch (rsync_file(&rc, &uri)) { + case rsync_status_pending: + logmsg(&rc, log_sys_err, "I don't know how to handle rsync_status_pending yet, help!"); + goto done; + case rsync_status_failed: logmsg(&rc, log_data_err, "Could not fetch trust anchor from %s", uri.s); + break; + case rsync_status_done: + break; + } if (bio) pkey = d2i_PUBKEY_bio(bio, NULL); BIO_free_all(bio); |