diff --git a/internal/sidecarlogresults/sidecarlogresults.go b/internal/sidecarlogresults/sidecarlogresults.go new file mode 100644 index 00000000000..23cc782f30f --- /dev/null +++ b/internal/sidecarlogresults/sidecarlogresults.go @@ -0,0 +1,96 @@ +/* +Copyright 2019 The Tekton Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sidecarlogresults + +import ( + "bufio" + "context" + "encoding/json" + "errors" + "fmt" + "io" + + "github.com/tektoncd/pipeline/pkg/apis/config" + "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" +) + +// ErrorReasonMaxResultSizeExceeded indicates that the result exceeded its maximum allowed size +// var ErrorReasonMaxResultSizeExceeded = fmt.Errorf("%s", "MaxResultSizeExceeded") +var ErrorReasonMaxResultSizeExceeded = "MaxResultSizeExceeded" + +// SidecarLogResult holds fields for storing extracted results +type SidecarLogResult struct { + Name string + Value string +} + +// GetResultsFromSidecarLogs extracts results from the logs of the results sidecar +func GetResultsFromSidecarLogs(ctx context.Context, clientset kubernetes.Interface, namespace string, name string, container string) ([]v1beta1.PipelineResourceResult, error) { + sidecarLogResults := []v1beta1.PipelineResourceResult{} + p, _ := clientset.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{}) + if p.Status.Phase == corev1.PodPending { + return sidecarLogResults, nil + } + podLogOpts := corev1.PodLogOptions{Container: container} + req := clientset.CoreV1().Pods(namespace).GetLogs(name, &podLogOpts) + sidecarLogs, err := req.Stream(ctx) + if err != nil { + return sidecarLogResults, err + } + defer sidecarLogs.Close() + maxResultLimit := config.FromContextOrDefaults(ctx).FeatureFlags.MaxResultSize + return extractResultsFromLogs(sidecarLogs, sidecarLogResults, maxResultLimit) +} + +func extractResultsFromLogs(logs io.Reader, sidecarLogResults []v1beta1.PipelineResourceResult, maxResultLimit int) ([]v1beta1.PipelineResourceResult, error) { + scanner := bufio.NewScanner(logs) + buf := make([]byte, maxResultLimit) + scanner.Buffer(buf, maxResultLimit) + for scanner.Scan() { + result, err := parseResults(scanner.Bytes(), maxResultLimit) + if err != nil { + return nil, err + } + sidecarLogResults = append(sidecarLogResults, result) + } + + if err := scanner.Err(); err != nil { + if errors.Is(err, bufio.ErrTooLong) { + return sidecarLogResults, fmt.Errorf("%s", ErrorReasonMaxResultSizeExceeded) + } + return nil, err + } + return sidecarLogResults, nil +} + +func parseResults(resultBytes []byte, maxResultLimit int) (v1beta1.PipelineResourceResult, error) { + result := v1beta1.PipelineResourceResult{} + if len(resultBytes) > maxResultLimit { + return result, fmt.Errorf("%s", ErrorReasonMaxResultSizeExceeded) + } + + var res SidecarLogResult + if err := json.Unmarshal(resultBytes, &res); err != nil { + return result, fmt.Errorf("Invalid result %w", err) + } + result = v1beta1.PipelineResourceResult{ + Key: res.Name, + Value: res.Value, + ResultType: v1beta1.TaskRunResultType, + } + return result, nil +} diff --git a/internal/sidecarlogresults/sidecarlogresults_test.go b/internal/sidecarlogresults/sidecarlogresults_test.go new file mode 100644 index 00000000000..f889b6675fe --- /dev/null +++ b/internal/sidecarlogresults/sidecarlogresults_test.go @@ -0,0 +1,135 @@ +package sidecarlogresults + +import ( + "encoding/json" + "fmt" + "strings" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" + "github.com/tektoncd/pipeline/test/diff" +) + +func TestExtractResultsFromLogs(t *testing.T) { + inputResults := []SidecarLogResult{ + { + Name: "result1", + Value: "foo", + }, { + Name: "result2", + Value: "bar", + }, + } + podLogs := "" + for _, r := range inputResults { + res, _ := json.Marshal(&r) + podLogs = fmt.Sprintf("%s%s\n", podLogs, string(res)) + } + logs := strings.NewReader(podLogs) + + results, err := extractResultsFromLogs(logs, []v1beta1.PipelineResourceResult{}, 4096) + if err != nil { + t.Error(err) + } + want := []v1beta1.PipelineResourceResult{ + { + Key: "result1", + Value: "foo", + ResultType: v1beta1.TaskRunResultType, + }, { + Key: "result2", + Value: "bar", + ResultType: v1beta1.TaskRunResultType, + }, + } + if d := cmp.Diff(want, results); d != "" { + t.Fatal(diff.PrintWantGot(d)) + } +} + +func TestExtractResultsFromLogs_Failure(t *testing.T) { + inputResults := []SidecarLogResult{ + { + Name: "result1", + Value: strings.Repeat("v", 4098), + }, + } + podLogs := "" + for _, r := range inputResults { + res, _ := json.Marshal(&r) + podLogs = fmt.Sprintf("%s%s\n", podLogs, string(res)) + } + logs := strings.NewReader(podLogs) + + _, err := extractResultsFromLogs(logs, []v1beta1.PipelineResourceResult{}, 4096) + if err.Error() != ErrorReasonMaxResultSizeExceeded { + t.Fatal(fmt.Sprintf("Expexted error %v but got %v", ErrorReasonMaxResultSizeExceeded, err)) + } +} + +func TestParseResults(t *testing.T) { + results := []SidecarLogResult{ + { + Name: "result1", + Value: "foo", + }, { + Name: "result2", + Value: `{"IMAGE_URL":"ar.com", "IMAGE_DIGEST":"sha234"}`, + }, { + Name: "result3", + Value: `["hello","world"]`, + }, + } + podLogs := []string{} + for _, r := range results { + res, _ := json.Marshal(&r) + podLogs = append(podLogs, string(res)) + } + want := []v1beta1.PipelineResourceResult{{ + Key: "result1", + Value: "foo", + ResultType: 1, + }, { + Key: "result2", + Value: `{"IMAGE_URL":"ar.com", "IMAGE_DIGEST":"sha234"}`, + ResultType: 1, + }, { + Key: "result3", + Value: `["hello","world"]`, + ResultType: 1, + }} + stepResults := []v1beta1.PipelineResourceResult{} + for _, plog := range podLogs { + res, err := parseResults([]byte(plog), 4096) + if err != nil { + t.Error(err) + } + stepResults = append(stepResults, res) + } + if d := cmp.Diff(want, stepResults); d != "" { + t.Fatal(diff.PrintWantGot(d)) + } +} + +func TestParseResults_Failure(t *testing.T) { + result := SidecarLogResult{ + Name: "result2", + Value: strings.Repeat("k", 4098), + } + res1, _ := json.Marshal("result1 v1") + res2, _ := json.Marshal(&result) + podLogs := []string{string(res1), string(res2)} + want := []string{ + "Invalid result json: cannot unmarshal string into Go value of type sidecarlogresults.SidecarLogResult", + ErrorReasonMaxResultSizeExceeded, + } + got := []string{} + for _, plog := range podLogs { + _, err := parseResults([]byte(plog), 4096) + got = append(got, err.Error()) + } + if d := cmp.Diff(want, got); d != "" { + t.Fatal(diff.PrintWantGot(d)) + } +} diff --git a/pkg/apis/pipeline/reservedsidecar.go b/pkg/apis/pipeline/reservedsidecar.go new file mode 100644 index 00000000000..970421f21a6 --- /dev/null +++ b/pkg/apis/pipeline/reservedsidecar.go @@ -0,0 +1,18 @@ +/* +Copyright 2019 The Tekton Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pipeline + +// ReservedResultsSidecarName is the name of the results sidecar that outputs the results to stdout +// when the results-from feature-flag is set to "sidecar-logs". +const ReservedResultsSidecarName = "tekton-log-results" diff --git a/pkg/apis/pipeline/v1beta1/taskrun_types.go b/pkg/apis/pipeline/v1beta1/taskrun_types.go index 8c70b63f30c..c107f39761b 100644 --- a/pkg/apis/pipeline/v1beta1/taskrun_types.go +++ b/pkg/apis/pipeline/v1beta1/taskrun_types.go @@ -181,6 +181,8 @@ const ( TaskRunReasonsResultsVerificationFailed TaskRunReason = "TaskRunResultsVerificationFailed" // AwaitingTaskRunResults is the reason set when waiting upon `TaskRun` results and signatures to verify AwaitingTaskRunResults TaskRunReason = "AwaitingTaskRunResults" + // TaskRunReasonResultLargerThanAllowedLimit is the reason set when one of the results exceeds its maximum allowed limit of 1 KB + TaskRunReasonResultLargerThanAllowedLimit TaskRunReason = "TaskRunResultLargerThanAllowedLimit" ) func (t TaskRunReason) String() string { diff --git a/pkg/entrypoint/entrypointer.go b/pkg/entrypoint/entrypointer.go index 5fc8c59f429..9dab69fbeed 100644 --- a/pkg/entrypoint/entrypointer.go +++ b/pkg/entrypoint/entrypointer.go @@ -29,6 +29,7 @@ import ( "strings" "time" + "github.com/tektoncd/pipeline/pkg/apis/config" "github.com/tektoncd/pipeline/pkg/apis/pipeline" "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" "github.com/tektoncd/pipeline/pkg/spire" @@ -85,6 +86,8 @@ type Entrypointer struct { SpireWorkloadAPI spire.EntrypointerAPIClient // ResultsDirectory is the directory to find results, defaults to pipeline.DefaultResultPath ResultsDirectory string + // ResultExtractionMethod is the method using which the controller extracts the results from the task pod. + ResultExtractionMethod string } // Waiter encapsulates waiting for files to exist. @@ -231,7 +234,7 @@ func (e Entrypointer) readResultsFromDisk(ctx context.Context, resultDir string) } // push output to termination path - if len(output) != 0 { + if e.ResultExtractionMethod == config.ResultExtractionMethodTerminationMessage && len(output) != 0 { if err := termination.WriteMessage(e.TerminationPath, output); err != nil { return err } diff --git a/pkg/entrypoint/entrypointer_test.go b/pkg/entrypoint/entrypointer_test.go index fe69776755f..45ab39e30fb 100644 --- a/pkg/entrypoint/entrypointer_test.go +++ b/pkg/entrypoint/entrypointer_test.go @@ -31,6 +31,7 @@ import ( "time" "github.com/google/go-cmp/cmp" + "github.com/tektoncd/pipeline/pkg/apis/config" "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" "github.com/tektoncd/pipeline/pkg/spire" "github.com/tektoncd/pipeline/pkg/termination" @@ -315,8 +316,9 @@ func TestReadResultsFromDisk(t *testing.T) { } e := Entrypointer{ - Results: resultsFilePath, - TerminationPath: terminationPath, + Results: resultsFilePath, + TerminationPath: terminationPath, + ResultExtractionMethod: config.ResultExtractionMethodTerminationMessage, } if err := e.readResultsFromDisk(ctx, ""); err != nil { t.Fatal(err) @@ -544,19 +546,20 @@ func TestEntrypointerResults(t *testing.T) { } err := Entrypointer{ - Command: append([]string{c.entrypoint}, c.args...), - WaitFiles: c.waitFiles, - PostFile: c.postFile, - Waiter: fw, - Runner: fr, - PostWriter: fpw, - Results: results, - ResultsDirectory: resultsDir, - TerminationPath: terminationPath, - Timeout: &timeout, - BreakpointOnFailure: c.breakpointOnFailure, - StepMetadataDir: c.stepDir, - SpireWorkloadAPI: signClient, + Command: append([]string{c.entrypoint}, c.args...), + WaitFiles: c.waitFiles, + PostFile: c.postFile, + Waiter: fw, + Runner: fr, + PostWriter: fpw, + Results: results, + ResultsDirectory: resultsDir, + ResultExtractionMethod: config.ResultExtractionMethodTerminationMessage, + TerminationPath: terminationPath, + Timeout: &timeout, + BreakpointOnFailure: c.breakpointOnFailure, + StepMetadataDir: c.stepDir, + SpireWorkloadAPI: signClient, }.Go() if err != nil { t.Fatalf("Entrypointer failed: %v", err) diff --git a/pkg/pod/entrypoint.go b/pkg/pod/entrypoint.go index 7a28eff6143..014a4c81707 100644 --- a/pkg/pod/entrypoint.go +++ b/pkg/pod/entrypoint.go @@ -26,6 +26,7 @@ import ( "strconv" "strings" + "github.com/tektoncd/pipeline/pkg/apis/config" "github.com/tektoncd/pipeline/pkg/apis/pipeline" "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" "gomodules.xyz/jsonpatch/v2" @@ -247,6 +248,13 @@ func StopSidecars(ctx context.Context, nopImage string, kubeclient kubernetes.In updated := false if newPod.Status.Phase == corev1.PodRunning { for _, s := range newPod.Status.ContainerStatuses { + // If the results-from is set to sidecar logs, + // a sidecar container with name `sidecar-log-results` is injected by the reconiler. + // Do not kill this sidecar. Let it exit gracefully. + resultsSidecarName := fmt.Sprintf("sidecar-%v", pipeline.ReservedResultsSidecarName) + if config.FromContextOrDefaults(ctx).FeatureFlags.ResultExtractionMethod == config.ResultExtractionMethodSidecarLogs && s.Name == resultsSidecarName { + continue + } // Stop any running container that isn't a step. // An injected sidecar container might not have the // "sidecar-" prefix, so we can't just look for that diff --git a/pkg/pod/entrypoint_test.go b/pkg/pod/entrypoint_test.go index a18c8838e27..4e0845e075d 100644 --- a/pkg/pod/entrypoint_test.go +++ b/pkg/pod/entrypoint_test.go @@ -19,10 +19,13 @@ package pod import ( "context" "errors" + "fmt" "testing" "time" "github.com/google/go-cmp/cmp" + "github.com/tektoncd/pipeline/pkg/apis/config" + "github.com/tektoncd/pipeline/pkg/apis/pipeline" "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" "github.com/tektoncd/pipeline/test/diff" corev1 "k8s.io/api/core/v1" @@ -102,6 +105,74 @@ func TestOrderContainers(t *testing.T) { } } +func TestOrderContainersWithResultsSidecarLogs(t *testing.T) { + steps := []corev1.Container{{ + Image: "step-1", + Command: []string{"cmd"}, + Args: []string{"arg1", "arg2"}, + }, { + Image: "step-2", + Command: []string{"cmd1", "cmd2", "cmd3"}, // multiple cmd elements + Args: []string{"arg1", "arg2"}, + VolumeMounts: []corev1.VolumeMount{volumeMount}, // pre-existing volumeMount + }, { + Image: "step-3", + Command: []string{"cmd"}, + Args: []string{"arg1", "arg2"}, + }} + want := []corev1.Container{{ + Image: "step-1", + Command: []string{entrypointBinary}, + Args: []string{ + "-wait_file", "/tekton/downward/ready", + "-wait_file_content", + "-post_file", "/tekton/run/0/out", + "-termination_path", "/tekton/termination", + "-step_metadata_dir", "/tekton/run/0/status", + "-dont_send_results_to_termination_path", + "-entrypoint", "cmd", "--", + "arg1", "arg2", + }, + VolumeMounts: []corev1.VolumeMount{downwardMount}, + TerminationMessagePath: "/tekton/termination", + }, { + Image: "step-2", + Command: []string{entrypointBinary}, + Args: []string{ + "-wait_file", "/tekton/run/0/out", + "-post_file", "/tekton/run/1/out", + "-termination_path", "/tekton/termination", + "-step_metadata_dir", "/tekton/run/1/status", + "-dont_send_results_to_termination_path", + "-entrypoint", "cmd1", "--", + "cmd2", "cmd3", + "arg1", "arg2", + }, + VolumeMounts: []corev1.VolumeMount{volumeMount}, + TerminationMessagePath: "/tekton/termination", + }, { + Image: "step-3", + Command: []string{entrypointBinary}, + Args: []string{ + "-wait_file", "/tekton/run/1/out", + "-post_file", "/tekton/run/2/out", + "-termination_path", "/tekton/termination", + "-step_metadata_dir", "/tekton/run/2/status", + "-dont_send_results_to_termination_path", + "-entrypoint", "cmd", "--", + "arg1", "arg2", + }, + TerminationMessagePath: "/tekton/termination", + }} + got, err := orderContainers([]string{"-dont_send_results_to_termination_path"}, steps, nil, nil, true) + if err != nil { + t.Fatalf("orderContainers: %v", err) + } + if d := cmp.Diff(want, got); d != "" { + t.Errorf("Diff %s", diff.PrintWantGot(d)) + } +} + func TestOrderContainersWithNoWait(t *testing.T) { steps := []corev1.Container{{ Image: "step-1", @@ -651,11 +722,23 @@ func TestStopSidecars(t *testing.T) { Name: injectedSidecar.Name, Image: nopImage, } + // This is a container that is added by the controller for accessing sidecar logs. + // This should not be stopped as long as results-from is set to sidecar-logs. + resultsSidecar := corev1.Container{ + Name: fmt.Sprintf("sidecar-%v", pipeline.ReservedResultsSidecarName), + Image: "original-injected-image", + } + // This container can be stopped if the results-from is not set to sidecar-logs. + stoppedResultsSidecar := corev1.Container{ + Name: fmt.Sprintf("sidecar-%v", pipeline.ReservedResultsSidecarName), + Image: nopImage, + } for _, c := range []struct { - desc string - pod corev1.Pod - wantContainers []corev1.Container + desc string + pod corev1.Pod + resultExtractionMethod string + wantContainers []corev1.Container }{{ desc: "Running sidecars (incl injected) should be stopped", pod: corev1.Pod{ @@ -681,6 +764,57 @@ func TestStopSidecars(t *testing.T) { }, }, wantContainers: []corev1.Container{stepContainer, stoppedSidecarContainer, stoppedInjectedSidecar}, + }, { + desc: "Results Sidecar should not be stopped", + pod: corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{stepContainer, sidecarContainer, resultsSidecar}, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + ContainerStatuses: []corev1.ContainerStatus{{ + // Step state doesn't matter. + }, { + Name: sidecarContainer.Name, + // Sidecar is running. + State: corev1.ContainerState{Running: &corev1.ContainerStateRunning{StartedAt: metav1.NewTime(time.Now())}}, + }, { + Name: resultsSidecar.Name, + // Results sidecar is running. + State: corev1.ContainerState{Running: &corev1.ContainerStateRunning{StartedAt: metav1.NewTime(time.Now())}}, + }}, + }, + }, + resultExtractionMethod: "sidecar-logs", + wantContainers: []corev1.Container{stepContainer, stoppedSidecarContainer, resultsSidecar}, + }, { + desc: "Results Sidecar should be stopped result method is not sidecar logs", + pod: corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{stepContainer, sidecarContainer, resultsSidecar}, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + ContainerStatuses: []corev1.ContainerStatus{{ + // Step state doesn't matter. + }, { + Name: sidecarContainer.Name, + // Sidecar is running. + State: corev1.ContainerState{Running: &corev1.ContainerStateRunning{StartedAt: metav1.NewTime(time.Now())}}, + }, { + Name: resultsSidecar.Name, + // Results sidecar is running. + State: corev1.ContainerState{Running: &corev1.ContainerStateRunning{StartedAt: metav1.NewTime(time.Now())}}, + }}, + }, + }, + wantContainers: []corev1.Container{stepContainer, stoppedSidecarContainer, stoppedResultsSidecar}, }, { desc: "Pending Pod should not be updated", pod: corev1.Pod{ @@ -722,6 +856,13 @@ func TestStopSidecars(t *testing.T) { }} { t.Run(c.desc, func(t *testing.T) { ctx := context.Background() + if c.resultExtractionMethod != "" { + ctx = config.ToContext(ctx, &config.Config{ + FeatureFlags: &config.FeatureFlags{ + ResultExtractionMethod: c.resultExtractionMethod, + }, + }) + } ctx, cancel := context.WithCancel(ctx) defer cancel() kubeclient := fakek8s.NewSimpleClientset(&c.pod) diff --git a/pkg/pod/pod.go b/pkg/pod/pod.go index 30bf9d8fa1c..c48243e917b 100644 --- a/pkg/pod/pod.go +++ b/pkg/pod/pod.go @@ -23,6 +23,7 @@ import ( "math" "path/filepath" "strconv" + "strings" "github.com/tektoncd/pipeline/pkg/apis/config" "github.com/tektoncd/pipeline/pkg/apis/pipeline" @@ -119,6 +120,7 @@ func (b *Builder) Build(ctx context.Context, taskRun *v1beta1.TaskRun, taskSpec implicitEnvVars := []corev1.EnvVar{} featureFlags := config.FromContextOrDefaults(ctx).FeatureFlags alphaAPIEnabled := featureFlags.EnableAPIFields == config.AlphaAPIFields + sidecarLogsResultsEnabled := config.FromContextOrDefaults(ctx).FeatureFlags.ResultExtractionMethod == config.ResultExtractionMethodSidecarLogs // Add our implicit volumes first, so they can be overridden by the user if they prefer. volumes = append(volumes, implicitVolumes...) @@ -127,10 +129,12 @@ func (b *Builder) Build(ctx context.Context, taskRun *v1beta1.TaskRun, taskSpec // Create Volumes and VolumeMounts for any credentials found in annotated // Secrets, along with any arguments needed by Step entrypoints to process // those secrets. + commonExtraEntrypointArgs := []string{} credEntrypointArgs, credVolumes, credVolumeMounts, err := credsInit(ctx, taskRun.Spec.ServiceAccountName, taskRun.Namespace, b.KubeClient) if err != nil { return nil, err } + commonExtraEntrypointArgs = append(commonExtraEntrypointArgs, credEntrypointArgs...) volumes = append(volumes, credVolumes...) volumeMounts = append(volumeMounts, credVolumeMounts...) @@ -147,6 +151,12 @@ func (b *Builder) Build(ctx context.Context, taskRun *v1beta1.TaskRun, taskSpec if alphaAPIEnabled && taskRun.Spec.ComputeResources != nil { tasklevel.ApplyTaskLevelComputeResources(steps, taskRun.Spec.ComputeResources) } + if sidecarLogsResultsEnabled && taskSpec.Results != nil { + // create a results sidecar + resultsSidecar := createResultsSidecar(taskSpec, b.Images.SidecarLogResultsImage) + taskSpec.Sidecars = append(taskSpec.Sidecars, resultsSidecar) + commonExtraEntrypointArgs = append(commonExtraEntrypointArgs, "-result_extraction_method", config.ResultExtractionMethodSidecarLogs) + } sidecars, err := v1beta1.MergeSidecarsWithOverrides(taskSpec.Sidecars, taskRun.Spec.SidecarOverrides) if err != nil { return nil, err @@ -192,9 +202,9 @@ func (b *Builder) Build(ctx context.Context, taskRun *v1beta1.TaskRun, taskSpec readyImmediately := isPodReadyImmediately(*featureFlags, taskSpec.Sidecars) if alphaAPIEnabled { - stepContainers, err = orderContainers(credEntrypointArgs, stepContainers, &taskSpec, taskRun.Spec.Debug, !readyImmediately) + stepContainers, err = orderContainers(commonExtraEntrypointArgs, stepContainers, &taskSpec, taskRun.Spec.Debug, !readyImmediately) } else { - stepContainers, err = orderContainers(credEntrypointArgs, stepContainers, &taskSpec, nil, !readyImmediately) + stepContainers, err = orderContainers(commonExtraEntrypointArgs, stepContainers, &taskSpec, nil, !readyImmediately) } if err != nil { return nil, err @@ -259,6 +269,28 @@ func (b *Builder) Build(ctx context.Context, taskRun *v1beta1.TaskRun, taskSpec stepContainers[i].VolumeMounts = vms } + if sidecarLogsResultsEnabled && taskSpec.Results != nil { + // Mount implicit volumes onto sidecarContainers + // so that they can access /tekton/results and /tekton/run. + for i, s := range sidecarContainers { + for j := 0; j < len(stepContainers); j++ { + s.VolumeMounts = append(s.VolumeMounts, runMount(j, true)) + } + requestedVolumeMounts := map[string]bool{} + for _, vm := range s.VolumeMounts { + requestedVolumeMounts[filepath.Clean(vm.MountPath)] = true + } + var toAdd []corev1.VolumeMount + for _, imp := range volumeMounts { + if !requestedVolumeMounts[filepath.Clean(imp.MountPath)] { + toAdd = append(toAdd, imp) + } + } + vms := append(s.VolumeMounts, toAdd...) //nolint + sidecarContainers[i].VolumeMounts = vms + } + } + // This loop: // - sets container name to add "step-" prefix or "step-unnamed-#" if not specified. // TODO(#1605): Remove this loop and make each transformation in @@ -435,3 +467,18 @@ func entrypointInitContainer(image string, steps []v1beta1.Step) corev1.Containe } return prepareInitContainer } + +// createResultsSidecar creates a sidecar that will run the sidecarlogresults binary. +func createResultsSidecar(taskSpec v1beta1.TaskSpec, image string) v1beta1.Sidecar { + names := make([]string, 0, len(taskSpec.Results)) + for _, r := range taskSpec.Results { + names = append(names, r.Name) + } + resultsStr := strings.Join(names, ",") + command := []string{"/ko-app/sidecarlogresults", "-results-dir", pipeline.DefaultResultPath, "-result-names", resultsStr} + return v1beta1.Sidecar{ + Name: pipeline.ReservedResultsSidecarName, + Image: image, + Command: command, + } +} diff --git a/pkg/pod/pod_test.go b/pkg/pod/pod_test.go index 7e5a7817d74..2d909afb790 100644 --- a/pkg/pod/pod_test.go +++ b/pkg/pod/pod_test.go @@ -38,6 +38,7 @@ import ( "github.com/tektoncd/pipeline/test/diff" "github.com/tektoncd/pipeline/test/names" corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" fakek8s "k8s.io/client-go/kubernetes/fake" @@ -1786,6 +1787,76 @@ _EOF_ }}, ActiveDeadlineSeconds: &defaultActiveDeadlineSeconds, }, + }, { + desc: "sidecar logs enabled", + featureFlags: map[string]string{"results-from": "sidecar-logs"}, + ts: v1beta1.TaskSpec{ + Results: []v1beta1.TaskResult{{ + Name: "foo", + Type: v1beta1.ResultsTypeString, + }}, + Steps: []v1beta1.Step{{ + Name: "name", + Image: "image", + Command: []string{"cmd"}, // avoid entrypoint lookup. + }}, + }, + want: &corev1.PodSpec{ + RestartPolicy: corev1.RestartPolicyNever, + InitContainers: []corev1.Container{ + entrypointInitContainer(images.EntrypointImage, []v1beta1.Step{{Name: "name"}}), + }, + Containers: []corev1.Container{{ + Name: "step-name", + Image: "image", + Command: []string{"/tekton/bin/entrypoint"}, + Args: []string{ + "-wait_file", + "/tekton/downward/ready", + "-wait_file_content", + "-post_file", + "/tekton/run/0/out", + "-termination_path", + "/tekton/termination", + "-step_metadata_dir", + "/tekton/run/0/status", + "-result_extraction_method", + "sidecar-logs", + "-results", + "foo", + "-entrypoint", + "cmd", + "--", + }, + VolumeMounts: append([]corev1.VolumeMount{binROMount, runMount(0, false), downwardMount, { + Name: "tekton-creds-init-home-0", + MountPath: "/tekton/creds", + }}, implicitVolumeMounts...), + TerminationMessagePath: "/tekton/termination", + }, { + Name: fmt.Sprintf("sidecar-%s", pipeline.ReservedResultsSidecarName), + Image: "", + Command: []string{ + "/ko-app/sidecarlogresults", + "-results-dir", + "/tekton/results", + "-result-names", + "foo", + }, + Resources: corev1.ResourceRequirements{ + Requests: nil, + }, + VolumeMounts: append([]v1.VolumeMount{ + {Name: "tekton-internal-bin", ReadOnly: true, MountPath: "/tekton/bin"}, + {Name: "tekton-internal-run-0", ReadOnly: true, MountPath: "/tekton/run/0"}, + }, implicitVolumeMounts...), + }}, + Volumes: append(implicitVolumes, binVolume, runVolume(0), downwardVolume, corev1.Volume{ + Name: "tekton-creds-init-home-0", + VolumeSource: corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{Medium: corev1.StorageMediumMemory}}, + }), + ActiveDeadlineSeconds: &defaultActiveDeadlineSeconds, + }, }} { t.Run(c.desc, func(t *testing.T) { names.TestingSeed() diff --git a/pkg/pod/status.go b/pkg/pod/status.go index 0a0894981eb..8d33676a05f 100644 --- a/pkg/pod/status.go +++ b/pkg/pod/status.go @@ -17,6 +17,7 @@ limitations under the License. package pod import ( + "context" "encoding/json" "fmt" "strconv" @@ -24,11 +25,15 @@ import ( "time" "github.com/hashicorp/go-multierror" + "github.com/tektoncd/pipeline/internal/sidecarlogresults" + "github.com/tektoncd/pipeline/pkg/apis/config" + "github.com/tektoncd/pipeline/pkg/apis/pipeline" "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" "github.com/tektoncd/pipeline/pkg/termination" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" "knative.dev/pkg/apis" ) @@ -104,7 +109,7 @@ func SidecarsReady(podStatus corev1.PodStatus) bool { } // MakeTaskRunStatus returns a TaskRunStatus based on the Pod's status. -func MakeTaskRunStatus(logger *zap.SugaredLogger, tr v1beta1.TaskRun, pod *corev1.Pod) (v1beta1.TaskRunStatus, error) { +func MakeTaskRunStatus(ctx context.Context, logger *zap.SugaredLogger, tr v1beta1.TaskRun, pod *corev1.Pod, kubeclient kubernetes.Interface) (v1beta1.TaskRunStatus, error) { trs := &tr.Status if trs.GetCondition(apis.ConditionSucceeded) == nil || trs.GetCondition(apis.ConditionSucceeded).Status == corev1.ConditionUnknown { // If the taskRunStatus doesn't exist yet, it's because we just started running @@ -136,7 +141,7 @@ func MakeTaskRunStatus(logger *zap.SugaredLogger, tr v1beta1.TaskRun, pod *corev } var merr *multierror.Error - if err := setTaskRunStatusBasedOnStepStatus(logger, stepStatuses, &tr); err != nil { + if err := setTaskRunStatusBasedOnStepStatus(ctx, logger, stepStatuses, &tr, kubeclient); err != nil { merr = multierror.Append(merr, err) } @@ -147,15 +152,26 @@ func MakeTaskRunStatus(logger *zap.SugaredLogger, tr v1beta1.TaskRun, pod *corev return *trs, merr.ErrorOrNil() } -func setTaskRunStatusBasedOnStepStatus(logger *zap.SugaredLogger, stepStatuses []corev1.ContainerStatus, tr *v1beta1.TaskRun) *multierror.Error { +func setTaskRunStatusBasedOnStepStatus(ctx context.Context, logger *zap.SugaredLogger, stepStatuses []corev1.ContainerStatus, tr *v1beta1.TaskRun, kubeclient kubernetes.Interface) *multierror.Error { trs := &tr.Status var merr *multierror.Error + sidecarLogResults := []v1beta1.PipelineResourceResult{} + sidecarLogsResultsEnabled := config.FromContextOrDefaults(ctx).FeatureFlags.ResultExtractionMethod == config.ResultExtractionMethodSidecarLogs + if sidecarLogsResultsEnabled && tr.Status.TaskSpec.Results != nil { + resultSidecarName := fmt.Sprintf("sidecar-%s", pipeline.ReservedResultsSidecarName) + slr, err := sidecarlogresults.GetResultsFromSidecarLogs(ctx, kubeclient, tr.Namespace, tr.Status.PodName, resultSidecarName) + if err != nil { + merr = multierror.Append(merr, err) + } + sidecarLogResults = slr + } for _, s := range stepStatuses { if s.State.Terminated != nil && len(s.State.Terminated.Message) != 0 { msg := s.State.Terminated.Message results, err := termination.ParseMessage(logger, msg) + results = append(results, sidecarLogResults...) if err != nil { logger.Errorf("termination message could not be parsed as JSON: %v", err) merr = multierror.Append(merr, err) @@ -175,12 +191,14 @@ func setTaskRunStatusBasedOnStepStatus(logger *zap.SugaredLogger, stepStatuses [ trs.TaskRunResults = append(trs.TaskRunResults, taskResults...) trs.ResourcesResult = append(trs.ResourcesResult, pipelineResourceResults...) } - msg, err = createMessageFromResults(filteredResults) - if err != nil { - logger.Errorf("%v", err) - merr = multierror.Append(merr, err) - } else { - s.State.Terminated.Message = msg + if len(sidecarLogResults) == 0 { + msg, err = createMessageFromResults(filteredResults) + if err != nil { + logger.Errorf("%v", err) + merr = multierror.Append(merr, err) + } else { + s.State.Terminated.Message = msg + } } if time != nil { s.State.Terminated.StartedAt = *time diff --git a/pkg/pod/status_test.go b/pkg/pod/status_test.go index c44f2b679ac..362662442e9 100644 --- a/pkg/pod/status_test.go +++ b/pkg/pod/status_test.go @@ -17,6 +17,7 @@ limitations under the License. package pod import ( + "context" "strings" "testing" "time" @@ -27,6 +28,7 @@ import ( "github.com/tektoncd/pipeline/test/diff" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + fakek8s "k8s.io/client-go/kubernetes/fake" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/logging" @@ -80,7 +82,8 @@ func TestSetTaskRunStatusBasedOnStepStatus(t *testing.T) { } logger, _ := logging.NewLogger("", "status") - merr := setTaskRunStatusBasedOnStepStatus(logger, c.ContainerStatuses, &tr) + kubeclient := fakek8s.NewSimpleClientset() + merr := setTaskRunStatusBasedOnStepStatus(context.Background(), logger, c.ContainerStatuses, &tr, kubeclient) if merr != nil { t.Errorf("setTaskRunStatusBasedOnStepStatus: %s", merr) } @@ -1061,7 +1064,8 @@ func TestMakeTaskRunStatus(t *testing.T) { }, } logger, _ := logging.NewLogger("", "status") - got, err := MakeTaskRunStatus(logger, tr, &c.pod) + kubeclient := fakek8s.NewSimpleClientset() + got, err := MakeTaskRunStatus(context.Background(), logger, tr, &c.pod, kubeclient) if err != nil { t.Errorf("MakeTaskRunResult: %s", err) } @@ -1275,7 +1279,8 @@ func TestMakeTaskRunStatusAlpha(t *testing.T) { }, } logger, _ := logging.NewLogger("", "status") - got, err := MakeTaskRunStatus(logger, tr, &c.pod) + kubeclient := fakek8s.NewSimpleClientset() + got, err := MakeTaskRunStatus(context.Background(), logger, tr, &c.pod, kubeclient) if err != nil { t.Errorf("MakeTaskRunResult: %s", err) } @@ -1396,7 +1401,8 @@ func TestMakeRunStatusJSONError(t *testing.T) { } logger, _ := logging.NewLogger("", "status") - gotTr, err := MakeTaskRunStatus(logger, tr, pod) + kubeclient := fakek8s.NewSimpleClientset() + gotTr, err := MakeTaskRunStatus(context.Background(), logger, tr, pod, kubeclient) if err == nil { t.Error("Expected error, got nil") } diff --git a/pkg/reconciler/taskrun/taskrun.go b/pkg/reconciler/taskrun/taskrun.go index b5a7f6e8646..fecb92187d6 100644 --- a/pkg/reconciler/taskrun/taskrun.go +++ b/pkg/reconciler/taskrun/taskrun.go @@ -25,6 +25,7 @@ import ( "time" "github.com/hashicorp/go-multierror" + "github.com/tektoncd/pipeline/internal/sidecarlogresults" "github.com/tektoncd/pipeline/pkg/apis/config" "github.com/tektoncd/pipeline/pkg/apis/pipeline" "github.com/tektoncd/pipeline/pkg/apis/pipeline/pod" @@ -186,6 +187,11 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, tr *v1beta1.TaskRun) pkg // updates regardless of whether the reconciliation errored out. if err = c.reconcile(ctx, tr, rtr); err != nil { logger.Errorf("Reconcile: %v", err.Error()) + if errors.Is(err, fmt.Errorf("%s", sidecarlogresults.ErrorReasonMaxResultSizeExceeded)) { + message := fmt.Sprintf("TaskRun %q failed to finish because atleast one of its results exceeded the max allowed limit.", tr.Name) + err := c.failTaskRun(ctx, tr, v1beta1.TaskRunReasonResultLargerThanAllowedLimit, message) + return c.finishReconcileUpdateEmitEvents(ctx, tr, before, err) + } } // Emit events (only when ConditionSucceeded was changed) @@ -252,7 +258,7 @@ func (c *Reconciler) stopSidecars(ctx context.Context, tr *v1beta1.TaskRun) erro } // do not continue if the TaskSpec had no sidecars - if tr.Status.TaskSpec != nil && len(tr.Status.TaskSpec.Sidecars) == 0 { + if tr.Status.TaskSpec != nil && len(tr.Status.TaskSpec.Sidecars) == 0 && len(tr.Status.Sidecars) == 0 { return nil } @@ -536,7 +542,7 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1beta1.TaskRun, rtr *re } // Convert the Pod's status to the equivalent TaskRun Status. - tr.Status, err = podconvert.MakeTaskRunStatus(logger, *tr, pod) + tr.Status, err = podconvert.MakeTaskRunStatus(ctx, logger, *tr, pod, c.KubeClientSet) if err != nil { return err }