diff options
author | Rob Austein <sra@hactrn.net> | 2011-06-21 16:45:28 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2011-06-21 16:45:28 +0000 |
commit | 6f2a4b3768940617c0943afbe12edbe18b079c07 (patch) | |
tree | 83d438d91d5f494ac6c5ee6bfd34fcdec79220ce | |
parent | 427c404c8a0adef795ac8f2c5e3a8c518b37978e (diff) |
Checkpoint
svn path=/rcynic-ng/rcynic.c; revision=3891
-rw-r--r-- | rcynic-ng/rcynic.c | 197 |
1 files changed, 72 insertions, 125 deletions
diff --git a/rcynic-ng/rcynic.c b/rcynic-ng/rcynic.c index d8b0d54f..2291896c 100644 --- a/rcynic-ng/rcynic.c +++ b/rcynic-ng/rcynic.c @@ -389,11 +389,20 @@ typedef struct walk_ctx { DECLARE_STACK_OF(walk_ctx_t) /** + * Return codes from rsync functions. + */ +typedef enum { + rsync_status_done, /* Request completed */ + rsync_status_failed, /* Request failed */ + rsync_status_pending /* Request in progress */ +} rsync_status_t; + +/** * Context for asyncronous rsync. */ typedef struct rsync_ctx { uri_t uri; - void (*handler)(const rcynic_ctx_t *, STACK_OF(walk_ctx_t) *, const uri_t *); + void (*handler)(const rcynic_ctx_t *, const rsync_status_t, STACK_OF(walk_ctx_t) *, const uri_t *); STACK_OF(walk_ctx_t) *stack; int blocked; pid_t pid; @@ -1473,15 +1482,6 @@ static int rsync_cached_uri(const rcynic_ctx_t *rc, /** - * Return codes from rsync functions. - */ -typedef enum { - rsync_status_done, /* Request completed */ - rsync_status_failed, /* Request failed */ - rsync_status_pending /* Request in progress */ -} rsync_status_t; - -/** * Manager for queue of rsync tasks in progress. * * General plan here is to process one completed child, or output has @@ -1542,7 +1542,14 @@ static void rsync_mgr(const rcynic_ctx_t *rc) close(ctx->fd); - if (WEXITSTATUS(pid_status)) { + if (ctx->buflen > 0) { + assert(ctx->buflen < sizeof(ctx->buffer)); + ctx->buffer[ctx->buflen] = '\0'; + logmsg(rc, log_telemetry, "rsync[%u]: %s", ctx->pid, ctx->buffer); + ctx->buflen = 0; + } + + if (WEXITSTATUS(pid_status) != 0) { logmsg(rc, log_data_err, "rsync exited with status %d fetching %s", WEXITSTATUS(pid_status), ctx->uri.s); mib_increment(rc, &ctx->uri, @@ -1560,6 +1567,10 @@ static void rsync_mgr(const rcynic_ctx_t *rc) if (!sk_OPENSSL_STRING_push_strdup(rc->rsync_cache, uribuf.s + SIZEOF_RSYNC)) logmsg(rc, log_sys_err, "Couldn't cache URI %s, blundering onward", ctx->uri.s); + if (ctx->handler) + ctx->handler(rc, WEXITSTATUS(pid_status) ? rsync_status_failed : rsync_status_done, ctx->stack, &ctx->uri); + sk_rsync_ctx_t_delete_ptr(rc->rsync_queue, ctx); + free(ctx); return; } @@ -1572,7 +1583,7 @@ static void rsync_mgr(const rcynic_ctx_t *rc) if (ctx->pid <= 0 || now < ctx->deadline || (waitpid(ctx->pid, &pid_status, WNOHANG) == ctx->pid && WIFEXITED(pid_status))) continue; - (void) kill(ctx->pid, SIGTERM); + (void) kill(ctx->pid, ctx->kill_count++ < KILL_MAX ? SIGTERM : SIGKILL); ctx->deadline = now + 1; signaled++; } @@ -1622,7 +1633,7 @@ static void rsync_mgr(const rcynic_ctx_t *rc) while ((s = strchr(ctx->buffer, '\n')) != NULL) { *s++ = '\0'; - logmsg(rc, log_telemetry, "%s", ctx->buffer); + logmsg(rc, log_telemetry, "rsync[%u]: %s", ctx->pid, ctx->buffer); assert(s >= ctx->buffer); ctx->buflen = s - ctx->buffer; if (ctx->buflen > 0) @@ -1630,13 +1641,14 @@ static void rsync_mgr(const rcynic_ctx_t *rc) } if (ctx->buflen == sizeof(ctx->buffer) - 1) { - logmsg(rc, log_telemetry, "%s", ctx->buffer); + logmsg(rc, log_telemetry, "rsync[%u]: %s", ctx->pid, ctx->buffer); ctx->buflen = 0; } } } } + /** * Run rsync. This is fairly nasty, because we need to: * @@ -1665,15 +1677,11 @@ static rsync_status_t rsync(const rcynic_ctx_t *rc, }; const char *argv[100]; - char *s, *b, buffer[URI_MAX * 4]; path_t path; rsync_ctx_t *ctx = NULL; - rsync_status_t ret = rsync_status_failed; - int i, n, argc = 0, pid_status = -1, pipe_fds[2]; - time_t now; - struct timeval tv; - fd_set rfds; - pid_t wpid; + int i, argc = 0, flags, pipe_fds[2]; + + pipe_fds[0] = pipe_fds[1] = -1; assert(rc && uri); @@ -1724,43 +1732,44 @@ static rsync_status_t rsync(const rcynic_ctx_t *rc, logmsg(rc, log_sys_err, "malloc(rsync_ctxt_t) failed"); return rsync_status_failed; } - memset(ctx, 0, sizeof(*ctx)); + memcpy(&ctx->uri, uri, sizeof(ctx->uri)); ctx->fd = -1; -#warning This leaks memory and will until we clean up rsync_ctx_t handling here - if (pipe(pipe_fds) < 0) { logmsg(rc, log_sys_err, "pipe() failed: %s", strerror(errno)); - return rsync_status_failed; + goto lose; } ctx->fd = pipe_fds[0]; - if ((i = fcntl(ctx->fd, F_GETFL, 0)) == -1 || - fcntl(ctx->fd, F_SETFL, i | O_NONBLOCK) == -1) { - logmsg(rc, log_sys_err, - "Couldn't set rsync's output stream non-blocking: %s", + if ((flags = fcntl(ctx->fd, F_GETFL, 0)) == -1) { + logmsg(rc, log_sys_err, "fcntl(F_GETFL) failed: %s", strerror(errno)); - close(pipe_fds[0]); - close(pipe_fds[1]); - return rsync_status_failed; + goto lose; + } + flags |= O_NONBLOCK; + if (fcntl(ctx->fd, F_SETFL, flags) == -1) { + logmsg(rc, log_sys_err, "fcntl(F_SETFL) failed: %s", + strerror(errno)); + goto lose; } switch ((ctx->pid = vfork())) { case -1: logmsg(rc, log_sys_err, "vfork() failed: %s", strerror(errno)); - close(pipe_fds[0]); - close(pipe_fds[1]); - return rsync_status_failed; + goto lose; case 0: #define whine(msg) write(2, msg, sizeof(msg) - 1) - close(pipe_fds[0]); - if (dup2(pipe_fds[1], 1) < 0) - whine("dup2(1) failed\n"); + if (close(pipe_fds[0]) < 0) + whine("close(pipe_fds[0]) failed\n"); + else if (dup2(pipe_fds[1], 1) < 0) + whine("dup2(pipe_fds[1], 1) failed\n"); else if (dup2(pipe_fds[1], 2) < 0) - whine("dup2(2) failed\n"); + whine("dup2(pipe_fds[1], 2) failed\n"); + else if (close(pipe_fds[1]) < 0) + whine("close(pipe_fds[1]) failed\n"); else if (execvp(argv[0], (char * const *) argv) < 0) - whine("execvp() failed\n"); + whine("execvp(argv[0], (char * const *) argv) failed\n"); whine("last system error: "); write(2, strerror(errno), strlen(strerror(errno))); whine("\n"); @@ -1769,95 +1778,28 @@ static rsync_status_t rsync(const rcynic_ctx_t *rc, } (void) close(pipe_fds[1]); + pipe_fds[1] = -1; - now = time(0); - ctx->started = now; - ctx->deadline = now + rc->rsync_timeout; - - n = -1; - i = 0; - while ((wpid = waitpid(-1, &pid_status, WNOHANG)) == 0 && - (!rc->rsync_timeout || (now = time(0)) < ctx->deadline)) { - FD_ZERO(&rfds); - FD_SET(ctx->fd, &rfds); - if (rc->rsync_timeout) { - tv.tv_sec = ctx->deadline - now; - tv.tv_usec = 0; - n = select(ctx->fd + 1, &rfds, NULL, NULL, &tv); - } else { - n = select(ctx->fd + 1, &rfds, NULL, NULL, NULL); - } - if (n == 0 || (n < 0 && errno == EINTR)) - continue; - if (n < 0) - break; - while ((n = read(ctx->fd, buffer + i, sizeof(buffer) - i - 1)) > 0) { - n += i; - assert(n < sizeof(buffer)); - buffer[n] = '\0'; - for (b = buffer; (s = strchr(b, '\n')) != NULL; b = s) { - *s++ = '\0'; - logmsg(rc, log_telemetry, "%s", b); - } - i = strlen(b); - assert(i < sizeof(buffer) && b + i < buffer + sizeof(buffer)); - if (b == buffer && i == sizeof(buffer) - 1) { - logmsg(rc, log_telemetry, "%s\\", b); - i = 0; - } - if (i > 0) { - memmove(buffer, b, i); - } - } - if (n == 0 || (n < 0 && errno != EAGAIN)) - break; - } - - close(ctx->fd); - - assert(i >= 0 && i < sizeof(buffer)); - if (i) { - buffer[i] = '\0'; - logmsg(rc, log_telemetry, "%s", buffer); - } - - if (n < 0 && errno != EAGAIN) - logmsg(rc, log_sys_err, "Problem reading rsync's output: %s", - strerror(errno)); - - if (rc->rsync_timeout && now >= ctx->deadline) - logmsg(rc, log_data_err, - "Fetch of %s took longer than %d seconds, terminating fetch", - uri->s, rc->rsync_timeout); - - assert(ctx->pid > 0); - for (i = 0; i < KILL_MAX && wpid == 0; i++) { - if ((wpid = waitpid(ctx->pid, &pid_status, WNOHANG)) != 0 && WIFEXITED(pid_status)) - break; - kill(ctx->pid, SIGTERM); - sleep(1); + if (!sk_rsync_ctx_t_push(rc->rsync_queue, ctx)) { + logmsg(rc, log_sys_err, "Couldn't push rsync state object onto queue, punting %s", uri->s); + goto lose; } - if (WEXITSTATUS(pid_status)) { - logmsg(rc, log_data_err, "rsync exited with status %d fetching %s", - WEXITSTATUS(pid_status), uri->s); - ret = rsync_status_failed; - mib_increment(rc, uri, (rc->rsync_timeout && now >= ctx->deadline - ? rsync_timed_out - : rsync_failed)); - } else { - ret = rsync_status_done; - mib_increment(rc, uri, rsync_succeeded); - } - - assert(strlen(uri->s) > SIZEOF_RSYNC); - strcpy(buffer, uri->s + SIZEOF_RSYNC); - if ((s = strrchr(buffer, '/')) != NULL && s[1] == '\0') - *s = '\0'; - if (!sk_OPENSSL_STRING_push_strdup(rc->rsync_cache, buffer)) - logmsg(rc, log_sys_err, "Couldn't cache URI %s, blundering onward", uri->s); + /* + * XXX Temporary control loop, needs to move to caller + */ + while (sk_rsync_ctx_t_num(rc->rsync_queue) > 0) + rsync_mgr(rc); + return rsync_status_done; - return ret; + lose: + if (pipe_fds[0] != -1) + (void) close(pipe_fds[0]); + if (pipe_fds[1] != -1) + (void) close(pipe_fds[1]); + if (ctx) + free(ctx); + return rsync_status_failed; } /** @@ -3798,6 +3740,11 @@ int main(int argc, char *argv[]) goto done; } + if ((rc.rsync_queue = sk_rsync_ctx_t_new_null()) == NULL) { + logmsg(&rc, log_sys_err, "Couldn't allocate rsync_queue"); + goto done; + } + rc.use_syslog = use_syslog; if (use_syslog) |