diff options
author | Rob Austein <sra@hactrn.net> | 2011-07-12 17:49:45 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2011-07-12 17:49:45 +0000 |
commit | 9f7276346762b2b5a1baeda76ce1f20e96bd37ec (patch) | |
tree | 3a57916fc43384b83ae01ea41a72a93edd08bb60 | |
parent | 8e7da19fb3256cc5efdbe6d45e36aec1d16a50fd (diff) |
Let rsync_mgr() decide when to start a new rsync subprocess; old way
was overruning the max_parallel_rsync limit.
svn path=/rcynic-ng/rcynic.c; revision=3931
-rw-r--r-- | rcynic-ng/rcynic.c | 33 |
1 files changed, 28 insertions, 5 deletions
diff --git a/rcynic-ng/rcynic.c b/rcynic-ng/rcynic.c index 56a10949..3fab27e0 100644 --- a/rcynic-ng/rcynic.c +++ b/rcynic-ng/rcynic.c @@ -1429,6 +1429,8 @@ static void walk_ctx_stack_free(STACK_OF(walk_ctx_t) *wsk) +static int rsync_count_running(const rcynic_ctx_t *); + /** * Add a task to the task queue. */ @@ -1440,6 +1442,8 @@ static int task_add(const rcynic_ctx_t *rc, assert(rc && rc->task_queue && handler); + assert(rsync_count_running(rc) <= rc->max_parallel_fetches); + if (!t) return 0; @@ -1507,9 +1511,15 @@ static int rsync_count_running(const rcynic_ctx_t *rc) assert(rc && rc->rsync_queue); - for (i = 0; (ctx = sk_rsync_ctx_t_value(rc->rsync_queue, i)) != NULL; ++i) - if (ctx->state == rsync_state_running) + for (i = 0; (ctx = sk_rsync_ctx_t_value(rc->rsync_queue, i)) != NULL; ++i) { + switch (ctx->state) { + case rsync_state_running: + case rsync_state_terminating: n++; + default: + continue; + } + } return n; } @@ -1578,6 +1588,8 @@ static void rsync_run(const rcynic_ctx_t *rc, assert(rc && ctx && ctx->state != rsync_state_running && rsync_runable(rc, ctx)); + assert(rsync_count_running(rc) < rc->max_parallel_fetches); + logmsg(rc, log_telemetry, "Fetching %s", ctx->uri.s); memset(argv, 0, sizeof(argv)); @@ -1670,8 +1682,8 @@ static void rsync_run(const rcynic_ctx_t *rc, ctx->problem = rsync_problem_none; if (rc->rsync_timeout) ctx->deadline = time(0) + rc->rsync_timeout; - logmsg(rc, log_debug, "Subprocess %u started, current subprocess count %d, URI %s", - (unsigned) ctx->pid, sk_rsync_ctx_t_num(rc->rsync_queue), ctx->uri.s); + logmsg(rc, log_debug, "Subprocess %u started, queued %d, runable %d, running %d, max %d, URI %s", + (unsigned) ctx->pid, sk_rsync_ctx_t_num(rc->rsync_queue), rsync_count_runable(rc), rsync_count_running(rc), rc->max_parallel_fetches, ctx->uri.s); if (ctx->handler) ctx->handler(rc, ctx, rsync_status_pending, &ctx->uri, ctx->wsk); return; @@ -1782,6 +1794,8 @@ static void rsync_mgr(const rcynic_ctx_t *rc) assert(rc && rc->rsync_queue); + assert(rsync_count_running(rc) <= rc->max_parallel_fetches); + /* * Check for log text from subprocesses. */ @@ -1916,6 +1930,8 @@ static void rsync_mgr(const rcynic_ctx_t *rc) if (pid == -1 && errno != EINTR && errno != ECHILD) logmsg(rc, log_sys_err, "waitpid() returned error: %s", strerror(errno)); + assert(rsync_count_running(rc) <= rc->max_parallel_fetches); + /* * Look for rsync contexts that have become runable. */ @@ -1928,6 +1944,8 @@ static void rsync_mgr(const rcynic_ctx_t *rc) } } + assert(rsync_count_running(rc) <= rc->max_parallel_fetches); + /* * Deal with children that have been running too long. */ @@ -1987,8 +2005,10 @@ static void rsync_init(const rcynic_ctx_t *rc, return; } - if (rsync_runable(rc, ctx)) +#if 0 + if (rsync_runable(rc, ctx) && rsync_count_running(rc) < rc->max_parallel_fetches); rsync_run(rc, ctx); +#endif } /** @@ -3485,6 +3505,8 @@ static void rsync_sia_callback(const rcynic_ctx_t *rc, if (rsync_count_runable(rc) >= rc->max_parallel_fetches) return; + assert(rsync_count_running(rc) < rc->max_parallel_fetches); + if ((wsk = walk_ctx_stack_clone(wsk)) == NULL) { logmsg(rc, log_sys_err, "walk_ctx_stack_clone() failed, probably memory exhaustion, blundering onwards without forking stack"); return; @@ -3676,6 +3698,7 @@ int main(int argc, char *argv[]) rc.log_level = log_data_err; rc.allow_stale_crl = 1; rc.allow_stale_manifest = 1; + rc.max_parallel_fetches = 1; #define QQ(x,y) rc.priority[x] = y; LOG_LEVELS; |