@@ -43,13 +43,20 @@ impl State {
4343 // Generate next unique ID in a distributed sequence.
4444 // The `node_id` argument must be globally unique.
4545 fn next_id ( & mut self , node_id : u64 , id_offset : u64 ) -> u64 {
46- let mut now = wait_until ( self . last_timestamp_ms ) ;
46+ self . next_id_with ( node_id, id_offset, & mut now_ms)
47+ }
48+
49+ fn next_id_with < F > ( & mut self , node_id : u64 , id_offset : u64 , now_ms : & mut F ) -> u64
50+ where
51+ F : FnMut ( ) -> u64 ,
52+ {
53+ let mut now = wait_until_with ( self . last_timestamp_ms , now_ms) ;
4754
4855 if now == self . last_timestamp_ms {
4956 self . sequence = ( self . sequence + 1 ) & MAX_SEQUENCE ;
5057 // Wraparound.
5158 if self . sequence == 0 {
52- now = wait_until ( now + 1 ) ;
59+ now = wait_until_with ( now + 1 , now_ms ) ;
5360 }
5461 } else {
5562 // Reset sequence to zero once we reach next ms.
@@ -80,9 +87,10 @@ fn now_ms() -> u64 {
8087 . as_millis ( ) as u64
8188}
8289
83- // Get a monotonically increasing timestamp in ms.
84- // Protects against clock drift.
85- fn wait_until ( target_ms : u64 ) -> u64 {
90+ fn wait_until_with < F > ( target_ms : u64 , now_ms : & mut F ) -> u64
91+ where
92+ F : FnMut ( ) -> u64 ,
93+ {
8694 loop {
8795 let now = now_ms ( ) ;
8896 if now >= target_ms {
@@ -150,6 +158,26 @@ mod test {
150158
151159 use super :: * ;
152160
161+ fn decode_id ( id : u64 ) -> ( u64 , u64 , u64 ) {
162+ let sequence = id & MAX_SEQUENCE ;
163+ let node_id = ( id >> NODE_SHIFT ) & MAX_NODE_ID ;
164+ let elapsed_ms = id >> TIMESTAMP_SHIFT ;
165+ ( elapsed_ms, node_id, sequence)
166+ }
167+
168+ fn fake_clock ( readings : & [ u64 ] ) -> impl FnMut ( ) -> u64 + ' _ {
169+ assert ! ( !readings. is_empty( ) , "fake clock requires at least one reading" ) ;
170+
171+ let mut index = 0 ;
172+ let last = readings[ readings. len ( ) - 1 ] ;
173+
174+ move || {
175+ let reading = readings. get ( index) . copied ( ) . unwrap_or ( last) ;
176+ index += 1 ;
177+ reading
178+ }
179+ }
180+
153181 #[ test]
154182 fn test_unique_ids ( ) {
155183 unsafe {
@@ -238,6 +266,65 @@ mod test {
238266 assert ! ( matches!( extracted_seq2, 1 | 0 ) ) ; // Sequence incremented (or time advanced and reset to 0)
239267 }
240268
269+ #[ test]
270+ fn test_regression_fixed_clock_ids ( ) {
271+ let node = 42 ;
272+ let now = 1_775_590_268_126 ;
273+ let mut state = State :: default ( ) ;
274+ let readings = [ now, now, now + 1 ] ;
275+ let mut now_ms = fake_clock ( & readings) ;
276+
277+ let first = state. next_id_with ( node, 0 , & mut now_ms) ;
278+ let second = state. next_id_with ( node, 0 , & mut now_ms) ;
279+ let third = state. next_id_with ( node, 0 , & mut now_ms) ;
280+
281+ assert_eq ! ( first, 47_839_699_276_046_336 ) ;
282+ assert_eq ! ( second, 47_839_699_276_046_337 ) ;
283+ assert_eq ! ( third, 47_839_699_280_240_640 ) ;
284+
285+ assert_eq ! ( decode_id( first) , ( 11_405_873_126 , node, 0 ) ) ;
286+ assert_eq ! ( decode_id( second) , ( 11_405_873_126 , node, 1 ) ) ;
287+ assert_eq ! ( decode_id( third) , ( 11_405_873_127 , node, 0 ) ) ;
288+ }
289+
290+ #[ test]
291+ fn test_regression_sequence_wrap_waits_for_next_millisecond ( ) {
292+ let node = 7 ;
293+ let elapsed_ms = 99 ;
294+ let now = PGDOG_EPOCH + elapsed_ms;
295+ let mut state = State {
296+ last_timestamp_ms : now,
297+ sequence : MAX_SEQUENCE - 1 ,
298+ } ;
299+ let readings = [ now, now, now, now + 1 ] ;
300+ let mut now_ms = fake_clock ( & readings) ;
301+
302+ let last_in_millisecond = state. next_id_with ( node, 0 , & mut now_ms) ;
303+ let first_in_next_millisecond = state. next_id_with ( node, 0 , & mut now_ms) ;
304+
305+ assert_eq ! ( last_in_millisecond, 415_268_863 ) ;
306+ assert_eq ! ( first_in_next_millisecond, 419_459_072 ) ;
307+ assert ! (
308+ first_in_next_millisecond > last_in_millisecond,
309+ "IDs must remain monotonic across sequence wrap"
310+ ) ;
311+ }
312+
313+ #[ test]
314+ fn test_regression_offset_is_applied_after_packed_id ( ) {
315+ let node = 3 ;
316+ let elapsed_ms = 321 ;
317+ let offset = 1_000_000 ;
318+ let now = PGDOG_EPOCH + elapsed_ms;
319+ let mut state = State :: default ( ) ;
320+ let readings = [ now] ;
321+ let mut now_ms = fake_clock ( & readings) ;
322+
323+ let id = state. next_id_with ( node, offset, & mut now_ms) ;
324+
325+ assert_eq ! ( id, 1_347_383_872 ) ;
326+ }
327+
241328 #[ test]
242329 fn test_id_offset ( ) {
243330 let offset: u64 = 1_000_000_000 ;
0 commit comments