1+ 'use strict'
2+
13const RSMQ = require ( 'rsmq' )
24
35/*
@@ -6,98 +8,125 @@ const RSMQ = require('rsmq')
68 * respects message signatures that should be deferred until
79 * a later time (specified in config)
810 */
9- module . exports = function Queue ( opts ) {
10- // instantiate message queue
11- const rsmq = new RSMQ ( {
12- host : opts . host ,
13- port : opts . port
14- } )
15-
11+ let QueueWrapper = function ( options ) {
12+ this . options = options
13+
14+ // initialise queue
15+ this . rsmq = this . initialiseQueue ( )
16+
1617 // initialise connection state
17- var connected = null
18- rsmq . on ( 'connect' , ( ) => connected = true )
19- rsmq . on ( 'disconnect' , ( ) => connected = false )
20-
21- // do request, fail or wait
22- function request ( req , err ) {
23- if ( connected === true ) return req ( )
24- if ( connected === false ) return err ( )
25-
26- rsmq . on ( 'connect' , connect )
27- rsmq . on ( 'disconnect' , disconnect )
28-
29- function connect ( ) {
30- req ( )
31- removeListeners ( )
32- }
33-
34- function disconnect ( ) {
35- err ( )
36- removeListeners ( )
37- }
38-
39- function removeListeners ( ) {
40- rsmq . removeListener ( 'connect' , connect )
41- rsmq . removeListener ( 'disconnect' , disconnect )
18+ this . connected = null
19+
20+ this . rsmq . on ( 'connect' , ( ) => {
21+ this . connected = true
22+ } )
23+
24+ this . rsmq . on ( 'disconnect' , ( ) => {
25+ this . connected = false
26+ } )
27+ }
28+
29+ // instantiate message queue
30+ QueueWrapper . prototype . initialiseQueue = function ( ) {
31+ return new RSMQ ( {
32+ host : this . options . host ,
33+ port : this . options . port
34+ } )
35+ }
36+
37+ // public send function
38+ QueueWrapper . prototype . send = function ( message , done ) {
39+ const send = ( ) => {
40+ let options = {
41+ qname : this . options . name ,
42+ message : message ,
43+ delay : this . getDelay ( message )
4244 }
45+
46+ console . log ( options )
47+
48+ this . rsmq . sendMessage ( options , done )
4349 }
44-
45- // public send function
46- this . send = function ( message , done ) {
47- function send ( ) {
48- var options = {
49- qname : opts . name ,
50- message : message ,
51- delay : getDelay ( message )
52- }
53- rsmq . sendMessage ( options , done )
54- }
5550
56- function error ( ) {
57- done ( new Error ( 'Queue server connection refused' ) )
58- }
51+ function error ( ) {
52+ done ( new Error ( 'Queue server connection refused' ) )
53+ }
54+
55+ this . request ( send , error )
56+ }
5957
60- request ( send , error )
58+ // do request, fail or wait
59+ QueueWrapper . prototype . request = function ( req , err ) {
60+ let rsmq = this . rsmq
61+
62+ if ( this . connected === true ) {
63+ return req ( )
6164 }
6265
63- // determine message delay (or 0)
64- function getDelay ( message ) {
65- return isDeferred ( message )
66- ? untilStart ( ) / 1000
67- : 0
66+ if ( this . connected === false ) {
67+ return err ( )
6868 }
6969
70- // is message signature in deferred list?
71- function isDeferred ( message ) {
72- if ( ! opts . deferred ) return false
73- return opts . deferred . messages . some ( ( value ) => {
74- return message . startsWith ( value )
75- } )
70+ function connect ( ) {
71+ req ( )
72+ removeListeners ( )
7673 }
77-
78- // how long until deferred message window?
79- function untilStart ( ) {
80- var now = new Date ( )
81- var start = parseTime ( opts . deferred . start )
82- var stop = parseTime ( opts . deferred . stop )
83-
84- if ( now >= start ) { // in or later than window
85- if ( now < stop || stop < start ) return 0 // in window or rollover
86- return 24 * 60 * 60 * 1000 - ( now - start ) // later than window
87- }
88- if ( now < start ) { // in or before window
89- if ( now < stop && stop < start ) return 0 // in window or rollover
90- return start - now // earlier than window
91- }
74+
75+ function disconnect ( ) {
76+ err ( )
77+ removeListeners ( )
78+ }
79+
80+ function removeListeners ( ) {
81+ rsmq . removeListener ( 'connect' , connect )
82+ rsmq . removeListener ( 'disconnect' , disconnect )
83+ }
84+
85+ this . rsmq . on ( 'connect' , connect )
86+ this . rsmq . on ( 'disconnect' , disconnect )
87+ }
88+
89+ // determine message delay (or 0)
90+ QueueWrapper . prototype . getDelay = function ( message ) {
91+ return this . isDeferred ( message )
92+ ? this . untilStart ( ) / 1000
93+ : 0
94+ }
95+
96+ // is message signature in deferred list?
97+ QueueWrapper . prototype . isDeferred = function ( message ) {
98+ if ( ! this . options . deferred ) return false
99+
100+ if ( ! Array . isArray ( this . options . deferred . messages ) ) return false
101+
102+ return this . options . deferred . messages . some ( ( value ) => {
103+ return message . startsWith ( value )
104+ } )
105+ }
106+
107+ // how long until deferred message window?
108+ QueueWrapper . prototype . untilStart = function ( ) {
109+ const now = new Date ( )
110+ const start = this . parseTime ( this . options . deferred . start )
111+ const stop = this . parseTime ( this . options . deferred . stop )
112+
113+ if ( now >= start ) { // in or later than window
114+ if ( now < stop || stop < start ) return 0 // in window or rollover
115+ return 24 * 60 * 60 * 1000 - ( now - start ) // later than window
92116 }
93-
94- // parse time string from config file
95- function parseTime ( string ) {
96- var time = new Date ( )
97- var [ hrs , mins ] = string . split ( ':' )
98- time . setUTCHours ( hrs , mins )
99- return time
117+
118+ if ( now < start ) { // in or before window
119+ if ( now < stop && stop < start ) return 0 // in window or rollover
120+ return start - now // earlier than window
100121 }
101-
102- return this
103122}
123+
124+ // parse time string from configured options
125+ QueueWrapper . prototype . parseTime = function ( string ) {
126+ const time = new Date ( )
127+ const timeParts = string . split ( ':' )
128+ time . setUTCHours ( timeParts [ 0 ] , timeParts [ 1 ] )
129+ return time
130+ }
131+
132+ module . exports = QueueWrapper
0 commit comments