@@ -32,10 +32,11 @@ use evolve_server::{
3232use evolve_stf_traits:: AccountsCodeStorage ;
3333use evolve_storage:: { Operation , Storage , StorageConfig } ;
3434use evolve_tx_eth:: TxContext ;
35+ use tokio:: sync:: oneshot;
3536
3637use crate :: {
37- BlockExecutedInfo , EvnodeServer , EvnodeServerConfig , EvnodeStfExecutor , ExecutorServiceConfig ,
38- OnBlockExecuted ,
38+ BlockExecutedInfo , EvnodeError , EvnodeServer , EvnodeServerConfig , EvnodeStfExecutor ,
39+ ExecutorServiceConfig , OnBlockExecuted ,
3940} ;
4041
4142type SharedChainIndex = Arc < PersistentChainIndex > ;
@@ -66,11 +67,16 @@ struct ExternalConsensusSinkConfig {
6667}
6768
6869struct ExternalConsensusCommitSink {
69- sender : Option < mpsc:: SyncSender < BlockExecutedInfo > > ,
70+ sender : Option < mpsc:: SyncSender < QueuedBlockExecution > > ,
7071 worker : Option < JoinHandle < ( ) > > ,
7172 current_height : Arc < AtomicU64 > ,
7273}
7374
75+ struct QueuedBlockExecution {
76+ info : BlockExecutedInfo ,
77+ result_tx : oneshot:: Sender < Result < B256 , EvnodeError > > ,
78+ }
79+
7480impl ExternalConsensusCommitSink {
7581 fn spawn < S > (
7682 storage : S ,
@@ -82,7 +88,7 @@ impl ExternalConsensusCommitSink {
8288 {
8389 const MAX_PENDING_BLOCKS : usize = 16 ;
8490
85- let ( sender, receiver) = mpsc:: sync_channel :: < BlockExecutedInfo > ( MAX_PENDING_BLOCKS ) ;
91+ let ( sender, receiver) = mpsc:: sync_channel :: < QueuedBlockExecution > ( MAX_PENDING_BLOCKS ) ;
8692 let current_height = Arc :: new ( AtomicU64 :: new ( config. initial_height ) ) ;
8793 let current_height_for_worker = Arc :: clone ( & current_height) ;
8894
@@ -94,54 +100,78 @@ impl ExternalConsensusCommitSink {
94100 . build ( )
95101 . expect ( "failed to build commit sink runtime" ) ;
96102
97- while let Ok ( info ) = receiver. recv ( ) {
98- let state_root = info. state_root ;
103+ while let Ok ( queued ) = receiver. recv ( ) {
104+ let QueuedBlockExecution { info, result_tx } = queued ;
99105 let operations = state_changes_to_operations ( info. state_changes ) ;
100- runtime. block_on ( async {
101- storage
102- . batch ( operations)
103- . await
104- . expect ( "storage batch failed" ) ;
105- storage. commit ( ) . await . expect ( "storage commit failed" )
106+ let result = runtime. block_on ( async {
107+ storage. batch ( operations) . await . map_err ( |err| {
108+ EvnodeError :: Storage ( format ! ( "storage batch failed: {err:?}" ) )
109+ } ) ?;
110+ storage. commit ( ) . await . map_err ( |err| {
111+ EvnodeError :: Storage ( format ! ( "storage commit failed: {err:?}" ) )
112+ } )
106113 } ) ;
107- let block_hash = compute_block_hash ( info. height , info. timestamp , parent_hash) ;
108- let metadata = BlockMetadata :: new (
109- block_hash,
110- parent_hash,
111- state_root,
112- info. timestamp ,
113- config. max_gas ,
114- Address :: ZERO ,
115- config. chain_id ,
116- ) ;
117-
118- let block = BlockBuilder :: < TxContext > :: new ( )
119- . number ( info. height )
120- . timestamp ( info. timestamp )
121- . transactions ( info. transactions )
122- . build ( ) ;
123- let ( stored_block, stored_txs, stored_receipts) =
124- build_index_data ( & block, & info. block_result , & metadata) ;
125-
126- if config. indexing_enabled {
127- if let Some ( ref index) = chain_index {
128- if let Err ( err) =
129- index. store_block ( stored_block, stored_txs, stored_receipts)
130- {
131- tracing:: warn!( "Failed to index block {}: {:?}" , info. height, err) ;
132- } else {
133- tracing:: debug!(
134- "Indexed block {} (hash={}, state_root={})" ,
135- info. height,
136- block_hash,
137- state_root
114+
115+ match result {
116+ Ok ( commit_hash) => {
117+ let committed_state_root = B256 :: from_slice ( commit_hash. as_bytes ( ) ) ;
118+ if committed_state_root != info. state_root {
119+ tracing:: warn!(
120+ height = info. height,
121+ preview_state_root = %info. state_root,
122+ committed_state_root = %committed_state_root,
123+ "execution state root differed from committed storage root"
138124 ) ;
139125 }
126+ let block_hash =
127+ compute_block_hash ( info. height , info. timestamp , parent_hash) ;
128+ let metadata = BlockMetadata :: new (
129+ block_hash,
130+ parent_hash,
131+ committed_state_root,
132+ info. timestamp ,
133+ config. max_gas ,
134+ Address :: ZERO ,
135+ config. chain_id ,
136+ ) ;
137+
138+ let block = BlockBuilder :: < TxContext > :: new ( )
139+ . number ( info. height )
140+ . timestamp ( info. timestamp )
141+ . transactions ( info. transactions )
142+ . build ( ) ;
143+ let ( stored_block, stored_txs, stored_receipts) =
144+ build_index_data ( & block, & info. block_result , & metadata) ;
145+
146+ if config. indexing_enabled {
147+ if let Some ( ref index) = chain_index {
148+ if let Err ( err) =
149+ index. store_block ( stored_block, stored_txs, stored_receipts)
150+ {
151+ tracing:: warn!(
152+ "Failed to index block {}: {:?}" ,
153+ info. height,
154+ err
155+ ) ;
156+ } else {
157+ tracing:: debug!(
158+ "Indexed block {} (hash={}, state_root={})" ,
159+ info. height,
160+ block_hash,
161+ committed_state_root
162+ ) ;
163+ }
164+ }
165+ }
166+
167+ parent_hash = block_hash;
168+ current_height_for_worker. store ( info. height , Ordering :: SeqCst ) ;
169+ let _ = result_tx. send ( Ok ( committed_state_root) ) ;
170+ }
171+ Err ( err) => {
172+ let _ = result_tx. send ( Err ( err) ) ;
140173 }
141174 }
142-
143- parent_hash = block_hash;
144- current_height_for_worker. store ( info. height , Ordering :: SeqCst ) ;
145175 }
146176 } ) ;
147177
@@ -160,9 +190,23 @@ impl ExternalConsensusCommitSink {
160190 . clone ( ) ;
161191
162192 Arc :: new ( move |info| {
163- sender
164- . send ( info)
165- . expect ( "external consensus commit sink stopped unexpectedly" ) ;
193+ let sender = sender. clone ( ) ;
194+ Box :: pin ( async move {
195+ let ( result_tx, result_rx) = oneshot:: channel ( ) ;
196+ sender
197+ . send ( QueuedBlockExecution { info, result_tx } )
198+ . map_err ( |_| {
199+ EvnodeError :: Unavailable (
200+ "external consensus commit sink stopped unexpectedly" . to_string ( ) ,
201+ )
202+ } ) ?;
203+ result_rx. await . map_err ( |_| {
204+ EvnodeError :: Unavailable (
205+ "external consensus commit sink stopped before returning a root"
206+ . to_string ( ) ,
207+ )
208+ } ) ?
209+ } )
166210 } )
167211 }
168212
0 commit comments