Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,13 @@ whose `--node-id` matches the `ingestor` value. The
acts as a hard override that always prevents ingestion regardless of
the config.

The section-level setting `cache_size` controls the default number of
blocks from the chain head for which block data is kept cached. Individual
chains can override this value. The default is 500. When the environment
variable `GRAPH_STORE_IGNORE_BLOCK_CACHE` is set, blocks older than
`cache_size` are treated as if they have no data. The value must be greater
than the reorg threshold.

The configuration for a chain `name` is specified in the section
`[chains.<name>]`, with the following:

Expand All @@ -120,6 +127,8 @@ The configuration for a chain `name` is specified in the section
- `amp`: the network name used by AMP for this chain; defaults to the chain name.
Set this when AMP uses a different name than graph-node (e.g., `amp = "ethereum-mainnet"`
on a chain named `mainnet`).
- `cache_size`: number of blocks from the chain head for which to keep
block data cached. Defaults to the section-level `cache_size`.
- `provider`: a list of providers for that chain

A `provider` is an object with the following characteristics:
Expand Down Expand Up @@ -168,6 +177,7 @@ optimisations.
```toml
[chains]
ingestor = "block_ingestor_node"
cache_size = 500
[chains.mainnet]
shard = "vip"
amp = "ethereum-mainnet"
Expand Down
5 changes: 5 additions & 0 deletions graph/src/components/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,11 @@ pub trait ChainIdStore: Send + Sync + 'static {
) -> Result<(), Error>;
}

/// The default size for the block cache, i.e., how many blocks behind
/// the chain head we should keep in the database cache. The
/// configuration can change this for individual chains
pub const BLOCK_CACHE_SIZE: BlockNumber = 500;

/// Common trait for blockchain store implementations.
#[async_trait]
pub trait ChainStore: ChainHeadStore {
Expand Down
7 changes: 7 additions & 0 deletions graph/src/env/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ pub struct EnvVarsStore {
pub use_brin_for_all_query_types: bool,
/// Temporary env var to disable certain lookups in the chain store
pub disable_block_cache_for_lookup: bool,
/// When set, block reads for blocks that are more than `cache_size`
/// blocks behind the chain head will act as if the data field in
/// the database was null. Set by `GRAPH_STORE_IGNORE_BLOCK_CACHE`.
pub ignore_block_cache: bool,
/// Safety switch to increase the number of columns used when
/// calculating the chunk size in `InsertQuery::chunk_size`. This can be
/// used to work around Postgres errors complaining 'number of
Expand Down Expand Up @@ -237,6 +241,7 @@ impl TryFrom<InnerStore> for EnvVarsStore {
create_gin_indexes: x.create_gin_indexes,
use_brin_for_all_query_types: x.use_brin_for_all_query_types,
disable_block_cache_for_lookup: x.disable_block_cache_for_lookup,
ignore_block_cache: x.ignore_block_cache,
insert_extra_cols: x.insert_extra_cols,
fdw_fetch_size: x.fdw_fetch_size,
account_like_scan_interval_hours: x.account_like_scan_interval_hours,
Expand Down Expand Up @@ -345,6 +350,8 @@ pub struct InnerStore {
use_brin_for_all_query_types: bool,
#[envconfig(from = "GRAPH_STORE_DISABLE_BLOCK_CACHE_FOR_LOOKUP", default = "false")]
disable_block_cache_for_lookup: bool,
#[envconfig(from = "GRAPH_STORE_IGNORE_BLOCK_CACHE", default = "false")]
ignore_block_cache: bool,
#[envconfig(from = "GRAPH_STORE_INSERT_EXTRA_COLS", default = "0")]
insert_extra_cols: usize,
#[envconfig(from = "GRAPH_STORE_FDW_FETCH_SIZE", default = "1000")]
Expand Down
86 changes: 81 additions & 5 deletions node/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use graph::{
anyhow::Error,
blockchain::BlockchainKind,
components::network_provider::{AmpChainNames, ChainName},
components::{
network_provider::{AmpChainNames, ChainName},
store::BLOCK_CACHE_SIZE,
},
env::ENV_VARS,
firehose::{SubgraphLimit, SUBGRAPHS_PER_CONN},
itertools::Itertools,
Expand All @@ -13,7 +16,7 @@ use graph::{
de::{self, value, SeqAccess, Visitor},
Deserialize, Deserializer,
},
serde_json, serde_regex, toml, Logger, NodeId, StoreError,
serde_json, serde_regex, toml, Logger, NodeId, StoreError, BLOCK_NUMBER_MAX,
},
};
use graph_chain_ethereum as ethereum;
Expand Down Expand Up @@ -456,14 +459,45 @@ pub struct ChainSection {
pub ingestor: String,
#[serde(flatten)]
pub chains: BTreeMap<String, Chain>,
/// The default for chains that don't set this explicitly. When running
/// without a config file, we use `BLOCK_NUMBER_MAX` to turn off pruning
/// the block cache
#[serde(default = "default_cache_size")]
pub cache_size: i32,
}

impl ChainSection {
fn validate(&mut self) -> Result<()> {
NodeId::new(&self.ingestor)
.map_err(|node| anyhow!("invalid node id for ingestor {}", node))?;
for (_, chain) in self.chains.iter_mut() {
chain.validate()?
let reorg_threshold = ENV_VARS.reorg_threshold();

if self.cache_size <= reorg_threshold {
return Err(anyhow!(
"default chains.cache_size ({}) must be greater than reorg_threshold ({})",
self.cache_size,
reorg_threshold
));
}

// Apply section-level cache_size as default for chains that
// don't set their own.
for chain in self.chains.values_mut() {
if chain.cache_size == 0 {
chain.cache_size = self.cache_size;
}
}

for (name, chain) in self.chains.iter_mut() {
chain.validate()?;
if chain.cache_size <= reorg_threshold {
return Err(anyhow!(
"chain '{}': cache_size ({}) must be greater than reorg_threshold ({})",
name,
chain.cache_size,
reorg_threshold
));
}
}

// Validate that effective AMP names are unique and don't collide
Expand Down Expand Up @@ -508,7 +542,12 @@ impl ChainSection {
Self::parse_networks(&mut chains, Transport::Rpc, &opt.ethereum_rpc)?;
Self::parse_networks(&mut chains, Transport::Ws, &opt.ethereum_ws)?;
Self::parse_networks(&mut chains, Transport::Ipc, &opt.ethereum_ipc)?;
Ok(Self { ingestor, chains })
Ok(Self {
ingestor,
chains,
// When running without a config file, we do not prune the block cache
cache_size: BLOCK_NUMBER_MAX,
})
}

pub fn providers(&self) -> Vec<String> {
Expand Down Expand Up @@ -587,6 +626,7 @@ impl ChainSection {
polling_interval: default_polling_interval(),
providers: vec![],
amp: None,
cache_size: 0,
});
entry.providers.push(provider);
}
Expand All @@ -611,6 +651,15 @@ pub struct Chain {
/// resolve to this chain. Defaults to the chain name.
#[serde(default)]
pub amp: Option<String>,
/// Number of blocks from chain head for which to keep block data
/// cached. When `GRAPH_STORE_IGNORE_BLOCK_CACHE` is set, blocks
/// older than this are treated as if they have no data.
#[serde(default)]
pub cache_size: i32,
}

fn default_cache_size() -> i32 {
BLOCK_CACHE_SIZE
}

fn default_blockchain_kind() -> BlockchainKind {
Expand Down Expand Up @@ -1345,6 +1394,7 @@ mod tests {
polling_interval: default_polling_interval(),
providers: vec![],
amp: None,
cache_size: 0,
},
actual
);
Expand All @@ -1368,11 +1418,37 @@ mod tests {
polling_interval: default_polling_interval(),
providers: vec![],
amp: None,
cache_size: 0,
},
actual
);
}

#[test]
fn chain_inherits_cache_size_from_section() {
let mut section = toml::from_str::<ChainSection>(
r#"
ingestor = "block_ingestor_node"
cache_size = 1000
[mainnet]
shard = "primary"
provider = []
[sepolia]
shard = "primary"
provider = []
cache_size = 2000
"#,
)
.unwrap();

section.validate().unwrap();

// mainnet inherits from section
assert_eq!(section.chains["mainnet"].cache_size, 1000);
// sepolia keeps its explicit value
assert_eq!(section.chains["sepolia"].cache_size, 2000);
}

#[test]
fn it_works_on_deprecated_provider_from_toml() {
let actual = toml::from_str(
Expand Down
15 changes: 10 additions & 5 deletions node/src/store_builder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::iter::FromIterator;
use std::{collections::HashMap, sync::Arc};

use graph::components::store::BLOCK_CACHE_SIZE;
use graph::prelude::{o, MetricsRegistry, NodeId};
use graph::slog::warn;
use graph::url::Url;
Expand All @@ -24,7 +25,8 @@ pub struct StoreBuilder {
subscription_manager: Arc<SubscriptionManager>,
chain_head_update_listener: Arc<PostgresChainHeadUpdateListener>,
/// Map network names to the shards where they are/should be stored
chains: HashMap<String, ShardName>,
/// and their cache_size setting
chains: HashMap<String, (ShardName, i32)>,
pub coord: Arc<PoolCoordinator>,
registry: Arc<MetricsRegistry>,
}
Expand Down Expand Up @@ -65,7 +67,7 @@ impl StoreBuilder {
let chains = HashMap::from_iter(config.chains.chains.iter().map(|(name, chain)| {
let shard = ShardName::new(chain.shard.to_string())
.expect("config validation catches invalid names");
(name.to_string(), shard)
(name.to_string(), (shard, chain.cache_size))
}));

let chain_head_update_listener = Arc::new(PostgresChainHeadUpdateListener::new(
Expand Down Expand Up @@ -177,15 +179,18 @@ impl StoreBuilder {
logger: &Logger,
pools: HashMap<ShardName, ConnectionPool>,
subgraph_store: Arc<SubgraphStore>,
chains: HashMap<String, ShardName>,
chains: HashMap<String, (ShardName, i32)>,
networks: Vec<String>,
registry: Arc<MetricsRegistry>,
) -> Arc<DieselStore> {
let networks = networks
.into_iter()
.map(|name| {
let shard = chains.get(&name).unwrap_or(&*PRIMARY_SHARD).clone();
(name, shard)
let (shard, cache_size) = chains
.get(&name)
.cloned()
.unwrap_or_else(|| (PRIMARY_SHARD.clone(), BLOCK_CACHE_SIZE));
(name, shard, cache_size)
})
.collect();

Expand Down
27 changes: 20 additions & 7 deletions store/postgres/src/block_store.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{collections::HashMap, sync::Arc, time::Duration};

use graph::parking_lot::RwLock;
use graph::{components::store::BLOCK_CACHE_SIZE, parking_lot::RwLock};

use anyhow::anyhow;
use async_trait::async_trait;
Expand Down Expand Up @@ -217,8 +217,9 @@ pub struct Inner {
/// known to the system at startup, either from configuration or from
/// previous state in the database.
stores: RwLock<HashMap<String, Arc<ChainStore>>>,
// We keep this information so we can create chain stores during startup
shards: Vec<(String, Shard)>,
/// We keep this information so we can create chain stores during
/// startup. The triple is (network, shard, cache_size)
shards: Vec<(String, Shard, BlockNumber)>,
pools: HashMap<Shard, ConnectionPool>,
sender: Arc<NotificationSender>,
mirror: PrimaryMirror,
Expand All @@ -240,8 +241,8 @@ impl BlockStore {
/// a chain uses the pool from `pools` for the given shard.
pub async fn new(
logger: Logger,
// (network, shard)
shards: Vec<(String, Shard)>,
// (network, shard, cache_size)
shards: Vec<(String, Shard, BlockNumber)>,
// shard -> pool
pools: HashMap<Shard, ConnectionPool>,
sender: Arc<NotificationSender>,
Expand Down Expand Up @@ -271,7 +272,7 @@ impl BlockStore {
let block_store = Self { inner };

// For each configured chain, add a chain store
for (chain_name, shard) in chains {
for (chain_name, shard, _cache_size) in chains {
if let Some(chain) = existing_chains
.iter()
.find(|chain| chain.name == chain_name)
Expand Down Expand Up @@ -363,6 +364,17 @@ impl BlockStore {
);
let ident = chain.network_identifier()?;
let logger = self.logger.new(o!("network" => chain.name.clone()));
let cache_size = self
.shards
.iter()
.find_map(|(network, _, chain_size)| {
if network == &chain.name {
Some(*chain_size)
} else {
None
}
})
.unwrap_or(BLOCK_CACHE_SIZE);
let store = ChainStore::new(
logger,
chain.name.clone(),
Expand All @@ -371,6 +383,7 @@ impl BlockStore {
pool,
ENV_VARS.store.recent_blocks_cache_capacity,
self.chain_store_metrics.clone(),
cache_size,
);
if create {
store.create(&ident).await?;
Expand Down Expand Up @@ -565,7 +578,7 @@ impl BlockStore {
let shard = self
.shards
.iter()
.find_map(|(chain_id, shard)| {
.find_map(|(chain_id, shard, _cache_size)| {
if chain_id.as_str().eq(network) {
Some(shard)
} else {
Expand Down
Loading
Loading