Skip to content

Commit 7f732c9

Browse files
committed
refactor: improve pod log reading and status tracking
- Simplified pod status watching by replacing shared informers with a direct watch loop. - Added a fallback mechanism for retrieving TaskRun statuses when permissions to list resources are restricted. - Updated log reading logic to stop immediately when a container failure is detected. - Added comprehensive unit tests for container failure checks and retryable watch behavior. Signed-off-by: Chmouel Boudjnah <chmouel@redhat.com>
1 parent 100b7cd commit 7f732c9

8 files changed

Lines changed: 456 additions & 125 deletions

File tree

pkg/log/task_reader.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -120,13 +120,13 @@ func (r *Reader) readAvailableTaskLogs(tr *v1.TaskRun) (<-chan Log, <-chan error
120120
return logC, errC, nil
121121
}
122122

123-
func (r *Reader) readStepsLogs(logC chan<- Log, errC chan<- error, steps []*step, pod *pods.Pod, follow, timestamps bool) {
123+
func (r *Reader) readStepsLogs(logC chan<- Log, errC chan<- error, steps []*step, podRef *pods.Pod, pod *corev1.Pod, follow, timestamps bool) {
124124
for _, step := range steps {
125125
if !follow && !step.hasStarted() {
126126
continue
127127
}
128128

129-
container := pod.Container(step.container)
129+
container := podRef.Container(step.container)
130130
containerLogC, containerLogErrC, err := container.LogReader(follow, timestamps).Read()
131131
if err != nil {
132132
errC <- fmt.Errorf("error in getting logs for step %s: %s", step.name, err)
@@ -153,10 +153,16 @@ func (r *Reader) readStepsLogs(logC chan<- Log, errC chan<- error, steps []*step
153153
}
154154
}
155155

156-
if err := container.Status(); err != nil {
157-
errC <- err
158-
return
156+
err = pods.CheckFailedContainers(pod, []string{step.container})
157+
if follow {
158+
err = podRef.CheckFailedContainers([]string{step.container})
159159
}
160+
if err == nil {
161+
continue
162+
}
163+
164+
errC <- err
165+
return
160166
}
161167
}
162168

@@ -206,7 +212,7 @@ func (r *Reader) readPodLogs(podC <-chan string, podErrC <-chan error, follow, t
206212
errC <- fmt.Errorf("no steps found for task %s", r.task)
207213
continue
208214
}
209-
r.readStepsLogs(logC, errC, steps, p, follow, timestamps)
215+
r.readStepsLogs(logC, errC, steps, p, pod, follow, timestamps)
210216
}
211217
}()
212218

