From 19ea929c1f6c7e072d258811e54cd5ecd8506ae6 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Mon, 13 Apr 2026 13:54:35 +0200 Subject: [PATCH] feat(da): support fiber (not via c-node) --- block/internal/da/fiber_client.go | 446 +++++++++++++++++++++ block/internal/da/fiber_client_test.go | 518 +++++++++++++++++++++++++ block/public.go | 22 ++ pkg/config/config.go | 54 +++ pkg/config/config_test.go | 14 +- pkg/config/defaults.go | 7 + 6 files changed, 1060 insertions(+), 1 deletion(-) create mode 100644 block/internal/da/fiber_client.go create mode 100644 block/internal/da/fiber_client_test.go diff --git a/block/internal/da/fiber_client.go b/block/internal/da/fiber_client.go new file mode 100644 index 0000000000..a64047eb44 --- /dev/null +++ b/block/internal/da/fiber_client.go @@ -0,0 +1,446 @@ +package da + +import ( + "context" + "encoding/binary" + "errors" + "fmt" + "sync" + "time" + + "github.com/rs/zerolog" + + "github.com/evstack/ev-node/block/internal/common" + datypes "github.com/evstack/ev-node/pkg/da/types" +) + +// FiberUploadResult contains the result of a Fiber upload operation. +type FiberUploadResult struct { + // BlobID is the Fiber blob identifier (typically 33 bytes: 1 version + 32 commitment). + BlobID []byte + // Height is the validator set height at which the blob was uploaded. + Height uint64 + // Promise is the serialized signed payment promise from validators, + // which serves as proof of data availability. + Promise []byte +} + +// FiberClient defines the interface for a Fiber protocol client backend. +// Implementations wrap the celestia-app fibre.Client or equivalent. +type FiberClient interface { + // Upload uploads data to the Fiber network under the given namespace. + // The namespace must be a valid share.Namespace (29 bytes). + Upload(ctx context.Context, namespace, data []byte) (FiberUploadResult, error) + + // Download downloads and reconstructs data from the Fiber network. + // blobID is the Fiber blob identifier returned by Upload. + // height is the validator set height (0 to use the current head). + Download(ctx context.Context, blobID []byte, height uint64) ([]byte, error) + + // GetLatestHeight returns the latest block height from the Fiber network. + GetLatestHeight(ctx context.Context) (uint64, error) +} + +// FiberConfig holds configuration for the Fiber DA client. +type FiberConfig struct { + // Client is the Fiber protocol client backend. + Client FiberClient + // Logger is the structured logger. + Logger zerolog.Logger + // DefaultTimeout is the default timeout for operations. + DefaultTimeout time.Duration + // Namespace is the header namespace string. + Namespace string + // DataNamespace is the data namespace string. + DataNamespace string + // ForcedInclusionNamespace is the forced inclusion namespace string. + ForcedInclusionNamespace string +} + +// fiberDAClient adapts a FiberClient to the ev-node FullClient interface. +// It bridges the Fiber push/pull model to the block-based DA interface by +// maintaining a local index of submitted blobs for height-based retrieval. +type fiberDAClient struct { + fiber FiberClient + logger zerolog.Logger + defaultTimeout time.Duration + namespaceBz []byte + dataNamespaceBz []byte + forcedNamespaceBz []byte + hasForcedNamespace bool + + mu sync.RWMutex + index map[uint64][]fiberIndexedBlob +} + +type fiberIndexedBlob struct { + id datypes.ID + data []byte + promise []byte + blobID []byte +} + +var _ FullClient = (*fiberDAClient)(nil) + +// NewFiberClient creates a new Fiber DA client adapter. +// Returns nil if the Fiber client backend is not provided. +func NewFiberClient(cfg FiberConfig) FullClient { + if cfg.Client == nil { + return nil + } + if cfg.DefaultTimeout == 0 { + cfg.DefaultTimeout = 60 * time.Second + } + + hasForced := cfg.ForcedInclusionNamespace != "" + var forcedBz []byte + if hasForced { + forcedBz = datypes.NamespaceFromString(cfg.ForcedInclusionNamespace).Bytes() + } + + return &fiberDAClient{ + fiber: cfg.Client, + logger: cfg.Logger.With().Str("component", "fiber_da_client").Logger(), + defaultTimeout: cfg.DefaultTimeout, + namespaceBz: datypes.NamespaceFromString(cfg.Namespace).Bytes(), + dataNamespaceBz: datypes.NamespaceFromString(cfg.DataNamespace).Bytes(), + forcedNamespaceBz: forcedBz, + hasForcedNamespace: hasForced, + index: make(map[uint64][]fiberIndexedBlob), + } +} + +// makeFiberID constructs an ev-node DA ID from a Fiber height and blob ID. +// Format: 8 bytes LE height + blobID bytes (compatible with datypes.SplitID). +func makeFiberID(height uint64, blobID []byte) datypes.ID { + id := make([]byte, 8+len(blobID)) + binary.LittleEndian.PutUint64(id, height) + copy(id[8:], blobID) + return id +} + +// splitFiberID extracts the Fiber height and blob ID from an ev-node DA ID. +func splitFiberID(id datypes.ID) (uint64, []byte) { + if len(id) <= 8 { + return 0, nil + } + return binary.LittleEndian.Uint64(id[:8]), id[8:] +} + +// Submit uploads each data blob to the Fiber network. +// All blobs are uploaded individually, then indexed under a single canonical height +// (the height of the last upload) to satisfy the ev-node DA contract that all +// submitted blobs appear at the same height. +func (c *fiberDAClient) Submit(ctx context.Context, data [][]byte, _ float64, namespace []byte, _ []byte) datypes.ResultSubmit { + var blobSize uint64 + for _, b := range data { + blobSize += uint64(len(b)) + } + + type uploadResult struct { + blobID []byte + height uint64 + promise []byte + data []byte + } + + uploaded := make([]uploadResult, 0, len(data)) + + for i, raw := range data { + if uint64(len(raw)) > common.DefaultMaxBlobSize { + return datypes.ResultSubmit{ + BaseResult: datypes.BaseResult{ + Code: datypes.StatusTooBig, + Message: fmt.Sprintf("blob %d exceeds max size (%d > %d)", i, len(raw), common.DefaultMaxBlobSize), + }, + } + } + + uploadCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout) + result, err := c.fiber.Upload(uploadCtx, namespace, raw) + cancel() + if err != nil { + code := datypes.StatusError + switch { + case errors.Is(err, context.Canceled): + code = datypes.StatusContextCanceled + case errors.Is(err, context.DeadlineExceeded): + code = datypes.StatusContextDeadline + } + c.logger.Error().Err(err).Int("blob_index", i).Msg("fiber upload failed") + return datypes.ResultSubmit{ + BaseResult: datypes.BaseResult{ + Code: code, + Message: fmt.Sprintf("fiber upload failed for blob %d: %v", i, err), + SubmittedCount: uint64(len(uploaded)), + BlobSize: blobSize, + Timestamp: time.Now(), + }, + } + } + + uploaded = append(uploaded, uploadResult{ + blobID: result.BlobID, + height: result.Height, + promise: result.Promise, + data: raw, + }) + } + + if len(uploaded) == 0 { + return datypes.ResultSubmit{ + BaseResult: datypes.BaseResult{ + Code: datypes.StatusSuccess, + BlobSize: blobSize, + Timestamp: time.Now(), + }, + } + } + + // Use the height of the last upload as the canonical submit height. + // Re-index all blobs under this single height so that Retrieve(height) + // returns all blobs from the same submit call. + submitHeight := uploaded[len(uploaded)-1].height + ids := make([]datypes.ID, len(uploaded)) + + c.mu.Lock() + for i, u := range uploaded { + id := makeFiberID(submitHeight, u.blobID) + ids[i] = id + c.index[submitHeight] = append(c.index[submitHeight], fiberIndexedBlob{ + id: id, + data: u.data, + promise: u.promise, + blobID: u.blobID, + }) + } + c.mu.Unlock() + + c.logger.Debug().Int("num_ids", len(ids)).Uint64("height", submitHeight).Msg("fiber DA submission successful") + + return datypes.ResultSubmit{ + BaseResult: datypes.BaseResult{ + Code: datypes.StatusSuccess, + IDs: ids, + SubmittedCount: uint64(len(ids)), + Height: submitHeight, + BlobSize: blobSize, + Timestamp: time.Now(), + }, + } +} + +// Retrieve retrieves blobs from the Fiber network at the specified height and namespace. +// It first checks the local submission index, then falls back to downloading via blob IDs. +func (c *fiberDAClient) Retrieve(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve { + return c.retrieve(ctx, height, namespace, true) +} + +// RetrieveBlobs retrieves blobs without blocking on timestamp resolution. +func (c *fiberDAClient) RetrieveBlobs(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve { + return c.retrieve(ctx, height, namespace, false) +} + +func (c *fiberDAClient) retrieve(_ context.Context, height uint64, _ []byte, _ bool) datypes.ResultRetrieve { + c.mu.RLock() + blobs, ok := c.index[height] + c.mu.RUnlock() + + if !ok || len(blobs) == 0 { + return datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{ + Code: datypes.StatusNotFound, + Message: "no blobs found at height in fiber index", + Height: height, + Timestamp: time.Now(), + }, + } + } + + ids := make([]datypes.ID, len(blobs)) + data := make([]datypes.Blob, len(blobs)) + for i, b := range blobs { + ids[i] = b.id + data[i] = b.data + } + + return datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{ + Code: datypes.StatusSuccess, + Height: height, + IDs: ids, + Timestamp: time.Now(), + }, + Data: data, + } +} + +// Get downloads specific blobs by their IDs from the Fiber network. +// Each ID is decoded to extract the Fiber blob ID and height, +// then downloaded via the Fiber client. +func (c *fiberDAClient) Get(ctx context.Context, ids []datypes.ID, _ []byte) ([]datypes.Blob, error) { + if len(ids) == 0 { + return nil, nil + } + + res := make([]datypes.Blob, 0, len(ids)) + for _, id := range ids { + height, blobID := splitFiberID(id) + if blobID == nil { + return nil, fmt.Errorf("invalid fiber blob id: %x", id) + } + + downloadCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout) + data, err := c.fiber.Download(downloadCtx, blobID, height) + cancel() + if err != nil { + return nil, fmt.Errorf("fiber download failed for blob %x: %w", blobID, err) + } + res = append(res, data) + } + + return res, nil +} + +// Subscribe returns a channel that emits SubscriptionEvents for new DA heights. +// Since the Fiber protocol doesn't have a native subscription mechanism, this +// implementation polls GetLatestHeight and emits events for heights present in +// the local submission index. Only heights indexed by this client are emitted. +func (c *fiberDAClient) Subscribe(ctx context.Context, _ []byte, _ bool) (<-chan datypes.SubscriptionEvent, error) { + out := make(chan datypes.SubscriptionEvent, 16) + + go func() { + defer close(out) + + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + var lastHeight uint64 + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + heightCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout) + height, err := c.fiber.GetLatestHeight(heightCtx) + cancel() + if err != nil { + c.logger.Error().Err(err).Msg("failed to get latest fiber height during subscribe") + continue + } + if height <= lastHeight { + continue + } + + for h := lastHeight + 1; h <= height; h++ { + c.mu.RLock() + blobs, ok := c.index[h] + c.mu.RUnlock() + if !ok || len(blobs) == 0 { + continue + } + + blobData := make([][]byte, len(blobs)) + for i, b := range blobs { + blobData[i] = b.data + } + + select { + case out <- datypes.SubscriptionEvent{ + Height: h, + Timestamp: time.Now(), + Blobs: blobData, + }: + case <-ctx.Done(): + return + } + } + lastHeight = height + } + } + }() + + return out, nil +} + +// GetLatestDAHeight returns the latest block height from the Fiber network. +func (c *fiberDAClient) GetLatestDAHeight(ctx context.Context) (uint64, error) { + heightCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout) + defer cancel() + + height, err := c.fiber.GetLatestHeight(heightCtx) + if err != nil { + return 0, fmt.Errorf("failed to get latest fiber height: %w", err) + } + return height, nil +} + +// GetProofs returns the serialized payment promises as proofs for the given IDs. +// In the Fiber protocol, the signed payment promise from validators serves as +// proof of data availability. +func (c *fiberDAClient) GetProofs(_ context.Context, ids []datypes.ID, _ []byte) ([]datypes.Proof, error) { + if len(ids) == 0 { + return []datypes.Proof{}, nil + } + + proofs := make([]datypes.Proof, len(ids)) + for i, id := range ids { + height, _ := splitFiberID(id) + + c.mu.RLock() + blobs := c.index[height] + c.mu.RUnlock() + + for _, b := range blobs { + if string(b.id) == string(id) { + proofs[i] = b.promise + break + } + } + } + + return proofs, nil +} + +// Validate verifies that the proofs (payment promises) correspond to the given IDs. +// It checks that each proof was stored for the matching blob during submission. +func (c *fiberDAClient) Validate(_ context.Context, ids []datypes.ID, proofs []datypes.Proof, _ []byte) ([]bool, error) { + if len(ids) != len(proofs) { + return nil, errors.New("number of IDs and proofs must match") + } + if len(ids) == 0 { + return []bool{}, nil + } + + results := make([]bool, len(ids)) + for i, id := range ids { + height, _ := splitFiberID(id) + + c.mu.RLock() + blobs := c.index[height] + c.mu.RUnlock() + + for _, b := range blobs { + if string(b.id) == string(id) { + // A non-empty promise proof that matches the stored promise is valid. + results[i] = len(proofs[i]) > 0 && string(proofs[i]) == string(b.promise) + break + } + } + } + + return results, nil +} + +// GetHeaderNamespace returns the header namespace bytes. +func (c *fiberDAClient) GetHeaderNamespace() []byte { return c.namespaceBz } + +// GetDataNamespace returns the data namespace bytes. +func (c *fiberDAClient) GetDataNamespace() []byte { return c.dataNamespaceBz } + +// GetForcedInclusionNamespace returns the forced inclusion namespace bytes. +func (c *fiberDAClient) GetForcedInclusionNamespace() []byte { return c.forcedNamespaceBz } + +// HasForcedInclusionNamespace reports whether forced inclusion namespace is configured. +func (c *fiberDAClient) HasForcedInclusionNamespace() bool { return c.hasForcedNamespace } diff --git a/block/internal/da/fiber_client_test.go b/block/internal/da/fiber_client_test.go new file mode 100644 index 0000000000..d119b8a931 --- /dev/null +++ b/block/internal/da/fiber_client_test.go @@ -0,0 +1,518 @@ +package da + +import ( + "context" + "crypto/sha256" + "errors" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/rs/zerolog" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + datypes "github.com/evstack/ev-node/pkg/da/types" +) + +// mockFiberClient is a test mock for the FiberClient interface. +type mockFiberClient struct { + mu sync.Mutex + uploads map[string][]byte // blobID hex -> data + height uint64 + uploadErr error + callCount atomic.Uint64 +} + +func newMockFiberClient() *mockFiberClient { + return &mockFiberClient{ + uploads: make(map[string][]byte), + height: 100, + } +} + +func (m *mockFiberClient) Upload(_ context.Context, namespace, data []byte) (FiberUploadResult, error) { + m.mu.Lock() + defer m.mu.Unlock() + + if m.uploadErr != nil { + return FiberUploadResult{}, m.uploadErr + } + + m.height++ + callIdx := m.callCount.Add(1) + + // Generate a unique blob ID for each upload + blobID := make([]byte, 33) + blobID[0] = 0 // version 0 + hash := sha256.Sum256(append([]byte{byte(callIdx)}, data...)) + copy(blobID[1:], hash[:]) + + m.uploads[string(blobID)] = data + + return FiberUploadResult{ + BlobID: blobID, + Height: m.height, + Promise: []byte("signed-promise-" + string(blobID)), + }, nil +} + +func (m *mockFiberClient) Download(_ context.Context, blobID []byte, _ uint64) ([]byte, error) { + m.mu.Lock() + defer m.mu.Unlock() + + data, ok := m.uploads[string(blobID)] + if !ok { + return nil, errors.New("blob not found") + } + return data, nil +} + +func (m *mockFiberClient) GetLatestHeight(_ context.Context) (uint64, error) { + m.mu.Lock() + defer m.mu.Unlock() + return m.height, nil +} + +func makeTestFiberClient(t *testing.T) (*mockFiberClient, FullClient) { + t.Helper() + mock := newMockFiberClient() + cl := NewFiberClient(FiberConfig{ + Client: mock, + Logger: zerolog.Nop(), + DefaultTimeout: 5 * time.Second, + Namespace: "test-ns", + DataNamespace: "test-ns", + }) + require.NotNil(t, cl) + return mock, cl +} + +func TestFiberClient_NewClient_Nil(t *testing.T) { + cl := NewFiberClient(FiberConfig{Client: nil}) + assert.Nil(t, cl) +} + +func TestFiberClient_Submit_Success(t *testing.T) { + _, cl := makeTestFiberClient(t) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + res := cl.Submit(context.Background(), [][]byte{[]byte("hello"), []byte("world")}, 0, ns, nil) + + require.Equal(t, datypes.StatusSuccess, res.Code) + require.Len(t, res.IDs, 2) + require.Equal(t, uint64(2), res.SubmittedCount) + require.Greater(t, res.Height, uint64(0)) + require.Equal(t, uint64(10), res.BlobSize) +} + +func TestFiberClient_Submit_SingleBlob(t *testing.T) { + _, cl := makeTestFiberClient(t) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + res := cl.Submit(context.Background(), [][]byte{[]byte("single")}, 0, ns, nil) + + require.Equal(t, datypes.StatusSuccess, res.Code) + require.Len(t, res.IDs, 1) + require.Equal(t, uint64(6), res.BlobSize) +} + +func TestFiberClient_Submit_EmptyData(t *testing.T) { + _, cl := makeTestFiberClient(t) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + res := cl.Submit(context.Background(), [][]byte{}, 0, ns, nil) + + require.Equal(t, datypes.StatusSuccess, res.Code) + require.Empty(t, res.IDs) + require.Equal(t, uint64(0), res.SubmittedCount) +} + +func TestFiberClient_Submit_UploadError(t *testing.T) { + mock := newMockFiberClient() + mock.uploadErr = errors.New("upload failed") + cl := NewFiberClient(FiberConfig{ + Client: mock, + Logger: zerolog.Nop(), + DefaultTimeout: 5 * time.Second, + Namespace: "test-ns", + DataNamespace: "test-ns", + }) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + res := cl.Submit(context.Background(), [][]byte{[]byte("data")}, 0, ns, nil) + + require.Equal(t, datypes.StatusError, res.Code) + require.Contains(t, res.Message, "fiber upload failed") +} + +func TestFiberClient_Submit_CanceledContext(t *testing.T) { + mock := newMockFiberClient() + mock.uploadErr = context.Canceled + cl := NewFiberClient(FiberConfig{ + Client: mock, + Logger: zerolog.Nop(), + DefaultTimeout: 5 * time.Second, + Namespace: "test-ns", + DataNamespace: "test-ns", + }) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + res := cl.Submit(context.Background(), [][]byte{[]byte("data")}, 0, ns, nil) + + require.Equal(t, datypes.StatusContextCanceled, res.Code) +} + +func TestFiberClient_Submit_DeadlineExceeded(t *testing.T) { + mock := newMockFiberClient() + mock.uploadErr = context.DeadlineExceeded + cl := NewFiberClient(FiberConfig{ + Client: mock, + Logger: zerolog.Nop(), + DefaultTimeout: 5 * time.Second, + Namespace: "test-ns", + DataNamespace: "test-ns", + }) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + res := cl.Submit(context.Background(), [][]byte{[]byte("data")}, 0, ns, nil) + + require.Equal(t, datypes.StatusContextDeadline, res.Code) +} + +func TestFiberClient_Submit_BlobTooLarge(t *testing.T) { + _, cl := makeTestFiberClient(t) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + largeBlob := make([]byte, 6*1024*1024) // 6MB > 5MB default max + res := cl.Submit(context.Background(), [][]byte{largeBlob}, 0, ns, nil) + + require.Equal(t, datypes.StatusTooBig, res.Code) +} + +func TestFiberClient_Retrieve_Success(t *testing.T) { + _, cl := makeTestFiberClient(t) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + submitRes := cl.Submit(context.Background(), [][]byte{[]byte("hello")}, 0, ns, nil) + require.Equal(t, datypes.StatusSuccess, submitRes.Code) + + retrieveRes := cl.Retrieve(context.Background(), submitRes.Height, ns) + require.Equal(t, datypes.StatusSuccess, retrieveRes.Code) + require.Len(t, retrieveRes.Data, 1) + require.Equal(t, []byte("hello"), retrieveRes.Data[0]) + require.Equal(t, submitRes.IDs, retrieveRes.IDs) +} + +func TestFiberClient_RetrieveBlobs_Success(t *testing.T) { + _, cl := makeTestFiberClient(t) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + submitRes := cl.Submit(context.Background(), [][]byte{[]byte("blob1"), []byte("blob2")}, 0, ns, nil) + require.Equal(t, datypes.StatusSuccess, submitRes.Code) + + retrieveRes := cl.RetrieveBlobs(context.Background(), submitRes.Height, ns) + require.Equal(t, datypes.StatusSuccess, retrieveRes.Code) + require.Len(t, retrieveRes.Data, 2) + require.Equal(t, []byte("blob1"), retrieveRes.Data[0]) + require.Equal(t, []byte("blob2"), retrieveRes.Data[1]) +} + +func TestFiberClient_Retrieve_NotFound(t *testing.T) { + _, cl := makeTestFiberClient(t) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + retrieveRes := cl.Retrieve(context.Background(), 9999, ns) + require.Equal(t, datypes.StatusNotFound, retrieveRes.Code) +} + +func TestFiberClient_Get_Success(t *testing.T) { + mock, cl := makeTestFiberClient(t) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + submitRes := cl.Submit(context.Background(), [][]byte{[]byte("getme")}, 0, ns, nil) + require.Equal(t, datypes.StatusSuccess, submitRes.Code) + require.Len(t, submitRes.IDs, 1) + + blobs, err := cl.Get(context.Background(), submitRes.IDs, ns) + require.NoError(t, err) + require.Len(t, blobs, 1) + require.Equal(t, []byte("getme"), blobs[0]) + + _ = mock // mock stores the data for download +} + +func TestFiberClient_Get_EmptyIDs(t *testing.T) { + _, cl := makeTestFiberClient(t) + + blobs, err := cl.Get(context.Background(), nil, nil) + require.NoError(t, err) + require.Nil(t, blobs) +} + +func TestFiberClient_Get_InvalidID(t *testing.T) { + _, cl := makeTestFiberClient(t) + + _, err := cl.Get(context.Background(), []datypes.ID{[]byte{0x01}}, nil) + require.Error(t, err) + require.Contains(t, err.Error(), "invalid fiber blob id") +} + +func TestFiberClient_Get_DownloadError(t *testing.T) { + _, cl := makeTestFiberClient(t) + + // Construct a valid-looking ID but with a blob ID that doesn't exist + fakeBlobID := make([]byte, 33) + id := makeFiberID(1, fakeBlobID) + + _, err := cl.Get(context.Background(), []datypes.ID{id}, nil) + require.Error(t, err) + require.Contains(t, err.Error(), "fiber download failed") +} + +func TestFiberClient_GetLatestDAHeight(t *testing.T) { + mock, cl := makeTestFiberClient(t) + + height, err := cl.GetLatestDAHeight(context.Background()) + require.NoError(t, err) + require.Equal(t, mock.height, height) +} + +func TestFiberClient_GetProofs_Success(t *testing.T) { + _, cl := makeTestFiberClient(t) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + submitRes := cl.Submit(context.Background(), [][]byte{[]byte("prooftest")}, 0, ns, nil) + require.Equal(t, datypes.StatusSuccess, submitRes.Code) + + proofs, err := cl.GetProofs(context.Background(), submitRes.IDs, ns) + require.NoError(t, err) + require.Len(t, proofs, 1) + require.NotEmpty(t, proofs[0]) // Should contain the promise +} + +func TestFiberClient_GetProofs_Empty(t *testing.T) { + _, cl := makeTestFiberClient(t) + + proofs, err := cl.GetProofs(context.Background(), nil, nil) + require.NoError(t, err) + require.Empty(t, proofs) +} + +func TestFiberClient_Validate_Success(t *testing.T) { + _, cl := makeTestFiberClient(t) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + submitRes := cl.Submit(context.Background(), [][]byte{[]byte("validateme")}, 0, ns, nil) + require.Equal(t, datypes.StatusSuccess, submitRes.Code) + + proofs, err := cl.GetProofs(context.Background(), submitRes.IDs, ns) + require.NoError(t, err) + + results, err := cl.Validate(context.Background(), submitRes.IDs, proofs, ns) + require.NoError(t, err) + require.Len(t, results, 1) + require.True(t, results[0]) +} + +func TestFiberClient_Validate_MismatchedLengths(t *testing.T) { + _, cl := makeTestFiberClient(t) + + _, err := cl.Validate(context.Background(), make([]datypes.ID, 3), make([]datypes.Proof, 2), nil) + require.Error(t, err) + require.Contains(t, err.Error(), "must match") +} + +func TestFiberClient_Validate_Empty(t *testing.T) { + _, cl := makeTestFiberClient(t) + + results, err := cl.Validate(context.Background(), nil, nil, nil) + require.NoError(t, err) + require.Empty(t, results) +} + +func TestFiberClient_Validate_WrongProof(t *testing.T) { + _, cl := makeTestFiberClient(t) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + submitRes := cl.Submit(context.Background(), [][]byte{[]byte("validatewrong")}, 0, ns, nil) + require.Equal(t, datypes.StatusSuccess, submitRes.Code) + + // Use a wrong proof + fakeProofs := []datypes.Proof{[]byte("wrong-proof")} + results, err := cl.Validate(context.Background(), submitRes.IDs, fakeProofs, ns) + require.NoError(t, err) + require.Len(t, results, 1) + require.False(t, results[0]) // Wrong proof should fail validation +} + +func TestFiberClient_Validate_EmptyProof(t *testing.T) { + _, cl := makeTestFiberClient(t) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + submitRes := cl.Submit(context.Background(), [][]byte{[]byte("data")}, 0, ns, nil) + require.Equal(t, datypes.StatusSuccess, submitRes.Code) + + emptyProofs := []datypes.Proof{[]byte{}} + results, err := cl.Validate(context.Background(), submitRes.IDs, emptyProofs, ns) + require.NoError(t, err) + require.False(t, results[0]) +} + +func TestFiberClient_Namespaces(t *testing.T) { + cl := NewFiberClient(FiberConfig{ + Client: newMockFiberClient(), + Logger: zerolog.Nop(), + Namespace: "header-ns", + DataNamespace: "data-ns", + ForcedInclusionNamespace: "forced-ns", + }) + require.NotNil(t, cl) + + require.Equal(t, datypes.NamespaceFromString("header-ns").Bytes(), cl.GetHeaderNamespace()) + require.Equal(t, datypes.NamespaceFromString("data-ns").Bytes(), cl.GetDataNamespace()) + require.Equal(t, datypes.NamespaceFromString("forced-ns").Bytes(), cl.GetForcedInclusionNamespace()) + require.True(t, cl.HasForcedInclusionNamespace()) +} + +func TestFiberClient_NoForcedNamespace(t *testing.T) { + cl := NewFiberClient(FiberConfig{ + Client: newMockFiberClient(), + Logger: zerolog.Nop(), + Namespace: "header-ns", + DataNamespace: "data-ns", + }) + require.NotNil(t, cl) + + require.Nil(t, cl.GetForcedInclusionNamespace()) + require.False(t, cl.HasForcedInclusionNamespace()) +} + +func TestFiberClient_Subscribe(t *testing.T) { + _, cl := makeTestFiberClient(t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ch, err := cl.Subscribe(ctx, nil, false) + require.NoError(t, err) + require.NotNil(t, ch) + + // Submit a blob so the index has something + ns := datypes.NamespaceFromString("test-ns").Bytes() + submitRes := cl.Submit(context.Background(), [][]byte{[]byte("sub-data")}, 0, ns, nil) + require.Equal(t, datypes.StatusSuccess, submitRes.Code) + + // The subscribe goroutine polls and should emit the event for the submitted height. + // Since the mock height starts at 100 and upload increments to 101, + // the subscribe loop should eventually pick it up. + select { + case ev := <-ch: + require.Equal(t, submitRes.Height, ev.Height) + require.Len(t, ev.Blobs, 1) + require.Equal(t, []byte("sub-data"), ev.Blobs[0]) + case <-time.After(5 * time.Second): + t.Fatal("subscribe did not emit event within timeout") + } +} + +func TestFiberClient_Submit_MultipleBlobs(t *testing.T) { + _, cl := makeTestFiberClient(t) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + data := [][]byte{[]byte("first"), []byte("second"), []byte("third")} + res := cl.Submit(context.Background(), data, 0, ns, nil) + + require.Equal(t, datypes.StatusSuccess, res.Code) + require.Len(t, res.IDs, 3) + require.Equal(t, uint64(3), res.SubmittedCount) + + // Verify all blobs can be retrieved + retrieveRes := cl.Retrieve(context.Background(), res.Height, ns) + require.Equal(t, datypes.StatusSuccess, retrieveRes.Code) + require.Len(t, retrieveRes.Data, 3) + require.Equal(t, []byte("first"), retrieveRes.Data[0]) + require.Equal(t, []byte("second"), retrieveRes.Data[1]) + require.Equal(t, []byte("third"), retrieveRes.Data[2]) +} + +func TestFiberClient_SubmitAndDownload(t *testing.T) { + mock, cl := makeTestFiberClient(t) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + data := []byte("download-test") + submitRes := cl.Submit(context.Background(), [][]byte{data}, 0, ns, nil) + require.Equal(t, datypes.StatusSuccess, submitRes.Code) + + // The mock stores the data, so Get should be able to download it + blobs, err := cl.Get(context.Background(), submitRes.IDs, ns) + require.NoError(t, err) + require.Len(t, blobs, 1) + require.Equal(t, data, blobs[0]) + + _ = mock +} + +func TestMakeFiberID_RoundTrip(t *testing.T) { + blobID := make([]byte, 33) + blobID[0] = 1 + for i := 1; i < 33; i++ { + blobID[i] = byte(i) + } + + id := makeFiberID(42, blobID) + height, extractedBlobID := splitFiberID(id) + + require.Equal(t, uint64(42), height) + require.Equal(t, blobID, extractedBlobID) +} + +func TestSplitFiberID_Invalid(t *testing.T) { + height, blobID := splitFiberID([]byte{0x01, 0x02}) + require.Equal(t, uint64(0), height) + require.Nil(t, blobID) +} + +func TestFiberClient_DefaultTimeout(t *testing.T) { + cl := NewFiberClient(FiberConfig{ + Client: newMockFiberClient(), + Logger: zerolog.Nop(), + Namespace: "ns", + DataNamespace: "ns", + }) + require.NotNil(t, cl) + fc := cl.(*fiberDAClient) + require.Equal(t, 60*time.Second, fc.defaultTimeout) +} + +func TestFiberClient_FullSubmitRetrieveCycle(t *testing.T) { + _, cl := makeTestFiberClient(t) + + ns := datypes.NamespaceFromString("test-ns").Bytes() + + // Submit + submitRes := cl.Submit(context.Background(), [][]byte{[]byte("cycle-data")}, 0, ns, nil) + require.Equal(t, datypes.StatusSuccess, submitRes.Code) + require.Len(t, submitRes.IDs, 1) + submittedHeight := submitRes.Height + + // Retrieve + retrieveRes := cl.Retrieve(context.Background(), submittedHeight, ns) + require.Equal(t, datypes.StatusSuccess, retrieveRes.Code) + require.Equal(t, []byte("cycle-data"), retrieveRes.Data[0]) + + // Get + blobs, err := cl.Get(context.Background(), submitRes.IDs, ns) + require.NoError(t, err) + require.Equal(t, []byte("cycle-data"), blobs[0]) + + // GetProofs + Validate + proofs, err := cl.GetProofs(context.Background(), submitRes.IDs, ns) + require.NoError(t, err) + require.NotEmpty(t, proofs[0]) + + valid, err := cl.Validate(context.Background(), submitRes.IDs, proofs, ns) + require.NoError(t, err) + require.True(t, valid[0]) +} diff --git a/block/public.go b/block/public.go index cc7691c299..e8ae4d14c5 100644 --- a/block/public.go +++ b/block/public.go @@ -63,6 +63,28 @@ func NewDAClient( return base } +// NewFiberDAClient creates a new DA client backed by the Fiber protocol. +// The fiberClient parameter must implement the da.FiberClient interface. +// The returned client implements both DAClient and DAVerifier interfaces. +func NewFiberDAClient( + fiberClient da.FiberClient, + config config.Config, + logger zerolog.Logger, +) FullDAClient { + base := da.NewFiberClient(da.FiberConfig{ + Client: fiberClient, + Logger: logger, + DefaultTimeout: config.DA.RequestTimeout.Duration, + Namespace: config.DA.GetNamespace(), + DataNamespace: config.DA.GetDataNamespace(), + ForcedInclusionNamespace: config.DA.GetForcedInclusionNamespace(), + }) + if config.Instrumentation.IsTracingEnabled() { + return da.WithTracingClient(base) + } + return base +} + // Exported errors used by the sequencers var ( // ErrForceInclusionNotConfigured is returned when force inclusion is not configured. diff --git a/pkg/config/config.go b/pkg/config/config.go index 09e85f3e20..02c1338279 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -92,6 +92,21 @@ const ( // FlagDAStartHeight is a flag for forcing the DA retrieval height to start from a specific height FlagDAStartHeight = FlagPrefixEvnode + "da.start_height" + // Fiber DA configuration flags + + // FlagDAFiberEnabled enables the Fiber DA client instead of the default JSON-RPC blob client + FlagDAFiberEnabled = FlagPrefixEvnode + "da.fiber.enabled" + // FlagDAFiberStateAddress is the gRPC address of the celestia-app node for Fiber state queries + FlagDAFiberStateAddress = FlagPrefixEvnode + "da.fiber.state_address" + // FlagDAFiberKeyringPath is the path to the keyring directory for Fiber payment promise signing + FlagDAFiberKeyringPath = FlagPrefixEvnode + "da.fiber.keyring_path" + // FlagDAFiberKeyName is the key name in the keyring to use for signing payment promises + FlagDAFiberKeyName = FlagPrefixEvnode + "da.fiber.key_name" + // FlagDAFiberUploadConcurrency limits concurrent Fiber uploads to validators + FlagDAFiberUploadConcurrency = FlagPrefixEvnode + "da.fiber.upload_concurrency" + // FlagDAFiberDownloadConcurrency limits concurrent Fiber downloads from validators + FlagDAFiberDownloadConcurrency = FlagPrefixEvnode + "da.fiber.download_concurrency" + // P2P configuration flags // FlagP2PListenAddress is a flag for specifying the P2P listen address @@ -258,6 +273,37 @@ type DAConfig struct { BatchSizeThreshold float64 `mapstructure:"batch_size_threshold" yaml:"batch_size_threshold" comment:"Minimum blob size threshold (as fraction of max blob size, 0.0-1.0) before submitting. Only applies to 'size' and 'adaptive' strategies. Example: 0.8 means wait until batch is 80% full. Default: 0.8."` BatchMaxDelay DurationWrapper `mapstructure:"batch_max_delay" yaml:"batch_max_delay" comment:"Maximum time to wait before submitting a batch regardless of size. Applies to 'time' and 'adaptive' strategies. Lower values reduce latency but may increase costs. Examples: \"6s\", \"12s\", \"30s\". Default: DA BlockTime."` BatchMinItems uint64 `mapstructure:"batch_min_items" yaml:"batch_min_items" comment:"Minimum number of items (headers or data) to accumulate before considering submission. Helps avoid submitting single items when more are expected soon. Default: 1."` + + // Fiber DA client configuration + Fiber FiberDAConfig `mapstructure:"fiber" yaml:"fiber"` +} + +// FiberDAConfig contains configuration for the Fiber DA client. +// When Enabled is true, the Fiber client is used instead of the default +// JSON-RPC blob client for DA operations. +type FiberDAConfig struct { + // Enabled switches the DA backend from the default JSON-RPC blob client + // to the Fiber protocol client. + Enabled bool `mapstructure:"enabled" yaml:"enabled" comment:"Enable the Fiber DA client for direct validator communication instead of the default JSON-RPC blob client"` + // StateAddress is the gRPC address of the celestia-app node used for + // state queries (validator set, chain ID, promise verification). + StateAddress string `mapstructure:"state_address" yaml:"state_address" comment:"gRPC address of the celestia-app node for Fiber state queries (host:port)"` + // KeyringPath is the directory path containing the keyring for signing + // Fiber payment promises. + KeyringPath string `mapstructure:"keyring_path" yaml:"keyring_path" comment:"Path to the keyring directory for Fiber payment promise signing"` + // KeyName is the name of the key in the keyring to use for signing. + KeyName string `mapstructure:"key_name" yaml:"key_name" comment:"Name of the key in the keyring to use for signing Fiber payment promises"` + // UploadConcurrency limits the number of concurrent upload connections + // to validators. + UploadConcurrency int `mapstructure:"upload_concurrency" yaml:"upload_concurrency" comment:"Maximum number of concurrent upload connections to Fiber validators"` + // DownloadConcurrency limits the number of concurrent download connections + // from validators. + DownloadConcurrency int `mapstructure:"download_concurrency" yaml:"download_concurrency" comment:"Maximum number of concurrent download connections from Fiber validators"` +} + +// IsFiberEnabled returns true if the Fiber DA client is configured and enabled. +func (d *DAConfig) IsFiberEnabled() bool { + return d.Fiber.Enabled } // GetNamespace returns the namespace for header submissions. @@ -602,6 +648,14 @@ func AddFlags(cmd *cobra.Command) { cmd.Flags().Uint64(FlagDAStartHeight, def.DA.StartHeight, "force DA retrieval to start from a specific height (0 for disabled)") cmd.Flags().MarkHidden(FlagDAStartHeight) + // Fiber DA configuration flags + cmd.Flags().Bool(FlagDAFiberEnabled, def.DA.Fiber.Enabled, "enable the Fiber DA client for direct validator communication") + cmd.Flags().String(FlagDAFiberStateAddress, def.DA.Fiber.StateAddress, "gRPC address of the celestia-app node for Fiber state queries (host:port)") + cmd.Flags().String(FlagDAFiberKeyringPath, def.DA.Fiber.KeyringPath, "path to the keyring directory for Fiber payment promise signing") + cmd.Flags().String(FlagDAFiberKeyName, def.DA.Fiber.KeyName, "name of the key in the keyring for signing Fiber payment promises") + cmd.Flags().Int(FlagDAFiberUploadConcurrency, def.DA.Fiber.UploadConcurrency, "maximum concurrent uploads to Fiber validators") + cmd.Flags().Int(FlagDAFiberDownloadConcurrency, def.DA.Fiber.DownloadConcurrency, "maximum concurrent downloads from Fiber validators") + // P2P configuration flags cmd.Flags().String(FlagP2PListenAddress, def.P2P.ListenAddress, "P2P listen address (host:port)") cmd.Flags().String(FlagP2PPeers, def.P2P.Peers, "Comma separated list of seed nodes to connect to") diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index cf556803c2..9ad6447306 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -78,6 +78,18 @@ func TestAddFlags(t *testing.T) { assertFlagValue(t, flags, FlagDAMempoolTTL, DefaultConfig().DA.MempoolTTL) assertFlagValue(t, flags, FlagDAMaxSubmitAttempts, DefaultConfig().DA.MaxSubmitAttempts) assertFlagValue(t, flags, FlagDARequestTimeout, DefaultConfig().DA.RequestTimeout.Duration) + assertFlagValue(t, flags, FlagDABatchingStrategy, DefaultConfig().DA.BatchingStrategy) + assertFlagValue(t, flags, FlagDABatchSizeThreshold, DefaultConfig().DA.BatchSizeThreshold) + assertFlagValue(t, flags, FlagDABatchMaxDelay, DefaultConfig().DA.BatchMaxDelay.Duration) + assertFlagValue(t, flags, FlagDABatchMinItems, DefaultConfig().DA.BatchMinItems) + + // DA Fiber flags + assertFlagValue(t, flags, FlagDAFiberEnabled, DefaultConfig().DA.Fiber.Enabled) + assertFlagValue(t, flags, FlagDAFiberStateAddress, DefaultConfig().DA.Fiber.StateAddress) + assertFlagValue(t, flags, FlagDAFiberKeyringPath, DefaultConfig().DA.Fiber.KeyringPath) + assertFlagValue(t, flags, FlagDAFiberKeyName, DefaultConfig().DA.Fiber.KeyName) + assertFlagValue(t, flags, FlagDAFiberUploadConcurrency, DefaultConfig().DA.Fiber.UploadConcurrency) + assertFlagValue(t, flags, FlagDAFiberDownloadConcurrency, DefaultConfig().DA.Fiber.DownloadConcurrency) // P2P flags assertFlagValue(t, flags, FlagP2PListenAddress, DefaultConfig().P2P.ListenAddress) @@ -140,7 +152,7 @@ func TestAddFlags(t *testing.T) { assertFlagValue(t, flags, FlagPruningInterval, DefaultConfig().Pruning.Interval.Duration) // Count the number of flags we're explicitly checking - expectedFlagCount := 78 // Update this number if you add more flag checks above + expectedFlagCount := 84 // Update this number if you add more flag checks above // Get the actual number of flags (both regular and persistent) actualFlagCount := 0 diff --git a/pkg/config/defaults.go b/pkg/config/defaults.go index 91fe68e3fc..2dd22b3e8a 100644 --- a/pkg/config/defaults.go +++ b/pkg/config/defaults.go @@ -83,6 +83,13 @@ func DefaultConfig() Config { BatchSizeThreshold: 0.8, BatchMaxDelay: DurationWrapper{0}, // 0 means use DA BlockTime BatchMinItems: 1, + Fiber: FiberDAConfig{ + Enabled: false, + StateAddress: "127.0.0.1:9090", + KeyName: "default-fibre", + UploadConcurrency: 100, + DownloadConcurrency: 34, + }, }, Instrumentation: DefaultInstrumentationConfig(), Log: LogConfig{