Skip to content
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changes

- Make it easier to override `DefaultMaxBlobSize` by ldflags [#3235](https://github.com/evstack/ev-node/pull/3235)
- Add solo sequencer (simple in memory single sequencer without force inclusion) [#3235](https://github.com/evstack/ev-node/pull/3235)
- Improve reaper to sustain txs burst better [#3236](https://github.com/evstack/ev-node/pull/3236)

## v1.1.0
Expand Down
4 changes: 3 additions & 1 deletion apps/testapp/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
FROM golang:1.26 AS base

#hadolint ignore=DL3018
RUN apt-get update && \

Check failure on line 4 in apps/testapp/Dockerfile

View workflow job for this annotation

GitHub Actions / lint / hadolint

DL3008 warning: Pin versions in apt get install. Instead of `apt-get install <package>` use `apt-get install <package>=<version>`
apt-get install -y --no-install-recommends \
build-essential \
ca-certificates \
Expand All @@ -21,13 +21,15 @@
# Dependencies are only re-downloaded when go.mod or go.sum change.
COPY go.mod go.sum ./
COPY apps/testapp/go.mod apps/testapp/go.sum ./apps/testapp/
RUN go mod download && (cd apps/testapp && go mod download)

Check failure on line 24 in apps/testapp/Dockerfile

View workflow job for this annotation

GitHub Actions / lint / hadolint

DL3003 warning: Use WORKDIR to switch to a directory

# Copy the rest of the source and build.
COPY . .

WORKDIR /ev-node/apps/testapp
RUN go build -o /go/bin/testapp .

# 125829120 = 120 MB
RUN go build -ldflags "-X github.com/evstack/ev-node/block/internal/common.defaultMaxBlobSizeStr=125829120" -o /go/bin/testapp .

## prep the final image.
#
Expand Down
2 changes: 1 addition & 1 deletion apps/testapp/cmd/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func InitCmd() *cobra.Command {
// we use load in order to parse all the flags
cfg, _ := rollconf.Load(cmd)
cfg.Node.Aggregator = aggregator
cfg.Node.BlockTime = rollconf.DurationWrapper{Duration: 10 * time.Millisecond}
cfg.Node.BlockTime = rollconf.DurationWrapper{Duration: 100 * time.Millisecond}
if err := cfg.Validate(); err != nil {
return fmt.Errorf("error validating config: %w", err)
}
Expand Down
6 changes: 5 additions & 1 deletion apps/testapp/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,17 @@ const (

// flagKVEndpoint is the flag for the KV endpoint
flagKVEndpoint = "kv-endpoint"
// flagSoloSequencer is the flag to enable a solo sequencer
flagSoloSequencer = "solo-sequencer"
)

func init() {
config.AddGlobalFlags(RootCmd, AppName)
config.AddFlags(RunCmd)
// Add the KV endpoint flag specifically to the RunCmd

// add more flags to RunCmd
RunCmd.Flags().String(flagKVEndpoint, "", "Address and port for the KV executor HTTP server")
RunCmd.Flags().Bool(flagSoloSequencer, false, "Enable Solo sequencer (instead of based sequencer or single sequencer)")
}

// RootCmd is the root command for Evolve
Expand Down
12 changes: 11 additions & 1 deletion apps/testapp/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/evstack/ev-node/pkg/p2p/key"
"github.com/evstack/ev-node/pkg/sequencers/based"
"github.com/evstack/ev-node/pkg/sequencers/single"
"github.com/evstack/ev-node/pkg/sequencers/solo"
"github.com/evstack/ev-node/pkg/store"
)

Expand Down Expand Up @@ -91,7 +92,7 @@ var RunCmd = &cobra.Command{
}

// Create sequencer based on configuration
sequencer, err := createSequencer(ctx, logger, datastore, nodeConfig, genesis, executor)
sequencer, err := createSequencer(ctx, command, logger, datastore, nodeConfig, genesis, executor)
if err != nil {
return err
}
Expand All @@ -105,12 +106,21 @@ var RunCmd = &cobra.Command{
// Otherwise, it creates a single (traditional) sequencer.
func createSequencer(
ctx context.Context,
cmd *cobra.Command,
logger zerolog.Logger,
datastore datastore.Batching,
nodeConfig config.Config,
genesis genesis.Genesis,
executor execution.Executor,
) (coresequencer.Sequencer, error) {
if enabled, _ := cmd.Flags().GetBool(flagSoloSequencer); enabled {
if nodeConfig.Node.BasedSequencer {
return nil, fmt.Errorf("solo sequencer cannot be used with based")
}

return solo.NewSoloSequencer(logger, []byte(genesis.ChainID), executor), nil
}

blobClient, err := blobrpc.NewWSClient(ctx, logger, nodeConfig.DA.Address, nodeConfig.DA.AuthToken, "")
if err != nil {
return nil, fmt.Errorf("failed to create blob client: %w", err)
Expand Down
20 changes: 19 additions & 1 deletion block/internal/common/consts.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
package common

const DefaultMaxBlobSize = 5 * 1024 * 1024 // 5MB fallback blob size limit
import "strconv"

// defaultMaxBlobSizeStr holds the string representation of the default blob
// size limit. Override at link time via:
//
// go build -ldflags "-X github.com/evstack/ev-node/block/internal/common.defaultMaxBlobSizeStr=125829120"
var defaultMaxBlobSizeStr = "5242880" // 5 MB

// DefaultMaxBlobSize is the max blob size limit used for blob submission.
var DefaultMaxBlobSize uint64

func init() {
v, err := strconv.ParseUint(defaultMaxBlobSizeStr, 10, 64)
if err != nil || v == 0 {
DefaultMaxBlobSize = 5 * 1024 * 1024 // 5 MB fallback
return
}
DefaultMaxBlobSize = v
}
2 changes: 1 addition & 1 deletion block/internal/da/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ func extractBlobData(resp *blobrpc.SubscriptionResponse) [][]byte {
continue
}
data := blob.Data()
if len(data) == 0 || len(data) > common.DefaultMaxBlobSize {
if len(data) == 0 || uint64(len(data)) > common.DefaultMaxBlobSize {
continue
}
blobs = append(blobs, data)
Expand Down
16 changes: 7 additions & 9 deletions block/internal/submitting/batching_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
type BatchingStrategy interface {
// ShouldSubmit determines if a batch should be submitted based on the strategy
// Returns true if submission should happen now
ShouldSubmit(pendingCount uint64, totalSizeBeforeSig int, maxBlobSize int, timeSinceLastSubmit time.Duration) bool
ShouldSubmit(pendingCount uint64, totalSizeBeforeSig uint64, maxBlobSize uint64, timeSinceLastSubmit time.Duration) bool
}

// NewBatchingStrategy creates a batching strategy based on configuration
Expand All @@ -34,7 +34,7 @@ func NewBatchingStrategy(cfg config.DAConfig) (BatchingStrategy, error) {
// ImmediateStrategy submits as soon as any items are available
type ImmediateStrategy struct{}

func (s *ImmediateStrategy) ShouldSubmit(pendingCount uint64, totalSize int, maxBlobSize int, timeSinceLastSubmit time.Duration) bool {
func (s *ImmediateStrategy) ShouldSubmit(pendingCount uint64, totalSize uint64, maxBlobSize uint64, timeSinceLastSubmit time.Duration) bool {
return pendingCount > 0
}

Expand All @@ -57,12 +57,12 @@ func NewSizeBasedStrategy(sizeThreshold float64, minItems uint64) *SizeBasedStra
}
}

func (s *SizeBasedStrategy) ShouldSubmit(pendingCount uint64, totalSize int, maxBlobSize int, timeSinceLastSubmit time.Duration) bool {
func (s *SizeBasedStrategy) ShouldSubmit(pendingCount uint64, totalSize uint64, maxBlobSize uint64, timeSinceLastSubmit time.Duration) bool {
if pendingCount < s.minItems {
return false
}

threshold := int(float64(maxBlobSize) * s.sizeThreshold)
threshold := uint64(float64(maxBlobSize) * s.sizeThreshold)
return totalSize >= threshold
}

Expand All @@ -85,7 +85,7 @@ func NewTimeBasedStrategy(daBlockTime time.Duration, maxDelay time.Duration, min
}
}

func (s *TimeBasedStrategy) ShouldSubmit(pendingCount uint64, totalSize int, maxBlobSize int, timeSinceLastSubmit time.Duration) bool {
func (s *TimeBasedStrategy) ShouldSubmit(pendingCount uint64, totalSize uint64, maxBlobSize uint64, timeSinceLastSubmit time.Duration) bool {
if pendingCount < s.minItems {
return false
}
Expand Down Expand Up @@ -120,18 +120,16 @@ func NewAdaptiveStrategy(daBlockTime time.Duration, sizeThreshold float64, maxDe
}
}

func (s *AdaptiveStrategy) ShouldSubmit(pendingCount uint64, totalSize int, maxBlobSize int, timeSinceLastSubmit time.Duration) bool {
func (s *AdaptiveStrategy) ShouldSubmit(pendingCount uint64, totalSize uint64, maxBlobSize uint64, timeSinceLastSubmit time.Duration) bool {
if pendingCount < s.minItems {
return false
}

// Submit if we've reached the size threshold
threshold := int(float64(maxBlobSize) * s.sizeThreshold)
threshold := uint64(float64(maxBlobSize) * s.sizeThreshold)
if totalSize >= threshold {
return true
}

// Submit if max delay has been reached
if timeSinceLastSubmit >= s.maxDelay {
return true
}
Expand Down
36 changes: 18 additions & 18 deletions block/internal/submitting/batching_strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ import (

func TestImmediateStrategy(t *testing.T) {
strategy := &ImmediateStrategy{}
maxBlobSize := common.DefaultMaxBlobSize

tests := []struct {
name string
pendingCount uint64
totalSize int
totalSize uint64
expected bool
}{
{
Expand All @@ -42,7 +43,7 @@ func TestImmediateStrategy(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := strategy.ShouldSubmit(tt.pendingCount, tt.totalSize, common.DefaultMaxBlobSize, 0)
result := strategy.ShouldSubmit(tt.pendingCount, tt.totalSize, maxBlobSize, 0)
assert.Equal(t, tt.expected, result)
})
}
Expand All @@ -56,7 +57,7 @@ func TestSizeBasedStrategy(t *testing.T) {
sizeThreshold float64
minItems uint64
pendingCount uint64
totalSize int
totalSize uint64
expectedSubmit bool
}{
{
Expand All @@ -80,15 +81,15 @@ func TestSizeBasedStrategy(t *testing.T) {
sizeThreshold: 0.8,
minItems: 1,
pendingCount: 10,
totalSize: int(float64(maxBlobSize) * 0.8), // 80% of max
totalSize: uint64(float64(maxBlobSize) * 0.8), // 80% of max
expectedSubmit: true,
},
{
name: "above threshold",
sizeThreshold: 0.8,
minItems: 1,
pendingCount: 20,
totalSize: int(float64(maxBlobSize) * 0.875), // 87.5%
totalSize: uint64(float64(maxBlobSize) * 0.875), // 87.5%
expectedSubmit: true,
},
{
Expand Down Expand Up @@ -125,39 +126,39 @@ func TestTimeBasedStrategy(t *testing.T) {
name string
minItems uint64
pendingCount uint64
totalSize int
totalSize uint64
timeSinceLastSubmit time.Duration
expectedSubmit bool
}{
{
name: "below min items",
minItems: 2,
pendingCount: 1,
totalSize: int(float64(maxBlobSize) * 0.2),
totalSize: uint64(float64(maxBlobSize) * 0.2),
timeSinceLastSubmit: 10 * time.Second,
expectedSubmit: false,
},
{
name: "before max delay",
minItems: 1,
pendingCount: 5,
totalSize: int(float64(maxBlobSize) * 0.5),
totalSize: uint64(float64(maxBlobSize) * 0.5),
timeSinceLastSubmit: 3 * time.Second,
expectedSubmit: false,
},
{
name: "at max delay",
minItems: 1,
pendingCount: 3,
totalSize: int(float64(maxBlobSize) * 0.4),
totalSize: uint64(float64(maxBlobSize) * 0.4),
timeSinceLastSubmit: 6 * time.Second,
expectedSubmit: true,
},
{
name: "after max delay",
minItems: 1,
pendingCount: 2,
totalSize: int(float64(maxBlobSize) * 0.2),
totalSize: uint64(float64(maxBlobSize) * 0.2),
timeSinceLastSubmit: 10 * time.Second,
expectedSubmit: true,
},
Expand All @@ -181,7 +182,7 @@ func TestAdaptiveStrategy(t *testing.T) {
name string
minItems uint64
pendingCount uint64
totalSize int
totalSize uint64
timeSinceLastSubmit time.Duration
expectedSubmit bool
reason string
Expand All @@ -190,7 +191,7 @@ func TestAdaptiveStrategy(t *testing.T) {
name: "below min items",
minItems: 3,
pendingCount: 2,
totalSize: int(float64(maxBlobSize) * 0.875),
totalSize: uint64(float64(maxBlobSize) * 0.875),
timeSinceLastSubmit: 10 * time.Second,
expectedSubmit: false,
reason: "not enough items",
Expand All @@ -199,7 +200,7 @@ func TestAdaptiveStrategy(t *testing.T) {
name: "size threshold reached",
minItems: 1,
pendingCount: 10,
totalSize: int(float64(maxBlobSize) * 0.85), // 85%
totalSize: uint64(float64(maxBlobSize) * 0.85), // 85%
timeSinceLastSubmit: 1 * time.Second,
expectedSubmit: true,
reason: "size threshold met",
Expand All @@ -208,7 +209,7 @@ func TestAdaptiveStrategy(t *testing.T) {
name: "time threshold reached",
minItems: 1,
pendingCount: 2,
totalSize: int(float64(maxBlobSize) * 0.2), // Only 20%
totalSize: uint64(float64(maxBlobSize) * 0.2), // Only 20%
timeSinceLastSubmit: 7 * time.Second,
expectedSubmit: true,
reason: "time threshold met",
Expand All @@ -217,7 +218,7 @@ func TestAdaptiveStrategy(t *testing.T) {
name: "neither threshold reached",
minItems: 1,
pendingCount: 5,
totalSize: int(float64(maxBlobSize) * 0.5), // 50%
totalSize: uint64(float64(maxBlobSize) * 0.5), // 50%
timeSinceLastSubmit: 3 * time.Second,
expectedSubmit: false,
reason: "waiting for threshold",
Expand All @@ -226,7 +227,7 @@ func TestAdaptiveStrategy(t *testing.T) {
name: "both thresholds reached",
minItems: 1,
pendingCount: 20,
totalSize: int(float64(maxBlobSize) * 0.875), // 87.5%
totalSize: uint64(float64(maxBlobSize) * 0.875), // 87.5%
timeSinceLastSubmit: 10 * time.Second,
expectedSubmit: true,
reason: "both thresholds met",
Expand Down Expand Up @@ -305,10 +306,9 @@ func TestNewBatchingStrategy(t *testing.T) {
}

func TestBatchingStrategiesComparison(t *testing.T) {
// This test demonstrates how different strategies behave with the same input
maxBlobSize := common.DefaultMaxBlobSize
pendingCount := uint64(10)
totalSize := maxBlobSize / 2 // 50% full
totalSize := maxBlobSize / 2
timeSinceLastSubmit := 3 * time.Second

immediate := &ImmediateStrategy{}
Expand Down
12 changes: 6 additions & 6 deletions block/internal/submitting/da_submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type retryPolicy struct {
MaxAttempts int
MinBackoff time.Duration
MaxBackoff time.Duration
MaxBlobBytes int
MaxBlobBytes uint64
}

func defaultRetryPolicy(maxAttempts int, maxDuration time.Duration) retryPolicy {
Expand Down Expand Up @@ -581,7 +581,7 @@ func submitToDA[T any](
if err != nil {
s.logger.Error().
Str("itemType", itemType).
Int("maxBlobBytes", pol.MaxBlobBytes).
Uint64("maxBlobBytes", pol.MaxBlobBytes).
Err(err).
Msg("CRITICAL: Unrecoverable error - item exceeds maximum blob size")
return fmt.Errorf("unrecoverable error: no %s items fit within max blob size: %w", itemType, err)
Expand Down Expand Up @@ -644,7 +644,7 @@ func submitToDA[T any](
if len(items) == 1 {
s.logger.Error().
Str("itemType", itemType).
Int("maxBlobBytes", pol.MaxBlobBytes).
Uint64("maxBlobBytes", pol.MaxBlobBytes).
Msg("CRITICAL: Unrecoverable error - single item exceeds DA blob size limit")
return fmt.Errorf("unrecoverable error: %w: single %s item exceeds DA blob size limit", common.ErrOversizedItem, itemType)
}
Expand Down Expand Up @@ -690,11 +690,11 @@ func submitToDA[T any](

// limitBatchBySize returns a prefix of items whose total marshaled size does not exceed maxBytes.
// If the first item exceeds maxBytes, it returns ErrOversizedItem which is unrecoverable.
func limitBatchBySize[T any](items []T, marshaled [][]byte, maxBytes int) ([]T, [][]byte, error) {
total := 0
func limitBatchBySize[T any](items []T, marshaled [][]byte, maxBytes uint64) ([]T, [][]byte, error) {
total := uint64(0)
count := 0
for i := range items {
sz := len(marshaled[i])
sz := uint64(len(marshaled[i]))
if sz > maxBytes {
if i == 0 {
return nil, nil, fmt.Errorf("%w: item size %d exceeds max %d", common.ErrOversizedItem, sz, maxBytes)
Expand Down
Loading
Loading