diff --git a/consts/errors.go b/consts/errors.go index bf7e51d2..4d3f1bc2 100644 --- a/consts/errors.go +++ b/consts/errors.go @@ -38,5 +38,6 @@ var ( ErrShardIsServicing = errors.New("shard is servicing") ErrShardSlotIsMigrating = errors.New("shard slot is migrating") ErrShardNoMatchNewMaster = errors.New("no match new master in shard") + ErrCannotOfflineMaster = errors.New("cannot take master node offline, failover first") ErrSlotStartAndStopEqual = errors.New("start and stop of a range cannot be equal") ) diff --git a/controller/cluster.go b/controller/cluster.go index 8d4e70d3..8e0ba4d2 100755 --- a/controller/cluster.go +++ b/controller/cluster.go @@ -132,17 +132,37 @@ func (c *ClusterChecker) increaseFailureCount(shardIndex int, node store.Node) i count := c.failureCounts[id] c.failureMu.Unlock() - // don't add the node into the failover candidates if it's not a master node if !node.IsMaster() { + if count >= c.options.maxFailureCount && !node.Failed() { + log := logger.Get().With( + zap.String("cluster_name", c.clusterName), + zap.String("id", node.ID()), + zap.String("addr", node.Addr()), + zap.Int64("failure_count", count)) + cluster, err := c.clusterStore.GetCluster(c.ctx, c.namespace, c.clusterName) + if err != nil { + log.Error("Failed to get the cluster info", zap.Error(err)) + return count + } + if err := cluster.SetNodeFailedByID(node.ID(), true); err != nil { + log.Error("Failed to set slave node as failed", zap.Error(err)) + return count + } + if err := c.clusterStore.UpdateCluster(c.ctx, c.namespace, cluster); err != nil { + log.Error("Failed to update the cluster", zap.Error(err)) + return count + } + log.Info("Marked slave node as failed due to probe failures") + } return count } - log := logger.Get().With( - zap.String("cluster_name", c.clusterName), - zap.String("id", node.ID()), - zap.Bool("is_master", node.IsMaster()), - zap.String("addr", node.Addr())) if count%c.options.maxFailureCount == 0 || count > c.options.maxFailureCount { + log := logger.Get().With( + zap.String("cluster_name", c.clusterName), + zap.String("id", node.ID()), + zap.Bool("is_master", node.IsMaster()), + zap.String("addr", node.Addr())) cluster, err := c.clusterStore.GetCluster(c.ctx, c.namespace, c.clusterName) if err != nil { log.Error("Failed to get the cluster info", zap.Error(err)) @@ -188,6 +208,9 @@ func (c *ClusterChecker) syncClusterToNodes(ctx context.Context) error { version := clusterInfo.Version.Load() for _, shard := range clusterInfo.Shards { for _, node := range shard.Nodes { + if node.Failed() { + continue + } go func(n store.Node) { log := logger.Get().With( zap.String("namespace", c.namespace), diff --git a/controller/cluster_test.go b/controller/cluster_test.go index a1a720a6..70d99f2b 100644 --- a/controller/cluster_test.go +++ b/controller/cluster_test.go @@ -144,14 +144,84 @@ func TestCluster_FailureCount(t *testing.T) { require.EqualValues(t, 0, cluster.failureCounts[mockNode2.Addr()]) require.True(t, mockNode2.IsMaster()) - // it will be always increase the failure count until the node is back again. + // Slave failure count keeps increasing; at threshold the slave is auto-marked as failed. for i := int64(0); i < cluster.options.maxFailureCount*2; i++ { require.EqualValues(t, i+1, cluster.increaseFailureCount(0, mockNode3)) } + require.True(t, mockNode3.Failed()) + require.EqualValues(t, 3, clusterInfo.Version.Load()) cluster.resetFailureCount(mockNode3.ID()) require.EqualValues(t, 0, cluster.failureCounts[mockNode3.ID()]) } +func TestCluster_SlaveFailureAutoOffline(t *testing.T) { + ctx := context.Background() + ns := "test-ns" + clusterName := "test-slave-offline" + + s := NewMockClusterStore() + mockMaster := store.NewClusterMockNode() + mockMaster.SetRole(store.RoleMaster) + mockMaster.Sequence = 100 + + mockSlave1 := store.NewClusterMockNode() + mockSlave1.SetRole(store.RoleSlave) + mockSlave1.Sequence = 90 + + mockSlave2 := store.NewClusterMockNode() + mockSlave2.SetRole(store.RoleSlave) + mockSlave2.Sequence = 80 + + clusterInfo := &store.Cluster{ + Name: clusterName, + Shards: []*store.Shard{{ + Nodes: []store.Node{mockMaster, mockSlave1, mockSlave2}, + SlotRanges: []store.SlotRange{{Start: 0, Stop: 16383}}, + MigratingSlot: &store.MigratingSlot{IsMigrating: false}, + TargetShardIndex: -1, + }}, + } + clusterInfo.Version.Store(1) + require.NoError(t, s.CreateCluster(ctx, ns, clusterInfo)) + + checker := &ClusterChecker{ + clusterStore: s, + namespace: ns, + clusterName: clusterName, + options: ClusterCheckOptions{ + pingInterval: time.Second, + maxFailureCount: 3, + }, + failureCounts: make(map[string]int64), + syncCh: make(chan struct{}, 1), + } + + // Slave should not be marked as failed before reaching threshold + require.False(t, mockSlave1.Failed()) + for i := int64(0); i < checker.options.maxFailureCount-1; i++ { + checker.increaseFailureCount(0, mockSlave1) + } + require.False(t, mockSlave1.Failed()) + require.EqualValues(t, 1, clusterInfo.Version.Load()) + + // Slave should be marked as failed when reaching threshold + checker.increaseFailureCount(0, mockSlave1) + require.True(t, mockSlave1.Failed()) + require.EqualValues(t, 2, clusterInfo.Version.Load()) + + // Subsequent failures should not trigger another update (already failed) + checker.increaseFailureCount(0, mockSlave1) + require.True(t, mockSlave1.Failed()) + require.EqualValues(t, 2, clusterInfo.Version.Load()) + + // Other slaves are not affected + require.False(t, mockSlave2.Failed()) + + // Master should not be affected by slave offline logic + require.True(t, mockMaster.IsMaster()) + require.False(t, mockMaster.Failed()) +} + func TestCluster_LoadAndProbe(t *testing.T) { ctx := context.Background() ns := "test-ns" diff --git a/server/api/node.go b/server/api/node.go index 44071084..cf60ec47 100644 --- a/server/api/node.go +++ b/server/api/node.go @@ -23,7 +23,10 @@ package api import ( "strconv" + "go.uber.org/zap" + "github.com/apache/kvrocks-controller/consts" + "github.com/apache/kvrocks-controller/logger" "github.com/apache/kvrocks-controller/server/helper" "github.com/gin-gonic/gin" @@ -82,3 +85,37 @@ func (handler *NodeHandler) Remove(c *gin.Context) { } helper.ResponseNoContent(c) } + +func (handler *NodeHandler) SetStatus(c *gin.Context) { + ns := c.Param("namespace") + cluster, _ := c.MustGet(consts.ContextKeyCluster).(*store.Cluster) + + var req struct { + Addrs []string `json:"addrs" binding:"required"` + Online bool `json:"online"` + } + if err := c.ShouldBindJSON(&req); err != nil { + helper.ResponseBadRequest(c, err) + return + } + + var err error + if req.Online { + err = cluster.SetNodesOnline(req.Addrs) + } else { + err = cluster.SetNodesOffline(req.Addrs) + } + if err != nil { + helper.ResponseError(c, err) + return + } + + if err := handler.s.UpdateCluster(c, ns, cluster); err != nil { + helper.ResponseError(c, err) + return + } + if err := cluster.SyncToNodes(c); err != nil { + logger.Get().With(zap.Error(err)).Warn("Failed to sync cluster info to nodes after status change") + } + helper.ResponseOK(c, nil) +} diff --git a/server/api/node_test.go b/server/api/node_test.go index 86291856..a93e0ca4 100644 --- a/server/api/node_test.go +++ b/server/api/node_test.go @@ -124,3 +124,52 @@ func TestNodeBasics(t *testing.T) { runRemove(t, cluster.Shards[0].Nodes[1].ID(), http.StatusNoContent) }) } + +func TestNodeSetStatus(t *testing.T) { + ns := "test-ns" + cluster, err := store.NewCluster("test-cluster", []string{"127.0.0.1:1234", "127.0.0.1:1235"}, 2) + require.NoError(t, err) + + handler := &NodeHandler{s: store.NewClusterStore(engine.NewMock())} + require.NoError(t, handler.s.CreateCluster(context.Background(), ns, cluster)) + + slaveAddr := cluster.Shards[0].Nodes[1].Addr() + masterAddr := cluster.Shards[0].Nodes[0].Addr() + + runSetStatus := func(t *testing.T, addrs []string, online bool, expectedCode int) { + var req struct { + Addrs []string `json:"addrs"` + Online bool `json:"online"` + } + req.Addrs = addrs + req.Online = online + + recorder := httptest.NewRecorder() + ctx := GetTestContext(recorder) + body, err := json.Marshal(req) + require.NoError(t, err) + + ctx.Set(consts.ContextKeyStore, handler.s) + ctx.Request.Body = io.NopCloser(bytes.NewBuffer(body)) + ctx.Params = []gin.Param{ + {Key: "namespace", Value: ns}, + {Key: "cluster", Value: cluster.Name}, + } + middleware.RequiredCluster(ctx) + require.Equal(t, http.StatusOK, recorder.Code) + handler.SetStatus(ctx) + require.Equal(t, expectedCode, recorder.Code) + } + + t.Run("offline slave", func(t *testing.T) { + runSetStatus(t, []string{slaveAddr}, false, http.StatusOK) + }) + + t.Run("online slave", func(t *testing.T) { + runSetStatus(t, []string{slaveAddr}, true, http.StatusOK) + }) + + t.Run("offline master rejected", func(t *testing.T) { + runSetStatus(t, []string{masterAddr}, false, http.StatusBadRequest) + }) +} diff --git a/server/helper/helper.go b/server/helper/helper.go index 896a2a47..88550c05 100644 --- a/server/helper/helper.go +++ b/server/helper/helper.go @@ -77,6 +77,8 @@ func ResponseError(c *gin.Context, err error) { code = http.StatusForbidden } else if errors.Is(err, consts.ErrInvalidArgument) { code = http.StatusBadRequest + } else if errors.Is(err, consts.ErrCannotOfflineMaster) { + code = http.StatusBadRequest } c.JSON(code, Response{ Error: &Error{Message: err.Error()}, diff --git a/server/route.go b/server/route.go index 109f8dd4..b5eb94a2 100644 --- a/server/route.go +++ b/server/route.go @@ -70,6 +70,7 @@ func (srv *Server) initHandlers() { clusters.GET("/:cluster", middleware.RequiredCluster, handler.Cluster.Get) clusters.DELETE("/:cluster", middleware.RequiredCluster, handler.Cluster.Remove) clusters.POST("/:cluster/migrate", handler.Cluster.MigrateSlot) + clusters.POST("/:cluster/nodes/status", middleware.RequiredCluster, handler.Node.SetStatus) } shards := clusters.Group("/:cluster/shards") diff --git a/store/cluster.go b/store/cluster.go index e33e41a1..68b2ba00 100644 --- a/store/cluster.go +++ b/store/cluster.go @@ -157,6 +157,62 @@ func (cluster *Cluster) SyncToNodes(ctx context.Context) error { return nil } +func (cluster *Cluster) findNodeByAddr(addr string) Node { + for _, shard := range cluster.Shards { + for _, node := range shard.Nodes { + if node.Addr() == addr { + return node + } + } + } + return nil +} + +func (cluster *Cluster) SetNodeFailedByID(nodeID string, failed bool) error { + for _, shard := range cluster.Shards { + for _, node := range shard.Nodes { + if node.ID() == nodeID { + node.SetFailed(failed) + return nil + } + } + } + return fmt.Errorf("node %s: %w", nodeID, consts.ErrNotFound) +} + +func (cluster *Cluster) SetNodesOffline(addrs []string) error { + nodes := make([]Node, 0, len(addrs)) + for _, addr := range addrs { + node := cluster.findNodeByAddr(addr) + if node == nil { + return fmt.Errorf("node %s: %w", addr, consts.ErrNotFound) + } + if node.IsMaster() { + return fmt.Errorf("node %s: %w", addr, consts.ErrCannotOfflineMaster) + } + nodes = append(nodes, node) + } + for _, node := range nodes { + node.SetFailed(true) + } + return nil +} + +func (cluster *Cluster) SetNodesOnline(addrs []string) error { + nodes := make([]Node, 0, len(addrs)) + for _, addr := range addrs { + node := cluster.findNodeByAddr(addr) + if node == nil { + return fmt.Errorf("node %s: %w", addr, consts.ErrNotFound) + } + nodes = append(nodes, node) + } + for _, node := range nodes { + node.SetFailed(false) + } + return nil +} + func (cluster *Cluster) GetNodes() []Node { nodes := make([]Node, 0) for i := 0; i < len(cluster.Shards); i++ { @@ -273,10 +329,19 @@ func ParseCluster(clusterStr string) (*Cluster, error) { addr: strings.Split(fields[1], "@")[0], } - if strings.Contains(fields[2], ",") { - node.role = strings.Split(fields[2], ",")[1] - } else { - node.role = fields[2] + // Parse comma-separated flags (e.g. "slave,fail", "myself,master") + // to extract role and failed state. + roleFlags := strings.Split(fields[2], ",") + for _, flag := range roleFlags { + switch flag { + case RoleMaster: + node.role = RoleMaster + case RoleSlave: + node.role = RoleSlave + case "fail": + node.failed = true + } + // ignore: myself, pfail, handshake, noaddr, nofailover, noflags } var err error diff --git a/store/cluster_node.go b/store/cluster_node.go index 8fcd1063..7dbbcd67 100755 --- a/store/cluster_node.go +++ b/store/cluster_node.go @@ -59,9 +59,11 @@ type Node interface { Password() string Addr() string IsMaster() bool + Failed() bool SetRole(string) SetPassword(string) + SetFailed(bool) Reset(ctx context.Context) error GetClusterNodeInfo(ctx context.Context) (*ClusterNodeInfo, error) @@ -82,6 +84,7 @@ type ClusterNode struct { role string password string createdAt int64 + failed bool } type ClusterInfo struct { @@ -121,6 +124,14 @@ func (n *ClusterNode) SetRole(role string) { n.role = role } +func (n *ClusterNode) Failed() bool { + return n.failed +} + +func (n *ClusterNode) SetFailed(failed bool) { + n.failed = failed +} + func (n *ClusterNode) Addr() string { return n.addr } @@ -272,6 +283,7 @@ func (n *ClusterNode) MarshalJSON() ([]byte, error) { "role": n.role, "password": n.password, "created_at": n.createdAt, + "failed": n.failed, }) } @@ -282,6 +294,7 @@ func (n *ClusterNode) UnmarshalJSON(bytes []byte) error { Role string `json:"role"` Password string `json:"password"` CreatedAt int64 `json:"created_at"` + Failed bool `json:"failed"` } if err := json.Unmarshal(bytes, &data); err != nil { return err @@ -292,5 +305,6 @@ func (n *ClusterNode) UnmarshalJSON(bytes []byte) error { n.role = data.Role n.password = data.Password n.createdAt = data.CreatedAt + n.failed = data.Failed return nil } diff --git a/store/cluster_shard.go b/store/cluster_shard.go index 1181283a..910943b5 100644 --- a/store/cluster_shard.go +++ b/store/cluster_shard.go @@ -293,7 +293,11 @@ func (shard *Shard) ToSlotsString() (string, error) { } } } else { - builder.WriteString(RoleSlave) + if node.Failed() { + builder.WriteString(RoleSlave + ",fail") + } else { + builder.WriteString(RoleSlave) + } builder.WriteByte(' ') builder.WriteString(shard.Nodes[masterNodeIndex].ID()) } diff --git a/store/cluster_shard_test.go b/store/cluster_shard_test.go index 1406f351..f47e79ac 100644 --- a/store/cluster_shard_test.go +++ b/store/cluster_shard_test.go @@ -78,3 +78,39 @@ func TestShard_IsServicing(t *testing.T) { shard.SlotRanges = []SlotRange{{Start: -1, Stop: -1}} require.False(t, shard.IsServicing()) } + +func TestToSlotsString_WithFailedSlave(t *testing.T) { + shard := NewShard() + shard.SlotRanges = []SlotRange{{Start: 0, Stop: 100}} + + master := NewClusterNode("127.0.0.1:6379", "") + master.SetRole(RoleMaster) + + slave := NewClusterNode("127.0.0.1:6380", "") + slave.SetRole(RoleSlave) + slave.SetFailed(true) + + shard.Nodes = []Node{master, slave} + + result, err := shard.ToSlotsString() + require.NoError(t, err) + require.Contains(t, result, "slave,fail "+master.ID()) +} + +func TestToSlotsString_WithOnlineSlave(t *testing.T) { + shard := NewShard() + shard.SlotRanges = []SlotRange{{Start: 0, Stop: 100}} + + master := NewClusterNode("127.0.0.1:6379", "") + master.SetRole(RoleMaster) + + slave := NewClusterNode("127.0.0.1:6380", "") + slave.SetRole(RoleSlave) + + shard.Nodes = []Node{master, slave} + + result, err := shard.ToSlotsString() + require.NoError(t, err) + require.Contains(t, result, "slave "+master.ID()) + require.NotContains(t, result, "slave,fail") +} diff --git a/store/cluster_test.go b/store/cluster_test.go index 975f03f6..7c5d6ad9 100644 --- a/store/cluster_test.go +++ b/store/cluster_test.go @@ -106,3 +106,90 @@ func TestCluster_PromoteNewMaster(t *testing.T) { require.NoError(t, err) require.Equal(t, node2.ID(), newMasterID) } + +func TestCluster_SetNodeFailedByID(t *testing.T) { + cluster, err := NewCluster("test", []string{"node1", "node2", "node3"}, 3) + require.NoError(t, err) + require.Len(t, cluster.Shards, 1) + + slaveNode := cluster.Shards[0].Nodes[1] + require.False(t, slaveNode.Failed()) + + // Set failed by ID + err = cluster.SetNodeFailedByID(slaveNode.ID(), true) + require.NoError(t, err) + require.True(t, slaveNode.Failed()) + + // Set back to not-failed + err = cluster.SetNodeFailedByID(slaveNode.ID(), false) + require.NoError(t, err) + require.False(t, slaveNode.Failed()) + + // Non-existent node ID + err = cluster.SetNodeFailedByID("nonexistent-id", true) + require.ErrorIs(t, err, consts.ErrNotFound) +} + +func TestCluster_SetNodesOffline(t *testing.T) { + cluster, err := NewCluster("test", []string{"node1", "node2"}, 2) + require.NoError(t, err) + require.Len(t, cluster.Shards, 1) + + masterAddr := cluster.Shards[0].Nodes[0].Addr() + slaveAddr := cluster.Shards[0].Nodes[1].Addr() + + // Cannot offline master + err = cluster.SetNodesOffline([]string{masterAddr}) + require.ErrorIs(t, err, consts.ErrCannotOfflineMaster) + + // Can offline slave + err = cluster.SetNodesOffline([]string{slaveAddr}) + require.NoError(t, err) + require.True(t, cluster.Shards[0].Nodes[1].Failed()) + + // Addr not found + err = cluster.SetNodesOffline([]string{"nonexistent:1234"}) + require.ErrorIs(t, err, consts.ErrNotFound) + + // Atomic: if any addr is invalid, none are applied + cluster.Shards[0].Nodes[1].SetFailed(false) + err = cluster.SetNodesOffline([]string{slaveAddr, "nonexistent:1234"}) + require.ErrorIs(t, err, consts.ErrNotFound) + require.False(t, cluster.Shards[0].Nodes[1].Failed()) // not modified +} + +func TestCluster_SetNodesOnline(t *testing.T) { + cluster, err := NewCluster("test", []string{"node1", "node2"}, 2) + require.NoError(t, err) + + slaveAddr := cluster.Shards[0].Nodes[1].Addr() + + // First offline + err = cluster.SetNodesOffline([]string{slaveAddr}) + require.NoError(t, err) + require.True(t, cluster.Shards[0].Nodes[1].Failed()) + + // Then online + err = cluster.SetNodesOnline([]string{slaveAddr}) + require.NoError(t, err) + require.False(t, cluster.Shards[0].Nodes[1].Failed()) +} + +func TestParseCluster_WithFailFlag(t *testing.T) { + // Build a cluster nodes string with a failed slave + clusterStr := "cfb28ef1deee4e0fa78da86abe5d24c8589b4f09 127.0.0.1:30001 master - 0 0 1 connected 0-5460\n" + + "e44242e22c74bbe4deab41c6a9dfb68e099f2f08 127.0.0.1:30004 slave,fail cfb28ef1deee4e0fa78da86abe5d24c8589b4f09 0 0 1 connected" + + cluster, err := ParseCluster(clusterStr) + require.NoError(t, err) + require.Len(t, cluster.Shards, 1) + require.Len(t, cluster.Shards[0].Nodes, 2) + + master := cluster.Shards[0].Nodes[0] + require.True(t, master.IsMaster()) + require.False(t, master.Failed()) + + slave := cluster.Shards[0].Nodes[1] + require.False(t, slave.IsMaster()) + require.True(t, slave.Failed()) +}