Skip to content

Commit 5c169d2

Browse files
committed
WORKFLOW-199: polling every X seconds for job completion
1 parent 8b370c0 commit 5c169d2

3 files changed

Lines changed: 184 additions & 109 deletions

File tree

lib/api.js

Lines changed: 60 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ var API = module.exports = function (opts) {
2626

2727
var log;
2828

29+
var waiting = {};
30+
2931
if (opts.log) {
3032
log = opts.log({
3133
component: 'workflow-api'
@@ -72,6 +74,12 @@ var API = module.exports = function (opts) {
7274
version: '0.1.0'
7375
};
7476

77+
var JOB_DONE_PATH = '/jobdone';
78+
var JOB_DONE_ROUTE = {
79+
path: JOB_DONE_PATH,
80+
version: '0.1.0'
81+
};
82+
7583
var JOBS_PATH = '/jobs';
7684
var JOB_PATH = JOBS_PATH + '/:uuid';
7785
var JOBS_ROUTE = {
@@ -279,7 +287,10 @@ var API = module.exports = function (opts) {
279287

280288
return factory.workflow(workflow, meta, function (err, result) {
281289
if (err) {
282-
return next(err.toRestError);
290+
if (typeof (err) === 'string')
291+
return next(new Error(err));
292+
else
293+
return next(err.toRestError || err);
283294
}
284295
res.header('Location', req.path() + '/' + result.uuid);
285296
// If Request-Id hasn't been set, we'll set it to workflow UUID:
@@ -497,8 +508,8 @@ var API = module.exports = function (opts) {
497508
params: {}
498509
};
499510
var meta = {};
500-
var members = ['exec_after', 'workflow', 'target', 'num_attempts',
501-
'uuid', 'locks'];
511+
var members = ['callback_urls', 'exec_after', 'workflow', 'target',
512+
'num_attempts', 'uuid', 'locks', 'wait'];
502513

503514
var job_members = [];
504515
if (typeof (opts.api.job_extra_params) !== 'undefined') {
@@ -517,6 +528,11 @@ var API = module.exports = function (opts) {
517528
}
518529
});
519530

531+
if (job.wait &&
532+
! (Array.isArray(job.callback_urls) && job.callback_urls.length))
533+
return next(new restify.ConflictError(
534+
'"callback_urls" is required when "wait" is true'));
535+
520536
if (req.headers['request-id']) {
521537
meta.req_id = req.headers['request-id'];
522538
}
@@ -534,9 +550,23 @@ var API = module.exports = function (opts) {
534550
res.header('request-id', result.uuid);
535551
}
536552
res.header('Location', req.path() + '/' + result.uuid);
553+
537554
res.status(201);
538-
res.send(result);
539-
return next();
555+
if (req.params.wait) {
556+
log.info({req: req, job: result.uuid},
557+
'holding onto request for job %s', result.uuid);
558+
waiting[result.uuid] = {
559+
req: req,
560+
res: res,
561+
next: next
562+
};
563+
// flush headers (so they have the UUID as the location header)
564+
res.write('\n');
565+
return undefined;
566+
} else {
567+
res.send(result);
568+
return next();
569+
}
540570
});
541571
}
542572

@@ -677,6 +707,29 @@ var API = module.exports = function (opts) {
677707
}
678708
});
679709
}
710+
711+
function jobDone(req, res, next) {
712+
var job = req.params;
713+
// end the incoming request
714+
res.send(job.uuid ? 200 : 400);
715+
next();
716+
717+
// end any waiting requests
718+
if (job.uuid && waiting[job.uuid]) {
719+
log.info('ending waiting request for %s', job.uuid);
720+
var o = waiting[job.uuid];
721+
722+
// res.send and res.format won't work here because headers were
723+
// flushed by writing a newline to the socket. instead, we format
724+
// the data ourselves and ship it off by assuming the user wanted
725+
// JSON. this is really lame. XXX
726+
o.res.end(JSON.stringify(job));
727+
o.next();
728+
delete waiting[job.uuid];
729+
}
730+
731+
}
732+
680733
// --- Routes
681734
// Workflows:
682735
server.get(WORKFLOWS_ROUTE, listWorkflows);
@@ -706,6 +759,8 @@ var API = module.exports = function (opts) {
706759
server.get(JOB_INFO_ROUTE, getInfo);
707760
server.head(JOB_INFO_ROUTE, getInfo);
708761
server.post(JOB_INFO_ROUTE, postInfo);
762+
// Job done (callback):
763+
server.post(JOB_DONE_ROUTE, jobDone);
709764
// Ping:
710765
server.get(PING_ROUTE, function (req, res, next) {
711766
var data = {

lib/job-runner.js

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,8 @@ var WorkflowJobRunner = module.exports = function (opts) {
6262
// pointer to child process forked by runTask
6363
var child = null;
6464
// Properties of job object which a task should not be allowed to modify:
65-
// Properties of job object which a task should not be allowed to modify:
6665
var frozen_props = [
67-
'chain', 'chain_results', 'onerror', 'onerror_results',
66+
'callback_urls', 'chain', 'chain_results', 'onerror', 'onerror_results',
6867
'exec_after', 'timeout', 'elapsed', 'uuid', 'workflow_uuid',
6968
'name', 'execution', 'num_attempts', 'max_attempts', 'initial_delay',
7069
'max_delay', 'prev_attempt', 'oncancel', 'oncancel_results',
@@ -90,6 +89,10 @@ var WorkflowJobRunner = module.exports = function (opts) {
9089
job.chain_results = [];
9190
}
9291

92+
if (!job.callback_urls) {
93+
job.callback_urls = [];
94+
}
95+
9396
if (job.onerror && !job.onerror_results) {
9497
job.onerror_results = [];
9598
}

0 commit comments

Comments
 (0)