From c9c9467db3f98658a064c52624ac1d3c04704c94 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Tich=C3=A1k?= Date: Thu, 5 Mar 2026 14:41:39 +0100 Subject: [PATCH 1/2] OCTRL-1081 kubectl task to create rudimentary bridge to kubernetes user infor setup properly... kubectl passes arguments properly to the kubernetes attempt for fairmq bla --- README.md | 2 + common/controlmode/controlmode.go | 10 + core/task/scheduler.go | 3 +- core/task/task.go | 11 +- core/task/taskclass/class.go | 19 +- docs/kubernetes_ecs.md | 70 +++++ executor/executable/kubectltask.go | 395 ++++++++++++++++++++++++ executor/executable/kubectltask_test.go | 80 +++++ executor/executable/task.go | 24 +- 9 files changed, 599 insertions(+), 15 deletions(-) create mode 100644 docs/kubernetes_ecs.md create mode 100644 executor/executable/kubectltask.go create mode 100644 executor/executable/kubectltask_test.go diff --git a/README.md b/README.md index 4889d30cb..5e905d1d2 100644 --- a/README.md +++ b/README.md @@ -193,6 +193,8 @@ There are two ways of interacting with AliECS: * [Sampling reservoir](/docs/metrics.md#sampling-reservoir) * [OCC API debugging with grpcc](/docs/using_grpcc_occ.md#occ-api-debugging-with-grpcc) * [Running tasks inside docker](/docs/running_docker.md#running-a-task-inside-a-docker-container) + * Kubernetes + * [ECS bridge to Kubernetes](/docs/kubernetes_ecs.md) * Resources * T. Mrnjavac et. al, [AliECS: A New Experiment Control System for the ALICE Experiment](https://doi.org/10.1051/epjconf/202429502027), CHEP23 diff --git a/common/controlmode/controlmode.go b/common/controlmode/controlmode.go index 6d6dc8fdb..5cab19729 100644 --- a/common/controlmode/controlmode.go +++ b/common/controlmode/controlmode.go @@ -39,6 +39,8 @@ const ( FAIRMQ BASIC HOOK + KUBECTL_DIRECT + KUBECTL_FAIRMQ ) func (cm ControlMode) String() string { @@ -51,6 +53,10 @@ func (cm ControlMode) String() string { return "basic" case HOOK: return "hook" + case KUBECTL_DIRECT: + return "kubectl_direct" + case KUBECTL_FAIRMQ: + return "kubectl_fairmq" } return "direct" } @@ -71,6 +77,10 @@ func (cm *ControlMode) UnmarshalText(b []byte) error { *cm = BASIC case "hook": *cm = HOOK + case "kubectl_direct": + *cm = KUBECTL_DIRECT + case "kubectl_fairmq": + *cm = KUBECTL_FAIRMQ default: *cm = DIRECT } diff --git a/core/task/scheduler.go b/core/task/scheduler.go index 5e2d60fe4..737cad73e 100644 --- a/core/task/scheduler.go +++ b/core/task/scheduler.go @@ -1432,7 +1432,8 @@ func makeTaskForMesosResources( cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%d", "OCC_CONTROL_PORT", controlPort)) } - if cmd.ControlMode == controlmode.FAIRMQ { + if cmd.ControlMode == controlmode.FAIRMQ || + cmd.ControlMode == controlmode.KUBECTL_FAIRMQ { cmd.Arguments = append(cmd.Arguments, "--control-port", strconv.FormatUint(controlPort, 10)) } diff --git a/core/task/task.go b/core/task/task.go index ba3c459e5..bee19a1b5 100644 --- a/core/task/task.go +++ b/core/task/task.go @@ -286,7 +286,9 @@ func (t *Task) BuildTaskCommand(role parentRole) (err error) { if class.Control.Mode == controlmode.BASIC || class.Control.Mode == controlmode.HOOK || class.Control.Mode == controlmode.DIRECT || - class.Control.Mode == controlmode.FAIRMQ { + class.Control.Mode == controlmode.FAIRMQ || + class.Control.Mode == controlmode.KUBECTL_DIRECT || + class.Control.Mode == controlmode.KUBECTL_FAIRMQ { var varStack map[string]string // First we get the full varStack from the parent role, and @@ -393,7 +395,8 @@ func (t *Task) BuildTaskCommand(role parentRole) (err error) { } } - if class.Control.Mode == controlmode.FAIRMQ { + if class.Control.Mode == controlmode.FAIRMQ || + class.Control.Mode == controlmode.KUBECTL_FAIRMQ { // FIXME read this from configuration // if the task class doesn't provide an id, we generate one ourselves if !utils.StringSliceContains(cmd.Arguments, "--id") { @@ -635,7 +638,9 @@ func (t *Task) BuildPropertyMap(bindMap channel.BindMap) (propMap controlcommand // For FAIRMQ tasks, we append FairMQ channel configuration if class.Control.Mode == controlmode.FAIRMQ || - class.Control.Mode == controlmode.DIRECT { + class.Control.Mode == controlmode.DIRECT || + class.Control.Mode == controlmode.KUBECTL_DIRECT || + class.Control.Mode == controlmode.KUBECTL_FAIRMQ { for _, inbCh := range channel.MergeInbound(parent.CollectInboundChannels(), class.Bind) { // We get the FairMQ-formatted propertyMap from the inbound channel spec var chanProps controlcommands.PropertyMap diff --git a/core/task/taskclass/class.go b/core/task/taskclass/class.go index 902741c6c..4a4e6329d 100644 --- a/core/task/taskclass/class.go +++ b/core/task/taskclass/class.go @@ -123,7 +123,6 @@ func (c *Class) UnmarshalYAML(unmarshal func(interface{}) error) (err error) { } } return - } func (c *Class) MarshalYAML() (interface{}, error) { @@ -154,13 +153,17 @@ func (c *Class) MarshalYAML() (interface{}, error) { Command: c.Command, } - if c.Control.Mode == controlmode.FAIRMQ { - aux.Control.Mode = "fairmq" - } else if c.Control.Mode == controlmode.BASIC { - aux.Control.Mode = "basic" - } else { - aux.Control.Mode = "direct" - } + // if c.Control.Mode == controlmode.FAIRMQ { + // aux.Control.Mode = "fairmq" + // } else if c.Control.Mode == controlmode.BASIC { + // aux.Control.Mode = "basic" + // } else if c.Control.Mode == controlmode.KUBECTL { + // aux.Control.Mode = "kubectl" + // } else { + // aux.Control.Mode = "direct" + // } + + aux.Control.Mode = c.Control.Mode.String() return aux, nil } diff --git a/docs/kubernetes_ecs.md b/docs/kubernetes_ecs.md new file mode 100644 index 000000000..ce698bdcf --- /dev/null +++ b/docs/kubernetes_ecs.md @@ -0,0 +1,70 @@ +# ECS with Kubernetes + +> ⚠️ **Warning** +> All Kubernetes work done is in a stage of prototype. + +## Kubernetes Cluster + +While prototyping we used many Kubernetes clusters, namely [`kind`](https://kind.sigs.k8s.io/), [`minikube`](https://minikube.sigs.k8s.io/docs/) and [`k3s`](https://k3s.io/) +in both local and remote cluster deployment. We used Openstack for remote deployment. +Follow the guides at the individual distributions in order to create the desired cluster setup. +For now we chose `k3s` for most of the activities performed because it is lightweight +and easily installed distribution which is also [`CNCF`](https://www.cncf.io/training/certification/) certified. + +All settings of `k3s` were used as default except one: locked-in-memory size. Use `ulimit -l` to learn +what is the limit for the current user and `LimitMEMLOCK` inside the k3s systemd service config +to set it for correct value. Right now the `flp` user has unlimited size (`LimitMEMLOCK=infinity`). +This config is necessary because even if you are running PODs with the privileged security context +under user flp, Kubernetes still sets limits according to its internal settings and doesn't +respect linux settings. + +Another setup we expect at this moment to be present at the target nodes +is ability to run PODs with privileged permissions and also under user `flp`. +This means that the machine has to have `flp` user setup the same way as +if you would do the installation with [`o2-flp-setup`](https://alice-flp.docs.cern.ch/Operations/Experts/system-configuration/utils/o2-flp-setup/). + +## Running tasks (`KubectlTask`) + +ECS is setup to run tasks through Mesos on all required hosts baremetal with active +task management (see [`ControllableTask`](/executor/executable/controllabletask.go)) +and OCC gRPC communication. When running docker task through ECS we could easily +wrap command to be run into the docker container with proper settings +([see](/docs/running_docker.md)). This is however not possible for Kubernetes +workloads as the PODs are "hidden" inside the cluster. So we plan +to deploy our own Task Controller which will connect to and guide +OCC state machine of required tasks. Thus we need to create custom +POC way to communicate with Kubernetes cluster from Mesos executor. + +The reason why we don't call Kubernetes cluster directly from ECS core +is that ECS does a lot of heavy lifting while deploying workloads, +monitoring workloads and by generating a lot of configuration which +is not trivial to replicate manually. However, if we create some class +that would be able to deploy one task into the Kubernetes and monitor its +state we could replicate `ControllableTask` workflow and leave ECS +mostly intact for now, save a lot of work and focus on prototyping +Kubernetes operator pattern. + +Thus [`KubectlTask`](/executor/executable/kubectltask.go) was created. This class +is written as a wrapper around `kubectl` utility to manage Kubernetes cluster. +It is based on following `kubectl` commands: + +* `apply` => `kubectl apply -f manifest.yaml` - deploys resource described inside given manifest +* `delete` => `kubectl delete -f manifest.yaml` - deletes resource from cluster +* `patch` => `kubectl patch -f exampletask.yaml --type='json' -p='[{"op": "replace", "path": "/spec/state", "value": "running"}]` - changes the state of resource inside cluster +* `get` => `kubectl get -f manifest.yaml -o jsonpath='{.spec.state}'` - queries exact field of resource (`state` in the example) inside cluster. + +These four commands allow us to deploy and monitor status of the deployed +resource without necessity to interact with it directly. However `KubectlTask` +expects that resource is the CRD [Task](/control-operator/api/v1alpha1/task_types.go). + +In order to activate `KubectlTask` you need to change yaml template +inside the `ControlWorkflows` directory. Namely: + +* add path to the kubectl manifest as the first argument in `.command.arguments` field +* change `.control.mode` to either `kubectl_direct` or `kubectl_fairmq` +You can find working template inside `control-operator/ecs-manifests/control-workflows/*_kube.yaml` + +Working kubectl manifests are to be found in `control-operator/ecs-manifests/kubernetes-manifests`. +You can see `*test.yaml` for concrete deployable manifests by `kubectl apply`, the rest +are the templates with variables to be filled in in a `${var}` format. `KubectlTask` +fills these variables from env vars. diff --git a/executor/executable/kubectltask.go b/executor/executable/kubectltask.go new file mode 100644 index 000000000..4a7112595 --- /dev/null +++ b/executor/executable/kubectltask.go @@ -0,0 +1,395 @@ +/* + * === This file is part of ALICE O² === + * + * Copyright 2018-2025 CERN and copyright holders of ALICE O². + * Author: Michal Tichak + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * In applying this license CERN does not waive the privileges and + * immunities granted to it by virtue of its status as an + * Intergovernmental Organization or submit itself to any jurisdiction. + */ + +package executable + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "os" + "os/exec" + "os/user" + "strconv" + "strings" + "sync/atomic" + "time" + + "github.com/AliceO2Group/Control/core/controlcommands" + "github.com/AliceO2Group/Control/executor/executorcmd" + mesos "github.com/mesos/mesos-go/api/v1/lib" + "github.com/sirupsen/logrus" +) + +const ( + KUBECTL string = "kubectl" + + APPLY string = "apply" + DELETE string = "delete" + PATCH string = "patch" + GET string = "get" + TASK string = "task" + CREATE string = "create" + + // TRANSITION_TIMEOUT = 10 * time.Second // inside controllable task +) + +type KubectlTask struct { + taskBase + rpc *executorcmd.RpcClient + configYaml string + running bool + latestStatus atomic.Value +} + +func GetUserInfo(username string) (uid, gid int64, supplemental []int64, err error) { + u, err := user.Lookup(username) + if err != nil { + return 0, 0, nil, err + } + + // Convert UID + uidInt, _ := strconv.ParseInt(u.Uid, 10, 64) + + // Convert Primary GID + gidInt, _ := strconv.ParseInt(u.Gid, 10, 64) + + // Get Supplemental Groups (e.g., wheel, pda) + groupStrings, _ := u.GroupIds() + var supplementalInts []int64 + for _, g := range groupStrings { + gInt, _ := strconv.ParseInt(g, 10, 64) + // Avoid adding the primary GID to the supplemental list + if gInt != gidInt { + supplementalInts = append(supplementalInts, gInt) + } + } + + return uidInt, gidInt, supplementalInts, nil +} + +func (task *KubectlTask) Launch() error { + if len(task.Tci.Arguments) == 0 { + log.WithFields(logrus.Fields{ + "controlmode": task.Tci.ControlMode, + "name": task.ti.Name, + }). + Error("no arguments in kubectl task. We need to have at least manifest location as the last argument") + return errors.New("no arguments for kubectl task. Location for kubernetes manifest needed") + } + + task.configYaml = task.Tci.Arguments[0] + + // Read the template file + content, err := os.ReadFile(task.configYaml) + if err != nil { + log.WithFields(logrus.Fields{ + "controlmode": task.Tci.ControlMode, + "name": task.ti.Name, + "file": task.configYaml, + }).WithError(err).Error("failed to read kubectl config file") + return err + } + + // Set the AliECS environment variables in the local process + // so os.ExpandEnv can find them + for _, envVar := range task.Tci.Env { + parts := strings.SplitN(envVar, "=", 2) + if len(parts) == 2 { + os.Setenv(parts[0], parts[1]) + } + } + + // Set arguments into the KUBE_ARGUMENTS os env leaving the kubemanifest file + arguments := task.Tci.Arguments[1:] + + log.WithFields(logrus.Fields{ + "controlmode": task.Tci.ControlMode, + "name": task.ti.Name, + "args": arguments, + }).Info("setting arguments as a KUBE_ARGUMENTS env var") + + os.Setenv("KUBE_ARGUMENTS", strings.Join(arguments, " ")) + + log.WithFields(logrus.Fields{ + "controlmode": task.Tci.ControlMode, + "name": task.ti.Name, + "command": *task.Tci.Value, + }).Info("setting command as a KUBE_COMMAND env var") + os.Setenv("KUBE_COMMAND", *task.Tci.Value) + + if uid, gid, supplementalIds, err := GetUserInfo("flp"); err == nil { + os.Setenv("FLP_UID", strconv.FormatInt(uid, 10)) + os.Setenv("FLP_GID", strconv.FormatInt(gid, 10)) + + var strIds []string + for _, id := range supplementalIds { + strIds = append(strIds, strconv.FormatInt(id, 10)) + } + supplementalString := "[" + strings.Join(strIds, ", ") + "]" + + os.Setenv("FLP_SUPPLEMENTAL_GORUPS", supplementalString) + } else { + log.Error("we cannot run kubectl task as flp user because we didn't find user details") + } + + // Replace ${VAR} placeholders with actual values + expandedYaml := os.ExpandEnv(string(content)) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + // Apply via Stdin (-) + command := exec.CommandContext(ctx, KUBECTL, APPLY, "-f", "-") + command.Stdin = strings.NewReader(expandedYaml) + + log.WithFields(logrus.Fields{ + "controlmode": task.Tci.ControlMode, + "name": task.ti.Name, + "command": command, + }).Info("Starting kubectl apply via Stdin") + + var stdoutBuf bytes.Buffer + var stderrBuf bytes.Buffer + + command.Stdout = io.MultiWriter(os.Stdout, &stdoutBuf) + command.Stderr = io.MultiWriter(os.Stderr, &stderrBuf) + + err = command.Run() + if err != nil { + log.WithFields(logrus.Fields{ + "controlmode": task.Tci.ControlMode, + "name": task.ti.Name, + }).WithError(err).Errorf("kubectl apply failed stderr: %s , stdin: %s", stderrBuf.String(), stdoutBuf.String()) + return err + } + + task.latestStatus.Store("") + task.running = true + go task.eventLoop() + return nil +} + +func (task *KubectlTask) Kill() error { + task.running = false + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + command := exec.CommandContext(ctx, KUBECTL, DELETE, "-f", task.configYaml) + + command.Stdout = os.Stdout + command.Stderr = os.Stderr + + err := command.Run() + if err != nil { + log.WithFields(logrus.Fields{ + "controlmode": task.Tci.ControlMode, + "name": task.ti.Name, + }).WithError(err).Error("kubectl delete failed") + return err + } + + task.sendStatus(task.knownEnvironmentId, mesos.TASK_FINISHED, "") + + return nil +} + +func (task *KubectlTask) Transition(transition *executorcmd.ExecutorCommand_Transition) *controlcommands.MesosCommandResponse_Transition { + // kubectl patch -f exampletask.yaml --type='json' -p='[{"op": "replace", "path": "/spec/state", "value": "running"}]' + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + // Extract the transition arguments (the 'Mix') to pipe them to Kubernetes + log.WithFields(logrus.Fields{ + "controlmode": task.Tci.ControlMode, + "name": task.ti.Name, + "args": transition.Arguments, + }).Info("Patching transition arguments to Kubernetes") + + argsJSON, err := json.Marshal(transition.Arguments) + if err != nil { + log.WithFields(logrus.Fields{ + "controlmode": task.Tci.ControlMode, + "name": task.ti.Name, + }).WithError(err).Error("failed to marshal transition arguments") + return transition.PrepareResponse(err, transition.Source, task.ti.TaskID.Value) + } + + transitionJSON := fmt.Sprintf(`[ + {"op": "add", "path": "/spec/state", "value": "%s"}, + {"op": "add", "path": "/spec/arguments", "value": %s} + ]`, strings.ToLower(transition.Destination), string(argsJSON)) + + command := exec.CommandContext(ctx, KUBECTL, PATCH, "-f", task.configYaml, "--type=json", "-p", transitionJSON) + + command.Stdout = os.Stdout + command.Stderr = os.Stderr + + log.WithFields(logrus.Fields{ + "controlmode": task.Tci.ControlMode, + "name": task.ti.Name, + "command": command, + }).Info("Starting kubectl patch") + + statusBeforeTransition := task.latestStatus.Load().(string) + + err = command.Run() + if err != nil { + log.WithFields(logrus.Fields{ + "controlmode": task.Tci.ControlMode, + "name": task.ti.Name, + "command": command, + }).WithError(err).Error("kubectl patch failed") + return transition.PrepareResponse(err, transition.Source, task.ti.TaskID.Value) + } + + log.WithFields(logrus.Fields{ + "controlmode": task.Tci.ControlMode, + "name": task.ti.Name, + "command": command, + }).Info("kubectl patch suceeded, waiting for actual status change") + actualStatus := "" + timeout := time.After(TRANSITION_TIMEOUT) + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + +loop: + for { + select { + case <-ticker.C: + actualStatus = task.latestStatus.Load().(string) + if actualStatus != statusBeforeTransition { + break loop + } + case <-timeout: + return transition.PrepareResponse(errors.New("timeout waiting for status change"), statusBeforeTransition, task.ti.TaskID.Value) + } + } + log.WithFields(logrus.Fields{ + "controlmode": task.Tci.ControlMode, + "name": task.ti.Name, + "command": command, + }).Infof("status changed from %s to %s", statusBeforeTransition, actualStatus) + + // TODO: I am not sure what PID should I put here + return transition.PrepareResponse(nil, actualStatus, task.ti.TaskID.Value) +} + +func (task *KubectlTask) getTaskStatus() (string, error) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + // command := exec.CommandContext(ctx, KUBECTL, GET, TASK, task.ti.Name, "-o", "jsonpath={.status.state}") + command := exec.CommandContext(ctx, KUBECTL, GET, "-f", task.configYaml, "-o", "jsonpath={.status.state}") + + var stdoutBuf bytes.Buffer + + command.Stdout = io.MultiWriter(os.Stdout, &stdoutBuf) + command.Stderr = os.Stderr + + log.WithFields(logrus.Fields{ + "controlmode": task.Tci.ControlMode, + "name": task.ti.Name, + "command": command, + }).Debug("Starting kubectl get task") + + err := command.Run() + if err != nil { + log.WithFields(logrus.Fields{ + "controlmode": task.Tci.ControlMode, + "name": task.ti.Name, + "command": command, + }).WithError(err).Error("kubectl get task failed") + return "", err + } + + // no newlines + return strings.TrimSpace(stdoutBuf.String()), nil +} + +func (task *KubectlTask) eventLoop() { + errorCount := 0 + maxErrors := 5 + for task.running { + time.Sleep(5 * time.Second) + status, err := task.getTaskStatus() + if err != nil { + errorCount += 1 + if errorCount < maxErrors { + log.WithError(err).Warnf("failed to get Task Status, retrying %d/%d", errorCount, maxErrors) + continue + } + log.WithError(err).Errorf("failed to get Task Status, sending TASK_FAILED and breaking from the eventLoop") + task.sendStatus(task.knownEnvironmentId, mesos.TASK_FAILED, "couldn't get task status via kubectl") + task.running = false + // TODO: remove when debugging done + // _ = task.Kill() + break + } + + status = strings.ToUpper(status) + + if task.latestStatus.Load().(string) == status { + continue + } + task.latestStatus.Store(status) + + var state mesos.TaskState + switch status { + case "CONFIGURED", "RUNNING", "STANDBY": + state = mesos.TASK_RUNNING + + case "ERROR": + state = mesos.TASK_FAILED + log.WithError(err).Error("Received error from kubectl task, terminating everything and sending update") + task.running = false + // TODO: remove when debugging done + // _ = task.Kill() + // + + default: + log.Errorf("received different status than expected: %s", status) + continue + } + + log.Debugf("sending new status from kubectl task %s", status) + task.sendStatus(task.knownEnvironmentId, state, "") + + } +} + +func (task *KubectlTask) UnmarshalTransition(data []byte) (cmd *executorcmd.ExecutorCommand_Transition, err error) { + cmd = new(executorcmd.ExecutorCommand_Transition) + if task.rpc != nil { + cmd.Transitioner = task.rpc.Transitioner + } + err = json.Unmarshal(data, cmd) + if err != nil { + cmd = nil + } + return +} diff --git a/executor/executable/kubectltask_test.go b/executor/executable/kubectltask_test.go new file mode 100644 index 000000000..6dbb6d300 --- /dev/null +++ b/executor/executable/kubectltask_test.go @@ -0,0 +1,80 @@ +/* + * === This file is part of ALICE O² === + * + * Copyright 2018-2025 CERN and copyright holders of ALICE O². + * Author: Michal Tichak + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * In applying this license CERN does not waive the privileges and + * immunities granted to it by virtue of its status as an + * Intergovernmental Organization or submit itself to any jurisdiction. + */ + +package executable + +import ( + "testing" + "time" + + "github.com/AliceO2Group/Control/common" + "github.com/AliceO2Group/Control/executor/executorcmd" + mesos "github.com/mesos/mesos-go/api/v1/lib" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("kubectl task test", func() { + task := KubectlTask{} + task.Tci = &common.TaskCommandInfo{} + task.Tci.Arguments = []string{"exampletask.yaml"} + task.configYaml = "exampletask.yaml" + task.ti = &mesos.TaskInfo{Name: "exampletask"} + When("starting and stoping the task", func() { + It("should start and stop accordingly", func() { + err := task.Launch() + Expect(err).NotTo(HaveOccurred()) + + // just so we can see monitoring tools show something + time.Sleep(2 * time.Second) + + err = task.Kill() + Expect(err).NotTo(HaveOccurred()) + }) + }) + + When("transitioning to running", func() { + It("should transition", func() { + task.configYaml = "exampletask.yaml" + transitionMsg := &executorcmd.ExecutorCommand_Transition{} + transitionMsg.Destination = "running" + result := task.Transition(transitionMsg) + Expect(result.ErrorString).To(BeEmpty()) + }) + }) + + // this test expects there is a task with a status on cluster. + When("geting current status", func() { + It("should get proper status", func() { + status, err := task.getTaskStatus() + Expect(err).NotTo(HaveOccurred()) + Expect(status).To(Equal("standby")) + }) + }) +}) + +func TestKubectlTask(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "KubectlTask suite") +} diff --git a/executor/executable/task.go b/executor/executable/task.go index e23b56394..abb92b14e 100644 --- a/executor/executable/task.go +++ b/executor/executable/task.go @@ -58,9 +58,11 @@ const ( var log = logger.New(logrus.StandardLogger(), "executor") -type SendStatusFunc func(envId uid.ID, state mesos.TaskState, message string) -type SendDeviceEventFunc func(envId uid.ID, event event.DeviceEvent) -type SendMessageFunc func(message []byte) +type ( + SendStatusFunc func(envId uid.ID, state mesos.TaskState, message string) + SendDeviceEventFunc func(envId uid.ID, event event.DeviceEvent) + SendMessageFunc func(message []byte) +) type Task interface { Launch() error @@ -90,6 +92,7 @@ func NewTask(taskInfo mesos.TaskInfo, sendStatusFunc SendStatusFunc, sendDeviceE log.WithField("json", string(tciData[:])). Trace("received TaskCommandInfo") + log.WithField("findme", "here").Info(string(tciData)) if err := json.Unmarshal(tciData, &commandInfo); tciData != nil && err == nil { log.WithFields(logrus.Fields{ "shell": *commandInfo.Shell, @@ -177,6 +180,21 @@ func NewTask(taskInfo mesos.TaskInfo, sendStatusFunc SendStatusFunc, sendDeviceE }, rpc: nil, } + case controlmode.KUBECTL_DIRECT: + fallthrough + case controlmode.KUBECTL_FAIRMQ: + newTask = &KubectlTask{ + taskBase: taskBase{ + ti: &taskInfo, + Tci: &commandInfo, + sendStatus: sendStatusFunc, + sendDeviceEvent: sendDeviceEventFunc, + sendMessage: sendMessageFunc, + knownEnvironmentId: envId, + knownDetector: detector, + }, + rpc: nil, + } } return newTask From cc7842c263f33a7c4e7b019e8d7d2f9bb76aabdf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Tich=C3=A1k?= Date: Fri, 10 Apr 2026 12:41:17 +0200 Subject: [PATCH 2/2] fixed documentation --- docs/kubernetes_ecs.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/kubernetes_ecs.md b/docs/kubernetes_ecs.md index ce698bdcf..893c4cff4 100644 --- a/docs/kubernetes_ecs.md +++ b/docs/kubernetes_ecs.md @@ -8,18 +8,18 @@ While prototyping we used many Kubernetes clusters, namely [`kind`](https://kind.sigs.k8s.io/), [`minikube`](https://minikube.sigs.k8s.io/docs/) and [`k3s`](https://k3s.io/) in both local and remote cluster deployment. We used Openstack for remote deployment. Follow the guides at the individual distributions in order to create the desired cluster setup. -For now we chose `k3s` for most of the activities performed because it is lightweight +k3s is recommended to run this prototype, as it is lightweight and easily installed distribution which is also [`CNCF`](https://www.cncf.io/training/certification/) certified. All settings of `k3s` were used as default except one: locked-in-memory size. Use `ulimit -l` to learn what is the limit for the current user and `LimitMEMLOCK` inside the k3s systemd service config to set it for correct value. Right now the `flp` user has unlimited size (`LimitMEMLOCK=infinity`). -This config is necessary because even if you are running PODs with the privileged security context +This config is necessary because even if you are running Pods with the privileged security context under user flp, Kubernetes still sets limits according to its internal settings and doesn't respect linux settings. Another setup we expect at this moment to be present at the target nodes -is ability to run PODs with privileged permissions and also under user `flp`. +is ability to run Pods with privileged permissions and also under user `flp`. This means that the machine has to have `flp` user setup the same way as if you would do the installation with [`o2-flp-setup`](https://alice-flp.docs.cern.ch/Operations/Experts/system-configuration/utils/o2-flp-setup/). @@ -30,7 +30,7 @@ task management (see [`ControllableTask`](/executor/executable/controllabletask. and OCC gRPC communication. When running docker task through ECS we could easily wrap command to be run into the docker container with proper settings ([see](/docs/running_docker.md)). This is however not possible for Kubernetes -workloads as the PODs are "hidden" inside the cluster. So we plan +workloads as the Pods are "hidden" inside the cluster. So we plan to deploy our own Task Controller which will connect to and guide OCC state machine of required tasks. Thus we need to create custom POC way to communicate with Kubernetes cluster from Mesos executor.