From d13f71079dbbbea032cae0d1e44d7e1310d15801 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Wed, 25 Mar 2026 16:58:02 -0700 Subject: [PATCH 1/6] graph: Add GRAPH_STORE_IGNORE_BLOCK_CACHE env var Add a boolean env var that, when set, will cause block reads for blocks outside the per-chain cache_size window to behave as if the data field is null. This is the first step toward experimenting with reduced block caching before the full block cache revamp. --- graph/src/env/store.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/graph/src/env/store.rs b/graph/src/env/store.rs index 863e5e4665c..e36e60c8b84 100644 --- a/graph/src/env/store.rs +++ b/graph/src/env/store.rs @@ -145,6 +145,10 @@ pub struct EnvVarsStore { pub use_brin_for_all_query_types: bool, /// Temporary env var to disable certain lookups in the chain store pub disable_block_cache_for_lookup: bool, + /// When set, block reads for blocks that are more than `cache_size` + /// blocks behind the chain head will act as if the data field in + /// the database was null. Set by `GRAPH_STORE_IGNORE_BLOCK_CACHE`. + pub ignore_block_cache: bool, /// Safety switch to increase the number of columns used when /// calculating the chunk size in `InsertQuery::chunk_size`. This can be /// used to work around Postgres errors complaining 'number of @@ -237,6 +241,7 @@ impl TryFrom for EnvVarsStore { create_gin_indexes: x.create_gin_indexes, use_brin_for_all_query_types: x.use_brin_for_all_query_types, disable_block_cache_for_lookup: x.disable_block_cache_for_lookup, + ignore_block_cache: x.ignore_block_cache, insert_extra_cols: x.insert_extra_cols, fdw_fetch_size: x.fdw_fetch_size, account_like_scan_interval_hours: x.account_like_scan_interval_hours, @@ -345,6 +350,8 @@ pub struct InnerStore { use_brin_for_all_query_types: bool, #[envconfig(from = "GRAPH_STORE_DISABLE_BLOCK_CACHE_FOR_LOOKUP", default = "false")] disable_block_cache_for_lookup: bool, + #[envconfig(from = "GRAPH_STORE_IGNORE_BLOCK_CACHE", default = "false")] + ignore_block_cache: bool, #[envconfig(from = "GRAPH_STORE_INSERT_EXTRA_COLS", default = "0")] insert_extra_cols: usize, #[envconfig(from = "GRAPH_STORE_FDW_FETCH_SIZE", default = "1000")] From 06d592d1f6a60800fbee1abe2fb5b2c3d5a9fe5e Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Wed, 25 Mar 2026 17:00:37 -0700 Subject: [PATCH 2/6] node: Add cache_size to chain TOML config Add a per-chain `cache_size` configuration parameter that controls how many blocks from chain head to keep in the block cache. Defaults to 500. Validated to be greater than reorg_threshold. --- graph/src/components/store/traits.rs | 5 +++++ node/src/config.rs | 32 ++++++++++++++++++++++++---- 2 files changed, 33 insertions(+), 4 deletions(-) diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index 755267f4324..2d47eb1786a 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -525,6 +525,11 @@ pub trait ChainIdStore: Send + Sync + 'static { ) -> Result<(), Error>; } +/// The default size for the block cache, i.e., how many blocks behind +/// the chain head we should keep in the database cache. The +/// configuration can change this for individual chains +pub const BLOCK_CACHE_SIZE: BlockNumber = 500; + /// Common trait for blockchain store implementations. #[async_trait] pub trait ChainStore: ChainHeadStore { diff --git a/node/src/config.rs b/node/src/config.rs index c538439835c..34f8075fa0b 100644 --- a/node/src/config.rs +++ b/node/src/config.rs @@ -1,7 +1,10 @@ use graph::{ anyhow::Error, blockchain::BlockchainKind, - components::network_provider::{AmpChainNames, ChainName}, + components::{ + network_provider::{AmpChainNames, ChainName}, + store::BLOCK_CACHE_SIZE, + }, env::ENV_VARS, firehose::{SubgraphLimit, SUBGRAPHS_PER_CONN}, itertools::Itertools, @@ -462,8 +465,17 @@ impl ChainSection { fn validate(&mut self) -> Result<()> { NodeId::new(&self.ingestor) .map_err(|node| anyhow!("invalid node id for ingestor {}", node))?; - for (_, chain) in self.chains.iter_mut() { - chain.validate()? + let reorg_threshold = ENV_VARS.reorg_threshold(); + for (name, chain) in self.chains.iter_mut() { + chain.validate()?; + if chain.cache_size <= reorg_threshold { + return Err(anyhow!( + "chain '{}': cache_size ({}) must be greater than reorg_threshold ({})", + name, + chain.cache_size, + reorg_threshold + )); + } } // Validate that effective AMP names are unique and don't collide @@ -587,6 +599,7 @@ impl ChainSection { polling_interval: default_polling_interval(), providers: vec![], amp: None, + cache_size: default_cache_size(), }); entry.providers.push(provider); } @@ -611,6 +624,15 @@ pub struct Chain { /// resolve to this chain. Defaults to the chain name. #[serde(default)] pub amp: Option, + /// Number of blocks from chain head for which to keep block data + /// cached. When `GRAPH_STORE_IGNORE_BLOCK_CACHE` is set, blocks + /// older than this are treated as if they have no data. + #[serde(default = "default_cache_size")] + pub cache_size: i32, +} + +fn default_cache_size() -> i32 { + BLOCK_CACHE_SIZE } fn default_blockchain_kind() -> BlockchainKind { @@ -1297,7 +1319,7 @@ where #[cfg(test)] mod tests { - use crate::config::{default_polling_interval, ChainSection, Web3Rule}; + use crate::config::{default_cache_size, default_polling_interval, ChainSection, Web3Rule}; use super::{ Chain, Config, FirehoseProvider, Provider, ProviderDetails, Shard, Transport, Web3Provider, @@ -1345,6 +1367,7 @@ mod tests { polling_interval: default_polling_interval(), providers: vec![], amp: None, + cache_size: default_cache_size(), }, actual ); @@ -1368,6 +1391,7 @@ mod tests { polling_interval: default_polling_interval(), providers: vec![], amp: None, + cache_size: default_cache_size(), }, actual ); From 69878ffdf2e8671a16584cfe5a05d432a82c4d42 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Wed, 25 Mar 2026 17:04:59 -0700 Subject: [PATCH 3/6] store: Thread cache_size to ChainStore Pass the per-chain cache_size configuration from TOML config through StoreBuilder and BlockStore to ChainStore, where it will be used to determine which blocks should be treated as uncached. --- node/src/store_builder.rs | 15 ++++++++++----- store/postgres/src/block_store.rs | 27 ++++++++++++++++++++------- store/postgres/src/chain_store.rs | 5 +++++ 3 files changed, 35 insertions(+), 12 deletions(-) diff --git a/node/src/store_builder.rs b/node/src/store_builder.rs index d54a64f5b06..24132881f3b 100644 --- a/node/src/store_builder.rs +++ b/node/src/store_builder.rs @@ -1,6 +1,7 @@ use std::iter::FromIterator; use std::{collections::HashMap, sync::Arc}; +use graph::components::store::BLOCK_CACHE_SIZE; use graph::prelude::{o, MetricsRegistry, NodeId}; use graph::slog::warn; use graph::url::Url; @@ -24,7 +25,8 @@ pub struct StoreBuilder { subscription_manager: Arc, chain_head_update_listener: Arc, /// Map network names to the shards where they are/should be stored - chains: HashMap, + /// and their cache_size setting + chains: HashMap, pub coord: Arc, registry: Arc, } @@ -65,7 +67,7 @@ impl StoreBuilder { let chains = HashMap::from_iter(config.chains.chains.iter().map(|(name, chain)| { let shard = ShardName::new(chain.shard.to_string()) .expect("config validation catches invalid names"); - (name.to_string(), shard) + (name.to_string(), (shard, chain.cache_size)) })); let chain_head_update_listener = Arc::new(PostgresChainHeadUpdateListener::new( @@ -177,15 +179,18 @@ impl StoreBuilder { logger: &Logger, pools: HashMap, subgraph_store: Arc, - chains: HashMap, + chains: HashMap, networks: Vec, registry: Arc, ) -> Arc { let networks = networks .into_iter() .map(|name| { - let shard = chains.get(&name).unwrap_or(&*PRIMARY_SHARD).clone(); - (name, shard) + let (shard, cache_size) = chains + .get(&name) + .cloned() + .unwrap_or_else(|| (PRIMARY_SHARD.clone(), BLOCK_CACHE_SIZE)); + (name, shard, cache_size) }) .collect(); diff --git a/store/postgres/src/block_store.rs b/store/postgres/src/block_store.rs index d33b664adfa..87ecc5358db 100644 --- a/store/postgres/src/block_store.rs +++ b/store/postgres/src/block_store.rs @@ -1,6 +1,6 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; -use graph::parking_lot::RwLock; +use graph::{components::store::BLOCK_CACHE_SIZE, parking_lot::RwLock}; use anyhow::anyhow; use async_trait::async_trait; @@ -217,8 +217,9 @@ pub struct Inner { /// known to the system at startup, either from configuration or from /// previous state in the database. stores: RwLock>>, - // We keep this information so we can create chain stores during startup - shards: Vec<(String, Shard)>, + /// We keep this information so we can create chain stores during + /// startup. The triple is (network, shard, cache_size) + shards: Vec<(String, Shard, BlockNumber)>, pools: HashMap, sender: Arc, mirror: PrimaryMirror, @@ -240,8 +241,8 @@ impl BlockStore { /// a chain uses the pool from `pools` for the given shard. pub async fn new( logger: Logger, - // (network, shard) - shards: Vec<(String, Shard)>, + // (network, shard, cache_size) + shards: Vec<(String, Shard, BlockNumber)>, // shard -> pool pools: HashMap, sender: Arc, @@ -271,7 +272,7 @@ impl BlockStore { let block_store = Self { inner }; // For each configured chain, add a chain store - for (chain_name, shard) in chains { + for (chain_name, shard, _cache_size) in chains { if let Some(chain) = existing_chains .iter() .find(|chain| chain.name == chain_name) @@ -363,6 +364,17 @@ impl BlockStore { ); let ident = chain.network_identifier()?; let logger = self.logger.new(o!("network" => chain.name.clone())); + let cache_size = self + .shards + .iter() + .find_map(|(network, _, chain_size)| { + if network == &chain.name { + Some(*chain_size) + } else { + None + } + }) + .unwrap_or(BLOCK_CACHE_SIZE); let store = ChainStore::new( logger, chain.name.clone(), @@ -371,6 +383,7 @@ impl BlockStore { pool, ENV_VARS.store.recent_blocks_cache_capacity, self.chain_store_metrics.clone(), + cache_size, ); if create { store.create(&ident).await?; @@ -565,7 +578,7 @@ impl BlockStore { let shard = self .shards .iter() - .find_map(|(chain_id, shard)| { + .find_map(|(chain_id, shard, _cache_size)| { if chain_id.as_str().eq(network) { Some(shard) } else { diff --git a/store/postgres/src/chain_store.rs b/store/postgres/src/chain_store.rs index fc0e15f6b04..a9c977e1fcc 100644 --- a/store/postgres/src/chain_store.rs +++ b/store/postgres/src/chain_store.rs @@ -2347,6 +2347,9 @@ pub struct ChainStore { chain_head_ptr_cache: ChainHeadPtrCache, /// Herd cache to prevent thundering herd on chain_head_ptr() lookups chain_head_ptr_herd: HerdCache, StoreError>>>, + /// Number of blocks from chain head for which to keep block data cached. + /// Used with `GRAPH_STORE_IGNORE_BLOCK_CACHE` to simulate block data eviction. + cache_size: BlockNumber, } impl ChainStore { @@ -2358,6 +2361,7 @@ impl ChainStore { pool: ConnectionPool, recent_blocks_cache_capacity: usize, metrics: Arc, + cache_size: BlockNumber, ) -> Self { let recent_blocks_cache = RecentBlocksCache::new(recent_blocks_cache_capacity, chain.clone(), metrics.clone()); @@ -2378,6 +2382,7 @@ impl ChainStore { ancestor_cache, chain_head_ptr_cache, chain_head_ptr_herd, + cache_size, } } From b438d8d31cc4cbee98346c059101f1dcbb28b33d Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Wed, 25 Mar 2026 17:05:55 -0700 Subject: [PATCH 4/6] store: Ignore cached blocks outside cache_size window When GRAPH_STORE_IGNORE_BLOCK_CACHE is set, block reads for blocks that are more than cache_size blocks behind the chain head now behave as if the block doesn't exist in the cache. This allows experimenting with the effects of reduced block caching before the full block cache revamp. --- store/postgres/src/chain_store.rs | 32 +++++++++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/store/postgres/src/chain_store.rs b/store/postgres/src/chain_store.rs index a9c977e1fcc..237d4b1e669 100644 --- a/store/postgres/src/chain_store.rs +++ b/store/postgres/src/chain_store.rs @@ -2386,6 +2386,20 @@ impl ChainStore { } } + /// Return the block number below which blocks should be treated as + /// not cached. Returns `0` when the feature is disabled (effectively + /// no cutoff). Returns `i32::MAX` when no chain head is known to + /// avoid serving stale data. + async fn cache_cutoff(self: &Arc) -> BlockNumber { + if !ENV_VARS.store.ignore_block_cache { + return 0; + } + match self.clone().chain_head_ptr().await { + Ok(Some(head)) => head.block_number().saturating_sub(self.cache_size), + _ => i32::MAX, + } + } + /// Execute a cached query, avoiding thundering herd for identical requests. /// Returns `(result, was_cached)`. async fn cached_lookup( @@ -2611,15 +2625,20 @@ impl ChainStore { self: &Arc, hashes: Vec, ) -> Result, StoreError> { + let cutoff = self.cache_cutoff().await; let mut conn = self.pool.get_permitted().await?; let values = self.storage.blocks(&mut conn, &self.chain, &hashes).await?; - Ok(values) + Ok(values + .into_iter() + .filter(|b| b.ptr.block_number() >= cutoff) + .collect()) } async fn blocks_from_store_by_numbers( self: &Arc, numbers: Vec, ) -> Result>, StoreError> { + let cutoff = self.cache_cutoff().await; let mut conn = self.pool.get_permitted().await?; let values = self .storage @@ -2628,7 +2647,10 @@ impl ChainStore { let mut block_map = BTreeMap::new(); - for block in values { + for block in values + .into_iter() + .filter(|b| b.ptr.block_number() >= cutoff) + { let block_number = block.ptr.block_number(); block_map .entry(block_number) @@ -3023,6 +3045,12 @@ impl ChainStoreTrait for ChainStore { block_ptr.hash_hex() ); + let target_number = block_ptr.block_number() - offset; + let cutoff = self.cache_cutoff().await; + if target_number < cutoff { + return Ok(None); + } + // Use herd cache to avoid thundering herd when multiple callers // request the same ancestor block simultaneously. The cache check // is inside the future so that only one caller checks and populates From 2b9b38b411728ba96f31ba43085a9072a9798769 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Thu, 26 Mar 2026 16:37:00 -0700 Subject: [PATCH 5/6] node: Inherit cache_size from ChainSection default Make ChainSection.cache_size the default for chains that don't set cache_size explicitly. Chains deserialized without cache_size get 0 as a sentinel; ChainSection::validate() then fills in the section-level default before the reorg_threshold check. --- node/src/config.rs | 66 +++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 59 insertions(+), 7 deletions(-) diff --git a/node/src/config.rs b/node/src/config.rs index 34f8075fa0b..882d6b7b6c2 100644 --- a/node/src/config.rs +++ b/node/src/config.rs @@ -16,7 +16,7 @@ use graph::{ de::{self, value, SeqAccess, Visitor}, Deserialize, Deserializer, }, - serde_json, serde_regex, toml, Logger, NodeId, StoreError, + serde_json, serde_regex, toml, Logger, NodeId, StoreError, BLOCK_NUMBER_MAX, }, }; use graph_chain_ethereum as ethereum; @@ -459,6 +459,11 @@ pub struct ChainSection { pub ingestor: String, #[serde(flatten)] pub chains: BTreeMap, + /// The default for chains that don't set this explicitly. When running + /// without a config file, we use `BLOCK_NUMBER_MAX` to turn off pruning + /// the block cache + #[serde(default = "default_cache_size")] + pub cache_size: i32, } impl ChainSection { @@ -466,6 +471,23 @@ impl ChainSection { NodeId::new(&self.ingestor) .map_err(|node| anyhow!("invalid node id for ingestor {}", node))?; let reorg_threshold = ENV_VARS.reorg_threshold(); + + if self.cache_size <= reorg_threshold { + return Err(anyhow!( + "default chains.cache_size ({}) must be greater than reorg_threshold ({})", + self.cache_size, + reorg_threshold + )); + } + + // Apply section-level cache_size as default for chains that + // don't set their own. + for chain in self.chains.values_mut() { + if chain.cache_size == 0 { + chain.cache_size = self.cache_size; + } + } + for (name, chain) in self.chains.iter_mut() { chain.validate()?; if chain.cache_size <= reorg_threshold { @@ -520,7 +542,12 @@ impl ChainSection { Self::parse_networks(&mut chains, Transport::Rpc, &opt.ethereum_rpc)?; Self::parse_networks(&mut chains, Transport::Ws, &opt.ethereum_ws)?; Self::parse_networks(&mut chains, Transport::Ipc, &opt.ethereum_ipc)?; - Ok(Self { ingestor, chains }) + Ok(Self { + ingestor, + chains, + // When running without a config file, we do not prune the block cache + cache_size: BLOCK_NUMBER_MAX, + }) } pub fn providers(&self) -> Vec { @@ -599,7 +626,7 @@ impl ChainSection { polling_interval: default_polling_interval(), providers: vec![], amp: None, - cache_size: default_cache_size(), + cache_size: 0, }); entry.providers.push(provider); } @@ -627,7 +654,7 @@ pub struct Chain { /// Number of blocks from chain head for which to keep block data /// cached. When `GRAPH_STORE_IGNORE_BLOCK_CACHE` is set, blocks /// older than this are treated as if they have no data. - #[serde(default = "default_cache_size")] + #[serde(default)] pub cache_size: i32, } @@ -1319,7 +1346,7 @@ where #[cfg(test)] mod tests { - use crate::config::{default_cache_size, default_polling_interval, ChainSection, Web3Rule}; + use crate::config::{default_polling_interval, ChainSection, Web3Rule}; use super::{ Chain, Config, FirehoseProvider, Provider, ProviderDetails, Shard, Transport, Web3Provider, @@ -1367,7 +1394,7 @@ mod tests { polling_interval: default_polling_interval(), providers: vec![], amp: None, - cache_size: default_cache_size(), + cache_size: 0, }, actual ); @@ -1391,12 +1418,37 @@ mod tests { polling_interval: default_polling_interval(), providers: vec![], amp: None, - cache_size: default_cache_size(), + cache_size: 0, }, actual ); } + #[test] + fn chain_inherits_cache_size_from_section() { + let mut section = toml::from_str::( + r#" + ingestor = "block_ingestor_node" + cache_size = 1000 + [mainnet] + shard = "primary" + provider = [] + [sepolia] + shard = "primary" + provider = [] + cache_size = 2000 + "#, + ) + .unwrap(); + + section.validate().unwrap(); + + // mainnet inherits from section + assert_eq!(section.chains["mainnet"].cache_size, 1000); + // sepolia keeps its explicit value + assert_eq!(section.chains["sepolia"].cache_size, 2000); + } + #[test] fn it_works_on_deprecated_provider_from_toml() { let actual = toml::from_str( From 192869fd8e510560660d7769df83c70facd09554 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Thu, 26 Mar 2026 17:03:00 -0700 Subject: [PATCH 6/6] node: Document cache_size setting in config.md --- docs/config.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/docs/config.md b/docs/config.md index 847a29e752d..dc126ea1420 100644 --- a/docs/config.md +++ b/docs/config.md @@ -110,6 +110,13 @@ whose `--node-id` matches the `ingestor` value. The acts as a hard override that always prevents ingestion regardless of the config. +The section-level setting `cache_size` controls the default number of +blocks from the chain head for which block data is kept cached. Individual +chains can override this value. The default is 500. When the environment +variable `GRAPH_STORE_IGNORE_BLOCK_CACHE` is set, blocks older than +`cache_size` are treated as if they have no data. The value must be greater +than the reorg threshold. + The configuration for a chain `name` is specified in the section `[chains.]`, with the following: @@ -120,6 +127,8 @@ The configuration for a chain `name` is specified in the section - `amp`: the network name used by AMP for this chain; defaults to the chain name. Set this when AMP uses a different name than graph-node (e.g., `amp = "ethereum-mainnet"` on a chain named `mainnet`). +- `cache_size`: number of blocks from the chain head for which to keep + block data cached. Defaults to the section-level `cache_size`. - `provider`: a list of providers for that chain A `provider` is an object with the following characteristics: @@ -168,6 +177,7 @@ optimisations. ```toml [chains] ingestor = "block_ingestor_node" +cache_size = 500 [chains.mainnet] shard = "vip" amp = "ethereum-mainnet"