Skip to content

Commit

Permalink
parsing logs to extract results
Browse files Browse the repository at this point in the history
  • Loading branch information
chitrangpatel committed Dec 8, 2022
1 parent d53dc1b commit 3abbf19
Show file tree
Hide file tree
Showing 9 changed files with 508 additions and 33 deletions.
96 changes: 96 additions & 0 deletions internal/sidecarlogresults/sidecarlogresults.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
Copyright 2019 The Tekton Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package sidecarlogresults

import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"io"

"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)

// ErrorReasonMaxResultSizeExceeded indicates that the result exceeded its maximum allowed size
// var ErrorReasonMaxResultSizeExceeded = fmt.Errorf("%s", "MaxResultSizeExceeded")
var ErrorReasonMaxResultSizeExceeded = "MaxResultSizeExceeded"

// SidecarLogResult holds fields for storing extracted results
type SidecarLogResult struct {
Name string
Value string
}

// GetResultsFromSidecarLogs extracts results from the logs of the results sidecar
func GetResultsFromSidecarLogs(ctx context.Context, clientset kubernetes.Interface, namespace string, name string, container string) ([]v1beta1.PipelineResourceResult, error) {
sidecarLogResults := []v1beta1.PipelineResourceResult{}
p, _ := clientset.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{})
if p.Status.Phase == corev1.PodPending {
return sidecarLogResults, nil
}
podLogOpts := corev1.PodLogOptions{Container: container}
req := clientset.CoreV1().Pods(namespace).GetLogs(name, &podLogOpts)
sidecarLogs, err := req.Stream(ctx)
if err != nil {
return sidecarLogResults, err
}
defer sidecarLogs.Close()
maxResultLimit := config.FromContextOrDefaults(ctx).FeatureFlags.MaxResultSize
return extractResultsFromLogs(sidecarLogs, sidecarLogResults, maxResultLimit)
}

func extractResultsFromLogs(logs io.Reader, sidecarLogResults []v1beta1.PipelineResourceResult, maxResultLimit int) ([]v1beta1.PipelineResourceResult, error) {
scanner := bufio.NewScanner(logs)
buf := make([]byte, maxResultLimit)
scanner.Buffer(buf, maxResultLimit)
for scanner.Scan() {
result, err := parseResults(scanner.Bytes(), maxResultLimit)
if err != nil {
return nil, err
}
sidecarLogResults = append(sidecarLogResults, result)
}

if err := scanner.Err(); err != nil {
if errors.Is(err, bufio.ErrTooLong) {
return sidecarLogResults, fmt.Errorf("%s", ErrorReasonMaxResultSizeExceeded)
}
return nil, err
}
return sidecarLogResults, nil
}

func parseResults(resultBytes []byte, maxResultLimit int) (v1beta1.PipelineResourceResult, error) {
result := v1beta1.PipelineResourceResult{}
if len(resultBytes) > maxResultLimit {
return result, fmt.Errorf("%s", ErrorReasonMaxResultSizeExceeded)
}

var res SidecarLogResult
if err := json.Unmarshal(resultBytes, &res); err != nil {
return result, fmt.Errorf("Invalid result %w", err)
}
result = v1beta1.PipelineResourceResult{
Key: res.Name,
Value: res.Value,
ResultType: v1beta1.TaskRunResultType,
}
return result, nil
}
195 changes: 195 additions & 0 deletions internal/sidecarlogresults/sidecarlogresults_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
package sidecarlogresults

import (
"context"
"encoding/json"
"fmt"
"strings"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
"github.com/tektoncd/pipeline/test/diff"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
fakekubeclientset "k8s.io/client-go/kubernetes/fake"
)

