@@ -43,13 +43,20 @@ impl State {
4343 // Generate next unique ID in a distributed sequence.
4444 // The `node_id` argument must be globally unique.
4545 fn next_id ( & mut self , node_id : u64 , id_offset : u64 ) -> u64 {
46- let mut now = wait_until ( self . last_timestamp_ms ) ;
46+ self . next_id_with ( node_id, id_offset, & mut now_ms)
47+ }
48+
49+ fn next_id_with < F > ( & mut self , node_id : u64 , id_offset : u64 , now_ms : & mut F ) -> u64
50+ where
51+ F : FnMut ( ) -> u64 ,
52+ {
53+ let mut now = wait_until_with ( self . last_timestamp_ms , now_ms) ;
4754
4855 if now == self . last_timestamp_ms {
4956 self . sequence = ( self . sequence + 1 ) & MAX_SEQUENCE ;
5057 // Wraparound.
5158 if self . sequence == 0 {
52- now = wait_until ( now + 1 ) ;
59+ now = wait_until_with ( now + 1 , now_ms ) ;
5360 }
5461 } else {
5562 // Reset sequence to zero once we reach next ms.
@@ -80,9 +87,10 @@ fn now_ms() -> u64 {
8087 . as_millis ( ) as u64
8188}
8289
83- // Get a monotonically increasing timestamp in ms.
84- // Protects against clock drift.
85- fn wait_until ( target_ms : u64 ) -> u64 {
90+ fn wait_until_with < F > ( target_ms : u64 , now_ms : & mut F ) -> u64
91+ where
92+ F : FnMut ( ) -> u64 ,
93+ {
8694 loop {
8795 let now = now_ms ( ) ;
8896 if now >= target_ms {
@@ -150,6 +158,29 @@ mod test {
150158
151159 use super :: * ;
152160
161+ fn decode_id ( id : u64 ) -> ( u64 , u64 , u64 ) {
162+ let sequence = id & MAX_SEQUENCE ;
163+ let node_id = ( id >> NODE_SHIFT ) & MAX_NODE_ID ;
164+ let elapsed_ms = id >> TIMESTAMP_SHIFT ;
165+ ( elapsed_ms, node_id, sequence)
166+ }
167+
168+ fn fake_clock ( readings : & [ u64 ] ) -> impl FnMut ( ) -> u64 + ' _ {
169+ assert ! (
170+ !readings. is_empty( ) ,
171+ "fake clock requires at least one reading"
172+ ) ;
173+
174+ let mut index = 0 ;
175+ let last = readings[ readings. len ( ) - 1 ] ;
176+
177+ move || {
178+ let reading = readings. get ( index) . copied ( ) . unwrap_or ( last) ;
179+ index += 1 ;
180+ reading
181+ }
182+ }
183+
153184 #[ test]
154185 fn test_unique_ids ( ) {
155186 unsafe {
@@ -238,6 +269,65 @@ mod test {
238269 assert ! ( matches!( extracted_seq2, 1 | 0 ) ) ; // Sequence incremented (or time advanced and reset to 0)
239270 }
240271
272+ #[ test]
273+ fn test_regression_fixed_clock_ids ( ) {
274+ let node = 42 ;
275+ let now = 1_775_590_268_126 ;
276+ let mut state = State :: default ( ) ;
277+ let readings = [ now, now, now + 1 ] ;
278+ let mut now_ms = fake_clock ( & readings) ;
279+
280+ let first = state. next_id_with ( node, 0 , & mut now_ms) ;
281+ let second = state. next_id_with ( node, 0 , & mut now_ms) ;
282+ let third = state. next_id_with ( node, 0 , & mut now_ms) ;
283+
284+ assert_eq ! ( first, 47_839_699_276_046_336 ) ;
285+ assert_eq ! ( second, 47_839_699_276_046_337 ) ;
286+ assert_eq ! ( third, 47_839_699_280_240_640 ) ;
287+
288+ assert_eq ! ( decode_id( first) , ( 11_405_873_126 , node, 0 ) ) ;
289+ assert_eq ! ( decode_id( second) , ( 11_405_873_126 , node, 1 ) ) ;
290+ assert_eq ! ( decode_id( third) , ( 11_405_873_127 , node, 0 ) ) ;
291+ }
292+
293+ #[ test]
294+ fn test_regression_sequence_wrap_waits_for_next_millisecond ( ) {
295+ let node = 7 ;
296+ let elapsed_ms = 99 ;
297+ let now = PGDOG_EPOCH + elapsed_ms;
298+ let mut state = State {
299+ last_timestamp_ms : now,
300+ sequence : MAX_SEQUENCE - 1 ,
301+ } ;
302+ let readings = [ now, now, now, now + 1 ] ;
303+ let mut now_ms = fake_clock ( & readings) ;
304+
305+ let last_in_millisecond = state. next_id_with ( node, 0 , & mut now_ms) ;
306+ let first_in_next_millisecond = state. next_id_with ( node, 0 , & mut now_ms) ;
307+
308+ assert_eq ! ( last_in_millisecond, 415_268_863 ) ;
309+ assert_eq ! ( first_in_next_millisecond, 419_459_072 ) ;
310+ assert ! (
311+ first_in_next_millisecond > last_in_millisecond,
312+ "IDs must remain monotonic across sequence wrap"
313+ ) ;
314+ }
315+
316+ #[ test]
317+ fn test_regression_offset_is_applied_after_packed_id ( ) {
318+ let node = 3 ;
319+ let elapsed_ms = 321 ;
320+ let offset = 1_000_000 ;
321+ let now = PGDOG_EPOCH + elapsed_ms;
322+ let mut state = State :: default ( ) ;
323+ let readings = [ now] ;
324+ let mut now_ms = fake_clock ( & readings) ;
325+
326+ let id = state. next_id_with ( node, offset, & mut now_ms) ;
327+
328+ assert_eq ! ( id, 1_347_383_872 ) ;
329+ }
330+
241331 #[ test]
242332 fn test_id_offset ( ) {
243333 let offset: u64 = 1_000_000_000 ;
0 commit comments