From 1cc149d72b6a5caa8c32ddad39c1a0f3a02c514f Mon Sep 17 00:00:00 2001 From: James Date: Tue, 7 Apr 2026 11:12:40 -0400 Subject: [PATCH 01/19] feat: add cursor caching methods to Cache trait Extend DbCache with cursor pointer storage and add take_cursor, return_cursor, drain_cursors to the Cache trait. Both RefCell and SharedCache implement the new methods. Co-Authored-By: Claude Sonnet 4.6 --- src/tx/cache.rs | 117 ++++++++++++++++++++++++++++++++++++------------ 1 file changed, 88 insertions(+), 29 deletions(-) diff --git a/src/tx/cache.rs b/src/tx/cache.rs index ff236bc..2767062 100644 --- a/src/tx/cache.rs +++ b/src/tx/cache.rs @@ -23,7 +23,7 @@ use std::{ sync::Arc, }; -/// Cache trait for transaction-local database handles. +/// Cache trait for transaction-local database handles and cursors. /// /// This is used by the [`SyncKind`] trait to define the cache type for each /// transaction kind. @@ -38,6 +38,16 @@ pub trait Cache: Clone + Default + std::fmt::Debug { /// Remove a database entry from the cache by dbi. fn remove_dbi(&self, dbi: ffi::MDBX_dbi); + + /// Take a cached cursor for the given DBI, if one exists. + fn take_cursor(&self, dbi: ffi::MDBX_dbi) -> Option<*mut ffi::MDBX_cursor>; + + /// Return a cursor to the cache for later reuse. + fn return_cursor(&self, dbi: ffi::MDBX_dbi, cursor: *mut ffi::MDBX_cursor); + + /// Drain all cached cursors, returning their raw pointers. + /// The caller is responsible for closing them via FFI. + fn drain_cursors(&self) -> SmallVec<[*mut ffi::MDBX_cursor; 8]>; } /// Cached database entry. @@ -73,37 +83,68 @@ impl From for Database { } } -/// Simple cache container for database handles. +/// Simple cache container for database handles and cursor pointers. /// /// Uses inline storage for the common case (most apps use < 16 databases). -#[derive(Debug, Default, Clone)] -#[repr(transparent)] -pub struct DbCache(SmallVec<[CachedDb; 16]>); +#[derive(Debug)] +pub struct DbCache { + dbs: SmallVec<[CachedDb; 16]>, + cursors: SmallVec<[(ffi::MDBX_dbi, *mut ffi::MDBX_cursor); 8]>, +} + +// SAFETY: DbCache contains `*mut ffi::MDBX_cursor` which is `!Send + !Sync`. +// These are raw MDBX cursor pointers bound to a transaction, not a thread. +// `Cursor` itself is already `Send + Sync` (see cursor.rs), so caching the +// same pointers here introduces no new unsoundness. All access to these +// pointers is mediated by `RefCell` (unsync path) or `RwLock` (sync path), +// ensuring no concurrent mutation. +unsafe impl Send for DbCache {} +unsafe impl Sync for DbCache {} + +impl Default for DbCache { + fn default() -> Self { + Self { dbs: SmallVec::new(), cursors: SmallVec::new() } + } +} + +impl Clone for DbCache { + fn clone(&self) -> Self { + Self { dbs: self.dbs.clone(), cursors: SmallVec::new() } + } +} impl DbCache { /// Read a database entry from the cache. fn read_db(&self, name_hash: u64) -> Option { - for entry in self.0.iter() { - if entry.name_hash == name_hash { - return Some(entry.db); - } - } - None + self.dbs.iter().find(|e| e.name_hash == name_hash).map(|e| e.db) } /// Write a database entry to the cache. fn write_db(&mut self, db: CachedDb) { - for entry in self.0.iter() { - if entry.name_hash == db.name_hash { - return; // Another thread beat us - } + if self.dbs.iter().any(|e| e.name_hash == db.name_hash) { + return; } - self.0.push(db); + self.dbs.push(db); } /// Remove a database entry from the cache by dbi. fn remove_dbi(&mut self, dbi: ffi::MDBX_dbi) { - self.0.retain(|entry| entry.db.dbi() != dbi); + self.dbs.retain(|entry| entry.db.dbi() != dbi); + } + + /// Take a cached cursor for the given DBI, if one exists. + fn take_cursor(&mut self, dbi: ffi::MDBX_dbi) -> Option<*mut ffi::MDBX_cursor> { + self.cursors.iter().position(|(d, _)| *d == dbi).map(|i| self.cursors.swap_remove(i).1) + } + + /// Return a cursor to the cache for later reuse. + fn return_cursor(&mut self, dbi: ffi::MDBX_dbi, cursor: *mut ffi::MDBX_cursor) { + self.cursors.push((dbi, cursor)); + } + + /// Drain all cached cursors, returning their raw pointers. + fn drain_cursors(&mut self) -> SmallVec<[*mut ffi::MDBX_cursor; 8]> { + self.cursors.drain(..).map(|(_, c)| c).collect() } } @@ -135,20 +176,29 @@ impl SharedCache { impl Cache for SharedCache { /// Read a database entry from the cache. fn read_db(&self, name_hash: u64) -> Option { - let cache = self.read(); - cache.read_db(name_hash) + self.read().read_db(name_hash) } /// Write a database entry to the cache. fn write_db(&self, db: CachedDb) { - let mut cache = self.write(); - cache.write_db(db); + self.write().write_db(db); } /// Remove a database entry from the cache by dbi. fn remove_dbi(&self, dbi: ffi::MDBX_dbi) { - let mut cache = self.write(); - cache.remove_dbi(dbi); + self.write().remove_dbi(dbi); + } + + fn take_cursor(&self, dbi: ffi::MDBX_dbi) -> Option<*mut ffi::MDBX_cursor> { + self.write().take_cursor(dbi) + } + + fn return_cursor(&self, dbi: ffi::MDBX_dbi, cursor: *mut ffi::MDBX_cursor) { + self.write().return_cursor(dbi, cursor); + } + + fn drain_cursors(&self) -> SmallVec<[*mut ffi::MDBX_cursor; 8]> { + self.write().drain_cursors() } } @@ -161,19 +211,28 @@ impl Default for SharedCache { impl Cache for RefCell { /// Read a database entry from the cache. fn read_db(&self, name_hash: u64) -> Option { - let cache = self.borrow(); - cache.read_db(name_hash) + self.borrow().read_db(name_hash) } /// Write a database entry to the cache. fn write_db(&self, db: CachedDb) { - let mut cache = self.borrow_mut(); - cache.write_db(db); + self.borrow_mut().write_db(db); } /// Remove a database entry from the cache by dbi. fn remove_dbi(&self, dbi: ffi::MDBX_dbi) { - let mut cache = self.borrow_mut(); - cache.remove_dbi(dbi); + self.borrow_mut().remove_dbi(dbi); + } + + fn take_cursor(&self, dbi: ffi::MDBX_dbi) -> Option<*mut ffi::MDBX_cursor> { + self.borrow_mut().take_cursor(dbi) + } + + fn return_cursor(&self, dbi: ffi::MDBX_dbi, cursor: *mut ffi::MDBX_cursor) { + self.borrow_mut().return_cursor(dbi, cursor); + } + + fn drain_cursors(&self) -> SmallVec<[*mut ffi::MDBX_cursor; 8]> { + self.borrow_mut().drain_cursors() } } From d5868e5286cdf85667105bacfa0900fe0cfb52c4 Mon Sep 17 00:00:00 2001 From: James Date: Tue, 7 Apr 2026 11:18:26 -0400 Subject: [PATCH 02/19] feat: add cache field to Cursor, return pointer on drop Cursor now holds a &'tx K::Cache reference. Drop returns the raw pointer to the cache instead of calling mdbx_cursor_close. Add from_raw constructor for cache-hit path. Co-Authored-By: Claude Sonnet 4.6 --- src/tx/cursor.rs | 39 +++++++++++++++++++++++++++++++-------- 1 file changed, 31 insertions(+), 8 deletions(-) diff --git a/src/tx/cursor.rs b/src/tx/cursor.rs index 9c058e5..b0c389a 100644 --- a/src/tx/cursor.rs +++ b/src/tx/cursor.rs @@ -6,6 +6,7 @@ use crate::{ tx::{ TxPtrAccess, aliases::IterKeyVals, + cache::Cache, iter::{Iter, IterDup, IterDupFixed, IterDupFixedOfKey, IterDupOfKey}, kind::WriteMarker, }, @@ -33,6 +34,7 @@ where K: TransactionKind, { access: &'tx K::Access, + cache: &'tx K::Cache, cursor: *mut ffi::MDBX_cursor, db: Database, _kind: PhantomData, @@ -43,12 +45,28 @@ where K: TransactionKind, { /// Creates a new cursor from a reference to a transaction access type. - pub(crate) fn new(access: &'tx K::Access, db: Database) -> MdbxResult { + pub(crate) fn new( + access: &'tx K::Access, + cache: &'tx K::Cache, + db: Database, + ) -> MdbxResult { let mut cursor: *mut ffi::MDBX_cursor = ptr::null_mut(); access.with_txn_ptr(|txn_ptr| unsafe { mdbx_result(ffi::mdbx_cursor_open(txn_ptr, db.dbi(), &mut cursor)) })?; - Ok(Self { access, cursor, db, _kind: PhantomData }) + Ok(Self { access, cache, cursor, db, _kind: PhantomData }) + } + + /// Wraps an existing raw cursor pointer with cache support. + /// + /// The cursor must already be bound to the correct transaction and DBI. + pub(crate) fn from_raw( + access: &'tx K::Access, + cache: &'tx K::Cache, + cursor: *mut ffi::MDBX_cursor, + db: Database, + ) -> Self { + Self { access, cache, cursor, db, _kind: PhantomData } } /// Helper function for `Clone`. This should only be invoked within @@ -59,7 +77,13 @@ where let res = ffi::mdbx_cursor_copy(other.cursor(), cursor); - let s = Self { access: other.access, cursor, db: other.db, _kind: PhantomData }; + let s = Self { + access: other.access, + cache: other.cache, + cursor, + db: other.db, + _kind: PhantomData, + }; mdbx_result(res)?; @@ -1072,11 +1096,10 @@ where K: TransactionKind, { fn drop(&mut self) { - // MDBX cursors MUST be closed. Failure to do so is a memory leak. - // - // To be able to close a cursor of a timed out transaction, we need to - // renew it first. Hence the usage of `with_txn_ptr_for_cleanup` here. - self.access.with_txn_ptr(|_| unsafe { ffi::mdbx_cursor_close(self.cursor) }); + // Return the cursor pointer to the transaction cache for reuse. + // The transaction's commit/drop path will call mdbx_cursor_close on + // all cached pointers once the transaction is still valid. + self.cache.return_cursor(self.db.dbi(), self.cursor); } } From 158d9184f9d89e09271d58c1bdcb4eeb9d00cc6b Mon Sep 17 00:00:00 2001 From: James Date: Tue, 7 Apr 2026 11:21:03 -0400 Subject: [PATCH 03/19] feat: cache-aware Tx::cursor() and drain on commit/drop Tx::cursor() checks the cursor cache before allocating. Commit and drop paths drain cached cursors inside with_txn_ptr to ensure all FFI close calls are properly serialized. Co-Authored-By: Claude Sonnet 4.6 --- src/tx/cursor.rs | 2 +- src/tx/impl.rs | 59 +++++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 54 insertions(+), 7 deletions(-) diff --git a/src/tx/cursor.rs b/src/tx/cursor.rs index b0c389a..bb2b92d 100644 --- a/src/tx/cursor.rs +++ b/src/tx/cursor.rs @@ -60,7 +60,7 @@ where /// Wraps an existing raw cursor pointer with cache support. /// /// The cursor must already be bound to the correct transaction and DBI. - pub(crate) fn from_raw( + pub(crate) const fn from_raw( access: &'tx K::Access, cache: &'tx K::Cache, cursor: *mut ffi::MDBX_cursor, diff --git a/src/tx/impl.rs b/src/tx/impl.rs index 70e3513..1b45012 100644 --- a/src/tx/impl.rs +++ b/src/tx/impl.rs @@ -47,7 +47,7 @@ impl fmt::Debug for TxMeta { /// /// [`TxSync`]: crate::tx::aliases::TxSync /// [`TxUnsync`]: crate::tx::aliases::TxUnsync -pub struct Tx::Access> { +pub struct Tx::Access> { txn: U, cache: K::Cache, @@ -55,7 +55,7 @@ pub struct Tx::Access> { meta: TxMeta, } -impl fmt::Debug for Tx { +impl fmt::Debug for Tx { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Tx").finish_non_exhaustive() } @@ -238,11 +238,33 @@ where /// Opens a cursor on the given database. /// - /// Multiple cursors can be open simultaneously on different databases - /// within the same transaction. The cursor borrows the transaction's - /// inner access type, allowing concurrent cursor operations. + /// Cursors are transparently cached: dropped cursors return their + /// raw pointer to the cache, and subsequent calls reuse them without + /// a new `mdbx_cursor_open` allocation. pub fn cursor(&self, db: Database) -> MdbxResult> { - Cursor::new(&self.txn, db) + if let Some(raw) = self.cache.take_cursor(db.dbi()) { + Ok(Cursor::from_raw(&self.txn, &self.cache, raw, db)) + } else { + Cursor::new(&self.txn, &self.cache, db) + } + } + + /// Drains the cursor cache and closes all cached cursor pointers. + /// + /// Must be called before commit or abort to ensure all cursors are + /// closed while the transaction is still valid. + fn drain_cached_cursors(&self) { + let cursors = self.cache.drain_cursors(); + if cursors.is_empty() { + return; + } + self.with_txn_ptr(|_| { + for cursor in cursors { + // SAFETY: cursor pointers are valid — they were returned + // by Cursor::drop during the lifetime of this transaction. + unsafe { ffi::mdbx_cursor_close(cursor) }; + } + }); } } @@ -489,6 +511,8 @@ where /// /// SAFETY: latency pointer must be valid for the duration of the commit. fn commit_inner(self, latency: *mut MDBX_commit_latency) -> MdbxResult<()> { + self.drain_cached_cursors(); + let was_aborted = self.with_txn_ptr(|txn| { if K::IS_READ_ONLY { mdbx_result(unsafe { ffi::mdbx_txn_commit_ex(txn, latency) }) @@ -542,6 +566,8 @@ where // span scope. let _guard = self.meta.span.clone().entered(); + self.drain_cached_cursors(); + // SAFETY: txn_ptr is valid from with_txn_ptr. let was_aborted = self.with_txn_ptr(|txn_ptr| unsafe { ops::commit_raw(txn_ptr, latency) })?; @@ -652,6 +678,27 @@ where } } +impl Drop for Tx +where + K: TransactionKind, + U: TxPtrAccess, +{ + fn drop(&mut self) { + let cursors = self.cache.drain_cursors(); + if cursors.is_empty() { + return; + } + self.txn.with_txn_ptr(|_| { + for cursor in cursors { + // SAFETY: cursor pointers were returned by Cursor::drop + // during the lifetime of this transaction, which is still + // alive (we are in Tx::drop, before txn ptr is dropped). + unsafe { ffi::mdbx_cursor_close(cursor) }; + } + }); + } +} + #[cfg(test)] mod tests { use super::*; From ab5817d6a5ecd5834c3c0f5f86b13b071d726d5a Mon Sep 17 00:00:00 2001 From: James Date: Tue, 7 Apr 2026 11:25:13 -0400 Subject: [PATCH 04/19] test: add cursor caching tests Verify cursor reuse across open/drop cycles, multiple cursors on the same DB, and repeated cycle stability. Co-Authored-By: Claude Sonnet 4.6 --- tests/cursor.rs | 128 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 128 insertions(+) diff --git a/tests/cursor.rs b/tests/cursor.rs index c5d6cfa..f655dee 100644 --- a/tests/cursor.rs +++ b/tests/cursor.rs @@ -1729,6 +1729,134 @@ fn test_put_multiple_empty_values_v2() { test_put_multiple_empty_values_impl(V2Factory::begin_rw, V2Factory::begin_ro); } +fn test_cursor_cache_reuse_impl( + begin_rw: impl Fn(&Environment) -> MdbxResult, + _begin_ro: impl Fn(&Environment) -> MdbxResult, +) where + RwTx: TestRwTxn, + RoTx: TestRoTxn, +{ + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + + let txn = begin_rw(&env).unwrap(); + let db = txn.create_db(None, DatabaseFlags::empty()).unwrap(); + txn.put(db, b"key1", b"val1", WriteFlags::empty()).unwrap(); + txn.put(db, b"key2", b"val2", WriteFlags::empty()).unwrap(); + + // First cursor: open, use, drop (returns to cache) + { + let mut cursor = txn.cursor(db).unwrap(); + let (k, v) = cursor.first::, Vec>().unwrap().unwrap(); + assert_eq!(&k, b"key1"); + assert_eq!(&v, b"val1"); + } + + // Second cursor: should reuse cached pointer + { + let mut cursor = txn.cursor(db).unwrap(); + let (k, v) = cursor.first::, Vec>().unwrap().unwrap(); + assert_eq!(&k, b"key1"); + assert_eq!(&v, b"val1"); + + let (k, v) = cursor.next::, Vec>().unwrap().unwrap(); + assert_eq!(&k, b"key2"); + assert_eq!(&v, b"val2"); + } + + txn.commit().unwrap(); +} + +#[test] +fn test_cursor_cache_reuse_v1() { + test_cursor_cache_reuse_impl(V1Factory::begin_rw, V1Factory::begin_ro); +} + +#[test] +fn test_cursor_cache_reuse_v2() { + test_cursor_cache_reuse_impl(V2Factory::begin_rw, V2Factory::begin_ro); +} + +fn test_cursor_cache_multiple_impl( + begin_rw: impl Fn(&Environment) -> MdbxResult, + _begin_ro: impl Fn(&Environment) -> MdbxResult, +) where + RwTx: TestRwTxn, + RoTx: TestRoTxn, +{ + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + + let txn = begin_rw(&env).unwrap(); + let db = txn.create_db(None, DatabaseFlags::empty()).unwrap(); + txn.put(db, b"a", b"1", WriteFlags::empty()).unwrap(); + txn.put(db, b"b", b"2", WriteFlags::empty()).unwrap(); + txn.put(db, b"c", b"3", WriteFlags::empty()).unwrap(); + + // Open two cursors, drop both (both return to cache) + { + let _c1 = txn.cursor(db).unwrap(); + let _c2 = txn.cursor(db).unwrap(); + } + + // Open two again — both should reuse cached pointers + { + let mut c1 = txn.cursor(db).unwrap(); + let mut c2 = txn.cursor(db).unwrap(); + + let (k1, _) = c1.first::, Vec>().unwrap().unwrap(); + let (k2, _) = c2.last::, Vec>().unwrap().unwrap(); + assert_eq!(&k1, b"a"); + assert_eq!(&k2, b"c"); + } + + txn.commit().unwrap(); +} + +#[test] +fn test_cursor_cache_multiple_v1() { + test_cursor_cache_multiple_impl(V1Factory::begin_rw, V1Factory::begin_ro); +} + +#[test] +fn test_cursor_cache_multiple_v2() { + test_cursor_cache_multiple_impl(V2Factory::begin_rw, V2Factory::begin_ro); +} + +fn test_cursor_cache_repeated_cycles_impl( + begin_rw: impl Fn(&Environment) -> MdbxResult, + _begin_ro: impl Fn(&Environment) -> MdbxResult, +) where + RwTx: TestRwTxn, + RoTx: TestRoTxn, +{ + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + + let txn = begin_rw(&env).unwrap(); + let db = txn.create_db(None, DatabaseFlags::empty()).unwrap(); + txn.put(db, b"key", b"val", WriteFlags::empty()).unwrap(); + + for _ in 0..100 { + let mut cursor = txn.cursor(db).unwrap(); + let (k, v) = cursor.first::, Vec>().unwrap().unwrap(); + assert_eq!(&k, b"key"); + assert_eq!(&v, b"val"); + } + + txn.commit().unwrap(); +} + +#[test] +fn test_cursor_cache_repeated_cycles_v1() { + test_cursor_cache_repeated_cycles_impl(V1Factory::begin_rw, V1Factory::begin_ro); +} + +#[test] +fn test_cursor_cache_repeated_cycles_v2() { + test_cursor_cache_repeated_cycles_impl(V2Factory::begin_rw, V2Factory::begin_ro); +} + // Release-build test: verify runtime error instead of panic #[cfg(not(debug_assertions))] #[test] From 3ceb76afaa2db2fa33ef43c0124dc97111b17e95 Mon Sep 17 00:00:00 2001 From: James Date: Tue, 7 Apr 2026 11:30:26 -0400 Subject: [PATCH 05/19] docs: document cursor caching Send/Sync and Drop constraint Document the unsafe Send + Sync impls on DbCache in CLAUDE.md with full justification. Add code comment explaining why Tx::Drop inlines drain logic instead of calling drain_cached_cursors (type system constraint: Drop is on Tx, helper is on Tx). --- CLAUDE.md | 17 ++++++++++++++++- src/tx/impl.rs | 4 ++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/CLAUDE.md b/CLAUDE.md index f8fc8c1..09c55a8 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -33,13 +33,28 @@ be mediated via the `TxAccess` trait. ## API Patterns -### Cursor Creation +### Cursor Creation and Caching ```rust let db = txn.open_db(None).unwrap(); // Returns Database (has dbi + flags) let cursor = txn.cursor(db).unwrap(); // Takes Database, NOT raw dbi ``` +Cursors are transparently cached within transactions. When a cursor is +dropped, its raw pointer is returned to the transaction's cache. Subsequent +`cursor()` calls reuse cached pointers, avoiding `mdbx_cursor_open`/ +`mdbx_cursor_close` overhead (~100 ns per cycle). The cache is drained +and all pointers closed on commit or abort. + +`DbCache` (in `src/tx/cache.rs`) stores raw `*mut ffi::MDBX_cursor` +pointers, which makes it `!Send + !Sync` by default. Explicit `unsafe impl +Send + Sync for DbCache` is required because: +- `SyncKind::Cache` requires `Cache + Send` (for `RefCell: Send`) +- `SharedCache` uses `Arc>` which requires `DbCache: Send + Sync` +- This is sound because `Cursor` itself is already `unsafe impl Send + Sync`, + and all access to cached pointers is mediated by `RefCell` (unsync) or + `RwLock` (sync) + ### Database Flags Validation DUP_SORT/DUP_FIXED methods validate flags at runtime: diff --git a/src/tx/impl.rs b/src/tx/impl.rs index 1b45012..777efea 100644 --- a/src/tx/impl.rs +++ b/src/tx/impl.rs @@ -678,6 +678,10 @@ where } } +// NOTE: This impl is on Tx with free U, not Tx (where U = K::Access). +// Rust requires Drop bounds to match the struct definition exactly, so we +// cannot call `self.drain_cached_cursors()` here (it lives on `impl Tx`). +// The drain-and-close logic is inlined instead. impl Drop for Tx where K: TransactionKind, From b97eff257a0f96d29ec0d8b67edaf9412c8fad76 Mon Sep 17 00:00:00 2001 From: James Date: Thu, 9 Apr 2026 09:30:11 -0400 Subject: [PATCH 06/19] fix: drain cached cursors on close_db and drop_db Prevents use-after-free when a DBI is closed or dropped while stale cursor pointers for it remain in the cursor cache. --- src/tx/cache.rs | 29 +++++++++++++++++++++++++++++ src/tx/impl.rs | 28 ++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+) diff --git a/src/tx/cache.rs b/src/tx/cache.rs index 2767062..8c079cf 100644 --- a/src/tx/cache.rs +++ b/src/tx/cache.rs @@ -48,6 +48,10 @@ pub trait Cache: Clone + Default + std::fmt::Debug { /// Drain all cached cursors, returning their raw pointers. /// The caller is responsible for closing them via FFI. fn drain_cursors(&self) -> SmallVec<[*mut ffi::MDBX_cursor; 8]>; + + /// Drain cached cursors for a specific DBI, returning their raw pointers. + /// The caller is responsible for closing them via FFI. + fn drain_cursors_for_dbi(&self, dbi: ffi::MDBX_dbi) -> SmallVec<[*mut ffi::MDBX_cursor; 8]>; } /// Cached database entry. @@ -146,6 +150,23 @@ impl DbCache { fn drain_cursors(&mut self) -> SmallVec<[*mut ffi::MDBX_cursor; 8]> { self.cursors.drain(..).map(|(_, c)| c).collect() } + + /// Drain cached cursors for a specific DBI, returning their raw pointers. + fn drain_cursors_for_dbi( + &mut self, + dbi: ffi::MDBX_dbi, + ) -> SmallVec<[*mut ffi::MDBX_cursor; 8]> { + let mut drained = SmallVec::new(); + self.cursors.retain(|(d, c)| { + if *d == dbi { + drained.push(*c); + false + } else { + true + } + }); + drained + } } /// Simple cache container for database handles. @@ -200,6 +221,10 @@ impl Cache for SharedCache { fn drain_cursors(&self) -> SmallVec<[*mut ffi::MDBX_cursor; 8]> { self.write().drain_cursors() } + + fn drain_cursors_for_dbi(&self, dbi: ffi::MDBX_dbi) -> SmallVec<[*mut ffi::MDBX_cursor; 8]> { + self.write().drain_cursors_for_dbi(dbi) + } } impl Default for SharedCache { @@ -235,4 +260,8 @@ impl Cache for RefCell { fn drain_cursors(&self) -> SmallVec<[*mut ffi::MDBX_cursor; 8]> { self.borrow_mut().drain_cursors() } + + fn drain_cursors_for_dbi(&self, dbi: ffi::MDBX_dbi) -> SmallVec<[*mut ffi::MDBX_cursor; 8]> { + self.borrow_mut().drain_cursors_for_dbi(dbi) + } } diff --git a/src/tx/impl.rs b/src/tx/impl.rs index 777efea..fd10653 100644 --- a/src/tx/impl.rs +++ b/src/tx/impl.rs @@ -224,12 +224,26 @@ where /// Closes the database handle. /// + /// Any cached cursor pointers for this DBI are drained and closed + /// before the handle is closed. + /// /// # Safety /// /// This will invalidate data cached in [`Database`] instances with the /// DBI, and may result in bad behavior when using those instances after /// calling this function. pub unsafe fn close_db(&self, dbi: ffi::MDBX_dbi) -> MdbxResult<()> { + // Drain and close any cached cursors for this DBI before closing it. + let stale = self.cache.drain_cursors_for_dbi(dbi); + if !stale.is_empty() { + self.with_txn_ptr(|_| { + for cursor in stale { + // SAFETY: cursor pointers are valid — returned by + // Cursor::drop during the lifetime of this transaction. + unsafe { ffi::mdbx_cursor_close(cursor) }; + } + }); + } // SAFETY: Caller ensures no other references exist. unsafe { ops::close_db_raw(self.meta.env.env_ptr(), dbi) }?; self.cache.remove_dbi(dbi); @@ -481,12 +495,26 @@ impl Tx { /// Drops the database from the environment. /// + /// Any cached cursor pointers for this DBI are drained and closed + /// before the database is dropped. + /// /// # Safety /// /// Caller must ensure no [`Cursor`] or other references to the database /// exist. [`Database`] instances with the DBI will be invalidated, and /// use after calling this function may result in bad behavior. pub unsafe fn drop_db(&self, db: Database) -> MdbxResult<()> { + // Drain and close any cached cursors for this DBI before dropping it. + let stale = self.cache.drain_cursors_for_dbi(db.dbi()); + if !stale.is_empty() { + self.with_txn_ptr(|_| { + for cursor in stale { + // SAFETY: cursor pointers are valid — returned by + // Cursor::drop during the lifetime of this transaction. + unsafe { ffi::mdbx_cursor_close(cursor) }; + } + }); + } self.with_txn_ptr(|txn| { // SAFETY: txn is a valid RW transaction pointer, caller ensures // no other references to dbi exist. From 33507af02313cfe6633f1538f8d35933b14773bc Mon Sep 17 00:00:00 2001 From: James Date: Thu, 9 Apr 2026 09:31:51 -0400 Subject: [PATCH 07/19] fix: prevent cache poisoning on mdbx_cursor_copy failure Construct Cursor only after mdbx_cursor_copy succeeds. On failure, close the raw pointer directly instead of letting Drop push an unbound cursor into the cache. --- src/tx/cursor.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/tx/cursor.rs b/src/tx/cursor.rs index bb2b92d..e10ccd4 100644 --- a/src/tx/cursor.rs +++ b/src/tx/cursor.rs @@ -76,18 +76,20 @@ where let cursor = ffi::mdbx_cursor_create(ptr::null_mut()); let res = ffi::mdbx_cursor_copy(other.cursor(), cursor); + if let Err(e) = mdbx_result(res) { + // Close directly — do NOT construct Self, as Drop would + // push this unbound cursor into the cache. + ffi::mdbx_cursor_close(cursor); + return Err(e); + } - let s = Self { + Ok(Self { access: other.access, cache: other.cache, cursor, db: other.db, _kind: PhantomData, - }; - - mdbx_result(res)?; - - Ok(s) + }) } } From 9c6dc2bb3a3acbd692ced96cce51cc6e6d65f8f7 Mon Sep 17 00:00:00 2001 From: James Date: Thu, 9 Apr 2026 09:33:08 -0400 Subject: [PATCH 08/19] fix: renew cached cursors to reset B-tree position Call mdbx_cursor_renew on the cache-hit path so reused cursors start at a clean position rather than retaining stale state. --- src/tx/impl.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/tx/impl.rs b/src/tx/impl.rs index fd10653..8185216 100644 --- a/src/tx/impl.rs +++ b/src/tx/impl.rs @@ -254,10 +254,17 @@ where /// /// Cursors are transparently cached: dropped cursors return their /// raw pointer to the cache, and subsequent calls reuse them without - /// a new `mdbx_cursor_open` allocation. + /// a new `mdbx_cursor_open` allocation. Cached cursors are renewed + /// via `mdbx_cursor_renew` to reset their position. pub fn cursor(&self, db: Database) -> MdbxResult> { if let Some(raw) = self.cache.take_cursor(db.dbi()) { - Ok(Cursor::from_raw(&self.txn, &self.cache, raw, db)) + self.with_txn_ptr(|txn_ptr| { + // SAFETY: txn_ptr is valid from with_txn_ptr, raw is a + // valid cursor pointer returned by a prior Cursor::drop. + let rc = unsafe { ffi::mdbx_cursor_renew(txn_ptr, raw) }; + mdbx_result(rc)?; + Ok(Cursor::from_raw(&self.txn, &self.cache, raw, db)) + }) } else { Cursor::new(&self.txn, &self.cache, db) } From 4144c177f70d8c6280bca106d43cbf343fbf6f10 Mon Sep 17 00:00:00 2001 From: James Date: Thu, 9 Apr 2026 09:33:42 -0400 Subject: [PATCH 09/19] docs: add keep-in-sync comments on duplicated drain logic Co-Authored-By: Claude Opus 4.6 (1M context) --- src/tx/impl.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/tx/impl.rs b/src/tx/impl.rs index 8185216..89782ca 100644 --- a/src/tx/impl.rs +++ b/src/tx/impl.rs @@ -274,6 +274,8 @@ where /// /// Must be called before commit or abort to ensure all cursors are /// closed while the transaction is still valid. + /// + /// NB: keep in sync with the inlined logic in `Tx::Drop`. fn drain_cached_cursors(&self) { let cursors = self.cache.drain_cursors(); if cursors.is_empty() { @@ -716,6 +718,7 @@ where // NOTE: This impl is on Tx with free U, not Tx (where U = K::Access). // Rust requires Drop bounds to match the struct definition exactly, so we // cannot call `self.drain_cached_cursors()` here (it lives on `impl Tx`). +// NB: keep in sync with `drain_cached_cursors`. // The drain-and-close logic is inlined instead. impl Drop for Tx where From 8504d37eab62ca358a7b06b52e6a1a79cdae16fa Mon Sep 17 00:00:00 2001 From: James Date: Thu, 9 Apr 2026 09:46:32 -0400 Subject: [PATCH 10/19] test: add RO cursor-cache tests Cover cursor caching in read-only transactions (reuse and repeated cycles), complementing existing RW-only coverage. Co-Authored-By: Claude Sonnet 4.6 --- tests/cursor.rs | 92 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 92 insertions(+) diff --git a/tests/cursor.rs b/tests/cursor.rs index f655dee..f8ac163 100644 --- a/tests/cursor.rs +++ b/tests/cursor.rs @@ -1857,6 +1857,98 @@ fn test_cursor_cache_repeated_cycles_v2() { test_cursor_cache_repeated_cycles_impl(V2Factory::begin_rw, V2Factory::begin_ro); } +fn test_cursor_cache_reuse_ro_impl( + begin_rw: impl Fn(&Environment) -> MdbxResult, + begin_ro: impl Fn(&Environment) -> MdbxResult, +) where + RwTx: TestRwTxn, + RoTx: TestRoTxn, +{ + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + + // Populate via RW, then commit + let txn = begin_rw(&env).unwrap(); + let db = txn.create_db(None, DatabaseFlags::empty()).unwrap(); + txn.put(db, b"key1", b"val1", WriteFlags::empty()).unwrap(); + txn.put(db, b"key2", b"val2", WriteFlags::empty()).unwrap(); + txn.commit().unwrap(); + + // Test cursor caching in RO txn + let txn = begin_ro(&env).unwrap(); + let db = txn.open_db(None).unwrap(); + + // First cursor: open, use, drop (returns to cache) + { + let mut cursor = txn.cursor(db).unwrap(); + let (k, v) = cursor.first::, Vec>().unwrap().unwrap(); + assert_eq!(&k, b"key1"); + assert_eq!(&v, b"val1"); + } + + // Second cursor: should reuse cached pointer + { + let mut cursor = txn.cursor(db).unwrap(); + let (k, v) = cursor.first::, Vec>().unwrap().unwrap(); + assert_eq!(&k, b"key1"); + assert_eq!(&v, b"val1"); + + let (k, v) = cursor.next::, Vec>().unwrap().unwrap(); + assert_eq!(&k, b"key2"); + assert_eq!(&v, b"val2"); + } + + txn.commit().unwrap(); +} + +#[test] +fn test_cursor_cache_reuse_ro_v1() { + test_cursor_cache_reuse_ro_impl(V1Factory::begin_rw, V1Factory::begin_ro); +} + +#[test] +fn test_cursor_cache_reuse_ro_v2() { + test_cursor_cache_reuse_ro_impl(V2Factory::begin_rw, V2Factory::begin_ro); +} + +fn test_cursor_cache_repeated_cycles_ro_impl( + begin_rw: impl Fn(&Environment) -> MdbxResult, + begin_ro: impl Fn(&Environment) -> MdbxResult, +) where + RwTx: TestRwTxn, + RoTx: TestRoTxn, +{ + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + + let txn = begin_rw(&env).unwrap(); + let db = txn.create_db(None, DatabaseFlags::empty()).unwrap(); + txn.put(db, b"key", b"val", WriteFlags::empty()).unwrap(); + txn.commit().unwrap(); + + let txn = begin_ro(&env).unwrap(); + let db = txn.open_db(None).unwrap(); + + for _ in 0..100 { + let mut cursor = txn.cursor(db).unwrap(); + let (k, v) = cursor.first::, Vec>().unwrap().unwrap(); + assert_eq!(&k, b"key"); + assert_eq!(&v, b"val"); + } + + txn.commit().unwrap(); +} + +#[test] +fn test_cursor_cache_repeated_cycles_ro_v1() { + test_cursor_cache_repeated_cycles_ro_impl(V1Factory::begin_rw, V1Factory::begin_ro); +} + +#[test] +fn test_cursor_cache_repeated_cycles_ro_v2() { + test_cursor_cache_repeated_cycles_ro_impl(V2Factory::begin_rw, V2Factory::begin_ro); +} + // Release-build test: verify runtime error instead of panic #[cfg(not(debug_assertions))] #[test] From 426be07b3228f4bc3d1fdf8f3c18622b3f2408f8 Mon Sep 17 00:00:00 2001 From: James Date: Thu, 9 Apr 2026 09:46:58 -0400 Subject: [PATCH 11/19] test: add cursor reuse across writes test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Verify cached cursors see updated data after B-tree COW from interleaved put operations — the primary hot path for caching. Co-Authored-By: Claude Sonnet 4.6 --- tests/cursor.rs | 51 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/tests/cursor.rs b/tests/cursor.rs index f8ac163..619d9a4 100644 --- a/tests/cursor.rs +++ b/tests/cursor.rs @@ -1949,6 +1949,57 @@ fn test_cursor_cache_repeated_cycles_ro_v2() { test_cursor_cache_repeated_cycles_ro_impl(V2Factory::begin_rw, V2Factory::begin_ro); } +fn test_cursor_cache_reuse_across_writes_impl( + begin_rw: impl Fn(&Environment) -> MdbxResult, + _begin_ro: impl Fn(&Environment) -> MdbxResult, +) where + RwTx: TestRwTxn, + RoTx: TestRoTxn, +{ + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + + let txn = begin_rw(&env).unwrap(); + let db = txn.create_db(None, DatabaseFlags::empty()).unwrap(); + txn.put(db, b"key1", b"val1", WriteFlags::empty()).unwrap(); + + // cursor -> read -> drop (returns to cache) + { + let mut cursor = txn.cursor(db).unwrap(); + let (k, v) = cursor.first::, Vec>().unwrap().unwrap(); + assert_eq!(&k, b"key1"); + assert_eq!(&v, b"val1"); + } + + // Write new data (B-tree COW) + txn.put(db, b"key2", b"val2", WriteFlags::empty()).unwrap(); + + // cursor (from cache) -> read -> should see updated data + { + let mut cursor = txn.cursor(db).unwrap(); + let (k, v) = cursor.last::, Vec>().unwrap().unwrap(); + assert_eq!(&k, b"key2"); + assert_eq!(&v, b"val2"); + + // Verify both entries visible + let (k, v) = cursor.first::, Vec>().unwrap().unwrap(); + assert_eq!(&k, b"key1"); + assert_eq!(&v, b"val1"); + } + + txn.commit().unwrap(); +} + +#[test] +fn test_cursor_cache_reuse_across_writes_v1() { + test_cursor_cache_reuse_across_writes_impl(V1Factory::begin_rw, V1Factory::begin_ro); +} + +#[test] +fn test_cursor_cache_reuse_across_writes_v2() { + test_cursor_cache_reuse_across_writes_impl(V2Factory::begin_rw, V2Factory::begin_ro); +} + // Release-build test: verify runtime error instead of panic #[cfg(not(debug_assertions))] #[test] From 14adac5b38665c52034a6cccd758dc690fbdbb4e Mon Sep 17 00:00:00 2001 From: James Date: Fri, 8 May 2026 04:09:24 -0400 Subject: [PATCH 12/19] refactor: route cursor FFI through ops, harden renew error path, add cache tests Addresses Fraser's review on PR #11: - Extract drain-and-close logic into `close_drained_cursors` helper, used by `close_db`, `drop_db`, `drain_cached_cursors`, and `Tx::Drop`. - Move all cursor FFI calls into `ops` (`cursor_renew_raw`, `cursor_close_raw`, `cursor_close2_raw`), matching the crate convention. - On `mdbx_cursor_renew` failure in `Tx::cursor`, defensively close the raw pointer via `mdbx_cursor_close2` instead of leaking it. Not re-cached: a subsequent call would hit the same failure. - Add `#[cfg(test)]` `cursor_count` and `inject_cursor` to the `Cache` trait so tests can verify cache state directly. - Add tests: cache-count assertions, `close_db` drains its DBI only, and a renew-failure test that injects a null pointer to exercise the error path and confirm no leak. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/tx/cache.rs | 35 +++++++++ src/tx/impl.rs | 184 ++++++++++++++++++++++++++++++++---------------- src/tx/ops.rs | 57 +++++++++++++++ 3 files changed, 215 insertions(+), 61 deletions(-) diff --git a/src/tx/cache.rs b/src/tx/cache.rs index 8c079cf..51feaf1 100644 --- a/src/tx/cache.rs +++ b/src/tx/cache.rs @@ -52,6 +52,21 @@ pub trait Cache: Clone + Default + std::fmt::Debug { /// Drain cached cursors for a specific DBI, returning their raw pointers. /// The caller is responsible for closing them via FFI. fn drain_cursors_for_dbi(&self, dbi: ffi::MDBX_dbi) -> SmallVec<[*mut ffi::MDBX_cursor; 8]>; + + /// Returns the total number of cached cursors across all DBIs. + #[cfg(test)] + fn cursor_count(&self) -> usize; + + /// Injects a raw cursor pointer into the cache for the given DBI. + /// + /// # Safety + /// + /// - `cursor` must either be a valid MDBX cursor pointer bound to the + /// enclosing transaction, or a pointer whose `mdbx_cursor_close2` + /// behaviour the caller accepts (tests that trigger failure paths + /// knowingly inject unusual values here). + #[cfg(test)] + unsafe fn inject_cursor(&self, dbi: ffi::MDBX_dbi, cursor: *mut ffi::MDBX_cursor); } /// Cached database entry. @@ -225,6 +240,16 @@ impl Cache for SharedCache { fn drain_cursors_for_dbi(&self, dbi: ffi::MDBX_dbi) -> SmallVec<[*mut ffi::MDBX_cursor; 8]> { self.write().drain_cursors_for_dbi(dbi) } + + #[cfg(test)] + fn cursor_count(&self) -> usize { + self.read().cursors.len() + } + + #[cfg(test)] + unsafe fn inject_cursor(&self, dbi: ffi::MDBX_dbi, cursor: *mut ffi::MDBX_cursor) { + self.write().cursors.push((dbi, cursor)); + } } impl Default for SharedCache { @@ -264,4 +289,14 @@ impl Cache for RefCell { fn drain_cursors_for_dbi(&self, dbi: ffi::MDBX_dbi) -> SmallVec<[*mut ffi::MDBX_cursor; 8]> { self.borrow_mut().drain_cursors_for_dbi(dbi) } + + #[cfg(test)] + fn cursor_count(&self) -> usize { + self.borrow().cursors.len() + } + + #[cfg(test)] + unsafe fn inject_cursor(&self, dbi: ffi::MDBX_dbi, cursor: *mut ffi::MDBX_cursor) { + self.borrow_mut().cursors.push((dbi, cursor)); + } } diff --git a/src/tx/impl.rs b/src/tx/impl.rs index 89782ca..6001c7b 100644 --- a/src/tx/impl.rs +++ b/src/tx/impl.rs @@ -233,17 +233,7 @@ where /// DBI, and may result in bad behavior when using those instances after /// calling this function. pub unsafe fn close_db(&self, dbi: ffi::MDBX_dbi) -> MdbxResult<()> { - // Drain and close any cached cursors for this DBI before closing it. - let stale = self.cache.drain_cursors_for_dbi(dbi); - if !stale.is_empty() { - self.with_txn_ptr(|_| { - for cursor in stale { - // SAFETY: cursor pointers are valid — returned by - // Cursor::drop during the lifetime of this transaction. - unsafe { ffi::mdbx_cursor_close(cursor) }; - } - }); - } + close_drained_cursors(&self.txn, self.cache.drain_cursors_for_dbi(dbi)); // SAFETY: Caller ensures no other references exist. unsafe { ops::close_db_raw(self.meta.env.env_ptr(), dbi) }?; self.cache.remove_dbi(dbi); @@ -257,40 +247,58 @@ where /// a new `mdbx_cursor_open` allocation. Cached cursors are renewed /// via `mdbx_cursor_renew` to reset their position. pub fn cursor(&self, db: Database) -> MdbxResult> { - if let Some(raw) = self.cache.take_cursor(db.dbi()) { - self.with_txn_ptr(|txn_ptr| { - // SAFETY: txn_ptr is valid from with_txn_ptr, raw is a - // valid cursor pointer returned by a prior Cursor::drop. - let rc = unsafe { ffi::mdbx_cursor_renew(txn_ptr, raw) }; - mdbx_result(rc)?; - Ok(Cursor::from_raw(&self.txn, &self.cache, raw, db)) - }) - } else { - Cursor::new(&self.txn, &self.cache, db) - } + let Some(raw) = self.cache.take_cursor(db.dbi()) else { + return Cursor::new(&self.txn, &self.cache, db); + }; + self.with_txn_ptr(|txn_ptr| { + // SAFETY: txn_ptr is valid from with_txn_ptr. `raw` was produced + // by a prior Cursor::drop in this transaction. + match unsafe { ops::cursor_renew_raw(txn_ptr, raw) } { + Ok(()) => Ok(Cursor::from_raw(&self.txn, &self.cache, raw, db)), + Err(e) => { + // Renew failed — the cursor is in an indeterminate state. + // Close it defensively with the non-panicking variant and + // return the error. Do NOT re-insert into the cache, as + // a subsequent `cursor()` call would hit the same failure. + // SAFETY: within with_txn_ptr; close2 handles any cursor state. + unsafe { ops::cursor_close2_raw(raw) }; + Err(e) + } + } + }) } /// Drains the cursor cache and closes all cached cursor pointers. /// /// Must be called before commit or abort to ensure all cursors are /// closed while the transaction is still valid. - /// - /// NB: keep in sync with the inlined logic in `Tx::Drop`. fn drain_cached_cursors(&self) { - let cursors = self.cache.drain_cursors(); - if cursors.is_empty() { - return; - } - self.with_txn_ptr(|_| { - for cursor in cursors { - // SAFETY: cursor pointers are valid — they were returned - // by Cursor::drop during the lifetime of this transaction. - unsafe { ffi::mdbx_cursor_close(cursor) }; - } - }); + close_drained_cursors(&self.txn, self.cache.drain_cursors()); } } +/// Closes a drained batch of cursor pointers via the ops module. +/// +/// This is a free function (rather than a method on [`Tx`]) so that +/// [`Tx::drop`], which is defined with a free access-type parameter, +/// can share the same implementation as the `Tx` methods. +fn close_drained_cursors( + access: &U, + cursors: SmallVec<[*mut ffi::MDBX_cursor; 8]>, +) { + if cursors.is_empty() { + return; + } + access.with_txn_ptr(|_| { + for cursor in cursors { + // SAFETY: cursor pointers are valid — they were returned by + // Cursor::drop during the lifetime of this transaction, which + // is still alive within the with_txn_ptr block. + unsafe { ops::cursor_close_raw(cursor) }; + } + }); +} + // Write-only impl Tx { /// Opens a handle to an MDBX database, creating the database if necessary. @@ -513,17 +521,7 @@ impl Tx { /// exist. [`Database`] instances with the DBI will be invalidated, and /// use after calling this function may result in bad behavior. pub unsafe fn drop_db(&self, db: Database) -> MdbxResult<()> { - // Drain and close any cached cursors for this DBI before dropping it. - let stale = self.cache.drain_cursors_for_dbi(db.dbi()); - if !stale.is_empty() { - self.with_txn_ptr(|_| { - for cursor in stale { - // SAFETY: cursor pointers are valid — returned by - // Cursor::drop during the lifetime of this transaction. - unsafe { ffi::mdbx_cursor_close(cursor) }; - } - }); - } + close_drained_cursors(&self.txn, self.cache.drain_cursors_for_dbi(db.dbi())); self.with_txn_ptr(|txn| { // SAFETY: txn is a valid RW transaction pointer, caller ensures // no other references to dbi exist. @@ -717,27 +715,15 @@ where // NOTE: This impl is on Tx with free U, not Tx (where U = K::Access). // Rust requires Drop bounds to match the struct definition exactly, so we -// cannot call `self.drain_cached_cursors()` here (it lives on `impl Tx`). -// NB: keep in sync with `drain_cached_cursors`. -// The drain-and-close logic is inlined instead. +// route through the `close_drained_cursors` free function, which is generic +// over the access type. impl Drop for Tx where K: TransactionKind, U: TxPtrAccess, { fn drop(&mut self) { - let cursors = self.cache.drain_cursors(); - if cursors.is_empty() { - return; - } - self.txn.with_txn_ptr(|_| { - for cursor in cursors { - // SAFETY: cursor pointers were returned by Cursor::drop - // during the lifetime of this transaction, which is still - // alive (we are in Tx::drop, before txn ptr is dropped). - unsafe { ffi::mdbx_cursor_close(cursor) }; - } - }); + close_drained_cursors(&self.txn, self.cache.drain_cursors()); } } @@ -796,6 +782,82 @@ mod tests { assert_ne!(db1_a.dbi(), db2.dbi()); } + #[test] + fn test_cursor_cache_counts() { + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + let txn = TxUnsync::::begin(env.clone()).unwrap(); + let db = txn.create_db(None, DatabaseFlags::empty()).unwrap(); + + assert_eq!(txn.cache.cursor_count(), 0); + + { + let _c = txn.cursor(db).unwrap(); + assert_eq!(txn.cache.cursor_count(), 0, "live cursor is not in cache"); + } + assert_eq!(txn.cache.cursor_count(), 1, "dropped cursor returns to cache"); + + { + let _c = txn.cursor(db).unwrap(); + assert_eq!(txn.cache.cursor_count(), 0, "second cursor reuses cached pointer"); + } + assert_eq!(txn.cache.cursor_count(), 1, "re-dropped cursor returns to cache"); + + txn.commit().unwrap(); + } + + #[test] + fn test_cursor_cache_close_db_drains() { + let dir = tempdir().unwrap(); + let env = Environment::builder().set_max_dbs(4).open(dir.path()).unwrap(); + + // Create two DBs in a write txn and commit. + { + let txn = TxUnsync::::begin(env.clone()).unwrap(); + txn.create_db(Some("a"), DatabaseFlags::empty()).unwrap(); + txn.create_db(Some("b"), DatabaseFlags::empty()).unwrap(); + txn.commit().unwrap(); + } + + let txn = TxUnsync::::begin(env.clone()).unwrap(); + let db_a = txn.open_db(Some("a")).unwrap(); + let db_b = txn.open_db(Some("b")).unwrap(); + + // Populate the cache: one cursor per DBI. + drop(txn.cursor(db_a).unwrap()); + drop(txn.cursor(db_b).unwrap()); + assert_eq!(txn.cache.cursor_count(), 2); + + // close_db on "a" must drain only that DBI's cursor. + // SAFETY: no live Cursor/Database instances for db_a remain. + unsafe { txn.close_db(db_a.dbi()).unwrap() }; + assert_eq!(txn.cache.cursor_count(), 1, "close_db drains only its DBI"); + } + + #[test] + fn test_cursor_renew_error_does_not_leak() { + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + let txn = TxUnsync::::begin(env.clone()).unwrap(); + let db = txn.create_db(None, DatabaseFlags::empty()).unwrap(); + + // Inject a null pointer into the cache for db's DBI. The cursor() + // cache-hit path will call mdbx_cursor_renew on it, which returns + // MDBX_EINVAL for a null cursor; the error path closes via + // mdbx_cursor_close2 (also null-safe) and must not re-insert. + // SAFETY: the renew error path uses cursor_close2_raw which tolerates null. + unsafe { txn.cache.inject_cursor(db.dbi(), ptr::null_mut()) }; + assert_eq!(txn.cache.cursor_count(), 1); + + let err = txn.cursor(db).expect_err("renew must fail on null cursor"); + assert!(matches!(err, MdbxError::DecodeError), "expected EINVAL mapping, got {err:?}"); + assert_eq!(txn.cache.cursor_count(), 0, "failed renew must not leak the pointer"); + + // A follow-up call falls through to the open path and succeeds, + // proving recovery via retry works. + let _c = txn.cursor(db).unwrap(); + } + fn __compile_checks() { fn assert_sync() {} assert_sync::(); diff --git a/src/tx/ops.rs b/src/tx/ops.rs index 8e583ce..18d564b 100644 --- a/src/tx/ops.rs +++ b/src/tx/ops.rs @@ -271,6 +271,63 @@ pub(crate) unsafe fn close_db_raw(env: *mut ffi::MDBX_env, dbi: ffi::MDBX_dbi) - Ok(()) } +/// Renews a cursor, binding it to the given transaction and resetting its +/// position. +/// +/// # Safety +/// +/// - `txn` must be a valid, non-null transaction pointer. +/// - `cursor` may be null or a valid cursor pointer. A null pointer returns +/// an error ([`MDBX_EINVAL`]) rather than undefined behaviour. +/// +/// [`MDBX_EINVAL`]: ffi::MDBX_EINVAL +#[inline(always)] +pub(crate) unsafe fn cursor_renew_raw( + txn: *mut ffi::MDBX_txn, + cursor: *mut ffi::MDBX_cursor, +) -> MdbxResult<()> { + // SAFETY: Caller guarantees txn is valid; MDBX null-checks the cursor. + mdbx_result(unsafe { ffi::mdbx_cursor_renew(txn, cursor) })?; + Ok(()) +} + +/// Closes a cursor, freeing its MDBX allocation. +/// +/// # Safety +/// +/// - `cursor` must be a valid, non-null cursor pointer. Passing null or an +/// invalid cursor causes an MDBX-internal abort. +/// - Must be called within a [`TxPtrAccess::with_txn_ptr`] block so the +/// cursor's transaction is still live. +/// +/// [`TxPtrAccess::with_txn_ptr`]: crate::tx::access::TxPtrAccess::with_txn_ptr +#[inline(always)] +pub(crate) unsafe fn cursor_close_raw(cursor: *mut ffi::MDBX_cursor) { + // SAFETY: Caller guarantees cursor is valid and non-null. + unsafe { ffi::mdbx_cursor_close(cursor) }; +} + +/// Closes a cursor without panicking on invalid input. +/// +/// Unlike [`cursor_close_raw`], this wraps `mdbx_cursor_close2` which +/// returns an error code instead of aborting the process when given a +/// null or otherwise invalid cursor. The return value is discarded since +/// this is only used on error paths where the outcome of the close is +/// irrelevant. +/// +/// # Safety +/// +/// - `cursor` may be null or a potentially-invalid cursor pointer. +/// - Must be called within a [`TxPtrAccess::with_txn_ptr`] block so the +/// cursor's transaction is still live. +/// +/// [`TxPtrAccess::with_txn_ptr`]: crate::tx::access::TxPtrAccess::with_txn_ptr +#[inline(always)] +pub(crate) unsafe fn cursor_close2_raw(cursor: *mut ffi::MDBX_cursor) { + // SAFETY: MDBX null-checks the cursor and returns an error on invalid input. + let _ = unsafe { ffi::mdbx_cursor_close2(cursor) }; +} + /// Checks if a memory pointer refers to dirty (modified) data. /// /// Returns `true` if the data is dirty and must be copied before borrowing. From 0da5858b92176e8056128c955623f52865a2fdd4 Mon Sep 17 00:00:00 2001 From: James Date: Fri, 8 May 2026 04:16:40 -0400 Subject: [PATCH 13/19] fix(clippy): use checked_div for remaining_in_page calc Satisfies clippy::manual_checked_ops (new in Rust 1.95). Co-Authored-By: Claude Opus 4.7 (1M context) --- src/tx/iter/dupfixed.rs | 11 ++++++----- src/tx/iter/dupfixed_key.rs | 11 ++++++----- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/src/tx/iter/dupfixed.rs b/src/tx/iter/dupfixed.rs index ef678a3..bf2b5b1 100644 --- a/src/tx/iter/dupfixed.rs +++ b/src/tx/iter/dupfixed.rs @@ -96,11 +96,12 @@ where Key: core::fmt::Debug, { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let remaining_in_page = if self.value_size > 0 { - self.current_page.len().saturating_sub(self.page_offset) / self.value_size - } else { - 0 - }; + let remaining_in_page = self + .current_page + .len() + .saturating_sub(self.page_offset) + .checked_div(self.value_size) + .unwrap_or(0); f.debug_struct("IterDupFixed") .field("exhausted", &self.exhausted) .field("value_size", &self.value_size) diff --git a/src/tx/iter/dupfixed_key.rs b/src/tx/iter/dupfixed_key.rs index 92149bb..726a1a2 100644 --- a/src/tx/iter/dupfixed_key.rs +++ b/src/tx/iter/dupfixed_key.rs @@ -48,11 +48,12 @@ where K: TransactionKind, { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let remaining_in_page = if self.value_size > 0 { - self.current_page.len().saturating_sub(self.page_offset) / self.value_size - } else { - 0 - }; + let remaining_in_page = self + .current_page + .len() + .saturating_sub(self.page_offset) + .checked_div(self.value_size) + .unwrap_or(0); f.debug_struct("IterDupFixedOfKey") .field("exhausted", &self.exhausted) .field("value_size", &self.value_size) From 73a6bb223700e06ab7277ceaa3e563465612e849 Mon Sep 17 00:00:00 2001 From: James Date: Fri, 8 May 2026 04:18:24 -0400 Subject: [PATCH 14/19] fix(clippy): use checked_div for remaining_in_page calc Satisfies clippy::manual_checked_ops (new in Rust 1.95). Co-Authored-By: Claude Opus 4.7 (1M context) --- src/tx/iter/dupfixed.rs | 2 +- src/tx/iter/dupfixed_key.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/tx/iter/dupfixed.rs b/src/tx/iter/dupfixed.rs index bf2b5b1..462af05 100644 --- a/src/tx/iter/dupfixed.rs +++ b/src/tx/iter/dupfixed.rs @@ -101,7 +101,7 @@ where .len() .saturating_sub(self.page_offset) .checked_div(self.value_size) - .unwrap_or(0); + .unwrap_or_default(); f.debug_struct("IterDupFixed") .field("exhausted", &self.exhausted) .field("value_size", &self.value_size) diff --git a/src/tx/iter/dupfixed_key.rs b/src/tx/iter/dupfixed_key.rs index 726a1a2..c961e65 100644 --- a/src/tx/iter/dupfixed_key.rs +++ b/src/tx/iter/dupfixed_key.rs @@ -53,7 +53,7 @@ where .len() .saturating_sub(self.page_offset) .checked_div(self.value_size) - .unwrap_or(0); + .unwrap_or_default(); f.debug_struct("IterDupFixedOfKey") .field("exhausted", &self.exhausted) .field("value_size", &self.value_size) From c78fd4bffab1c4c88fb976afbe86f8deb4262ec4 Mon Sep 17 00:00:00 2001 From: James Date: Fri, 8 May 2026 09:43:05 -0400 Subject: [PATCH 15/19] fix(cursor-cache): co-locate cache with txn pointer to fix clone-drop drain MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `Tx::Drop` previously drained the shared cursor cache unconditionally. For `TxSync`, the cache was `Arc>` shared across clones, so dropping any clone emptied the cache for all surviving clones. The naive `Arc::strong_count == 1` gate would race on concurrent drops and leak the cursor pointers (MDBX requires explicit `mdbx_cursor_close` regardless of RO/RW). Move the cache inside `PtrSync` (widening the existing `Mutex<()>` to `Mutex`) and `PtrUnsync` (`RefCell`). The `Arc` that controls the txn lifetime now also controls cache lifetime, so cursors are drained-and-closed exactly once in the inner Drop when refcount hits 0 — race-free by construction. Removes `SharedCache`, drops `Clone+Default` from the `Cache` trait, and trims the `cache` field/parameter from `Tx` and `Cursor`. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/tx/access.rs | 78 ++++++++++++++++++++++++----- src/tx/cache.rs | 98 ++++++++++++------------------------- src/tx/cursor.rs | 26 +++------- src/tx/impl.rs | 125 +++++++++++++++++++++++++++++++---------------- src/tx/kind.rs | 15 +----- 5 files changed, 190 insertions(+), 152 deletions(-) diff --git a/src/tx/access.rs b/src/tx/access.rs index f0dec56..c66a04d 100644 --- a/src/tx/access.rs +++ b/src/tx/access.rs @@ -1,13 +1,20 @@ use crate::{ Environment, sys::txn_manager::{Abort, RawTxPtr}, + tx::{ + cache::{Cache, DbCache}, + ops, + }, }; use core::fmt; use parking_lot::{Mutex, MutexGuard}; -use std::sync::{ - Arc, - atomic::{AtomicBool, Ordering}, - mpsc::sync_channel, +use std::{ + cell::RefCell, + sync::{ + Arc, + atomic::{AtomicBool, Ordering}, + mpsc::sync_channel, + }, }; use tracing::debug_span; @@ -29,8 +36,16 @@ mod sealed { /// are stored for read-only and read-write transactions. It ensures that /// the transaction pointer can be accessed safely, respecting timeouts /// and ownership semantics. +/// +/// The associated [`Cache`] type co-locates the cursor cache with the +/// transaction so that the cache lifetime is bound to the transaction +/// pointer's: cached cursors are drained and closed exactly once, in the +/// implementing type's `Drop`, before the transaction is aborted. #[allow(unreachable_pub)] pub trait TxPtrAccess: fmt::Debug + sealed::Sealed { + /// Cache type co-located with this transaction pointer. + type Cache: Cache; + /// Create an instance of the implementing type from a raw transaction /// pointer. fn from_ptr_and_env(ptr: *mut ffi::MDBX_txn, env: Environment, is_read_only: bool) -> Self @@ -45,6 +60,10 @@ pub trait TxPtrAccess: fmt::Debug + sealed::Sealed { /// Mark the transaction as committed. fn mark_committed(&self); + /// Returns a reference to the cursor/database cache associated with + /// this transaction. + fn cache(&self) -> &Self::Cache; + /// Get the transaction ID by making a call into the MDBX C API. fn tx_id(&self) -> Option { let mut id = 0; @@ -60,6 +79,8 @@ impl TxPtrAccess for Arc where T: TxPtrAccess, { + type Cache = T::Cache; + fn from_ptr_and_env(ptr: *mut ffi::MDBX_txn, env: Environment, is_read_only: bool) -> Self where Self: Sized, @@ -77,12 +98,17 @@ where fn mark_committed(&self) { self.as_ref().mark_committed(); } + + fn cache(&self) -> &Self::Cache { + self.as_ref().cache() + } } -/// Wrapper for raw txn pointer for RW transactions. +/// Wrapper for raw txn pointer for unsynchronized transactions. pub struct PtrUnsync { committed: AtomicBool, ptr: *mut ffi::MDBX_txn, + cache: RefCell, } impl fmt::Debug for PtrUnsync { @@ -92,11 +118,13 @@ impl fmt::Debug for PtrUnsync { } impl TxPtrAccess for PtrUnsync { + type Cache = RefCell; + fn from_ptr_and_env(ptr: *mut ffi::MDBX_txn, _env: Environment, _is_read_only: bool) -> Self where Self: Sized, { - Self { committed: AtomicBool::new(false), ptr } + Self { committed: AtomicBool::new(false), ptr, cache: RefCell::new(DbCache::default()) } } fn with_txn_ptr(&self, f: F) -> R @@ -111,6 +139,10 @@ impl TxPtrAccess for PtrUnsync { // Type is neither Sync nor Send, so no concurrent access is possible. unsafe { *self.committed.as_ptr() = true }; } + + fn cache(&self) -> &Self::Cache { + &self.cache + } } impl Drop for PtrUnsync { @@ -118,6 +150,13 @@ impl Drop for PtrUnsync { // SAFETY: // We have exclusive ownership of this pointer. unsafe { + // Close any cached cursors before the txn ends. No-op if + // commit/abort already drained. + for cursor in self.cache.get_mut().drain_cursors() { + // SAFETY: cursor was returned from an earlier Cursor::drop + // bound to this still-live txn. + ops::cursor_close_raw(cursor); + } if !*self.committed.as_ptr() { ffi::mdbx_txn_abort(self.ptr); } @@ -139,9 +178,10 @@ pub struct PtrSync { /// Whether the transaction was committed. committed: AtomicBool, - /// Contains a lock to ensure exclusive access to the transaction. - /// The inner boolean indicates the timeout status. - lock: Mutex<()>, + /// Lock that serialises access to the transaction pointer **and** + /// guards the cursor/database cache. Held inside [`with_txn_ptr`]; also + /// taken implicitly via the [`Cache`] impl on `Mutex`. + lock: Mutex, /// The environment that owns the transaction. env: Environment, @@ -159,7 +199,7 @@ unsafe impl Sync for PtrSync {} impl PtrSync { /// Acquires the inner transaction lock to guarantee exclusive access to the transaction /// pointer. - pub(crate) fn lock(&self) -> MutexGuard<'_, ()> { + pub(crate) fn lock(&self) -> MutexGuard<'_, DbCache> { if let Some(lock) = self.lock.try_lock() { lock } else { @@ -176,13 +216,15 @@ impl PtrSync { } impl TxPtrAccess for PtrSync { + type Cache = Mutex; + fn from_ptr_and_env(ptr: *mut ffi::MDBX_txn, env: Environment, is_read_only: bool) -> Self where Self: Sized, { Self { committed: AtomicBool::new(false), - lock: Mutex::new(()), + lock: Mutex::new(DbCache::default()), txn: ptr, env, is_read_only, @@ -200,10 +242,24 @@ impl TxPtrAccess for PtrSync { fn mark_committed(&self) { self.committed.store(true, Ordering::SeqCst); } + + fn cache(&self) -> &Self::Cache { + &self.lock + } } impl Drop for PtrSync { fn drop(&mut self) { + // Close any cached cursors before the transaction ends. Runs + // exactly once: this `Drop` only fires when the last `Arc` + // is released, so there is no race with surviving `TxSync` clones. + // No-op if `commit_inner` already drained the cache. + for cursor in self.lock.get_mut().drain_cursors() { + // SAFETY: cursor was returned from an earlier Cursor::drop + // bound to this still-live txn. + unsafe { ops::cursor_close_raw(cursor) }; + } + if self.committed.load(Ordering::SeqCst) { return; } diff --git a/src/tx/cache.rs b/src/tx/cache.rs index 51feaf1..4bd76a7 100644 --- a/src/tx/cache.rs +++ b/src/tx/cache.rs @@ -1,35 +1,37 @@ -//! Caches for [`Database`] info, used by the [`TxSync`] and [`TxUnsync`] types. +//! Caches for [`Database`] info and cursor pointers, used by the [`TxSync`] +//! and [`TxUnsync`] types. //! -//! This module defines cache types for storing database handles within -//! transactions. Caches improve performance by avoiding repeated lookups of -//! database information. +//! This module defines cache types for storing database handles and cached +//! cursor pointers within transactions. The cache is co-located with the +//! transaction pointer (in [`PtrSync`]/[`PtrUnsync`]) so that its lifetime +//! is bound to the transaction's; this guarantees cached cursors are closed +//! exactly once, before the transaction is aborted or committed, even when +//! a `TxSync` is cloned across threads. //! -//! The primary caches are: -//! - [`DbCache`]: A simple inline cache using `SmallVec` for efficient storage -//! of a small number of database handles. Used in unsynchronized -//! transactions via [`RefCell`]. -//! - [`SharedCache`]: A thread-safe cache using `Arc>` for -//! synchronized transactions. +//! The container is [`DbCache`], used either: +//! - inline behind a [`RefCell`] for unsynchronized transactions, or +//! - inline behind a [`parking_lot::Mutex`] for synchronized transactions +//! (the same mutex that serialises raw txn-pointer access). //! //! [`TxSync`]: crate::tx::aliases::TxSync //! [`TxUnsync`]: crate::tx::aliases::TxUnsync +//! [`PtrSync`]: crate::tx::PtrSync +//! [`PtrUnsync`]: crate::tx::PtrUnsync use crate::Database; -use parking_lot::RwLock; +use parking_lot::Mutex; use smallvec::SmallVec; use std::{ cell::RefCell, hash::{Hash, Hasher}, - sync::Arc, }; /// Cache trait for transaction-local database handles and cursors. /// -/// This is used by the [`SyncKind`] trait to define the cache type for each -/// transaction kind. -/// -/// [`SyncKind`]: crate::tx::kind::SyncKind -pub trait Cache: Clone + Default + std::fmt::Debug { +/// Implemented on the concrete container types that wrap a [`DbCache`] +/// (e.g. [`Mutex`] for synchronized transactions and +/// [`RefCell`] for unsynchronized ones). +pub trait Cache: std::fmt::Debug { /// Read a database entry from the cache. fn read_db(&self, name_hash: u64) -> Option; @@ -126,12 +128,6 @@ impl Default for DbCache { } } -impl Clone for DbCache { - fn clone(&self) -> Self { - Self { dbs: self.dbs.clone(), cursors: SmallVec::new() } - } -} - impl DbCache { /// Read a database entry from the cache. fn read_db(&self, name_hash: u64) -> Option { @@ -162,7 +158,7 @@ impl DbCache { } /// Drain all cached cursors, returning their raw pointers. - fn drain_cursors(&mut self) -> SmallVec<[*mut ffi::MDBX_cursor; 8]> { + pub(crate) fn drain_cursors(&mut self) -> SmallVec<[*mut ffi::MDBX_cursor; 8]> { self.cursors.drain(..).map(|(_, c)| c).collect() } @@ -184,77 +180,43 @@ impl DbCache { } } -/// Simple cache container for database handles. -/// -/// Uses inline storage for the common case (most apps use < 16 databases). -#[derive(Debug, Clone)] -pub struct SharedCache { - cache: Arc>, -} - -impl SharedCache { - /// Creates a new empty cache. - fn new() -> Self { - Self { cache: Arc::new(RwLock::new(DbCache::default())) } - } - - /// Returns a read guard to the cache. - fn read(&self) -> parking_lot::RwLockReadGuard<'_, DbCache> { - self.cache.read() - } - - /// Returns a write guard to the cache. - fn write(&self) -> parking_lot::RwLockWriteGuard<'_, DbCache> { - self.cache.write() - } -} - -impl Cache for SharedCache { - /// Read a database entry from the cache. +impl Cache for Mutex { fn read_db(&self, name_hash: u64) -> Option { - self.read().read_db(name_hash) + self.lock().read_db(name_hash) } - /// Write a database entry to the cache. fn write_db(&self, db: CachedDb) { - self.write().write_db(db); + self.lock().write_db(db); } - /// Remove a database entry from the cache by dbi. fn remove_dbi(&self, dbi: ffi::MDBX_dbi) { - self.write().remove_dbi(dbi); + self.lock().remove_dbi(dbi); } fn take_cursor(&self, dbi: ffi::MDBX_dbi) -> Option<*mut ffi::MDBX_cursor> { - self.write().take_cursor(dbi) + self.lock().take_cursor(dbi) } fn return_cursor(&self, dbi: ffi::MDBX_dbi, cursor: *mut ffi::MDBX_cursor) { - self.write().return_cursor(dbi, cursor); + self.lock().return_cursor(dbi, cursor); } fn drain_cursors(&self) -> SmallVec<[*mut ffi::MDBX_cursor; 8]> { - self.write().drain_cursors() + self.lock().drain_cursors() } fn drain_cursors_for_dbi(&self, dbi: ffi::MDBX_dbi) -> SmallVec<[*mut ffi::MDBX_cursor; 8]> { - self.write().drain_cursors_for_dbi(dbi) + self.lock().drain_cursors_for_dbi(dbi) } #[cfg(test)] fn cursor_count(&self) -> usize { - self.read().cursors.len() + self.lock().cursors.len() } #[cfg(test)] unsafe fn inject_cursor(&self, dbi: ffi::MDBX_dbi, cursor: *mut ffi::MDBX_cursor) { - self.write().cursors.push((dbi, cursor)); - } -} - -impl Default for SharedCache { - fn default() -> Self { - Self::new() + self.lock().cursors.push((dbi, cursor)); } } diff --git a/src/tx/cursor.rs b/src/tx/cursor.rs index e10ccd4..e24fa37 100644 --- a/src/tx/cursor.rs +++ b/src/tx/cursor.rs @@ -34,7 +34,6 @@ where K: TransactionKind, { access: &'tx K::Access, - cache: &'tx K::Cache, cursor: *mut ffi::MDBX_cursor, db: Database, _kind: PhantomData, @@ -45,16 +44,12 @@ where K: TransactionKind, { /// Creates a new cursor from a reference to a transaction access type. - pub(crate) fn new( - access: &'tx K::Access, - cache: &'tx K::Cache, - db: Database, - ) -> MdbxResult { + pub(crate) fn new(access: &'tx K::Access, db: Database) -> MdbxResult { let mut cursor: *mut ffi::MDBX_cursor = ptr::null_mut(); access.with_txn_ptr(|txn_ptr| unsafe { mdbx_result(ffi::mdbx_cursor_open(txn_ptr, db.dbi(), &mut cursor)) })?; - Ok(Self { access, cache, cursor, db, _kind: PhantomData }) + Ok(Self { access, cursor, db, _kind: PhantomData }) } /// Wraps an existing raw cursor pointer with cache support. @@ -62,11 +57,10 @@ where /// The cursor must already be bound to the correct transaction and DBI. pub(crate) const fn from_raw( access: &'tx K::Access, - cache: &'tx K::Cache, cursor: *mut ffi::MDBX_cursor, db: Database, ) -> Self { - Self { access, cache, cursor, db, _kind: PhantomData } + Self { access, cursor, db, _kind: PhantomData } } /// Helper function for `Clone`. This should only be invoked within @@ -83,13 +77,7 @@ where return Err(e); } - Ok(Self { - access: other.access, - cache: other.cache, - cursor, - db: other.db, - _kind: PhantomData, - }) + Ok(Self { access: other.access, cursor, db: other.db, _kind: PhantomData }) } } @@ -1099,9 +1087,9 @@ where { fn drop(&mut self) { // Return the cursor pointer to the transaction cache for reuse. - // The transaction's commit/drop path will call mdbx_cursor_close on - // all cached pointers once the transaction is still valid. - self.cache.return_cursor(self.db.dbi(), self.cursor); + // The cache lives inside the access type, so the txn's Drop closes + // any leftover pointers before aborting. + self.access.cache().return_cursor(self.db.dbi(), self.cursor); } } diff --git a/src/tx/impl.rs b/src/tx/impl.rs index 6001c7b..506b909 100644 --- a/src/tx/impl.rs +++ b/src/tx/impl.rs @@ -50,9 +50,9 @@ impl fmt::Debug for TxMeta { pub struct Tx::Access> { txn: U, - cache: K::Cache, - meta: TxMeta, + + _kind: core::marker::PhantomData, } impl fmt::Debug for Tx { @@ -66,7 +66,11 @@ where K: TransactionKind>, { fn clone(&self) -> Self { - Self { txn: Arc::clone(&self.txn), cache: self.cache.clone(), meta: self.meta.clone() } + Self { + txn: Arc::clone(&self.txn), + meta: self.meta.clone(), + _kind: core::marker::PhantomData, + } } } @@ -75,8 +79,7 @@ impl Tx { pub(crate) fn from_access_and_env(txn: K::Access, env: Environment) -> Self { let span = K::new_span(txn.tx_id().unwrap_or_default()); let meta = TxMeta { env, span }; - let cache = K::Cache::default(); - Self { txn, cache, meta } + Self { txn, meta, _kind: core::marker::PhantomData } } /// Creates a new transaction wrapper from raw pointer and environment. @@ -156,7 +159,7 @@ where pub fn open_db(&self, name: Option<&str>) -> MdbxResult { let name_hash = CachedDb::hash_name(name); - if let Some(db) = self.cache.read_db(name_hash) { + if let Some(db) = self.txn.cache().read_db(name_hash) { return Ok(db); } @@ -174,7 +177,7 @@ where flags: DatabaseFlags, ) -> MdbxResult { let db = self.open_db_with_flags(name, flags)?; - self.cache.write_db(db); + self.txn.cache().write_db(db); Ok(db) } @@ -233,10 +236,10 @@ where /// DBI, and may result in bad behavior when using those instances after /// calling this function. pub unsafe fn close_db(&self, dbi: ffi::MDBX_dbi) -> MdbxResult<()> { - close_drained_cursors(&self.txn, self.cache.drain_cursors_for_dbi(dbi)); + close_drained_cursors(&self.txn, self.txn.cache().drain_cursors_for_dbi(dbi)); // SAFETY: Caller ensures no other references exist. unsafe { ops::close_db_raw(self.meta.env.env_ptr(), dbi) }?; - self.cache.remove_dbi(dbi); + self.txn.cache().remove_dbi(dbi); Ok(()) } @@ -247,14 +250,14 @@ where /// a new `mdbx_cursor_open` allocation. Cached cursors are renewed /// via `mdbx_cursor_renew` to reset their position. pub fn cursor(&self, db: Database) -> MdbxResult> { - let Some(raw) = self.cache.take_cursor(db.dbi()) else { - return Cursor::new(&self.txn, &self.cache, db); + let Some(raw) = self.txn.cache().take_cursor(db.dbi()) else { + return Cursor::new(&self.txn, db); }; self.with_txn_ptr(|txn_ptr| { // SAFETY: txn_ptr is valid from with_txn_ptr. `raw` was produced // by a prior Cursor::drop in this transaction. match unsafe { ops::cursor_renew_raw(txn_ptr, raw) } { - Ok(()) => Ok(Cursor::from_raw(&self.txn, &self.cache, raw, db)), + Ok(()) => Ok(Cursor::from_raw(&self.txn, raw, db)), Err(e) => { // Renew failed — the cursor is in an indeterminate state. // Close it defensively with the non-panicking variant and @@ -270,10 +273,11 @@ where /// Drains the cursor cache and closes all cached cursor pointers. /// - /// Must be called before commit or abort to ensure all cursors are - /// closed while the transaction is still valid. + /// Must be called before commit to ensure all cursors are closed while + /// the transaction is still valid. Abort flows rely on + /// `PtrSync::Drop` / `PtrUnsync::Drop` to drain instead. fn drain_cached_cursors(&self) { - close_drained_cursors(&self.txn, self.cache.drain_cursors()); + close_drained_cursors(&self.txn, self.txn.cache().drain_cursors()); } } @@ -521,14 +525,14 @@ impl Tx { /// exist. [`Database`] instances with the DBI will be invalidated, and /// use after calling this function may result in bad behavior. pub unsafe fn drop_db(&self, db: Database) -> MdbxResult<()> { - close_drained_cursors(&self.txn, self.cache.drain_cursors_for_dbi(db.dbi())); + close_drained_cursors(&self.txn, self.txn.cache().drain_cursors_for_dbi(db.dbi())); self.with_txn_ptr(|txn| { // SAFETY: txn is a valid RW transaction pointer, caller ensures // no other references to dbi exist. unsafe { ops::drop_db_raw(txn, db.dbi()) } })?; - self.cache.remove_dbi(db.dbi()); + self.txn.cache().remove_dbi(db.dbi()); Ok(()) } @@ -713,20 +717,6 @@ where } } -// NOTE: This impl is on Tx with free U, not Tx (where U = K::Access). -// Rust requires Drop bounds to match the struct definition exactly, so we -// route through the `close_drained_cursors` free function, which is generic -// over the access type. -impl Drop for Tx -where - K: TransactionKind, - U: TxPtrAccess, -{ - fn drop(&mut self) { - close_drained_cursors(&self.txn, self.cache.drain_cursors()); - } -} - #[cfg(test)] mod tests { use super::*; @@ -789,19 +779,19 @@ mod tests { let txn = TxUnsync::::begin(env.clone()).unwrap(); let db = txn.create_db(None, DatabaseFlags::empty()).unwrap(); - assert_eq!(txn.cache.cursor_count(), 0); + assert_eq!(txn.txn.cache().cursor_count(), 0); { let _c = txn.cursor(db).unwrap(); - assert_eq!(txn.cache.cursor_count(), 0, "live cursor is not in cache"); + assert_eq!(txn.txn.cache().cursor_count(), 0, "live cursor is not in cache"); } - assert_eq!(txn.cache.cursor_count(), 1, "dropped cursor returns to cache"); + assert_eq!(txn.txn.cache().cursor_count(), 1, "dropped cursor returns to cache"); { let _c = txn.cursor(db).unwrap(); - assert_eq!(txn.cache.cursor_count(), 0, "second cursor reuses cached pointer"); + assert_eq!(txn.txn.cache().cursor_count(), 0, "second cursor reuses cached pointer"); } - assert_eq!(txn.cache.cursor_count(), 1, "re-dropped cursor returns to cache"); + assert_eq!(txn.txn.cache().cursor_count(), 1, "re-dropped cursor returns to cache"); txn.commit().unwrap(); } @@ -826,12 +816,65 @@ mod tests { // Populate the cache: one cursor per DBI. drop(txn.cursor(db_a).unwrap()); drop(txn.cursor(db_b).unwrap()); - assert_eq!(txn.cache.cursor_count(), 2); + assert_eq!(txn.txn.cache().cursor_count(), 2); // close_db on "a" must drain only that DBI's cursor. // SAFETY: no live Cursor/Database instances for db_a remain. unsafe { txn.close_db(db_a.dbi()).unwrap() }; - assert_eq!(txn.cache.cursor_count(), 1, "close_db drains only its DBI"); + assert_eq!(txn.txn.cache().cursor_count(), 1, "close_db drains only its DBI"); + } + + #[test] + fn test_clone_drop_does_not_drain_shared_cache() { + // Regression for the bug evalir flagged on PR #11: dropping a + // `TxSync` clone must not drain the cursor cache shared with + // surviving clones. + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + let txn_a = RwTxSync::begin(env.clone()).unwrap(); + let db = txn_a.create_db(None, DatabaseFlags::empty()).unwrap(); + drop(txn_a.cursor(db).unwrap()); + assert_eq!(txn_a.txn.cache().cursor_count(), 1); + + let txn_b = txn_a.clone(); + drop(txn_b); + assert_eq!( + txn_a.txn.cache().cursor_count(), + 1, + "clone-drop must not drain the shared cursor cache" + ); + + // Surviving clone can still reuse the cached cursor. + drop(txn_a.cursor(db).unwrap()); + assert_eq!(txn_a.txn.cache().cursor_count(), 1); + } + + #[test] + fn test_concurrent_clone_drop_no_uaf() { + // Two threads each hold a clone and drop concurrently. Repeat to + // shake out any race in cache cleanup. With cache co-located in + // `PtrSync`, the cleanup runs exactly once when the last `Arc` + // drops — no race possible. + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + + for _ in 0..200 { + let txn = RwTxSync::begin(env.clone()).unwrap(); + let db = txn.create_db(None, DatabaseFlags::empty()).unwrap(); + // Populate cache with a few cursors. + drop(txn.cursor(db).unwrap()); + drop(txn.cursor(db).unwrap()); + drop(txn.cursor(db).unwrap()); + + let a = txn.clone(); + let b = txn.clone(); + drop(txn); // original + + let h1 = std::thread::spawn(move || drop(a)); + let h2 = std::thread::spawn(move || drop(b)); + h1.join().unwrap(); + h2.join().unwrap(); + } } #[test] @@ -846,12 +889,12 @@ mod tests { // MDBX_EINVAL for a null cursor; the error path closes via // mdbx_cursor_close2 (also null-safe) and must not re-insert. // SAFETY: the renew error path uses cursor_close2_raw which tolerates null. - unsafe { txn.cache.inject_cursor(db.dbi(), ptr::null_mut()) }; - assert_eq!(txn.cache.cursor_count(), 1); + unsafe { txn.txn.cache().inject_cursor(db.dbi(), ptr::null_mut()) }; + assert_eq!(txn.txn.cache().cursor_count(), 1); let err = txn.cursor(db).expect_err("renew must fail on null cursor"); assert!(matches!(err, MdbxError::DecodeError), "expected EINVAL mapping, got {err:?}"); - assert_eq!(txn.cache.cursor_count(), 0, "failed renew must not leak the pointer"); + assert_eq!(txn.txn.cache().cursor_count(), 0, "failed renew must not leak the pointer"); // A follow-up call falls through to the open path and succeeds, // proving recovery via retry works. diff --git a/src/tx/kind.rs b/src/tx/kind.rs index 5e97f14..9802a49 100644 --- a/src/tx/kind.rs +++ b/src/tx/kind.rs @@ -1,13 +1,9 @@ -use std::{cell::RefCell, fmt::Debug, ptr, sync::Arc}; +use std::{fmt::Debug, ptr, sync::Arc}; use crate::{ Environment, MdbxResult, error::mdbx_result, - tx::{ - PtrSync, TxPtrAccess, - access::PtrUnsync, - cache::{Cache, DbCache, SharedCache}, - }, + tx::{PtrSync, TxPtrAccess, access::PtrUnsync}, }; use ffi::{MDBX_TXN_RDONLY, MDBX_TXN_READWRITE, MDBX_txn_flags_t}; @@ -86,31 +82,24 @@ pub trait SyncKind { /// The inner storage type for the transaction pointer. type Access: TxPtrAccess; - - /// Cache type used for this transaction kind. - type Cache: Cache + Send; } impl SyncKind for RoSync { const SYNC: bool = true; type Access = Arc; - type Cache = SharedCache; } impl SyncKind for RwSync { const SYNC: bool = true; type Access = Arc; - type Cache = SharedCache; } impl SyncKind for Ro { type Access = PtrUnsync; - type Cache = RefCell; } impl SyncKind for Rw { type Access = PtrUnsync; - type Cache = RefCell; } /// Marker trait for writable transaction kinds. From 9440579fb9e295715baaff673d4638ef1886c634 Mon Sep 17 00:00:00 2001 From: James Date: Fri, 8 May 2026 09:56:17 -0400 Subject: [PATCH 16/19] docs: fix broken Tx::drop reference in close_drained_cursors Co-Authored-By: Claude Opus 4.7 (1M context) --- src/tx/impl.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/tx/impl.rs b/src/tx/impl.rs index 506b909..3ea1f9c 100644 --- a/src/tx/impl.rs +++ b/src/tx/impl.rs @@ -283,9 +283,9 @@ where /// Closes a drained batch of cursor pointers via the ops module. /// -/// This is a free function (rather than a method on [`Tx`]) so that -/// [`Tx::drop`], which is defined with a free access-type parameter, -/// can share the same implementation as the `Tx` methods. +/// This is a free function (rather than a method on [`Tx`]) so that the +/// `close_db` / `drop_db` paths and the access types' own `Drop` impls can +/// share a single closing routine, generic over the access type. fn close_drained_cursors( access: &U, cursors: SmallVec<[*mut ffi::MDBX_cursor; 8]>, From c0d5bef0130be7accca93df9d05825b7192cee35 Mon Sep 17 00:00:00 2001 From: James Date: Fri, 8 May 2026 10:00:28 -0400 Subject: [PATCH 17/19] docs: drop unresolvable intra-doc links on private PtrSync field Co-Authored-By: Claude Opus 4.7 (1M context) --- src/tx/access.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/tx/access.rs b/src/tx/access.rs index c66a04d..33e10c4 100644 --- a/src/tx/access.rs +++ b/src/tx/access.rs @@ -179,8 +179,8 @@ pub struct PtrSync { committed: AtomicBool, /// Lock that serialises access to the transaction pointer **and** - /// guards the cursor/database cache. Held inside [`with_txn_ptr`]; also - /// taken implicitly via the [`Cache`] impl on `Mutex`. + /// guards the cursor/database cache. Held inside `with_txn_ptr`; also + /// taken implicitly via the `Cache` impl on `Mutex`. lock: Mutex, /// The environment that owns the transaction. From 91ef3f8b49693fe1461431a23f3a725814ef200d Mon Sep 17 00:00:00 2001 From: James Date: Mon, 18 May 2026 09:19:00 -0400 Subject: [PATCH 18/19] docs: document deadlock risk of dropping Cursor inside with_txn_ptr Co-Authored-By: Claude Opus 4.7 (1M context) --- src/tx/cursor.rs | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/src/tx/cursor.rs b/src/tx/cursor.rs index e24fa37..9ac8dc8 100644 --- a/src/tx/cursor.rs +++ b/src/tx/cursor.rs @@ -1085,10 +1085,25 @@ impl<'tx, K> Drop for Cursor<'tx, K> where K: TransactionKind, { + /// Returns the cursor pointer to the transaction cache for reuse. The + /// cache lives inside the access type, so the txn's `Drop` closes any + /// leftover pointers before aborting. + /// + /// # Deadlock + /// + /// For `Sync` transaction kinds (`RoTxSync`, `RwTxSync`) the cache is + /// guarded by the same `Mutex` that `with_txn_ptr` holds. Dropping a + /// `Cursor` from inside a `with_txn_ptr` closure on the same transaction + /// therefore deadlocks: the closure already owns the lock and this + /// `return_cursor` call tries to re-acquire it. + /// + /// No current code path triggers this — `with_txn_ptr` only exposes the + /// raw txn pointer, and the cursor borrows the access through `&self` + /// rather than the closure argument — but it is reachable in user code + /// because `RoTxSync::txn` is an `Arc` with interior + /// mutability, so a `Cursor` can be carried into the closure and + /// dropped there. Avoid dropping cursors inside `with_txn_ptr`. fn drop(&mut self) { - // Return the cursor pointer to the transaction cache for reuse. - // The cache lives inside the access type, so the txn's Drop closes - // any leftover pointers before aborting. self.access.cache().return_cursor(self.db.dbi(), self.cursor); } } From 59b80006967e912e13badb541f399611c659482b Mon Sep 17 00:00:00 2001 From: James Date: Mon, 18 May 2026 09:24:16 -0400 Subject: [PATCH 19/19] chore: bump version to 0.8.3 Co-Authored-By: Claude Opus 4.7 (1M context) --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 7b0f1d1..9046665 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "signet-libmdbx" description = "Idiomatic and safe MDBX wrapper" -version = "0.8.2" +version = "0.8.3" edition = "2024" rust-version = "1.92" license = "MIT OR Apache-2.0"