aboutsummaryrefslogtreecommitdiff
path: root/rcynic-ng/rcynic.c
diff options
context:
space:
mode:
Diffstat (limited to 'rcynic-ng/rcynic.c')
-rw-r--r--rcynic-ng/rcynic.c168
1 files changed, 96 insertions, 72 deletions
diff --git a/rcynic-ng/rcynic.c b/rcynic-ng/rcynic.c
index 3acbad75..f0c2a697 100644
--- a/rcynic-ng/rcynic.c
+++ b/rcynic-ng/rcynic.c
@@ -398,6 +398,7 @@ DECLARE_STACK_OF(walk_ctx_t)
typedef enum {
rsync_status_done, /* Request completed */
rsync_status_failed, /* Request failed */
+ rsync_status_timed_out, /* Request timed out */
rsync_status_pending, /* Request in progress */
rsync_status_skipped /* Request not attempted */
} rsync_status_t;
@@ -1139,31 +1140,35 @@ static int rm_rf(const path_t *name)
/**
- * Add an entry to the dead host cache.
+ * Check to see whether a hostname is in the dead host cache.
*/
-static void dead_host_add(const rcynic_ctx_t *rc, const uri_t *uri)
+static int dead_host_check(const rcynic_ctx_t *rc, const uri_t *uri)
{
hostname_t hostname;
assert(rc && uri && rc->dead_host_cache);
- if (!uri_to_hostname(uri, &hostname))
- return;
-
- (void) sk_OPENSSL_STRING_push_strdup(rc->dead_host_cache, hostname.s);
+ return (uri_to_hostname(uri, &hostname) &&
+ sk_OPENSSL_STRING_find(rc->dead_host_cache, hostname.s) >= 0);
}
+
/**
- * Check to see whether a hostname is in the dead host cache.
+ * Add an entry to the dead host cache.
*/
-static int dead_host_check(const rcynic_ctx_t *rc, const uri_t *uri)
+static void dead_host_add(const rcynic_ctx_t *rc, const uri_t *uri)
{
hostname_t hostname;
assert(rc && uri && rc->dead_host_cache);
- return (uri_to_hostname(uri, &hostname) &&
- sk_OPENSSL_STRING_find(rc->dead_host_cache, hostname.s) >= 0);
+ if (dead_host_check(rc, uri))
+ return;
+
+ if (!uri_to_hostname(uri, &hostname))
+ return;
+
+ (void) sk_OPENSSL_STRING_push_strdup(rc->dead_host_cache, hostname.s);
}
@@ -1622,12 +1627,12 @@ static int rsync_runable(const rcynic_ctx_t *rc,
case rsync_state_initial:
case rsync_state_running:
- case rsync_state_terminating:
return 1;
case rsync_state_retry_wait:
return ctx->deadline <= time(0);
+ case rsync_state_terminating:
case rsync_state_conflict_wait:
#warning rsync_state_conflict_wait not implemented yet
return 0;
@@ -1672,7 +1677,7 @@ static void rsync_run(const rcynic_ctx_t *rc,
pipe_fds[0] = pipe_fds[1] = -1;
- assert(rc && ctx && ctx->state != rsync_state_running && rsync_runable(rc, ctx));
+ assert(rc && ctx && ctx->pid == 0 && ctx->state != rsync_state_running && rsync_runable(rc, ctx));
assert(rsync_count_running(rc) < rc->max_parallel_fetches);
@@ -1796,6 +1801,9 @@ static void rsync_run(const rcynic_ctx_t *rc,
static void do_one_rsync_log_line(const rcynic_ctx_t *rc,
rsync_ctx_t *ctx)
{
+ unsigned u;
+ char *s;
+
/*
* Send line to our log unless it's empty.
*/
@@ -1805,8 +1813,11 @@ static void do_one_rsync_log_line(const rcynic_ctx_t *rc,
/*
* Check for magic error strings
*/
- if (strstr(ctx->buffer, "@ERROR: max connections"))
+ if ((s = strstr(ctx->buffer, "@ERROR: max connections")) != NULL) {
ctx->problem = rsync_problem_refused;
+ if (sscanf(s, "@ERROR: max connections (%u) reached -- try again later", &u) == 1)
+ logmsg(rc, log_debug, "Subprocess %u reported limit of %u for %s", ctx->pid, u, ctx->uri.s);
+ }
}
/**
@@ -1879,55 +1890,6 @@ static void rsync_mgr(const rcynic_ctx_t *rc)
assert(rc && rc->rsync_queue);
- assert(rsync_count_running(rc) <= rc->max_parallel_fetches);
-
- /*
- * Check for log text from subprocesses.
- */
-
- n = rsync_construct_select(rc, now, &rfds, &tv);
-
- if (n > 0 || tv.tv_sec)
- n = select(n + 1, &rfds, NULL, NULL, tv.tv_sec ? &tv : NULL);
-
- if (n > 0) {
-
- for (i = 0; (ctx = sk_rsync_ctx_t_value(rc->rsync_queue, i)) != NULL; ++i) {
- if (ctx->fd <= 0 || !FD_ISSET(ctx->fd, &rfds))
- continue;
-
- assert(ctx->buflen < sizeof(ctx->buffer) - 1);
-
- while ((n = read(ctx->fd, ctx->buffer + ctx->buflen, sizeof(ctx->buffer) - 1 - ctx->buflen)) > 0) {
- ctx->buflen += n;
- assert(ctx->buflen < sizeof(ctx->buffer));
- ctx->buffer[ctx->buflen] = '\0';
-
- while ((s = strchr(ctx->buffer, '\n')) != NULL) {
- *s++ = '\0';
- do_one_rsync_log_line(rc, ctx);
- assert(s > ctx->buffer && s < ctx->buffer + sizeof(ctx->buffer));
- ctx->buflen -= s - ctx->buffer;
- assert(ctx->buflen < sizeof(ctx->buffer));
- if (ctx->buflen > 0)
- memmove(ctx->buffer, s, ctx->buflen);
- ctx->buffer[ctx->buflen] = '\0';
- }
-
- if (ctx->buflen == sizeof(ctx->buffer) - 1) {
- ctx->buffer[sizeof(ctx->buffer) - 1] = '\0';
- do_one_rsync_log_line(rc, ctx);
- ctx->buflen = 0;
- }
- }
-
- if (n == 0) {
- (void) close(ctx->fd);
- ctx->fd = -1;
- }
- }
- }
-
/*
* Check for exited subprocesses.
*/
@@ -1935,13 +1897,16 @@ static void rsync_mgr(const rcynic_ctx_t *rc)
while ((pid = waitpid(-1, &pid_status, WNOHANG)) > 0) {
/*
- * Child exited, handle that and return.
+ * Child exited, handle it.
*/
+ logmsg(rc, log_debug, "Subprocess %d exited with status %d", pid, WEXITSTATUS(pid_status));
+
for (i = 0; (ctx = sk_rsync_ctx_t_value(rc->rsync_queue, i)) != NULL; ++i)
if (ctx->pid == pid)
break;
if (ctx == NULL) {
+ assert(i == sk_rsync_ctx_t_num(rc->rsync_queue));
logmsg(rc, log_sys_err, "Couldn't find rsync context for pid %d", pid);
continue;
}
@@ -1959,7 +1924,11 @@ static void rsync_mgr(const rcynic_ctx_t *rc)
switch (WEXITSTATUS(pid_status)) {
case 0:
- log_validation_status(rc, &ctx->uri, rsync_succeeded, object_generation_null);
+ log_validation_status(rc, &ctx->uri,
+ (ctx->problem == rsync_problem_timed_out
+ ? rsync_timed_out
+ : rsync_succeeded),
+ object_generation_null);
break;
case 5: /* "Error starting client-server protocol" */
@@ -2008,9 +1977,13 @@ static void rsync_mgr(const rcynic_ctx_t *rc)
}
rsync_cache_add(rc, &ctx->uri);
-
if (ctx->handler)
- ctx->handler(rc, ctx, WEXITSTATUS(pid_status) ? rsync_status_failed : rsync_status_done, &ctx->uri, ctx->wsk);
+ ctx->handler(rc, ctx, (ctx->problem == rsync_problem_timed_out
+ ? rsync_status_timed_out
+ : WEXITSTATUS(pid_status) != 0
+ ? rsync_status_failed
+ : rsync_status_done),
+ &ctx->uri, ctx->wsk);
(void) sk_rsync_ctx_t_delete_ptr(rc->rsync_queue, ctx);
free(ctx);
ctx = NULL;
@@ -2024,11 +1997,57 @@ static void rsync_mgr(const rcynic_ctx_t *rc)
/*
* Look for rsync contexts that have become runable.
*/
- if (rsync_count_running(rc) < rc->max_parallel_fetches) {
+ for (i = 0; (ctx = sk_rsync_ctx_t_value(rc->rsync_queue, i)) != NULL; ++i)
+ if (ctx->state != rsync_state_running &&
+ rsync_runable(rc, ctx) &&
+ rsync_count_running(rc) < rc->max_parallel_fetches)
+ rsync_run(rc, ctx);
+
+ assert(rsync_count_running(rc) <= rc->max_parallel_fetches);
+
+ /*
+ * Check for log text from subprocesses.
+ */
+
+ n = rsync_construct_select(rc, now, &rfds, &tv);
+
+ if (n > 0 || tv.tv_sec)
+ n = select(n + 1, &rfds, NULL, NULL, tv.tv_sec ? &tv : NULL);
+
+ if (n > 0) {
+
for (i = 0; (ctx = sk_rsync_ctx_t_value(rc->rsync_queue, i)) != NULL; ++i) {
- if (ctx->state != rsync_state_running && rsync_runable(rc, ctx)) {
- rsync_run(rc, ctx);
- break;
+ if (ctx->fd <= 0 || !FD_ISSET(ctx->fd, &rfds))
+ continue;
+
+ assert(ctx->buflen < sizeof(ctx->buffer) - 1);
+
+ while ((n = read(ctx->fd, ctx->buffer + ctx->buflen, sizeof(ctx->buffer) - 1 - ctx->buflen)) > 0) {
+ ctx->buflen += n;
+ assert(ctx->buflen < sizeof(ctx->buffer));
+ ctx->buffer[ctx->buflen] = '\0';
+
+ while ((s = strchr(ctx->buffer, '\n')) != NULL) {
+ *s++ = '\0';
+ do_one_rsync_log_line(rc, ctx);
+ assert(s > ctx->buffer && s < ctx->buffer + sizeof(ctx->buffer));
+ ctx->buflen -= s - ctx->buffer;
+ assert(ctx->buflen < sizeof(ctx->buffer));
+ if (ctx->buflen > 0)
+ memmove(ctx->buffer, s, ctx->buflen);
+ ctx->buffer[ctx->buflen] = '\0';
+ }
+
+ if (ctx->buflen == sizeof(ctx->buffer) - 1) {
+ ctx->buffer[sizeof(ctx->buffer) - 1] = '\0';
+ do_one_rsync_log_line(rc, ctx);
+ ctx->buflen = 0;
+ }
+ }
+
+ if (n == 0) {
+ (void) close(ctx->fd);
+ ctx->fd = -1;
}
}
}
@@ -2048,14 +2067,15 @@ static void rsync_mgr(const rcynic_ctx_t *rc)
ctx->problem = rsync_problem_timed_out;
ctx->state = rsync_state_terminating;
ctx->tries = 0;
- logmsg(rc, log_telemetry, "Subprocess %u is taking too long fetching %s", (unsigned) ctx->pid, ctx->uri.s);
+ logmsg(rc, log_telemetry, "Subprocess %u is taking too long fetching %s, whacking it", (unsigned) ctx->pid, ctx->uri.s);
+ dead_host_add(rc, &ctx->uri);
} else if (sig == SIGTERM) {
logmsg(rc, log_telemetry, "Whacking subprocess %u again", (unsigned) ctx->pid);
} else {
logmsg(rc, log_telemetry, "Whacking subprocess %u with big hammer", (unsigned) ctx->pid);
}
(void) kill(ctx->pid, sig);
- ctx->deadline = now + 2;
+ ctx->deadline = now + 1;
}
}
}
@@ -3677,6 +3697,10 @@ static void rsync_sia_callback(const rcynic_ctx_t *rc,
log_validation_status(rc, uri, rsync_failed, object_generation_null);
break;
+ case rsync_status_timed_out:
+ log_validation_status(rc, uri, rsync_timed_out, object_generation_null);
+ break;
+
case rsync_status_skipped:
log_validation_status(rc, uri, rsync_skipped, object_generation_null);
break;