Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions consts/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,6 @@ var (
ErrShardIsServicing = errors.New("shard is servicing")
ErrShardSlotIsMigrating = errors.New("shard slot is migrating")
ErrShardNoMatchNewMaster = errors.New("no match new master in shard")
ErrCannotOfflineMaster = errors.New("cannot take master node offline, failover first")
ErrSlotStartAndStopEqual = errors.New("start and stop of a range cannot be equal")
)
35 changes: 29 additions & 6 deletions controller/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,17 +132,37 @@ func (c *ClusterChecker) increaseFailureCount(shardIndex int, node store.Node) i
count := c.failureCounts[id]
c.failureMu.Unlock()

// don't add the node into the failover candidates if it's not a master node
if !node.IsMaster() {
if count >= c.options.maxFailureCount && !node.Failed() {
log := logger.Get().With(
zap.String("cluster_name", c.clusterName),
zap.String("id", node.ID()),
zap.String("addr", node.Addr()),
zap.Int64("failure_count", count))
cluster, err := c.clusterStore.GetCluster(c.ctx, c.namespace, c.clusterName)
if err != nil {
log.Error("Failed to get the cluster info", zap.Error(err))
return count
}
if err := cluster.SetNodeFailedByID(node.ID(), true); err != nil {
log.Error("Failed to set slave node as failed", zap.Error(err))
return count
}
if err := c.clusterStore.UpdateCluster(c.ctx, c.namespace, cluster); err != nil {
log.Error("Failed to update the cluster", zap.Error(err))
return count
}
log.Info("Marked slave node as failed due to probe failures")
}
return count
}

