diff --git a/docs/config.md b/docs/config.md index 847a29e752d..dc126ea1420 100644 --- a/docs/config.md +++ b/docs/config.md @@ -110,6 +110,13 @@ whose `--node-id` matches the `ingestor` value. The acts as a hard override that always prevents ingestion regardless of the config. +The section-level setting `cache_size` controls the default number of +blocks from the chain head for which block data is kept cached. Individual +chains can override this value. The default is 500. When the environment +variable `GRAPH_STORE_IGNORE_BLOCK_CACHE` is set, blocks older than +`cache_size` are treated as if they have no data. The value must be greater +than the reorg threshold. + The configuration for a chain `name` is specified in the section `[chains.]`, with the following: @@ -120,6 +127,8 @@ The configuration for a chain `name` is specified in the section - `amp`: the network name used by AMP for this chain; defaults to the chain name. Set this when AMP uses a different name than graph-node (e.g., `amp = "ethereum-mainnet"` on a chain named `mainnet`). +- `cache_size`: number of blocks from the chain head for which to keep + block data cached. Defaults to the section-level `cache_size`. - `provider`: a list of providers for that chain A `provider` is an object with the following characteristics: @@ -168,6 +177,7 @@ optimisations. ```toml [chains] ingestor = "block_ingestor_node" +cache_size = 500 [chains.mainnet] shard = "vip" amp = "ethereum-mainnet" diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index 755267f4324..2d47eb1786a 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -525,6 +525,11 @@ pub trait ChainIdStore: Send + Sync + 'static { ) -> Result<(), Error>; } +/// The default size for the block cache, i.e., how many blocks behind +/// the chain head we should keep in the database cache. The +/// configuration can change this for individual chains +pub const BLOCK_CACHE_SIZE: BlockNumber = 500; + /// Common trait for blockchain store implementations. #[async_trait] pub trait ChainStore: ChainHeadStore { diff --git a/graph/src/env/store.rs b/graph/src/env/store.rs index 863e5e4665c..e36e60c8b84 100644 --- a/graph/src/env/store.rs +++ b/graph/src/env/store.rs @@ -145,6 +145,10 @@ pub struct EnvVarsStore { pub use_brin_for_all_query_types: bool, /// Temporary env var to disable certain lookups in the chain store pub disable_block_cache_for_lookup: bool, + /// When set, block reads for blocks that are more than `cache_size` + /// blocks behind the chain head will act as if the data field in + /// the database was null. Set by `GRAPH_STORE_IGNORE_BLOCK_CACHE`. + pub ignore_block_cache: bool, /// Safety switch to increase the number of columns used when /// calculating the chunk size in `InsertQuery::chunk_size`. This can be /// used to work around Postgres errors complaining 'number of @@ -237,6 +241,7 @@ impl TryFrom for EnvVarsStore { create_gin_indexes: x.create_gin_indexes, use_brin_for_all_query_types: x.use_brin_for_all_query_types, disable_block_cache_for_lookup: x.disable_block_cache_for_lookup, + ignore_block_cache: x.ignore_block_cache, insert_extra_cols: x.insert_extra_cols, fdw_fetch_size: x.fdw_fetch_size, account_like_scan_interval_hours: x.account_like_scan_interval_hours, @@ -345,6 +350,8 @@ pub struct InnerStore { use_brin_for_all_query_types: bool, #[envconfig(from = "GRAPH_STORE_DISABLE_BLOCK_CACHE_FOR_LOOKUP", default = "false")] disable_block_cache_for_lookup: bool, + #[envconfig(from = "GRAPH_STORE_IGNORE_BLOCK_CACHE", default = "false")] + ignore_block_cache: bool, #[envconfig(from = "GRAPH_STORE_INSERT_EXTRA_COLS", default = "0")] insert_extra_cols: usize, #[envconfig(from = "GRAPH_STORE_FDW_FETCH_SIZE", default = "1000")] diff --git a/node/src/config.rs b/node/src/config.rs index c538439835c..882d6b7b6c2 100644 --- a/node/src/config.rs +++ b/node/src/config.rs @@ -1,7 +1,10 @@ use graph::{ anyhow::Error, blockchain::BlockchainKind, - components::network_provider::{AmpChainNames, ChainName}, + components::{ + network_provider::{AmpChainNames, ChainName}, + store::BLOCK_CACHE_SIZE, + }, env::ENV_VARS, firehose::{SubgraphLimit, SUBGRAPHS_PER_CONN}, itertools::Itertools, @@ -13,7 +16,7 @@ use graph::{ de::{self, value, SeqAccess, Visitor}, Deserialize, Deserializer, }, - serde_json, serde_regex, toml, Logger, NodeId, StoreError, + serde_json, serde_regex, toml, Logger, NodeId, StoreError, BLOCK_NUMBER_MAX, }, }; use graph_chain_ethereum as ethereum; @@ -456,14 +459,45 @@ pub struct ChainSection { pub ingestor: String, #[serde(flatten)] pub chains: BTreeMap, + /// The default for chains that don't set this explicitly. When running + /// without a config file, we use `BLOCK_NUMBER_MAX` to turn off pruning + /// the block cache + #[serde(default = "default_cache_size")] + pub cache_size: i32, } impl ChainSection { fn validate(&mut self) -> Result<()> { NodeId::new(&self.ingestor) .map_err(|node| anyhow!("invalid node id for ingestor {}", node))?; - for (_, chain) in self.chains.iter_mut() { - chain.validate()? + let reorg_threshold = ENV_VARS.reorg_threshold(); + + if self.cache_size <= reorg_threshold { + return Err(anyhow!( + "default chains.cache_size ({}) must be greater than reorg_threshold ({})", + self.cache_size, + reorg_threshold + )); + } + + // Apply section-level cache_size as default for chains that + // don't set their own. + for chain in self.chains.values_mut() { + if chain.cache_size == 0 { + chain.cache_size = self.cache_size; + } + } + + for (name, chain) in self.chains.iter_mut() { + chain.validate()?; + if chain.cache_size <= reorg_threshold { + return Err(anyhow!( + "chain '{}': cache_size ({}) must be greater than reorg_threshold ({})", + name, + chain.cache_size, + reorg_threshold + )); + } } // Validate that effective AMP names are unique and don't collide @@ -508,7 +542,12 @@ impl ChainSection { Self::parse_networks(&mut chains, Transport::Rpc, &opt.ethereum_rpc)?; Self::parse_networks(&mut chains, Transport::Ws, &opt.ethereum_ws)?; Self::parse_networks(&mut chains, Transport::Ipc, &opt.ethereum_ipc)?; - Ok(Self { ingestor, chains }) + Ok(Self { + ingestor, + chains, + // When running without a config file, we do not prune the block cache + cache_size: BLOCK_NUMBER_MAX, + }) } pub fn providers(&self) -> Vec { @@ -587,6 +626,7 @@ impl ChainSection { polling_interval: default_polling_interval(), providers: vec![], amp: None, + cache_size: 0, }); entry.providers.push(provider); } @@ -611,6 +651,15 @@ pub struct Chain { /// resolve to this chain. Defaults to the chain name. #[serde(default)] pub amp: Option, + /// Number of blocks from chain head for which to keep block data + /// cached. When `GRAPH_STORE_IGNORE_BLOCK_CACHE` is set, blocks + /// older than this are treated as if they have no data. + #[serde(default)] + pub cache_size: i32, +} + +fn default_cache_size() -> i32 { + BLOCK_CACHE_SIZE } fn default_blockchain_kind() -> BlockchainKind { @@ -1345,6 +1394,7 @@ mod tests { polling_interval: default_polling_interval(), providers: vec![], amp: None, + cache_size: 0, }, actual ); @@ -1368,11 +1418,37 @@ mod tests { polling_interval: default_polling_interval(), providers: vec![], amp: None, + cache_size: 0, }, actual ); } + #[test] + fn chain_inherits_cache_size_from_section() { + let mut section = toml::from_str::( + r#" + ingestor = "block_ingestor_node" + cache_size = 1000 + [mainnet] + shard = "primary" + provider = [] + [sepolia] + shard = "primary" + provider = [] + cache_size = 2000 + "#, + ) + .unwrap(); + + section.validate().unwrap(); + + // mainnet inherits from section + assert_eq!(section.chains["mainnet"].cache_size, 1000); + // sepolia keeps its explicit value + assert_eq!(section.chains["sepolia"].cache_size, 2000); + } + #[test] fn it_works_on_deprecated_provider_from_toml() { let actual = toml::from_str( diff --git a/node/src/store_builder.rs b/node/src/store_builder.rs index d54a64f5b06..24132881f3b 100644 --- a/node/src/store_builder.rs +++ b/node/src/store_builder.rs @@ -1,6 +1,7 @@ use std::iter::FromIterator; use std::{collections::HashMap, sync::Arc}; +use graph::components::store::BLOCK_CACHE_SIZE; use graph::prelude::{o, MetricsRegistry, NodeId}; use graph::slog::warn; use graph::url::Url; @@ -24,7 +25,8 @@ pub struct StoreBuilder { subscription_manager: Arc, chain_head_update_listener: Arc, /// Map network names to the shards where they are/should be stored - chains: HashMap, + /// and their cache_size setting + chains: HashMap, pub coord: Arc, registry: Arc, } @@ -65,7 +67,7 @@ impl StoreBuilder { let chains = HashMap::from_iter(config.chains.chains.iter().map(|(name, chain)| { let shard = ShardName::new(chain.shard.to_string()) .expect("config validation catches invalid names"); - (name.to_string(), shard) + (name.to_string(), (shard, chain.cache_size)) })); let chain_head_update_listener = Arc::new(PostgresChainHeadUpdateListener::new( @@ -177,15 +179,18 @@ impl StoreBuilder { logger: &Logger, pools: HashMap, subgraph_store: Arc, - chains: HashMap, + chains: HashMap, networks: Vec, registry: Arc, ) -> Arc { let networks = networks .into_iter() .map(|name| { - let shard = chains.get(&name).unwrap_or(&*PRIMARY_SHARD).clone(); - (name, shard) + let (shard, cache_size) = chains + .get(&name) + .cloned() + .unwrap_or_else(|| (PRIMARY_SHARD.clone(), BLOCK_CACHE_SIZE)); + (name, shard, cache_size) }) .collect(); diff --git a/store/postgres/src/block_store.rs b/store/postgres/src/block_store.rs index d33b664adfa..87ecc5358db 100644 --- a/store/postgres/src/block_store.rs +++ b/store/postgres/src/block_store.rs @@ -1,6 +1,6 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; -use graph::parking_lot::RwLock; +use graph::{components::store::BLOCK_CACHE_SIZE, parking_lot::RwLock}; use anyhow::anyhow; use async_trait::async_trait; @@ -217,8 +217,9 @@ pub struct Inner { /// known to the system at startup, either from configuration or from /// previous state in the database. stores: RwLock>>, - // We keep this information so we can create chain stores during startup - shards: Vec<(String, Shard)>, + /// We keep this information so we can create chain stores during + /// startup. The triple is (network, shard, cache_size) + shards: Vec<(String, Shard, BlockNumber)>, pools: HashMap, sender: Arc, mirror: PrimaryMirror, @@ -240,8 +241,8 @@ impl BlockStore { /// a chain uses the pool from `pools` for the given shard. pub async fn new( logger: Logger, - // (network, shard) - shards: Vec<(String, Shard)>, + // (network, shard, cache_size) + shards: Vec<(String, Shard, BlockNumber)>, // shard -> pool pools: HashMap, sender: Arc, @@ -271,7 +272,7 @@ impl BlockStore { let block_store = Self { inner }; // For each configured chain, add a chain store - for (chain_name, shard) in chains { + for (chain_name, shard, _cache_size) in chains { if let Some(chain) = existing_chains .iter() .find(|chain| chain.name == chain_name) @@ -363,6 +364,17 @@ impl BlockStore { ); let ident = chain.network_identifier()?; let logger = self.logger.new(o!("network" => chain.name.clone())); + let cache_size = self + .shards + .iter() + .find_map(|(network, _, chain_size)| { + if network == &chain.name { + Some(*chain_size) + } else { + None + } + }) + .unwrap_or(BLOCK_CACHE_SIZE); let store = ChainStore::new( logger, chain.name.clone(), @@ -371,6 +383,7 @@ impl BlockStore { pool, ENV_VARS.store.recent_blocks_cache_capacity, self.chain_store_metrics.clone(), + cache_size, ); if create { store.create(&ident).await?; @@ -565,7 +578,7 @@ impl BlockStore { let shard = self .shards .iter() - .find_map(|(chain_id, shard)| { + .find_map(|(chain_id, shard, _cache_size)| { if chain_id.as_str().eq(network) { Some(shard) } else { diff --git a/store/postgres/src/chain_store.rs b/store/postgres/src/chain_store.rs index fc0e15f6b04..237d4b1e669 100644 --- a/store/postgres/src/chain_store.rs +++ b/store/postgres/src/chain_store.rs @@ -2347,6 +2347,9 @@ pub struct ChainStore { chain_head_ptr_cache: ChainHeadPtrCache, /// Herd cache to prevent thundering herd on chain_head_ptr() lookups chain_head_ptr_herd: HerdCache, StoreError>>>, + /// Number of blocks from chain head for which to keep block data cached. + /// Used with `GRAPH_STORE_IGNORE_BLOCK_CACHE` to simulate block data eviction. + cache_size: BlockNumber, } impl ChainStore { @@ -2358,6 +2361,7 @@ impl ChainStore { pool: ConnectionPool, recent_blocks_cache_capacity: usize, metrics: Arc, + cache_size: BlockNumber, ) -> Self { let recent_blocks_cache = RecentBlocksCache::new(recent_blocks_cache_capacity, chain.clone(), metrics.clone()); @@ -2378,6 +2382,21 @@ impl ChainStore { ancestor_cache, chain_head_ptr_cache, chain_head_ptr_herd, + cache_size, + } + } + + /// Return the block number below which blocks should be treated as + /// not cached. Returns `0` when the feature is disabled (effectively + /// no cutoff). Returns `i32::MAX` when no chain head is known to + /// avoid serving stale data. + async fn cache_cutoff(self: &Arc) -> BlockNumber { + if !ENV_VARS.store.ignore_block_cache { + return 0; + } + match self.clone().chain_head_ptr().await { + Ok(Some(head)) => head.block_number().saturating_sub(self.cache_size), + _ => i32::MAX, } } @@ -2606,15 +2625,20 @@ impl ChainStore { self: &Arc, hashes: Vec, ) -> Result, StoreError> { + let cutoff = self.cache_cutoff().await; let mut conn = self.pool.get_permitted().await?; let values = self.storage.blocks(&mut conn, &self.chain, &hashes).await?; - Ok(values) + Ok(values + .into_iter() + .filter(|b| b.ptr.block_number() >= cutoff) + .collect()) } async fn blocks_from_store_by_numbers( self: &Arc, numbers: Vec, ) -> Result>, StoreError> { + let cutoff = self.cache_cutoff().await; let mut conn = self.pool.get_permitted().await?; let values = self .storage @@ -2623,7 +2647,10 @@ impl ChainStore { let mut block_map = BTreeMap::new(); - for block in values { + for block in values + .into_iter() + .filter(|b| b.ptr.block_number() >= cutoff) + { let block_number = block.ptr.block_number(); block_map .entry(block_number) @@ -3018,6 +3045,12 @@ impl ChainStoreTrait for ChainStore { block_ptr.hash_hex() ); + let target_number = block_ptr.block_number() - offset; + let cutoff = self.cache_cutoff().await; + if target_number < cutoff { + return Ok(None); + } + // Use herd cache to avoid thundering herd when multiple callers // request the same ancestor block simultaneously. The cache check // is inside the future so that only one caller checks and populates