func TestExtractResultsFromLogs(t *testing.T) {
inputResults := []SidecarLogResult{
{
Name: "result1",
Value: "foo",
}, {
Name: "result2",
Value: "bar",
},
}
podLogs := ""
for _, r := range inputResults {
res, _ := json.Marshal(&r)
podLogs = fmt.Sprintf("%s%s\n", podLogs, string(res))
}
logs := strings.NewReader(podLogs)

results, err := extractResultsFromLogs(logs, []v1beta1.PipelineResourceResult{}, 4096)
if err != nil {
t.Error(err)
}
want := []v1beta1.PipelineResourceResult{
{
Key: "result1",
Value: "foo",
ResultType: v1beta1.TaskRunResultType,
}, {
Key: "result2",
Value: "bar",
ResultType: v1beta1.TaskRunResultType,
},
}
if d := cmp.Diff(want, results); d != "" {
t.Fatal(diff.PrintWantGot(d))
}
}

func TestExtractResultsFromLogs_Failure(t *testing.T) {
inputResults := []SidecarLogResult{
{
Name: "result1",
Value: strings.Repeat("v", 4098),
},
}
podLogs := ""
for _, r := range inputResults {
res, _ := json.Marshal(&r)
podLogs = fmt.Sprintf("%s%s\n", podLogs, string(res))
}
logs := strings.NewReader(podLogs)

_, err := extractResultsFromLogs(logs, []v1beta1.PipelineResourceResult{}, 4096)
if err.Error() != ErrorReasonMaxResultSizeExceeded {
t.Fatal(fmt.Sprintf("Expexted error %v but got %v", ErrorReasonMaxResultSizeExceeded, err))
}
}

func TestParseResults(t *testing.T) {
results := []SidecarLogResult{
{
Name: "result1",
Value: "foo",
}, {
Name: "result2",
Value: `{"IMAGE_URL":"ar.com", "IMAGE_DIGEST":"sha234"}`,
}, {
Name: "result3",
Value: `["hello","world"]`,
},
}
podLogs := []string{}
for _, r := range results {
res, _ := json.Marshal(&r)
podLogs = append(podLogs, string(res))
}
want := []v1beta1.PipelineResourceResult{{
Key: "result1",
Value: "foo",
ResultType: 1,
}, {
Key: "result2",
Value: `{"IMAGE_URL":"ar.com", "IMAGE_DIGEST":"sha234"}`,
ResultType: 1,
}, {
Key: "result3",
Value: `["hello","world"]`,
ResultType: 1,
}}
stepResults := []v1beta1.PipelineResourceResult{}
for _, plog := range podLogs {
res, err := parseResults([]byte(plog), 4096)
if err != nil {
t.Error(err)
}
stepResults = append(stepResults, res)
}
if d := cmp.Diff(want, stepResults); d != "" {
t.Fatal(diff.PrintWantGot(d))
}
}

func TestParseResults_Failure(t *testing.T) {
result := SidecarLogResult{
Name: "result2",
Value: strings.Repeat("k", 4098),
}
res1, _ := json.Marshal("result1 v1")
res2, _ := json.Marshal(&result)
podLogs := []string{string(res1), string(res2)}
want := []string{
"Invalid result json: cannot unmarshal string into Go value of type sidecarlogresults.SidecarLogResult",
ErrorReasonMaxResultSizeExceeded,
}
got := []string{}
for _, plog := range podLogs {
_, err := parseResults([]byte(plog), 4096)
got = append(got, err.Error())
}
if d := cmp.Diff(want, got); d != "" {
t.Fatal(diff.PrintWantGot(d))
}
}

func TestGetResultsFromSidecarLogs(t *testing.T) {
for _, c := range []struct {
desc string
podPhase v1.PodPhase
wantError bool
}{{
desc: "pod pending to start",
podPhase: corev1.PodPending,
wantError: false,
}, {
desc: "pod running extract logs",
podPhase: corev1.PodRunning,
wantError: true,
}} {
t.Run(c.desc, func(t *testing.T) {
ctx := context.Background()
clientset := fakekubeclientset.NewSimpleClientset()
pod := &v1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "pod",
Namespace: "foo",
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "container",
Image: "image",
},
},
},
Status: v1.PodStatus{
Phase: c.podPhase,
},
}
pod, err := clientset.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
if err != nil {
t.Errorf("Error occured while creating pod %s: %s", pod.Name, err.Error())
}

