aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Austein <sra@hactrn.net>2011-07-12 17:49:45 +0000
committerRob Austein <sra@hactrn.net>2011-07-12 17:49:45 +0000
commit9f7276346762b2b5a1baeda76ce1f20e96bd37ec (patch)
tree3a57916fc43384b83ae01ea41a72a93edd08bb60
parent8e7da19fb3256cc5efdbe6d45e36aec1d16a50fd (diff)
Let rsync_mgr() decide when to start a new rsync subprocess; old way
was overruning the max_parallel_rsync limit. svn path=/rcynic-ng/rcynic.c; revision=3931
-rw-r--r--rcynic-ng/rcynic.c33
1 files changed, 28 insertions, 5 deletions
diff --git a/rcynic-ng/rcynic.c b/rcynic-ng/rcynic.c
index 56a10949..3fab27e0 100644
--- a/rcynic-ng/rcynic.c
+++ b/rcynic-ng/rcynic.c
@@ -1429,6 +1429,8 @@ static void walk_ctx_stack_free(STACK_OF(walk_ctx_t) *wsk)
+static int rsync_count_running(const rcynic_ctx_t *);
+
/**
* Add a task to the task queue.
*/
@@ -1440,6 +1442,8 @@ static int task_add(const rcynic_ctx_t *rc,
assert(rc && rc->task_queue && handler);
+ assert(rsync_count_running(rc) <= rc->max_parallel_fetches);
+
if (!t)
return 0;
@@ -1507,9 +1511,15 @@ static int rsync_count_running(const rcynic_ctx_t *rc)
assert(rc && rc->rsync_queue);
- for (i = 0; (ctx = sk_rsync_ctx_t_value(rc->rsync_queue, i)) != NULL; ++i)
- if (ctx->state == rsync_state_running)
+ for (i = 0; (ctx = sk_rsync_ctx_t_value(rc->rsync_queue, i)) != NULL; ++i) {
+ switch (ctx->state) {
+ case rsync_state_running:
+ case rsync_state_terminating:
n++;
+ default:
+ continue;
+ }
+ }
return n;
}
@@ -1578,6 +1588,8 @@ static void rsync_run(const rcynic_ctx_t *rc,
assert(rc && ctx && ctx->state != rsync_state_running && rsync_runable(rc, ctx));
+ assert(rsync_count_running(rc) < rc->max_parallel_fetches);
+
logmsg(rc, log_telemetry, "Fetching %s", ctx->uri.s);
memset(argv, 0, sizeof(argv));
@@ -1670,8 +1682,8 @@ static void rsync_run(const rcynic_ctx_t *rc,
ctx->problem = rsync_problem_none;
if (rc->rsync_timeout)
ctx->deadline = time(0) + rc->rsync_timeout;
- logmsg(rc, log_debug, "Subprocess %u started, current subprocess count %d, URI %s",
- (unsigned) ctx->pid, sk_rsync_ctx_t_num(rc->rsync_queue), ctx->uri.s);
+ logmsg(rc, log_debug, "Subprocess %u started, queued %d, runable %d, running %d, max %d, URI %s",
+ (unsigned) ctx->pid, sk_rsync_ctx_t_num(rc->rsync_queue), rsync_count_runable(rc), rsync_count_running(rc), rc->max_parallel_fetches, ctx->uri.s);
if (ctx->handler)
ctx->handler(rc, ctx, rsync_status_pending, &ctx->uri, ctx->wsk);
return;
@@ -1782,6 +1794,8 @@ static void rsync_mgr(const rcynic_ctx_t *rc)
assert(rc && rc->rsync_queue);
+ assert(rsync_count_running(rc) <= rc->max_parallel_fetches);
+
/*
* Check for log text from subprocesses.
*/
@@ -1916,6 +1930,8 @@ static void rsync_mgr(const rcynic_ctx_t *rc)
if (pid == -1 && errno != EINTR && errno != ECHILD)
logmsg(rc, log_sys_err, "waitpid() returned error: %s", strerror(errno));
+ assert(rsync_count_running(rc) <= rc->max_parallel_fetches);
+
/*
* Look for rsync contexts that have become runable.
*/
@@ -1928,6 +1944,8 @@ static void rsync_mgr(const rcynic_ctx_t *rc)
}
}
+ assert(rsync_count_running(rc) <= rc->max_parallel_fetches);
+
/*
* Deal with children that have been running too long.
*/
@@ -1987,8 +2005,10 @@ static void rsync_init(const rcynic_ctx_t *rc,
return;
}
- if (rsync_runable(rc, ctx))
+#if 0
+ if (rsync_runable(rc, ctx) && rsync_count_running(rc) < rc->max_parallel_fetches);
rsync_run(rc, ctx);
+#endif
}
/**
@@ -3485,6 +3505,8 @@ static void rsync_sia_callback(const rcynic_ctx_t *rc,
if (rsync_count_runable(rc) >= rc->max_parallel_fetches)
return;
+ assert(rsync_count_running(rc) < rc->max_parallel_fetches);
+
if ((wsk = walk_ctx_stack_clone(wsk)) == NULL) {
logmsg(rc, log_sys_err, "walk_ctx_stack_clone() failed, probably memory exhaustion, blundering onwards without forking stack");
return;
@@ -3676,6 +3698,7 @@ int main(int argc, char *argv[])
rc.log_level = log_data_err;
rc.allow_stale_crl = 1;
rc.allow_stale_manifest = 1;
+ rc.max_parallel_fetches = 1;
#define QQ(x,y) rc.priority[x] = y;
LOG_LEVELS;