pkg/log/task_reader_test.go

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
package log
2+
3+
import (
4+
"testing"
5+
6+
"github.com/tektoncd/cli/pkg/cli"
7+
podsfake "github.com/tektoncd/cli/pkg/pods/fake"
8+
"github.com/tektoncd/cli/pkg/test"
9+
corev1 "k8s.io/api/core/v1"
10+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
11+
)
12+
13+
func TestReadPodLogs_stopsAfterFailedStep(t *testing.T) {
14+
const (
15+
ns = "ns"
16+
podName = "pod"
17+
)
18+
19+
pods := []*corev1.Pod{{
20+
ObjectMeta: metav1.ObjectMeta{
21+
Name: podName,
22+
Namespace: ns,
23+
},
24+
Spec: corev1.PodSpec{
25+
Containers: []corev1.Container{
26+
{Name: "step-first"},
27+
{Name: "step-second"},
28+
},
29+
},
30+
Status: corev1.PodStatus{
31+
ContainerStatuses: []corev1.ContainerStatus{
32+
{
33+
Name: "step-first",
34+
State: corev1.ContainerState{
35+
Terminated: &corev1.ContainerStateTerminated{
36+
ExitCode: 137,
37+
Reason: "OOMKilled",
38+
},
39+
},
40+
},
41+
{
42+
Name: "step-second",
43+
State: corev1.ContainerState{
44+
Terminated: &corev1.ContainerStateTerminated{
45+
ExitCode: 0,
46+
},
47+
},
48+
},
49+
},
50+
},
51+
}}
52+
53+
cs, _ := test.SeedV1beta1TestData(t, test.Data{Pods: pods})
54+
reader := &Reader{
55+
ns: ns,
56+
clients: &cli.Clients{Kube: cs.Kube},
57+
streamer: podsfake.Streamer(podsfake.Logs(
58+
podsfake.Task(podName,
59+
podsfake.Step("step-first", "first-log"),
60+
podsfake.Step("step-second", "second-log"),
61+
),
62+
)),
63+
task: "task",
64+
}
65+
66+
podC := make(chan string, 1)
67+
podC <- podName
68+
close(podC)
69+
70+
logC, errC := reader.readPodLogs(podC, nil, false, false)
71+
72+
var logs []Log
73+
var errs []error
74+
for logC != nil || errC != nil {
75+
select {
76+
case l, ok := <-logC:
77+
if !ok {
78+
logC = nil
79+
continue
80+
}
81+
logs = append(logs, l)
82+
case err, ok := <-errC:
83+
if !ok {
84+
errC = nil
85+
continue
86+
}
87+
errs = append(errs, err)
88+
}
89+
}
90+
91+
if len(logs) != 2 {
92+
t.Fatalf("expected first step log and EOF only, got %#v", logs)
93+
}
94+
if logs[0].Step != "first" || logs[0].Log != "first-log" {
95+
t.Fatalf("unexpected first log: %#v", logs[0])
96+
}
97+
if logs[1].Step != "first" || logs[1].Log != "EOFLOG" {
98+
t.Fatalf("unexpected EOF log: %#v", logs[1])
99+
}
100+
if len(errs) != 1 {
101+
t.Fatalf("expected one error, got %#v", errs)
102+
}
103+
if errs[0] == nil || errs[0].Error() == "" {
104+
t.Fatalf("expected non-empty error, got %#v", errs[0])
105+
}
106+
}