// Fake logs are not formatted properly so there will be an error
_, err = GetResultsFromSidecarLogs(ctx, clientset, "foo", "pod", "container")
if err != nil && !c.wantError {
t.Fatalf("did not expect an error but got: %v", err)
}
if c.wantError && err == nil {
t.Fatal("dxpected to get an error but did not")
}
})
}
}
2 changes: 2 additions & 0 deletions pkg/apis/pipeline/v1beta1/taskrun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ const (
TaskRunReasonsResultsVerificationFailed TaskRunReason = "TaskRunResultsVerificationFailed"
// AwaitingTaskRunResults is the reason set when waiting upon `TaskRun` results and signatures to verify
AwaitingTaskRunResults TaskRunReason = "AwaitingTaskRunResults"
// TaskRunReasonResultLargerThanAllowedLimit is the reason set when one of the results exceeds its maximum allowed limit of 1 KB
TaskRunReasonResultLargerThanAllowedLimit TaskRunReason = "TaskRunResultLargerThanAllowedLimit"
)

func (t TaskRunReason) String() string {
Expand Down
33 changes: 18 additions & 15 deletions pkg/entrypoint/entrypointer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"time"

"github.com/google/go-cmp/cmp"
"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
"github.com/tektoncd/pipeline/pkg/spire"
"github.com/tektoncd/pipeline/pkg/termination"
Expand Down Expand Up @@ -315,8 +316,9 @@ func TestReadResultsFromDisk(t *testing.T) {
}

e := Entrypointer{
Results: resultsFilePath,
TerminationPath: terminationPath,
Results: resultsFilePath,
TerminationPath: terminationPath,
ResultExtractionMethod: config.ResultExtractionMethodTerminationMessage,
}
if err := e.readResultsFromDisk(ctx, ""); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -544,19 +546,20 @@ func TestEntrypointerResults(t *testing.T) {
}

err := Entrypointer{
Command: append([]string{c.entrypoint}, c.args...),
WaitFiles: c.waitFiles,
PostFile: c.postFile,
Waiter: fw,
Runner: fr,
PostWriter: fpw,
Results: results,
ResultsDirectory: resultsDir,
TerminationPath: terminationPath,
Timeout: &timeout,
BreakpointOnFailure: c.breakpointOnFailure,
StepMetadataDir: c.stepDir,
SpireWorkloadAPI: signClient,
Command: append([]string{c.entrypoint}, c.args...),
WaitFiles: c.waitFiles,
PostFile: c.postFile,
Waiter: fw,
Runner: fr,
PostWriter: fpw,
Results: results,
ResultsDirectory: resultsDir,
ResultExtractionMethod: config.ResultExtractionMethodTerminationMessage,
TerminationPath: terminationPath,
Timeout: &timeout,
BreakpointOnFailure: c.breakpointOnFailure,
StepMetadataDir: c.stepDir,
SpireWorkloadAPI: signClient,
}.Go()
if err != nil {
t.Fatalf("Entrypointer failed: %v", err)
Expand Down
8 changes: 8 additions & 0 deletions pkg/pod/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"strconv"
"strings"

"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
"gomodules.xyz/jsonpatch/v2"
Expand Down Expand Up @@ -247,6 +248,13 @@ func StopSidecars(ctx context.Context, nopImage string, kubeclient kubernetes.In
updated := false
if newPod.Status.Phase == corev1.PodRunning {
for _, s := range newPod.Status.ContainerStatuses {
// If the results-from is set to sidecar logs,
// a sidecar container with name `sidecar-log-results` is injected by the reconiler.
// Do not kill this sidecar. Let it exit gracefully.
resultsSidecarName := fmt.Sprintf("sidecar-%v", pipeline.ReservedResultsSidecarName)
if config.FromContextOrDefaults(ctx).FeatureFlags.ResultExtractionMethod == config.ResultExtractionMethodSidecarLogs && s.Name == resultsSidecarName {
continue
}
// Stop any running container that isn't a step.
// An injected sidecar container might not have the
// "sidecar-" prefix, so we can't just look for that
Expand Down
Loading

0 comments on commit 3abbf19

Please sign in to comment.