Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 95 additions & 5 deletions pgdog/src/unique_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,20 @@ impl State {
// Generate next unique ID in a distributed sequence.
// The `node_id` argument must be globally unique.
fn next_id(&mut self, node_id: u64, id_offset: u64) -> u64 {
let mut now = wait_until(self.last_timestamp_ms);
self.next_id_with(node_id, id_offset, &mut now_ms)
}

fn next_id_with<F>(&mut self, node_id: u64, id_offset: u64, now_ms: &mut F) -> u64
where
F: FnMut() -> u64,
{
let mut now = wait_until_with(self.last_timestamp_ms, now_ms);

if now == self.last_timestamp_ms {
self.sequence = (self.sequence + 1) & MAX_SEQUENCE;
// Wraparound.
if self.sequence == 0 {
now = wait_until(now + 1);
now = wait_until_with(now + 1, now_ms);
}
} else {
// Reset sequence to zero once we reach next ms.
Expand Down Expand Up @@ -80,9 +87,10 @@ fn now_ms() -> u64 {
.as_millis() as u64
}

// Get a monotonically increasing timestamp in ms.
// Protects against clock drift.
fn wait_until(target_ms: u64) -> u64 {
fn wait_until_with<F>(target_ms: u64, now_ms: &mut F) -> u64
where
F: FnMut() -> u64,
{
loop {
let now = now_ms();
if now >= target_ms {
Expand Down Expand Up @@ -150,6 +158,29 @@ mod test {

use super::*;

fn decode_id(id: u64) -> (u64, u64, u64) {
let sequence = id & MAX_SEQUENCE;
let node_id = (id >> NODE_SHIFT) & MAX_NODE_ID;
let elapsed_ms = id >> TIMESTAMP_SHIFT;
(elapsed_ms, node_id, sequence)
}

fn fake_clock(readings: &[u64]) -> impl FnMut() -> u64 + '_ {
assert!(
!readings.is_empty(),
"fake clock requires at least one reading"
);

let mut index = 0;
let last = readings[readings.len() - 1];

move || {
let reading = readings.get(index).copied().unwrap_or(last);
index += 1;
reading
}
}

#[test]
fn test_unique_ids() {
unsafe {
Expand Down Expand Up @@ -238,6 +269,65 @@ mod test {
assert!(matches!(extracted_seq2, 1 | 0)); // Sequence incremented (or time advanced and reset to 0)
}

#[test]
fn test_regression_fixed_clock_ids() {
let node = 42;
let now = 1_775_590_268_126;
let mut state = State::default();
let readings = [now, now, now + 1];
let mut now_ms = fake_clock(&readings);

let first = state.next_id_with(node, 0, &mut now_ms);
let second = state.next_id_with(node, 0, &mut now_ms);
let third = state.next_id_with(node, 0, &mut now_ms);

assert_eq!(first, 47_839_699_276_046_336);
assert_eq!(second, 47_839_699_276_046_337);
assert_eq!(third, 47_839_699_280_240_640);

assert_eq!(decode_id(first), (11_405_873_126, node, 0));
assert_eq!(decode_id(second), (11_405_873_126, node, 1));
assert_eq!(decode_id(third), (11_405_873_127, node, 0));
}

#[test]
fn test_regression_sequence_wrap_waits_for_next_millisecond() {
let node = 7;
let elapsed_ms = 99;
let now = PGDOG_EPOCH + elapsed_ms;
let mut state = State {
last_timestamp_ms: now,
sequence: MAX_SEQUENCE - 1,
};
let readings = [now, now, now, now + 1];
let mut now_ms = fake_clock(&readings);

let last_in_millisecond = state.next_id_with(node, 0, &mut now_ms);
let first_in_next_millisecond = state.next_id_with(node, 0, &mut now_ms);

assert_eq!(last_in_millisecond, 415_268_863);
assert_eq!(first_in_next_millisecond, 419_459_072);
assert!(
first_in_next_millisecond > last_in_millisecond,
"IDs must remain monotonic across sequence wrap"
);
}

#[test]
fn test_regression_offset_is_applied_after_packed_id() {
let node = 3;
let elapsed_ms = 321;
let offset = 1_000_000;
let now = PGDOG_EPOCH + elapsed_ms;
let mut state = State::default();
let readings = [now];
let mut now_ms = fake_clock(&readings);

let id = state.next_id_with(node, offset, &mut now_ms);

assert_eq!(id, 1_347_383_872);
}

#[test]
fn test_id_offset() {
let offset: u64 = 1_000_000_000;
Expand Down
Loading