pkg/pipelinerun/tracker.go

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package pipelinerun
1717
import (
1818
"context"
1919
"errors"
20+
"fmt"
2021
"sync"
2122
"time"
2223

@@ -27,6 +28,7 @@ import (
2728
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
2829
informers "github.com/tektoncd/pipeline/pkg/client/informers/externalversions"
2930
corev1 "k8s.io/api/core/v1"
31+
apierrors "k8s.io/apimachinery/pkg/api/errors"
3032
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3133
"k8s.io/apimachinery/pkg/fields"
3234
"k8s.io/client-go/tools/cache"
@@ -223,13 +225,21 @@ func GetTaskRunsWithStatus(pr *v1.PipelineRun, c *cli.Clients, ns string) (map[s
223225
return map[string]*v1.PipelineRunTaskRunStatus{}, nil
224226
}
225227

228+
taskRunsByName, err := getTaskRunsByPipelineRun(pr.Name, c, ns)
229+
if err != nil && !canFallbackToTaskRunGet(err) {
230+
return nil, err
231+
}
232+
226233
trStatuses := make(map[string]*v1.PipelineRunTaskRunStatus)
227234
for _, cr := range pr.Status.ChildReferences {
228235
//TODO: Needs to handle Run, CustomRun later
229236
if cr.Kind == "TaskRun" {
230-
tr, err := taskrunpkg.GetTaskRun(taskrunGroupResource, c, cr.Name, ns)
231-
if err != nil {
232-
return nil, err
237+
tr, ok := taskRunsByName[cr.Name]
238+
if !ok {
239+
tr, err = taskrunpkg.GetTaskRun(taskrunGroupResource, c, cr.Name, ns)
240+
if err != nil {
241+
return nil, err
242+
}
233243
}
234244

235245
trStatuses[cr.Name] = &v1.PipelineRunTaskRunStatus{
@@ -242,3 +252,23 @@ func GetTaskRunsWithStatus(pr *v1.PipelineRun, c *cli.Clients, ns string) (map[s
242252

243253
return trStatuses, nil
244254
}
255+
256+
func getTaskRunsByPipelineRun(prName string, c *cli.Clients, ns string) (map[string]*v1.TaskRun, error) {
257+
var taskRuns v1.TaskRunList
258+
if err := actions.ListV1(taskrunGroupResource, c, metav1.ListOptions{
259+
LabelSelector: fmt.Sprintf("tekton.dev/pipelineRun=%s", prName),
260+
}, ns, &taskRuns); err != nil {
261+
return nil, err
262+
}
263+
264+
taskRunsByName := make(map[string]*v1.TaskRun, len(taskRuns.Items))
265+
for i := range taskRuns.Items {
266+
taskRunsByName[taskRuns.Items[i].Name] = &taskRuns.Items[i]
267+
}
268+
269+
return taskRunsByName, nil
270+
}
271+
272+
func canFallbackToTaskRunGet(err error) bool {
273+
return apierrors.IsForbidden(err) || apierrors.IsUnauthorized(err)
274+
}

pkg/pipelinerun/tracker_test.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
pipelinetest "github.com/tektoncd/pipeline/test"
3434
"github.com/tektoncd/pipeline/test/diff"
3535
corev1 "k8s.io/api/core/v1"
36+
apierrors "k8s.io/apimachinery/pkg/api/errors"
3637
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3738
"k8s.io/apimachinery/pkg/runtime"
3839
"k8s.io/apimachinery/pkg/watch"
@@ -266,3 +267,75 @@ func TestTracker_watchErrorHandler(t *testing.T) {
266267
})
267268
}
268269
}
270+
271+
func TestGetTaskRunsWithStatus_fallsBackWhenListForbidden(t *testing.T) {
272+
const (
273+
ns = "namespace"
274+
prName = "output-pipeline-1"
275+
trName = "output-task-1"
276+
task = "output-task-1"
277+
pod = "output-task-1-pod"
278+
)
279+
280+
pr := &v1.PipelineRun{
281+
ObjectMeta: metav1.ObjectMeta{
282+
Name: prName,
283+
Namespace: ns,
284+
},
285+
Status: v1.PipelineRunStatus{
286+
PipelineRunStatusFields: v1.PipelineRunStatusFields{
287+
ChildReferences: []v1.ChildStatusReference{{
288+
Name: trName,
289+
PipelineTaskName: task,
290+
TypeMeta: runtime.TypeMeta{
291+
APIVersion: "tekton.dev/v1",
292+
Kind: "TaskRun",
293+
},
294+
}},
295+
},
296+
},
297+
}
298+
tr := &v1.TaskRun{
299+
ObjectMeta: metav1.ObjectMeta{
300+
Name: trName,
301+
Namespace: ns,
302+
},
303+
Status: v1.TaskRunStatus{
304+
TaskRunStatusFields: v1.TaskRunStatusFields{
305+
PodName: pod,
306+
},
307+
},
308+
}
309+
310+
cs, _ := test.SeedTestData(t, pipelinetest.Data{TaskRuns: []*v1.TaskRun{tr}})
311+
cs.Pipeline.Resources = cb.APIResourceList("v1", []string{"taskrun", "pipelinerun"})
312+
313+
tdc := testDynamic.Options{
314+
PrependReactors: []testDynamic.PrependOpt{{
315+
Verb: "list",
316+
Resource: "taskruns",
317+
Action: func(action k8stest.Action) (bool, runtime.Object, error) {
318+
return true, nil, apierrors.NewForbidden(taskrunGroupResource.GroupResource(), "", errors.New("forbidden"))
319+
},
320+
}},
321+
}
322+
dynamic, err := tdc.Client(cb.UnstructuredTR(tr, "v1"))
323+
if err != nil {
324+
t.Fatalf("unable to create dynamic client: %v", err)
325+
}
326+
327+
clients := &cli.Clients{
328+
Tekton: cs.Pipeline,
329+
Kube: cs.Kube,
330+
Dynamic: dynamic,
331+
}
332+
333+
trStatuses, err := GetTaskRunsWithStatus(pr, clients, ns)
334+
if err != nil {
335+
t.Fatalf("unexpected error: %v", err)
336+
}
337+
338+
if trStatuses[trName] == nil || trStatuses[trName].Status == nil || trStatuses[trName].Status.PodName != pod {
339+
t.Fatalf("unexpected taskrun statuses: %#v", trStatuses)
340+
}
341+
}

pkg/pods/container.go

Lines changed: 49 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -30,43 +30,7 @@ type Container struct {
3030
}
3131

3232
func (c *Container) Status() error {
33-
pod, err := c.pod.Get()
34-
if err != nil {
35-
return err
36-
}
37-
38-
container := c.name
39-
for _, cs := range pod.Status.ContainerStatuses {
40-
if cs.Name != container {
41-
continue
42-
}
43-
44-
if cs.State.Terminated != nil && cs.State.Terminated.ExitCode == 1 {
45-
msg := ""
46-
47-
if cs.State.Terminated.Reason != "" && cs.State.Terminated.Reason != "Error" {
48-
msg = msg + " : " + cs.State.Terminated.Reason
49-
}
50-
51-
if cs.State.Terminated.Message != "" && cs.State.Terminated.Message != "Error" {
52-
msg = msg + " : " + cs.State.Terminated.Message
53-
}
54-
55-
return fmt.Errorf("container %s has failed %s", container, msg)
56-
}
57-
}
58-
59-
for _, cs := range pod.Status.InitContainerStatuses {
60-
if cs.Name != container {
61-
continue
62-
}
63-
64-
if cs.State.Terminated != nil && cs.State.Terminated.ExitCode == 1 {
65-
return fmt.Errorf("container %s has failed: %s", container, cs.State.Terminated.Reason)
66-
}
67-
}
68-
69-
return nil
33+
return c.pod.CheckFailedContainers([]string{c.name})
7034
}
7135

7236
// Log represents one log message from a pod
@@ -128,3 +92,51 @@ func (lr *LogReader) Read() (<-chan Log, <-chan error, error) {
12892

12993
return logC, errC, nil
13094
}
95+
96+
func (p *Pod) CheckFailedContainers(containerNames []string) error {
97+
pod, err := p.Get()
98+
if err != nil {
99+
return err
100+
}
101+
102+
return CheckFailedContainers(pod, containerNames)
103+
}
104+
105+
func CheckFailedContainers(pod *corev1.Pod, containerNames []string) error {
106+
containerSet := map[string]struct{}{}
107+
for _, containerName := range containerNames {
108+
containerSet[containerName] = struct{}{}
109+
}
110+
111+
for _, cs := range pod.Status.ContainerStatuses {
112+
if _, ok := containerSet[cs.Name]; !ok {
113+
continue
114+
}
115+
116+
if cs.State.Terminated != nil && cs.State.Terminated.ExitCode != 0 {
117+
msg := ""
118+
119+
if cs.State.Terminated.Reason != "" && cs.State.Terminated.Reason != "Error" {
120+
msg += " : " + cs.State.Terminated.Reason
121+
}
122+
123+
if cs.State.Terminated.Message != "" && cs.State.Terminated.Message != "Error" {
124+
msg += " : " + cs.State.Terminated.Message
125+
}
126+
127+
return fmt.Errorf("container %s has failed %s", cs.Name, msg)
128+
}
129+
}
130+
131+
for _, cs := range pod.Status.InitContainerStatuses {
132+
if _, ok := containerSet[cs.Name]; !ok {
133+
continue
134+
}
135+
136+
if cs.State.Terminated != nil && cs.State.Terminated.ExitCode != 0 {
137+
return fmt.Errorf("container %s has failed: %s", cs.Name, cs.State.Terminated.Reason)
138+
}
139+
}
140+
141+
return nil
142+
}

0 commit comments

Comments
 (0)