Skip to content

Commit d163059

Browse files
alpejulienrbrt
andauthored
fix: Publisher-mode synchronization option for failover scenario (#3222)
* Publisher-mode synchronization option for failover scenario * Changelog * Review feedback * Doc update * just tidy all --------- Co-authored-by: julienrbrt <julien@rbrt.fr>
1 parent 04c9cad commit d163059

File tree

20 files changed

+664
-127
lines changed

20 files changed

+664
-127
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1313

1414
### Changes
1515

16-
- Improve P2P transient network failure [#3212](https://github.com/evstack/ev-node/pull/3212)
16+
* Added publisher-mode synchronization option for failover scenarios with early P2P infrastructure readiness [#3222](https://github.com/evstack/ev-node/pull/3222)
17+
* Improve P2P transient network failure [#3212](https://github.com/evstack/ev-node/pull/3212)
1718
* Improve execution/evm check for stored meta not stale [#3221](https://github.com/evstack/ev-node/pull/3221)
1819

1920
## v1.1.0-rc.1

apps/testapp/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ require (
7979
github.com/googleapis/gax-go/v2 v2.20.0 // indirect
8080
github.com/gorilla/websocket v1.5.3 // indirect
8181
github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 // indirect
82-
github.com/hashicorp/go-hclog v1.6.2 // indirect
82+
github.com/hashicorp/go-hclog v1.6.3 // indirect
8383
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
8484
github.com/hashicorp/go-metrics v0.5.4 // indirect
8585
github.com/hashicorp/go-msgpack v0.5.5 // indirect

apps/testapp/go.sum

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -650,8 +650,9 @@ github.com/hashicorp/consul/sdk v0.1.1/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyN
650650
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
651651
github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
652652
github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
653-
github.com/hashicorp/go-hclog v1.6.2 h1:NOtoftovWkDheyUM/8JW3QMiXyxJK3uHRK7wV04nD2I=
654653
github.com/hashicorp/go-hclog v1.6.2/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M=
654+
github.com/hashicorp/go-hclog v1.6.3 h1:Qr2kF+eVWjTiYmU7Y31tYlP1h0q/X3Nl3tPGdaB11/k=
655+
github.com/hashicorp/go-hclog v1.6.3/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M=
655656
github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
656657
github.com/hashicorp/go-immutable-radix v1.3.1 h1:DKHmCUm2hRBK510BaiZlwvpD40f8bJFeZnpfm2KLowc=
657658
github.com/hashicorp/go-immutable-radix v1.3.1/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=

block/internal/syncing/syncer.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -694,6 +694,12 @@ var (
694694
// TrySyncNextBlock attempts to sync the next available block
695695
// the event is always the next block in sequence as processHeightEvent ensures it.
696696
func (s *Syncer) TrySyncNextBlock(ctx context.Context, event *common.DAHeightEvent) error {
697+
return s.trySyncNextBlockWithState(ctx, event, s.getLastState())
698+
}
699+
700+
// trySyncNextBlockWithState attempts to sync the next available block using
701+
// the provided current state as the validation/apply baseline.
702+
func (s *Syncer) trySyncNextBlockWithState(ctx context.Context, event *common.DAHeightEvent, currentState types.State) error {
697703
select {
698704
case <-ctx.Done():
699705
return ctx.Err()
@@ -703,7 +709,6 @@ func (s *Syncer) TrySyncNextBlock(ctx context.Context, event *common.DAHeightEve
703709
header := event.Header
704710
data := event.Data
705711
nextHeight := event.Header.Height()
706-
currentState := s.getLastState()
707712
headerHash := header.Hash().String()
708713

709714
s.logger.Info().Uint64("height", nextHeight).Str("source", string(event.Source)).Msg("syncing block")
@@ -1201,6 +1206,7 @@ func (s *Syncer) RecoverFromRaft(ctx context.Context, raftState *raft.RaftBlockS
12011206
s.logger.Debug().Err(err).Msg("no state in store, using genesis defaults for recovery")
12021207
currentState = types.State{
12031208
ChainID: s.genesis.ChainID,
1209+
InitialHeight: s.genesis.InitialHeight,
12041210
LastBlockHeight: s.genesis.InitialHeight - 1,
12051211
}
12061212
}
@@ -1214,11 +1220,12 @@ func (s *Syncer) RecoverFromRaft(ctx context.Context, raftState *raft.RaftBlockS
12141220
return nil
12151221
} else if currentState.LastBlockHeight+1 == raftState.Height { // raft is 1 block ahead
12161222
// apply block
1217-
err := s.TrySyncNextBlock(ctx, &common.DAHeightEvent{
1223+
event := &common.DAHeightEvent{
12181224
Header: &header,
12191225
Data: &data,
12201226
Source: "",
1221-
})
1227+
}
1228+
err := s.trySyncNextBlockWithState(ctx, event, currentState)
12221229
if err != nil {
12231230
return err
12241231
}

block/internal/syncing/syncer_test.go

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/evstack/ev-node/pkg/config"
2727
datypes "github.com/evstack/ev-node/pkg/da/types"
2828
"github.com/evstack/ev-node/pkg/genesis"
29+
"github.com/evstack/ev-node/pkg/raft"
2930
signerpkg "github.com/evstack/ev-node/pkg/signer"
3031
"github.com/evstack/ev-node/pkg/signer/noop"
3132
"github.com/evstack/ev-node/pkg/store"
@@ -306,6 +307,121 @@ func TestSequentialBlockSync(t *testing.T) {
306307
requireEmptyChan(t, errChan)
307308
}
308309

310+
func TestSyncer_RecoverFromRaft_BootstrapsStateWhenUninitialized(t *testing.T) {
311+
ds := dssync.MutexWrap(datastore.NewMapDatastore())
312+
st := store.New(ds)
313+
314+
cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop())
315+
require.NoError(t, err)
316+
317+
addr, pub, signer := buildSyncTestSigner(t)
318+
gen := genesis.Genesis{
319+
ChainID: "1234",
320+
InitialHeight: 1,
321+
StartTime: time.Now().Add(-time.Second),
322+
ProposerAddress: addr,
323+
}
324+
325+
mockExec := testmocks.NewMockExecutor(t)
326+
mockHeaderStore := extmocks.NewMockStore[*types.P2PSignedHeader](t)
327+
mockDataStore := extmocks.NewMockStore[*types.P2PData](t)
328+
s := NewSyncer(
329+
st,
330+
mockExec,
331+
nil,
332+
cm,
333+
common.NopMetrics(),
334+
config.DefaultConfig(),
335+
gen,
336+
mockHeaderStore,
337+
mockDataStore,
338+
zerolog.Nop(),
339+
common.DefaultBlockOptions(),
340+
make(chan error, 1),
341+
nil,
342+
)
343+
344+
// lastState intentionally not initialized to simulate recovery-before-start path.
345+
data := makeData(gen.ChainID, 1, 0)
346+
headerBz, hdr := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, []byte("app0"), data, nil)
347+
dataBz, err := data.MarshalBinary()
348+
require.NoError(t, err)
349+
350+
mockExec.EXPECT().
351+
ExecuteTxs(mock.Anything, mock.Anything, uint64(1), mock.Anything, mock.Anything).
352+
Return([]byte("app1"), nil).
353+
Once()
354+
355+
err = s.RecoverFromRaft(t.Context(), &raft.RaftBlockState{
356+
Height: 1,
357+
Hash: hdr.Hash(),
358+
Header: headerBz,
359+
Data: dataBz,
360+
})
361+
require.NoError(t, err)
362+
363+
state, err := st.GetState(t.Context())
364+
require.NoError(t, err)
365+
require.Equal(t, gen.ChainID, state.ChainID)
366+
require.Equal(t, uint64(1), state.LastBlockHeight)
367+
}
368+
369+
func TestSyncer_RecoverFromRaft_KeepsStrictValidationAfterStateExists(t *testing.T) {
370+
ds := dssync.MutexWrap(datastore.NewMapDatastore())
371+
st := store.New(ds)
372+
373+
cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop())
374+
require.NoError(t, err)
375+
376+
addr, pub, signer := buildSyncTestSigner(t)
377+
gen := genesis.Genesis{
378+
ChainID: "1234",
379+
InitialHeight: 1,
380+
StartTime: time.Now().Add(-time.Second),
381+
ProposerAddress: addr,
382+
}
383+
384+
mockExec := testmocks.NewMockExecutor(t)
385+
mockHeaderStore := extmocks.NewMockStore[*types.P2PSignedHeader](t)
386+
mockDataStore := extmocks.NewMockStore[*types.P2PData](t)
387+
s := NewSyncer(
388+
st,
389+
mockExec,
390+
nil,
391+
cm,
392+
common.NopMetrics(),
393+
config.DefaultConfig(),
394+
gen,
395+
mockHeaderStore,
396+
mockDataStore,
397+
zerolog.Nop(),
398+
common.DefaultBlockOptions(),
399+
make(chan error, 1),
400+
nil,
401+
)
402+
403+
// Non-empty state must remain strictly validated.
404+
s.SetLastState(types.State{
405+
ChainID: "wrong-chain",
406+
InitialHeight: 1,
407+
LastBlockHeight: 0,
408+
})
409+
410+
data := makeData(gen.ChainID, 1, 0)
411+
headerBz, hdr := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, []byte("app0"), data, nil)
412+
dataBz, err := data.MarshalBinary()
413+
require.NoError(t, err)
414+
415+
err = s.RecoverFromRaft(t.Context(), &raft.RaftBlockState{
416+
Height: 1,
417+
Hash: hdr.Hash(),
418+
Header: headerBz,
419+
Data: dataBz,
420+
})
421+
require.Error(t, err)
422+
require.ErrorContains(t, err, "invalid chain ID")
423+
}
424+
309425
func TestSyncer_processPendingEvents(t *testing.T) {
310426
ds := dssync.MutexWrap(datastore.NewMapDatastore())
311427
st := store.New(ds)

docs/guides/raft_production.md

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ This guide details the Raft consensus implementation in `ev-node`, used for High
2222

2323
## Configuration
2424

25-
Raft is configured via CLI flags or the `config.toml` file under the `[raft]` (or `[rollkit.raft]`) section.
25+
Raft is configured via CLI flags or the `config.toml` file under the `[raft]` (or `[evnode.raft]`) section.
2626

2727
### Essential Flags
2828

@@ -33,7 +33,7 @@ Raft is configured via CLI flags or the `config.toml` file under the `[raft]` (o
3333
| `--evnode.raft.raft_addr` | `raft.raft_addr` | TCP address for Raft transport. | `0.0.0.0:5001` (Bind to private IP) |
3434
| `--evnode.raft.raft_dir` | `raft.raft_dir` | Directory for Raft data. | `/data/raft` (Must be persistent) |
3535
| `--evnode.raft.peers` | `raft.peers` | Comma-separated list of peer addresses in format `nodeID@host:port`. | `node-1@10.0.0.1:5001,node-2@10.0.0.2:5001,node-3@10.0.0.3:5001` |
36-
| `--evnode.raft.bootstrap` | `raft.bootstrap` | Bootstrap the cluster. **Required** for initial setup. | `true` (See Limitations) |
36+
| `--evnode.raft.bootstrap` | `raft.bootstrap` | Compatibility flag. Startup mode is selected automatically from persisted raft configuration state. | optional |
3737

3838
### Timeout Tuning
3939

@@ -55,11 +55,15 @@ Ideally, a failover should complete within `2 * BlockTime` to minimize user impa
5555
5656
## Production Deployment Principles
5757

58-
### 1. Static Peering & Bootstrap
59-
Current implementation requires **Bootstrap Mode** (`--evnode.raft.bootstrap=true`) for all nodes participating in the cluster initialization.
60-
* **All nodes** should list the full set of peers in `--evnode.raft.peers`.
58+
### 1. Static Peering & Automatic Startup Mode
59+
Use static peering with automatic mode selection from local raft configuration:
60+
* If local raft configuration already exists in `--evnode.raft.raft_dir`, the node starts in rejoin mode.
61+
* If no local raft configuration exists yet, the node bootstraps from configured peers.
62+
* `--evnode.raft.bootstrap` is retained for compatibility but does not control mode selection.
63+
* **All configured cluster members** should list the full set of peers in `--evnode.raft.peers`.
6164
* The `peers` list format is strict: `NodeID@Host:Port`.
62-
* **Limitation**: Dynamic addition of peers (Run-time Membership Changes) via RPC/CLI is not currently exposed. The cluster membership is static based on the initial bootstrap configuration.
65+
* **Limitation**: Dynamic addition of peers (run-time membership changes) via RPC/CLI is not currently exposed.
66+
* **Not supported**: Joining an existing cluster as a brand-new node that was not part of the initial static membership.
6367

6468
### 2. Infrastructure Requirements
6569
* **Encrypted Network (CRITICAL)**: Raft traffic is **unencrypted** (plain TCP). You **MUST** run the cluster inside a private network, VPN, or encrypted mesh (e.g., WireGuard, Tailscale). **Never expose Raft ports to the public internet**; doing so allows attackers to hijack the cluster consensus.
@@ -86,13 +90,13 @@ Monitor the following metrics (propagated via Prometheus if enabled):
8690

8791
```bash
8892
./ev-node start \
89-
--node.aggregator \
90-
--raft.enable \
91-
--raft.node_id="node-1" \
92-
--raft.raft_addr="0.0.0.0:5001" \
93-
--raft.raft_dir="/var/lib/ev-node/raft" \
94-
--raft.bootstrap=true \
95-
--raft.peers="node-1@10.0.1.1:5001,node-2@10.0.1.2:5001,node-3@10.0.1.3:5001" \
96-
--p2p.listen_address="/ip4/0.0.0.0/tcp/26656" \
93+
--evnode.node.aggregator=true \
94+
--evnode.raft.enable=true \
95+
--evnode.raft.node_id="node-1" \
96+
--evnode.raft.raft_addr="0.0.0.0:5001" \
97+
--evnode.raft.raft_dir="/var/lib/ev-node/raft" \
98+
--evnode.raft.bootstrap=true \
99+
--evnode.raft.peers="node-1@10.0.1.1:5001,node-2@10.0.1.2:5001,node-3@10.0.1.3:5001" \
100+
--evnode.p2p.listen_address="/ip4/0.0.0.0/tcp/26656" \
97101
...other flags
98102
```

docs/learn/config.md

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1321,7 +1321,7 @@ _Constant:_ `FlagRaftDir`
13211321
### Raft Bootstrap
13221322

13231323
**Description:**
1324-
If true, bootstraps a new Raft cluster. Only set this on the very first node when initializing a new cluster.
1324+
Legacy compatibility flag. Startup mode is now auto-selected from persisted raft configuration state, so this flag is not used to choose bootstrap vs rejoin.
13251325

13261326
**YAML:**
13271327

@@ -1352,6 +1352,16 @@ raft:
13521352
_Default:_ `""` (empty)
13531353
_Constant:_ `FlagRaftPeers`
13541354

1355+
### Raft Startup Mode
1356+
1357+
Raft startup mode is selected automatically from local raft configuration state:
1358+
1359+
* If the node already has persisted raft configuration in `raft.raft_dir`, it starts in rejoin mode.
1360+
* If no raft configuration exists yet, it bootstraps a cluster from configured peers.
1361+
* `raft.bootstrap` is retained for compatibility but does not control mode selection.
1362+
1363+
`--evnode.raft.rejoin` has been removed.
1364+
13551365
### Raft Snap Count
13561366

13571367
**Description:**

node/failover.go

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ type failoverState struct {
3333
dataSyncService *evsync.DataSyncService
3434
rpcServer *http.Server
3535
bc *block.Components
36+
raftNode *raft.Node
37+
isAggregator bool
3638

3739
// catchup fields — used when the aggregator needs to sync before producing
3840
catchupEnabled bool
@@ -172,13 +174,41 @@ func setupFailoverState(
172174
dataSyncService: dataSyncService,
173175
rpcServer: rpcServer,
174176
bc: bc,
177+
raftNode: raftNode,
178+
isAggregator: isAggregator,
175179
store: rktStore,
176180
catchupEnabled: catchupEnabled,
177181
catchupTimeout: nodeConfig.Node.CatchupTimeout.Duration,
178182
daBlockTime: nodeConfig.DA.BlockTime.Duration,
179183
}, nil
180184
}
181185

186+
// shouldStartSyncInPublisherMode avoids startup deadlock when a raft leader boots
187+
// with empty sync stores and no peer can serve height 1 yet.
188+
func (f *failoverState) shouldStartSyncInPublisherMode(ctx context.Context) bool {
189+
if !f.isAggregator || f.raftNode == nil || !f.raftNode.IsLeader() {
190+
return false
191+
}
192+
193+
storeHeight, err := f.store.Height(ctx)
194+
if err != nil {
195+
f.logger.Warn().Err(err).Msg("cannot determine store height; keeping blocking sync startup")
196+
return false
197+
}
198+
headerHeight := f.headerSyncService.Store().Height()
199+
dataHeight := f.dataSyncService.Store().Height()
200+
if headerHeight > 0 || dataHeight > 0 {
201+
return false
202+
}
203+
204+
f.logger.Info().
205+
Uint64("store_height", storeHeight).
206+
Uint64("header_height", headerHeight).
207+
Uint64("data_height", dataHeight).
208+
Msg("raft-enabled aggregator with empty sync stores: starting sync services in publisher mode")
209+
return true
210+
}
211+
182212
func (f *failoverState) Run(pCtx context.Context) (multiErr error) {
183213
stopService := func(stoppable func(context.Context) error, name string) { //nolint:contextcheck // shutdown uses context.Background intentionally
184214
// parent context is cancelled already, so we need to create a new one
@@ -207,15 +237,28 @@ func (f *failoverState) Run(pCtx context.Context) (multiErr error) {
207237
})
208238

209239
// start header and data sync services concurrently to avoid cumulative startup delay.
240+
startSyncInPublisherMode := f.shouldStartSyncInPublisherMode(ctx)
210241
syncWg, syncCtx := errgroup.WithContext(ctx)
211242
syncWg.Go(func() error {
212-
if err := f.headerSyncService.Start(syncCtx); err != nil {
243+
var err error
244+
if startSyncInPublisherMode {
245+
err = f.headerSyncService.StartForPublishing(syncCtx)
246+
} else {
247+
err = f.headerSyncService.Start(syncCtx)
248+
}
249+
if err != nil {
213250
return fmt.Errorf("header sync service: %w", err)
214251
}
215252
return nil
216253
})
217254
syncWg.Go(func() error {
218-
if err := f.dataSyncService.Start(syncCtx); err != nil {
255+
var err error
256+
if startSyncInPublisherMode {
257+
err = f.dataSyncService.StartForPublishing(syncCtx)
258+
} else {
259+
err = f.dataSyncService.Start(syncCtx)
260+
}
261+
if err != nil {
219262
return fmt.Errorf("data sync service: %w", err)
220263
}
221264
return nil

pkg/config/config.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -400,7 +400,7 @@ type RaftConfig struct {
400400
NodeID string `mapstructure:"node_id" yaml:"node_id" comment:"Unique identifier for this node in the Raft cluster"`
401401
RaftAddr string `mapstructure:"raft_addr" yaml:"raft_addr" comment:"Address for Raft communication (host:port)"`
402402
RaftDir string `mapstructure:"raft_dir" yaml:"raft_dir" comment:"Directory for Raft logs and snapshots"`
403-
Bootstrap bool `mapstructure:"bootstrap" yaml:"bootstrap" comment:"Bootstrap a new Raft cluster (only for the first node)"`
403+
Bootstrap bool `mapstructure:"bootstrap" yaml:"bootstrap" comment:"Bootstrap a new static Raft cluster during initial bring-up"`
404404
Peers string `mapstructure:"peers" yaml:"peers" comment:"Comma-separated list of peer Raft addresses (nodeID@host:port)"`
405405
SnapCount uint64 `mapstructure:"snap_count" yaml:"snap_count" comment:"Number of log entries between snapshots"`
406406
SendTimeout time.Duration `mapstructure:"send_timeout" yaml:"send_timeout" comment:"Max duration to wait for a message to be sent to a peer"`
@@ -646,7 +646,7 @@ func AddFlags(cmd *cobra.Command) {
646646
cmd.Flags().String(FlagRaftNodeID, def.Raft.NodeID, "unique identifier for this node in the Raft cluster")
647647
cmd.Flags().String(FlagRaftAddr, def.Raft.RaftAddr, "address for Raft communication (host:port)")
648648
cmd.Flags().String(FlagRaftDir, def.Raft.RaftDir, "directory for Raft logs and snapshots")
649-
cmd.Flags().Bool(FlagRaftBootstrap, def.Raft.Bootstrap, "bootstrap a new Raft cluster (only for the first node)")
649+
cmd.Flags().Bool(FlagRaftBootstrap, def.Raft.Bootstrap, "bootstrap a new static Raft cluster during initial bring-up")
650650
cmd.Flags().String(FlagRaftPeers, def.Raft.Peers, "comma-separated list of peer Raft addresses (nodeID@host:port)")
651651
cmd.Flags().Uint64(FlagRaftSnapCount, def.Raft.SnapCount, "number of log entries between snapshots")
652652
cmd.Flags().Duration(FlagRaftSendTimeout, def.Raft.SendTimeout, "max duration to wait for a message to be sent to a peer")

0 commit comments

Comments
 (0)