Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1148,11 +1148,9 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
let block = match self.chain_client.as_ref() {
ChainClient::Firehose(endpoints) => {
let chain_store = self.chain_store.cheap_clone();
// First try to get the block from the store (typed cache)
if let Ok(blocks) = chain_store.blocks(vec![block.hash.clone()]).await {
if let Some(cached_block) = blocks.into_iter().next() {
return Ok(cached_block.light_block().parent_ptr());
}
// First try to get the parent pointer from the store header columns
if let Ok(Some(parent)) = chain_store.block_parent_ptr(&block.hash).await {
return Ok(Some(parent));
}

// If not in store, fetch from Firehose
Expand Down
14 changes: 14 additions & 0 deletions graph/src/blockchain/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,12 @@ impl ChainStore for MockChainStore {
async fn blocks(self: Arc<Self>, _hashes: Vec<BlockHash>) -> Result<Vec<CachedBlock>, Error> {
unimplemented!()
}
async fn block_parent_ptr(
self: Arc<Self>,
_hash: &BlockHash,
) -> Result<Option<BlockPtr>, Error> {
unimplemented!()
}
async fn ancestor_block(
self: Arc<Self>,
_block_ptr: BlockPtr,
Expand All @@ -538,6 +544,14 @@ impl ChainStore for MockChainStore {
) -> Result<Option<(CachedBlock, BlockPtr)>, Error> {
unimplemented!()
}
async fn ancestor_block_ptr(
self: Arc<Self>,
_block_ptr: BlockPtr,
_offset: BlockNumber,
_root: Option<BlockHash>,
) -> Result<Option<BlockPtr>, Error> {
unimplemented!()
}
async fn cleanup_cached_blocks(
&self,
_ancestor_count: BlockNumber,
Expand Down
17 changes: 17 additions & 0 deletions graph/src/components/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,13 @@ pub trait ChainStore: ChainHeadStore {
/// Returns the blocks present in the store as typed cached blocks.
async fn blocks(self: Arc<Self>, hashes: Vec<BlockHash>) -> Result<Vec<CachedBlock>, Error>;

/// Return the parent block pointer for the block with the given hash.
/// Only reads header columns (hash, number, parent_hash), not the data
/// column. More efficient than `blocks` when only the parent pointer is
/// needed.
async fn block_parent_ptr(self: Arc<Self>, hash: &BlockHash)
-> Result<Option<BlockPtr>, Error>;

/// Returns the blocks present in the store for the given block numbers.
async fn block_ptrs_by_numbers(
self: Arc<Self>,
Expand Down Expand Up @@ -587,6 +594,16 @@ pub trait ChainStore: ChainHeadStore {
root: Option<BlockHash>,
) -> Result<Option<(CachedBlock, BlockPtr)>, Error>;

/// Like `ancestor_block` but returns only the block pointer, not the
/// block data. More efficient when callers only need to identify which
/// block is the ancestor without reading the full block body.
async fn ancestor_block_ptr(
self: Arc<Self>,
block_ptr: BlockPtr,
offset: BlockNumber,
root: Option<BlockHash>,
) -> Result<Option<BlockPtr>, Error>;

/// Remove old blocks from the cache we maintain in the database and
/// return a pair containing the number of the oldest block retained
/// and the number of blocks deleted.
Expand Down
9 changes: 5 additions & 4 deletions node/src/manager/commands/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,11 @@ pub async fn info(
let head_block = chain_store.cheap_clone().chain_head_ptr().await?;
let ancestor = match &head_block {
None => None,
Some(head_block) => chain_store
.ancestor_block(head_block.clone(), offset, None)
.await?
.map(|x| x.1),
Some(head_block) => {
chain_store
.ancestor_block_ptr(head_block.clone(), offset, None)
.await?
}
};

row("name", chain.name);
Expand Down
215 changes: 214 additions & 1 deletion store/postgres/src/chain_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,10 @@ mod data {
use diesel::dsl::sql;
use diesel::insert_into;
use diesel::sql_types::{Array, Binary, Bool, Nullable, Text};
use diesel::{delete, sql_query, ExpressionMethods, JoinOnDsl, OptionalExtension, QueryDsl};
use diesel::{
delete, sql_query, BoolExpressionMethods, ExpressionMethods, JoinOnDsl,
NullableExpressionMethods, OptionalExtension, QueryDsl,
};
use diesel::{
deserialize::FromSql,
pg::Pg,
Expand Down Expand Up @@ -745,6 +748,73 @@ mod data {
.collect())
}

/// Return the parent block pointer for the block with the given hash.
/// Only reads header columns, not the data column.
pub(super) async fn block_parent_ptr(
&self,
conn: &mut AsyncPgConnection,
chain: &str,
hash: &BlockHash,
) -> Result<Option<BlockPtr>, Error> {
let result = match self {
Storage::Shared => {
use public::ethereum_blocks as b;

let (child, parent) = diesel::alias!(
public::ethereum_blocks as child,
public::ethereum_blocks as parent
);

child
.inner_join(
parent.on(child
.field(b::parent_hash)
.assume_not_null()
.eq(parent.field(b::hash))
.and(parent.field(b::network_name).eq(chain))),
)
.select((parent.field(b::hash), parent.field(b::number)))
.filter(child.field(b::hash).eq(format!("{:x}", hash)))
.filter(child.field(b::network_name).eq(chain))
.first::<(String, i64)>(conn)
.await
.optional()?
.map(|(h, n)| {
Ok::<_, Error>(BlockPtr::new(h.parse()?, i32::try_from(n).unwrap()))
})
.transpose()?
}
Storage::Private(Schema { blocks, .. }) => {
// We can't use diesel::alias! here because the table is
// dynamic, so we write the SQL query manually

let query = format!(
"SELECT parent.hash, parent.number \
FROM {qname} child, {qname} parent \
WHERE child.hash = $1 \
AND child.parent_hash = parent.hash",
qname = blocks.qname
);

#[derive(QueryableByName)]
struct BlockHashAndNumber {
#[diesel(sql_type = Bytea)]
hash: Vec<u8>,
#[diesel(sql_type = BigInt)]
number: i64,
}

sql_query(query)
.bind::<Bytea, _>(hash.as_slice())
.get_result::<BlockHashAndNumber>(conn)
.await
.optional()?
.map(|block| BlockPtr::from((block.hash, block.number)))
}
};
Ok(result)
}

pub(super) async fn block_hashes_by_block_number(
&self,
conn: &mut AsyncPgConnection,
Expand Down Expand Up @@ -1222,6 +1292,89 @@ mod data {
Ok(data_and_ptr)
}

/// Like `ancestor_block` but returns only the `BlockPtr` without
/// fetching the `data` column.
pub(super) async fn ancestor_block_ptr(
&self,
conn: &mut AsyncPgConnection,
block_ptr: BlockPtr,
offset: BlockNumber,
root: Option<BlockHash>,
) -> Result<Option<BlockPtr>, Error> {
let short_circuit_predicate = match root {
Some(_) => "and b.parent_hash <> $3",
None => "",
};

match self {
Storage::Shared => {
let query =
self.ancestor_block_query(short_circuit_predicate, "ethereum_blocks");

#[derive(QueryableByName)]
struct BlockHashAndNumber {
#[diesel(sql_type = Text)]
hash: String,
#[diesel(sql_type = BigInt)]
number: i64,
}

let block = match root {
Some(root) => sql_query(query)
.bind::<Text, _>(block_ptr.hash_hex())
.bind::<BigInt, _>(offset as i64)
.bind::<Text, _>(root.hash_hex())
.get_result::<BlockHashAndNumber>(conn),
None => sql_query(query)
.bind::<Text, _>(block_ptr.hash_hex())
.bind::<BigInt, _>(offset as i64)
.get_result::<BlockHashAndNumber>(conn),
}
.await
.optional()?;

match block {
None => Ok(None),
Some(block) => Ok(Some(BlockPtr::new(
BlockHash::from_str(&block.hash)?,
i32::try_from(block.number).unwrap(),
))),
}
}
Storage::Private(Schema { blocks, .. }) => {
let query =
self.ancestor_block_query(short_circuit_predicate, blocks.qname.as_str());

#[derive(QueryableByName)]
struct BlockHashAndNumber {
#[diesel(sql_type = Bytea)]
hash: Vec<u8>,
#[diesel(sql_type = BigInt)]
number: i64,
}

let block = match &root {
Some(root) => sql_query(query)
.bind::<Bytea, _>(block_ptr.hash_slice())
.bind::<BigInt, _>(offset as i64)
.bind::<Bytea, _>(root.as_slice())
.get_result::<BlockHashAndNumber>(conn),
None => sql_query(query)
.bind::<Bytea, _>(block_ptr.hash_slice())
.bind::<BigInt, _>(offset as i64)
.get_result::<BlockHashAndNumber>(conn),
}
.await
.optional()?;

match block {
None => Ok(None),
Some(block) => Ok(Some(BlockPtr::from((block.hash, block.number)))),
}
}
}
}

pub(super) async fn delete_blocks_before(
&self,
conn: &mut AsyncPgConnection,
Expand Down Expand Up @@ -2925,6 +3078,47 @@ impl ChainStoreTrait for ChainStore {
Ok(result)
}

async fn ancestor_block_ptr(
self: Arc<Self>,
block_ptr: BlockPtr,
offset: BlockNumber,
root: Option<BlockHash>,
) -> Result<Option<BlockPtr>, Error> {
ensure!(
block_ptr.number >= offset,
"block offset {} for block `{}` points to before genesis block",
offset,
block_ptr.hash_hex()
);

// Check the in-memory cache first.
if let Some((ptr, _)) = self.recent_blocks_cache.get_ancestor(&block_ptr, offset) {
return Ok(Some(ptr));
}

// Cache miss, query the database (CTE only, no data fetch).
let mut conn = self.pool.get_permitted().await?;
self.storage
.ancestor_block_ptr(&mut conn, block_ptr, offset, root)
.await
}

async fn block_parent_ptr(
self: Arc<Self>,
hash: &BlockHash,
) -> Result<Option<BlockPtr>, Error> {
// Check the in-memory cache first.
if let Some(parent_ptr) = self.recent_blocks_cache.get_parent_ptr_by_hash(hash) {
return Ok(Some(parent_ptr));
}

// Cache miss, query the database.
let mut conn = self.pool.get_permitted().await?;
self.storage
.block_parent_ptr(&mut conn, &self.chain, hash)
.await
}

async fn cleanup_cached_blocks(
&self,
ancestor_count: BlockNumber,
Expand Down Expand Up @@ -3121,6 +3315,14 @@ mod recent_blocks_cache {
.and_then(|block| block.data.as_ref().map(|data| (&block.ptr, data)))
}

fn get_parent_ptr_by_hash(&self, hash: &BlockHash) -> Option<BlockPtr> {
let block = self.blocks.values().find(|b| &b.ptr.hash == hash)?;
self.blocks
.values()
.find(|b| b.ptr.hash == block.parent_hash)
.map(|parent| parent.ptr.clone())
}

fn get_block_by_number(&self, number: BlockNumber) -> Option<&CacheBlock> {
self.blocks.get(&number)
}
Expand Down Expand Up @@ -3237,6 +3439,17 @@ mod recent_blocks_cache {
block_opt
}

pub fn get_parent_ptr_by_hash(&self, hash: &BlockHash) -> Option<BlockPtr> {
let inner = self.inner.read();
let result = inner.get_parent_ptr_by_hash(hash);
if result.is_some() {
inner.metrics.record_cache_hit(&inner.network);
} else {
inner.metrics.record_cache_miss(&inner.network);
}
result
}

pub fn get_blocks_by_hash(&self, hashes: &[BlockHash]) -> Vec<(BlockPtr, CachedBlock)> {
let inner = self.inner.read();
let blocks: Vec<_> = hashes
Expand Down
Loading