Skip to content

Commit 6f8e093

Browse files
authored
refactor: reaper to drain mempool (#3236)
* refactor: reaper to drain mempool * feedback * fix partial drain * cleanup old readme * Prevent multiple Start() calls across components * fix unwanted log * lock * updates * remove redundant * changelog * feedback * feedback * feedback * add bench
1 parent 3abebca commit 6f8e093

File tree

12 files changed

+754
-372
lines changed

12 files changed

+754
-372
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99

1010
## [Unreleased]
1111

12+
### Changes
13+
14+
- Improve reaper to sustain txs burst better [#3236](https://github.com/evstack/ev-node/pull/3236)
15+
1216
## v1.1.0
1317

1418
No changes from v1.1.0-rc.2.

block/components.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,9 +278,9 @@ func newAggregatorComponents(
278278
sequencer,
279279
genesis,
280280
logger,
281-
executor,
282281
cacheManager,
283282
config.Node.ScrapeInterval.Duration,
283+
executor.NotifyNewTransactions,
284284
)
285285
if err != nil {
286286
return nil, fmt.Errorf("failed to create reaper: %w", err)

block/internal/cache/manager.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ type CacheManager interface {
5353
// Transaction operations
5454
IsTxSeen(hash string) bool
5555
SetTxSeen(hash string)
56+
SetTxsSeen(hashes []string)
5657
CleanupOldTxs(olderThan time.Duration) int
5758

5859
// Pending events syncing coordination
@@ -210,6 +211,14 @@ func (m *implementation) SetTxSeen(hash string) {
210211
m.txTimestamps.Store(hash, time.Now())
211212
}
212213

214+
func (m *implementation) SetTxsSeen(hashes []string) {
215+
now := time.Now()
216+
for _, hash := range hashes {
217+
m.txCache.setSeen(hash, 0)
218+
m.txTimestamps.Store(hash, now)
219+
}
220+
}
221+
213222
// CleanupOldTxs removes transaction hashes older than olderThan and returns
214223
// the count removed. Defaults to DefaultTxCacheRetention if olderThan <= 0.
215224
func (m *implementation) CleanupOldTxs(olderThan time.Duration) int {
@@ -237,13 +246,6 @@ func (m *implementation) CleanupOldTxs(olderThan time.Duration) int {
237246
return true
238247
})
239248

240-
if removed > 0 {
241-
m.logger.Debug().
242-
Int("removed", removed).
243-
Dur("older_than", olderThan).
244-
Msg("cleaned up old transaction hashes from cache")
245-
}
246-
247249
return removed
248250
}
249251

block/internal/executing/executor.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,11 +162,20 @@ func (e *Executor) SetBlockProducer(bp BlockProducer) {
162162
}
163163

164164
// Start begins the execution component
165-
func (e *Executor) Start(ctx context.Context) error {
165+
func (e *Executor) Start(ctx context.Context) (err error) {
166+
if e.cancel != nil {
167+
return errors.New("executor already started")
168+
}
166169
e.ctx, e.cancel = context.WithCancel(ctx)
170+
defer func() { // if error during init cancel context
171+
if err != nil {
172+
e.cancel()
173+
e.ctx, e.cancel = nil, nil
174+
}
175+
}()
167176

168177
// Initialize state
169-
if err := e.initializeState(); err != nil {
178+
if err = e.initializeState(); err != nil {
170179
return fmt.Errorf("failed to initialize state: %w", err)
171180
}
172181

block/internal/pruner/pruner.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@ func New(
5252

5353
// Start begins the pruning loop.
5454
func (p *Pruner) Start(ctx context.Context) error {
55+
if p.cancel != nil {
56+
return errors.New("pruner already started")
57+
}
5558
if !p.cfg.IsPruningEnabled() {
5659
p.logger.Info().Msg("pruning is disabled, not starting pruner")
5760
return nil

0 commit comments

Comments
 (0)