Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 36 additions & 7 deletions giga/deps/tasks/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package tasks
import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"sort"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -113,8 +115,10 @@ type scheduler struct {
executeCh chan func()
validateCh chan func()
metrics *schedulerMetrics
synchronous bool // true if maxIncarnation exceeds threshold
maxIncarnation int // current highest incarnation
synchronous bool // true if maxIncarnation exceeds threshold
maxIncarnation int // current highest incarnation
conflictKeyCounts map[string]int // per-key conflict counts accumulated over the block
conflictKeyMu sync.Mutex
}

// NewScheduler creates a new scheduler
Expand Down Expand Up @@ -166,23 +170,27 @@ func (s *scheduler) DoExecute(work func()) {
s.executeCh <- work
}

func (s *scheduler) findConflicts(task *deliverTxTask) (bool, []int) {
func (s *scheduler) findConflicts(task *deliverTxTask) (bool, []int, []string) {
var conflicts []int
conflictKeys := make([]string, 0, len(s.multiVersionStores))
uniq := make(map[int]struct{})
valid := true
for _, mv := range s.multiVersionStores {
ok, mvConflicts := mv.ValidateTransactionState(task.AbsoluteIndex)
for storeKey, mv := range s.multiVersionStores {
ok, mvConflicts, mvKeys := mv.ValidateTransactionStateWithKeys(task.AbsoluteIndex)
for _, c := range mvConflicts {
if _, ok := uniq[c]; !ok {
conflicts = append(conflicts, c)
uniq[c] = struct{}{}
}
}
for _, k := range mvKeys {
conflictKeys = append(conflictKeys, storeKey.Name()+"/"+k)
}
// any non-ok value makes valid false
valid = valid && ok
}
sort.Ints(conflicts)
return valid, conflicts
return valid, conflicts, conflictKeys
}

func toTasks(reqs []*sdk.DeliverTxEntry) ([]*deliverTxTask, map[int]*deliverTxTask) {
Expand Down Expand Up @@ -284,6 +292,7 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t
tasks, tasksMap := toTasks(reqs)
s.allTasks = tasks
s.allTasksMap = tasksMap
s.conflictKeyCounts = make(map[string]int)
s.executeCh = make(chan func(), len(tasks))
s.validateCh = make(chan func(), len(tasks))
defer s.emitMetrics()
Expand Down Expand Up @@ -338,6 +347,21 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t
}
s.metrics.maxIncarnation = s.maxIncarnation

if s.metrics.retries > 0 && len(s.conflictKeyCounts) > 0 {
encoded := make(map[string]int, len(s.conflictKeyCounts))
for k, v := range s.conflictKeyCounts {
storeName, rawKey, _ := strings.Cut(k, "/")
var encodedKey string
if rawKey == "globalAccountNumber" {
encodedKey = storeName + "/" + rawKey
} else {
encodedKey = storeName + "/" + hex.EncodeToString([]byte(rawKey))
}
encoded[encodedKey] = v
}
logger.Info("occ scheduler key conflicts", "height", ctx.BlockHeight(), "counts", encoded)
}

logger.Info("occ scheduler", "height", ctx.BlockHeight(), "txs", len(tasks), "latency_ms", time.Since(startTime).Milliseconds(), "retries", s.metrics.retries, "maxIncarnation", s.maxIncarnation, "iterations", iterations, "sync", s.synchronous, "workers", s.workers)

return s.collectResponses(tasks), nil
Expand All @@ -354,9 +378,14 @@ func (s *scheduler) shouldRerun(task *deliverTxTask) bool {
// With the current scheduler, we won't actually get to this step if a previous task has already been determined to be invalid,
// since we choose to fail fast and mark the subsequent tasks as invalid as well.
// TODO: in a future async scheduler that no longer exhaustively validates in order, we may need to carefully handle the `valid=true` with conflicts case
if valid, conflicts := s.findConflicts(task); !valid {
if valid, conflicts, conflictKeys := s.findConflicts(task); !valid {
s.invalidateTask(task)
task.AppendDependencies(conflicts)
s.conflictKeyMu.Lock()
for _, k := range conflictKeys {
s.conflictKeyCounts[k]++
}
s.conflictKeyMu.Unlock()

// if the conflicts are now validated, then rerun this task
if dependenciesValidated(s.allTasksMap, task.Dependencies) {
Expand Down
18 changes: 18 additions & 0 deletions sei-cosmos/baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ func (app *BaseApp) SetDeliverStateToCommit() {
// height.
func (app *BaseApp) Commit(ctx context.Context) (res *abci.ResponseCommit, err error) {
defer telemetry.MeasureSince(time.Now(), "abci", "commit")
commitStart := time.Now()
app.commitLock.Lock()
defer app.commitLock.Unlock()

Expand Down Expand Up @@ -300,6 +301,18 @@ func (app *BaseApp) Commit(ctx context.Context) (res *abci.ResponseCommit, err e
}
app.SnapshotIfApplicable(uint64(header.Height)) //nolint:gosec // bounds checked above

commitMs := time.Since(commitStart).Milliseconds()
ppMs := app.execProcessProposalMs
fbMs := app.execFinalizeBlockMs
logger.Info("execution block time",
"height", header.Height,
"block_txs", app.execBlockTxCount,
"process_proposal_ms", ppMs,
"finalize_block_ms", fbMs,
"commit_ms", commitMs,
"total_execution_ms", ppMs+fbMs+commitMs,
)

return &abci.ResponseCommit{
RetainHeight: retainHeight,
}, nil
Expand Down Expand Up @@ -922,6 +935,8 @@ func splitPath(requestPath string) (path []string) {
// ABCI++
func (app *BaseApp) ProcessProposal(ctx context.Context, req *abci.RequestProcessProposal) (resp *abci.ResponseProcessProposal, err error) {
defer telemetry.MeasureSince(time.Now(), "abci", "process_proposal")
ppStart := time.Now()
defer func() { app.execProcessProposalMs = time.Since(ppStart).Milliseconds() }()
if app.ChainID != req.Header.ChainID {
return nil, fmt.Errorf("unexpected ChainID, got %q, want %q", req.Header.ChainID, app.ChainID)
}
Expand Down Expand Up @@ -983,6 +998,9 @@ func (app *BaseApp) ProcessProposal(ctx context.Context, req *abci.RequestProces

func (app *BaseApp) FinalizeBlock(ctx context.Context, req *abci.RequestFinalizeBlock) (*abci.ResponseFinalizeBlock, error) {
defer telemetry.MeasureSince(time.Now(), "abci", "finalize_block")
fbStart := time.Now()
app.execBlockTxCount = len(req.Txs)
defer func() { app.execFinalizeBlockMs = time.Since(fbStart).Milliseconds() }()

if app.cms.TracingEnabled() {
app.cms.SetTracingContext(sdk.TraceContext(
Expand Down
4 changes: 4 additions & 0 deletions sei-cosmos/baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ type BaseApp struct {
occEnabled bool

deliverTxHooks []DeliverTxHook

execProcessProposalMs int64
execFinalizeBlockMs int64
execBlockTxCount int
}

type appStore struct {
Expand Down
19 changes: 15 additions & 4 deletions sei-cosmos/store/multiversion/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type MultiVersionStore interface {
GetIterateset(index int) Iterateset
ClearIterateset(index int)
ValidateTransactionState(index int) (bool, []int)
ValidateTransactionStateWithKeys(index int) (bool, []int, []string)
}

type WriteSet map[string][]byte
Expand Down Expand Up @@ -331,13 +332,14 @@ func (s *Store) checkIteratorAtIndex(index int) bool {
return valid
}

func (s *Store) checkReadsetAtIndex(index int) (bool, []int) {
func (s *Store) checkReadsetAtIndex(index int) (bool, []int, []string) {
conflictSet := make(map[int]struct{})
var conflictKeys []string
valid := true

readSetAny, found := s.txReadSets.Load(index)
if !found {
return true, []int{}
return true, []int{}, nil
}
readset := readSetAny.(ReadSet)
// iterate over readset and check if the value is the same as the latest value relateive to txIndex in the multiversion store
Expand All @@ -354,6 +356,7 @@ func (s *Store) checkReadsetAtIndex(index int) (bool, []int) {
parentVal := s.parentStore.Get([]byte(key))
if !bytes.Equal(parentVal, value) {
valid = false
conflictKeys = append(conflictKeys, key)
}
} else {
// if estimate, mark as conflict index - but don't invalidate
Expand All @@ -365,10 +368,12 @@ func (s *Store) checkReadsetAtIndex(index int) (bool, []int) {
// TODO: would we want to return early?
conflictSet[latestValue.Index()] = struct{}{}
valid = false
conflictKeys = append(conflictKeys, key)
}
} else if !bytes.Equal(latestValue.Value(), value) {
conflictSet[latestValue.Index()] = struct{}{}
valid = false
conflictKeys = append(conflictKeys, key)
}
}
}
Expand All @@ -380,7 +385,7 @@ func (s *Store) checkReadsetAtIndex(index int) (bool, []int) {

sort.Ints(conflictIndices)

return valid, conflictIndices
return valid, conflictIndices, conflictKeys
}

// TODO: do we want to return bool + []int where bool indicates whether it was valid and then []int indicates only ones for which we need to wait due to estimates? - yes i think so?
Expand All @@ -390,11 +395,17 @@ func (s *Store) ValidateTransactionState(index int) (bool, []int) {
// TODO: can we parallelize for all iterators?
iteratorValid := s.checkIteratorAtIndex(index)

readsetValid, conflictIndices := s.checkReadsetAtIndex(index)
readsetValid, conflictIndices, _ := s.checkReadsetAtIndex(index)

return iteratorValid && readsetValid, conflictIndices
}

func (s *Store) ValidateTransactionStateWithKeys(index int) (bool, []int, []string) {
iteratorValid := s.checkIteratorAtIndex(index)
readsetValid, conflictIndices, conflictKeys := s.checkReadsetAtIndex(index)
return iteratorValid && readsetValid, conflictIndices, conflictKeys
}

func (s *Store) WriteLatestToStore() {
// sort the keys
keys := []string{}
Expand Down
43 changes: 36 additions & 7 deletions sei-cosmos/tasks/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package tasks
import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"sort"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -113,8 +115,10 @@ type scheduler struct {
executeCh chan func()
validateCh chan func()
metrics *schedulerMetrics
synchronous bool // true if maxIncarnation exceeds threshold
maxIncarnation int // current highest incarnation
synchronous bool // true if maxIncarnation exceeds threshold
maxIncarnation int // current highest incarnation
conflictKeyCounts map[string]int // per-key conflict counts accumulated over the block
conflictKeyMu sync.Mutex
}

// NewScheduler creates a new scheduler
Expand Down Expand Up @@ -166,23 +170,27 @@ func (s *scheduler) DoExecute(work func()) {
s.executeCh <- work
}

func (s *scheduler) findConflicts(task *deliverTxTask) (bool, []int) {
func (s *scheduler) findConflicts(task *deliverTxTask) (bool, []int, []string) {
var conflicts []int
conflictKeys := make([]string, 0, len(s.multiVersionStores))
uniq := make(map[int]struct{})
valid := true
for _, mv := range s.multiVersionStores {
ok, mvConflicts := mv.ValidateTransactionState(task.AbsoluteIndex)
for storeKey, mv := range s.multiVersionStores {
ok, mvConflicts, mvKeys := mv.ValidateTransactionStateWithKeys(task.AbsoluteIndex)
for _, c := range mvConflicts {
if _, ok := uniq[c]; !ok {
conflicts = append(conflicts, c)
uniq[c] = struct{}{}
}
}
for _, k := range mvKeys {
conflictKeys = append(conflictKeys, storeKey.Name()+"/"+k)
}
// any non-ok value makes valid false
valid = valid && ok
}
sort.Ints(conflicts)
return valid, conflicts
return valid, conflicts, conflictKeys
}

func toTasks(reqs []*sdk.DeliverTxEntry) ([]*deliverTxTask, map[int]*deliverTxTask) {
Expand Down Expand Up @@ -270,6 +278,7 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t
tasks, tasksMap := toTasks(reqs)
s.allTasks = tasks
s.allTasksMap = tasksMap
s.conflictKeyCounts = make(map[string]int)
s.executeCh = make(chan func(), len(tasks))
s.validateCh = make(chan func(), len(tasks))
defer s.emitMetrics()
Expand Down Expand Up @@ -324,6 +333,21 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t
}
s.metrics.maxIncarnation = s.maxIncarnation

if s.metrics.retries > 0 && len(s.conflictKeyCounts) > 0 {
encoded := make(map[string]int, len(s.conflictKeyCounts))
for k, v := range s.conflictKeyCounts {
storeName, rawKey, _ := strings.Cut(k, "/")
var encodedKey string
if rawKey == "globalAccountNumber" {
encodedKey = storeName + "/" + rawKey
} else {
encodedKey = storeName + "/" + hex.EncodeToString([]byte(rawKey))
}
encoded[encodedKey] = v
}
logger.Info("occ scheduler key conflicts", "height", ctx.BlockHeight(), "counts", encoded)
}

logger.Info("occ scheduler", "height", ctx.BlockHeight(), "txs", len(tasks), "latency_ms", time.Since(startTime).Milliseconds(), "retries", s.metrics.retries, "maxIncarnation", s.maxIncarnation, "iterations", iterations, "sync", s.synchronous, "workers", s.workers)

return s.collectResponses(tasks), nil
Expand All @@ -340,9 +364,14 @@ func (s *scheduler) shouldRerun(task *deliverTxTask) bool {
// With the current scheduler, we won't actually get to this step if a previous task has already been determined to be invalid,
// since we choose to fail fast and mark the subsequent tasks as invalid as well.
// TODO: in a future async scheduler that no longer exhaustively validates in order, we may need to carefully handle the `valid=true` with conflicts case
if valid, conflicts := s.findConflicts(task); !valid {
if valid, conflicts, conflictKeys := s.findConflicts(task); !valid {
s.invalidateTask(task)
task.AppendDependencies(conflicts)
s.conflictKeyMu.Lock()
for _, k := range conflictKeys {
s.conflictKeyCounts[k]++
}
s.conflictKeyMu.Unlock()

// if the conflicts are now validated, then rerun this task
if dependenciesValidated(s.allTasksMap, task.Dependencies) {
Expand Down
Loading