log := logger.Get().With(
zap.String("cluster_name", c.clusterName),
zap.String("id", node.ID()),
zap.Bool("is_master", node.IsMaster()),
zap.String("addr", node.Addr()))
if count%c.options.maxFailureCount == 0 || count > c.options.maxFailureCount {
log := logger.Get().With(
zap.String("cluster_name", c.clusterName),
zap.String("id", node.ID()),
zap.Bool("is_master", node.IsMaster()),
zap.String("addr", node.Addr()))
cluster, err := c.clusterStore.GetCluster(c.ctx, c.namespace, c.clusterName)
if err != nil {
log.Error("Failed to get the cluster info", zap.Error(err))
Expand Down Expand Up @@ -188,6 +208,9 @@ func (c *ClusterChecker) syncClusterToNodes(ctx context.Context) error {
version := clusterInfo.Version.Load()
for _, shard := range clusterInfo.Shards {
for _, node := range shard.Nodes {
if node.Failed() {
continue
}
go func(n store.Node) {
log := logger.Get().With(
zap.String("namespace", c.namespace),
Expand Down
72 changes: 71 additions & 1 deletion controller/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,84 @@ func TestCluster_FailureCount(t *testing.T) {
require.EqualValues(t, 0, cluster.failureCounts[mockNode2.Addr()])
require.True(t, mockNode2.IsMaster())

// it will be always increase the failure count until the node is back again.
// Slave failure count keeps increasing; at threshold the slave is auto-marked as failed.
for i := int64(0); i < cluster.options.maxFailureCount*2; i++ {
require.EqualValues(t, i+1, cluster.increaseFailureCount(0, mockNode3))
}
require.True(t, mockNode3.Failed())
require.EqualValues(t, 3, clusterInfo.Version.Load())
cluster.resetFailureCount(mockNode3.ID())
require.EqualValues(t, 0, cluster.failureCounts[mockNode3.ID()])
}

func TestCluster_SlaveFailureAutoOffline(t *testing.T) {
ctx := context.Background()
ns := "test-ns"
clusterName := "test-slave-offline"

s := NewMockClusterStore()
mockMaster := store.NewClusterMockNode()
mockMaster.SetRole(store.RoleMaster)
mockMaster.Sequence = 100

mockSlave1 := store.NewClusterMockNode()
mockSlave1.SetRole(store.RoleSlave)
mockSlave1.Sequence = 90

mockSlave2 := store.NewClusterMockNode()
mockSlave2.SetRole(store.RoleSlave)
mockSlave2.Sequence = 80

clusterInfo := &store.Cluster{
Name: clusterName,
Shards: []*store.Shard{{
Nodes: []store.Node{mockMaster, mockSlave1, mockSlave2},
SlotRanges: []store.SlotRange{{Start: 0, Stop: 16383}},
MigratingSlot: &store.MigratingSlot{IsMigrating: false},
TargetShardIndex: -1,
}},
}
clusterInfo.Version.Store(1)
require.NoError(t, s.CreateCluster(ctx, ns, clusterInfo))

checker := &ClusterChecker{
clusterStore: s,
namespace: ns,
clusterName: clusterName,
options: ClusterCheckOptions{
pingInterval: time.Second,
maxFailureCount: 3,
},
failureCounts: make(map[string]int64),
syncCh: make(chan struct{}, 1),
}

// Slave should not be marked as failed before reaching threshold
require.False(t, mockSlave1.Failed())
for i := int64(0); i < checker.options.maxFailureCount-1; i++ {
checker.increaseFailureCount(0, mockSlave1)
}
require.False(t, mockSlave1.Failed())
require.EqualValues(t, 1, clusterInfo.Version.Load())

// Slave should be marked as failed when reaching threshold
checker.increaseFailureCount(0, mockSlave1)
require.True(t, mockSlave1.Failed())
require.EqualValues(t, 2, clusterInfo.Version.Load())

// Subsequent failures should not trigger another update (already failed)
checker.increaseFailureCount(0, mockSlave1)
require.True(t, mockSlave1.Failed())
require.EqualValues(t, 2, clusterInfo.Version.Load())

// Other slaves are not affected
require.False(t, mockSlave2.Failed())

// Master should not be affected by slave offline logic
require.True(t, mockMaster.IsMaster())
require.False(t, mockMaster.Failed())
}

func TestCluster_LoadAndProbe(t *testing.T) {
ctx := context.Background()
ns := "test-ns"
Expand Down
37 changes: 37 additions & 0 deletions server/api/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ package api
import (
"strconv"

"go.uber.org/zap"

"github.com/apache/kvrocks-controller/consts"
"github.com/apache/kvrocks-controller/logger"
"github.com/apache/kvrocks-controller/server/helper"
"github.com/gin-gonic/gin"

Expand Down Expand Up @@ -82,3 +85,37 @@ func (handler *NodeHandler) Remove(c *gin.Context) {
}
helper.ResponseNoContent(c)
}

func (handler *NodeHandler) SetStatus(c *gin.Context) {
ns := c.Param("namespace")
cluster, _ := c.MustGet(consts.ContextKeyCluster).(*store.Cluster)

var req struct {
Addrs []string `json:"addrs" binding:"required"`
Online bool `json:"online"`
}
if err := c.ShouldBindJSON(&req); err != nil {
helper.ResponseBadRequest(c, err)
return
}

var err error
if req.Online {
err = cluster.SetNodesOnline(req.Addrs)
} else {
err = cluster.SetNodesOffline(req.Addrs)
}
if err != nil {
helper.ResponseError(c, err)
return
}

if err := handler.s.UpdateCluster(c, ns, cluster); err != nil {
helper.ResponseError(c, err)
return
}
if err := cluster.SyncToNodes(c); err != nil {
logger.Get().With(zap.Error(err)).Warn("Failed to sync cluster info to nodes after status change")
}
helper.ResponseOK(c, nil)
}
49 changes: 49 additions & 0 deletions server/api/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,52 @@ func TestNodeBasics(t *testing.T) {
runRemove(t, cluster.Shards[0].Nodes[1].ID(), http.StatusNoContent)
})
}

func TestNodeSetStatus(t *testing.T) {
ns := "test-ns"
cluster, err := store.NewCluster("test-cluster", []string{"127.0.0.1:1234", "127.0.0.1:1235"}, 2)
require.NoError(t, err)

handler := &NodeHandler{s: store.NewClusterStore(engine.NewMock())}
require.NoError(t, handler.s.CreateCluster(context.Background(), ns, cluster))

slaveAddr := cluster.Shards[0].Nodes[1].Addr()
masterAddr := cluster.Shards[0].Nodes[0].Addr()

runSetStatus := func(t *testing.T, addrs []string, online bool, expectedCode int) {
var req struct {
Addrs []string `json:"addrs"`
Online bool `json:"online"`
}
req.Addrs = addrs
req.Online = online

recorder := httptest.NewRecorder()
ctx := GetTestContext(recorder)
body, err := json.Marshal(req)
require.NoError(t, err)

ctx.Set(consts.ContextKeyStore, handler.s)
ctx.Request.Body = io.NopCloser(bytes.NewBuffer(body))
ctx.Params = []gin.Param{
{Key: "namespace", Value: ns},
{Key: "cluster", Value: cluster.Name},
}
middleware.RequiredCluster(ctx)
require.Equal(t, http.StatusOK, recorder.Code)
handler.SetStatus(ctx)
require.Equal(t, expectedCode, recorder.Code)
}

t.Run("offline slave", func(t *testing.T) {
runSetStatus(t, []string{slaveAddr}, false, http.StatusOK)
})

t.Run("online slave", func(t *testing.T) {
runSetStatus(t, []string{slaveAddr}, true, http.StatusOK)
})

t.Run("offline master rejected", func(t *testing.T) {
runSetStatus(t, []string{masterAddr}, false, http.StatusBadRequest)
})
}
2 changes: 2 additions & 0 deletions server/helper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ func ResponseError(c *gin.Context, err error) {
code = http.StatusForbidden
} else if errors.Is(err, consts.ErrInvalidArgument) {
code = http.StatusBadRequest
} else if errors.Is(err, consts.ErrCannotOfflineMaster) {
code = http.StatusBadRequest
}
c.JSON(code, Response{
Error: &Error{Message: err.Error()},
Expand Down
1 change: 1 addition & 0 deletions server/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func (srv *Server) initHandlers() {
clusters.GET("/:cluster", middleware.RequiredCluster, handler.Cluster.Get)
clusters.DELETE("/:cluster", middleware.RequiredCluster, handler.Cluster.Remove)
clusters.POST("/:cluster/migrate", handler.Cluster.MigrateSlot)
clusters.POST("/:cluster/nodes/status", middleware.RequiredCluster, handler.Node.SetStatus)
}

shards := clusters.Group("/:cluster/shards")
Expand Down
73 changes: 69 additions & 4 deletions store/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,62 @@ func (cluster *Cluster) SyncToNodes(ctx context.Context) error {
return nil
}

func (cluster *Cluster) findNodeByAddr(addr string) Node {
for _, shard := range cluster.Shards {
for _, node := range shard.Nodes {
if node.Addr() == addr {
return node
}
}
}
return nil
}

func (cluster *Cluster) SetNodeFailedByID(nodeID string, failed bool) error {
for _, shard := range cluster.Shards {
for _, node := range shard.Nodes {
if node.ID() == nodeID {
node.SetFailed(failed)
return nil
}
}
}
return fmt.Errorf("node %s: %w", nodeID, consts.ErrNotFound)
}

func (cluster *Cluster) SetNodesOffline(addrs []string) error {
nodes := make([]Node, 0, len(addrs))
for _, addr := range addrs {
node := cluster.findNodeByAddr(addr)
if node == nil {
return fmt.Errorf("node %s: %w", addr, consts.ErrNotFound)
}
if node.IsMaster() {
return fmt.Errorf("node %s: %w", addr, consts.ErrCannotOfflineMaster)
}
nodes = append(nodes, node)
}
for _, node := range nodes {
node.SetFailed(true)
}
return nil
}

func (cluster *Cluster) SetNodesOnline(addrs []string) error {
nodes := make([]Node, 0, len(addrs))
for _, addr := range addrs {
node := cluster.findNodeByAddr(addr)
if node == nil {
return fmt.Errorf("node %s: %w", addr, consts.ErrNotFound)
}
nodes = append(nodes, node)
}
for _, node := range nodes {
node.SetFailed(false)
}
return nil
}

func (cluster *Cluster) GetNodes() []Node {
nodes := make([]Node, 0)
for i := 0; i < len(cluster.Shards); i++ {
Expand Down Expand Up @@ -273,10 +329,19 @@ func ParseCluster(clusterStr string) (*Cluster, error) {
addr: strings.Split(fields[1], "@")[0],
}

if strings.Contains(fields[2], ",") {
node.role = strings.Split(fields[2], ",")[1]
} else {
node.role = fields[2]
// Parse comma-separated flags (e.g. "slave,fail", "myself,master")
// to extract role and failed state.
roleFlags := strings.Split(fields[2], ",")
for _, flag := range roleFlags {
switch flag {
case RoleMaster:
node.role = RoleMaster
case RoleSlave:
node.role = RoleSlave
case "fail":
node.failed = true
}
// ignore: myself, pfail, handshake, noaddr, nofailover, noflags
}

var err error
Expand Down
Loading
Loading