aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Austein <sra@hactrn.net>2011-06-29 02:22:53 +0000
committerRob Austein <sra@hactrn.net>2011-06-29 02:22:53 +0000
commit89f548c97011f60db7a1d5b72890edee4013d4e6 (patch)
tree2be7a82ccddef9e24343edc4ce1c04f2da9d24f2
parent5dfd98d3b07fb5f84de514ea19a23a3a25efb470 (diff)
Reorder operations in rsync_mgr()
svn path=/rcynic-ng/rcynic.c; revision=3909
-rw-r--r--rcynic-ng/rcynic.c236
1 files changed, 121 insertions, 115 deletions
diff --git a/rcynic-ng/rcynic.c b/rcynic-ng/rcynic.c
index 996ac42b..7b8b2339 100644
--- a/rcynic-ng/rcynic.c
+++ b/rcynic-ng/rcynic.c
@@ -1592,6 +1592,7 @@ static int rsync_runable(const rcynic_ctx_t *rc,
case rsync_state_initial:
case rsync_state_running:
+ case rsync_state_terminating:
return 1;
case rsync_state_retry_wait:
@@ -1599,8 +1600,6 @@ static int rsync_runable(const rcynic_ctx_t *rc,
case rsync_state_conflict_wait:
#warning rsync_state_conflict_wait not implemented yet
-
- case rsync_state_terminating:
return 0;
}
@@ -1737,8 +1736,8 @@ static void rsync_run(const rcynic_ctx_t *rc,
ctx->problem = rsync_problem_none;
if (rc->rsync_timeout)
ctx->deadline = time(0) + rc->rsync_timeout;
- logmsg(rc, log_debug, "Subprocess %u started, current subprocess count %d",
- (unsigned) ctx->pid, sk_rsync_ctx_t_num(rc->rsync_queue));
+ logmsg(rc, log_debug, "Subprocess %u started, current subprocess count %d, URI %s",
+ (unsigned) ctx->pid, sk_rsync_ctx_t_num(rc->rsync_queue), ctx->uri.s);
if (ctx->handler)
ctx->handler(rc, ctx, rsync_status_pending, &ctx->uri, ctx->wsk);
return;
@@ -1760,8 +1759,7 @@ static void rsync_run(const rcynic_ctx_t *rc,
/**
* Process one line of rsync's output. This is a separate function
- * primarily to centralize things like searching for magic error
- * messages we have to handle.
+ * primarily to centralize scraping for magic error strings.
*/
static void do_one_rsync_log_line(const rcynic_ctx_t *rc,
rsync_ctx_t *ctx)
@@ -1780,6 +1778,50 @@ static void do_one_rsync_log_line(const rcynic_ctx_t *rc,
}
/**
+ * Construct select() arguments.
+ */
+static int rsync_construct_select(const rcynic_ctx_t *rc,
+ const time_t now,
+ fd_set *rfds,
+ struct timeval *tv)
+{
+ rsync_ctx_t *ctx;
+ time_t when = 0;
+ int i, n = 0;
+
+ assert(rc && rc->rsync_queue && rfds && tv);
+
+ FD_ZERO(rfds);
+
+ for (i = 0; (ctx = sk_rsync_ctx_t_value(rc->rsync_queue, i)) != NULL; ++i) {
+ switch (ctx->state) {
+
+ case rsync_state_running:
+ if (ctx->fd >= 0) {
+ FD_SET(ctx->fd, rfds);
+ if (ctx->fd > n)
+ n = ctx->fd;
+ }
+ if (!rc->rsync_timeout)
+ continue;
+ /* Fall through */
+
+ case rsync_state_retry_wait:
+ if (when == 0 || ctx->deadline < when)
+ when = ctx->deadline;
+ /* Fall through */
+
+ default:
+ continue;
+ }
+ }
+
+ tv->tv_sec = when ? when - now : 0;
+ tv->tv_usec = 0;
+ return n;
+}
+
+/**
* Manager for queue of rsync tasks in progress.
*
* General plan here is to process one completed child, or output
@@ -1795,7 +1837,7 @@ static void do_one_rsync_log_line(const rcynic_ctx_t *rc,
*/
static void rsync_mgr(const rcynic_ctx_t *rc)
{
- time_t now = time(0), when;
+ time_t now = time(0);
int i, n, pid_status = -1;
rsync_ctx_t *ctx = NULL;
struct timeval tv;
@@ -1807,13 +1849,55 @@ static void rsync_mgr(const rcynic_ctx_t *rc)
assert(rc && rc->rsync_queue);
/*
- * Look for rsync contexts that have become runable.
+ * Check for log text from subprocesses.
*/
- if (rsync_count_running(rc) < rc->max_parallel_fetches) {
- for (i = 0; (ctx = sk_rsync_ctx_t_value(rc->rsync_queue, i)) != NULL; ++i) {
- if (ctx->state != rsync_state_running && rsync_runable(rc, ctx)) {
- rsync_run(rc, ctx);
- return;
+
+ n = rsync_construct_select(rc, now, &rfds, &tv);
+
+ if (n > 0 || tv.tv_sec)
+ n = select(n + 1, &rfds, NULL, NULL, tv.tv_sec ? &tv : NULL);
+
+ if (n > 0) {
+
+ int again = 1;
+
+ while (again) {
+ again = 0;
+
+ for (i = 0; (ctx = sk_rsync_ctx_t_value(rc->rsync_queue, i)) != NULL; ++i) {
+ if (ctx->fd <= 0 || !FD_ISSET(ctx->fd, &rfds))
+ continue;
+
+ assert(ctx->buflen < sizeof(ctx->buffer) - 1);
+
+ while ((n = read(ctx->fd, ctx->buffer + ctx->buflen, sizeof(ctx->buffer) - 1 - ctx->buflen)) > 0) {
+ again = 1;
+ ctx->buflen += n;
+ assert(ctx->buflen < sizeof(ctx->buffer));
+ ctx->buffer[ctx->buflen] = '\0';
+
+ while ((s = strchr(ctx->buffer, '\n')) != NULL) {
+ *s++ = '\0';
+ do_one_rsync_log_line(rc, ctx);
+ assert(s > ctx->buffer && s < ctx->buffer + sizeof(ctx->buffer));
+ ctx->buflen -= s - ctx->buffer;
+ assert(ctx->buflen < sizeof(ctx->buffer));
+ if (ctx->buflen > 0)
+ memmove(ctx->buffer, s, ctx->buflen);
+ ctx->buffer[ctx->buflen] = '\0';
+ }
+
+ if (ctx->buflen == sizeof(ctx->buffer) - 1) {
+ ctx->buffer[sizeof(ctx->buffer) - 1] = '\0';
+ do_one_rsync_log_line(rc, ctx);
+ ctx->buflen = 0;
+ }
+ }
+
+ if (n == 0) {
+ (void) close(ctx->fd);
+ ctx->fd = -1;
+ }
}
}
}
@@ -1822,32 +1906,8 @@ static void rsync_mgr(const rcynic_ctx_t *rc)
* Check for exited subprocesses.
*/
- while ((pid = waitpid(-1, &pid_status, WNOHANG)) == -1 && errno == EINTR)
- ;
-
- switch (pid) {
-
- case -1:
- /*
- * If we have no children, we're done here.
- */
- if (errno == ECHILD)
- return;
-
- /*
- * Not a lot we can do for errors other than EINTR or ECHILD,
- * other than whining.
- */
- logmsg(rc, log_sys_err, "waitpid() returned error: %s", strerror(errno));
- return;
-
- case 0:
- /*
- * We have children, but none ready to exit, continue to select().
- */
- break;
+ while ((pid = waitpid(-1, &pid_status, WNOHANG)) > 0) {
- default:
/*
* Child exited, handle that and return.
*/
@@ -1857,7 +1917,7 @@ static void rsync_mgr(const rcynic_ctx_t *rc)
break;
if (ctx == NULL) {
logmsg(rc, log_sys_err, "Couldn't find rsync context for pid %d", pid);
- return;
+ continue;
}
close(ctx->fd);
@@ -1870,8 +1930,6 @@ static void rsync_mgr(const rcynic_ctx_t *rc)
ctx->buflen = 0;
}
- ctx->pid = 0;
-
switch (WEXITSTATUS(pid_status)) {
case 0:
@@ -1891,23 +1949,22 @@ static void rsync_mgr(const rcynic_ctx_t *rc)
if (ctx->problem == rsync_problem_refused) {
unsigned char r;
- if (RAND_bytes(&r, sizeof(r)))
- r %= 60;
- else
+ if (!RAND_bytes(&r, sizeof(r)))
r = 60;
ctx->deadline = time(0) + 30 + r;
ctx->state = rsync_state_retry_wait;
ctx->problem = rsync_problem_none;
+ ctx->pid = 0;
ctx->tries++;
logmsg(rc, log_telemetry, "Scheduling retry for %s", ctx->uri.s);
- return;
+ continue;
}
/* Otherwise, fall through */
default:
- logmsg(rc, log_data_err, "rsync exited with status %d fetching %s",
- WEXITSTATUS(pid_status), ctx->uri.s);
+ logmsg(rc, log_data_err, "rsync %u exited with status %d fetching %s",
+ (unsigned) pid, WEXITSTATUS(pid_status), ctx->uri.s);
mib_increment(rc, &ctx->uri,
(rc->rsync_timeout && now >= ctx->deadline
? rsync_timed_out
@@ -1926,17 +1983,30 @@ static void rsync_mgr(const rcynic_ctx_t *rc)
ctx->handler(rc, ctx, WEXITSTATUS(pid_status) ? rsync_status_failed : rsync_status_done, &ctx->uri, ctx->wsk);
(void) sk_rsync_ctx_t_delete_ptr(rc->rsync_queue, ctx);
free(ctx);
- return;
+ ctx = NULL;
+ }
+
+ if (pid == -1 && errno != EINTR && errno != ECHILD)
+ logmsg(rc, log_sys_err, "waitpid() returned error: %s", strerror(errno));
+
+ /*
+ * Look for rsync contexts that have become runable.
+ */
+ if (rsync_count_running(rc) < rc->max_parallel_fetches) {
+ for (i = 0; (ctx = sk_rsync_ctx_t_value(rc->rsync_queue, i)) != NULL; ++i) {
+ if (ctx->state != rsync_state_running && rsync_runable(rc, ctx)) {
+ rsync_run(rc, ctx);
+ break;
+ }
+ }
}
/*
* Deal with children that have been running too long.
*/
if (rc->rsync_timeout) {
- int signaled = 0;
for (i = 0; (ctx = sk_rsync_ctx_t_value(rc->rsync_queue, i)) != NULL; ++i) {
- if (ctx->pid <= 0 || now < ctx->deadline ||
- (waitpid(ctx->pid, &pid_status, WNOHANG) == ctx->pid && WIFEXITED(pid_status)))
+ if (ctx->pid <= 0 || now < ctx->deadline)
continue;
if (ctx->state != rsync_state_terminating) {
ctx->problem = rsync_problem_timed_out;
@@ -1946,70 +2016,6 @@ static void rsync_mgr(const rcynic_ctx_t *rc)
}
(void) kill(ctx->pid, ctx->tries++ < KILL_MAX ? SIGTERM : SIGKILL);
ctx->deadline = now + 2;
- signaled++;
- }
- if (signaled)
- return;
- }
-
- /*
- * Check for log text from children if we get this far. This is
- * where we finally block (select()) if we've nothing else to do.
- */
-
- FD_ZERO(&rfds);
- when = 0;
- n = 0;
-
- for (i = 0; (ctx = sk_rsync_ctx_t_value(rc->rsync_queue, i)) != NULL; ++i) {
- if (ctx->fd <= 0)
- continue;
- assert(ctx->state == rsync_state_running);
- FD_SET(ctx->fd, &rfds);
- if (ctx->fd > n)
- n = ctx->fd;
- if (when == 0 || ctx->deadline < when)
- when = ctx->deadline;
- }
-
- if (rc->rsync_timeout && when != 0) {
- tv.tv_sec = when - now;
- tv.tv_usec = 0;
- n = select(n + 1, &rfds, NULL, NULL, &tv);
- } else {
- n = select(n + 1, &rfds, NULL, NULL, NULL);
- }
-
- if (n <= 0)
- return;
-
- for (i = 0; (ctx = sk_rsync_ctx_t_value(rc->rsync_queue, i)) != NULL; ++i) {
- if (ctx->fd <= 0 || !FD_ISSET(ctx->fd, &rfds))
- continue;
-
- assert(ctx->buflen < sizeof(ctx->buffer) - 1);
-
- while ((n = read(ctx->fd, ctx->buffer + ctx->buflen, sizeof(ctx->buffer) - 1 - ctx->buflen)) > 0) {
- ctx->buflen += n;
- assert(ctx->buflen < sizeof(ctx->buffer));
- ctx->buffer[ctx->buflen] = '\0';
-
- while ((s = strchr(ctx->buffer, '\n')) != NULL) {
- *s++ = '\0';
- do_one_rsync_log_line(rc, ctx);
- assert(s > ctx->buffer && s < ctx->buffer + sizeof(ctx->buffer));
- ctx->buflen -= s - ctx->buffer;
- assert(ctx->buflen < sizeof(ctx->buffer));
- if (ctx->buflen > 0)
- memmove(ctx->buffer, s, ctx->buflen);
- ctx->buffer[ctx->buflen] = '\0';
- }
-
- if (ctx->buflen == sizeof(ctx->buffer) - 1) {
- ctx->buffer[sizeof(ctx->buffer) - 1] = '\0';
- do_one_rsync_log_line(rc, ctx);
- ctx->buflen = 0;
- }
}
}
}