Skip to content

Commit

Permalink
TEP-0127: Larger results using sidecar logs - parsing logs to extract…
Browse files Browse the repository at this point in the history
… results

This PR addresses a part of TEP-0127 - parsing logs to extract results into the task run CRD.

The taskrun reconciler fetches the logs of the injected sidecar container sidecar-tekton-log-results. The logs are already structured and can be un-marshalled via json. The reconciler parses these logs and adds the results to the task run CRD. If the size of the result is greater than the max-result-size configured by the user then the TaskRun fails with reason TaskRunResultLargerThanAllowedLimit.

**Note**: the e2e test only covers the success case of using sidecar
logs to extract results. This is because the failure cases are already
tested via unit tests.

Before approving this PR, review #5834
  • Loading branch information
chitrangpatel committed Dec 14, 2022
1 parent f9021b1 commit d556af3
Show file tree
Hide file tree
Showing 13 changed files with 1,048 additions and 19 deletions.
66 changes: 66 additions & 0 deletions internal/sidecarlogresults/sidecarlogresults.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,25 @@ limitations under the License.
package sidecarlogresults

import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"path/filepath"

"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
"golang.org/x/sync/errgroup"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
)

// ErrSizeExceeded indicates that the result exceeded its maximum allowed size
var ErrSizeExceeded = errors.New("results size exceeds configured limit")

// SidecarLogResult holds fields for storing extracted results
type SidecarLogResult struct {
Name string
Expand Down Expand Up @@ -125,3 +135,59 @@ func LookForResults(w io.Writer, runDir string, resultsDir string, resultNames [
}
return nil
}

// GetResultsFromSidecarLogs extracts results from the logs of the results sidecar
func GetResultsFromSidecarLogs(ctx context.Context, clientset kubernetes.Interface, namespace string, name string, container string, podPhase corev1.PodPhase) ([]v1beta1.PipelineResourceResult, error) {
sidecarLogResults := []v1beta1.PipelineResourceResult{}
if podPhase == corev1.PodPending {
return sidecarLogResults, nil
}
podLogOpts := corev1.PodLogOptions{Container: container}
req := clientset.CoreV1().Pods(namespace).GetLogs(name, &podLogOpts)
sidecarLogs, err := req.Stream(ctx)
if err != nil {
return sidecarLogResults, err
}
defer sidecarLogs.Close()
maxResultLimit := config.FromContextOrDefaults(ctx).FeatureFlags.MaxResultSize
return extractResultsFromLogs(sidecarLogs, sidecarLogResults, maxResultLimit)
}

func extractResultsFromLogs(logs io.Reader, sidecarLogResults []v1beta1.PipelineResourceResult, maxResultLimit int) ([]v1beta1.PipelineResourceResult, error) {
scanner := bufio.NewScanner(logs)
buf := make([]byte, maxResultLimit)
scanner.Buffer(buf, maxResultLimit)
for scanner.Scan() {
result, err := parseResults(scanner.Bytes(), maxResultLimit)
if err != nil {
return nil, err
}
sidecarLogResults = append(sidecarLogResults, result)
}

if scanner.Err() != nil {
if errors.Is(scanner.Err(), bufio.ErrTooLong) {
return sidecarLogResults, ErrSizeExceeded
}
return nil, scanner.Err()
}
return sidecarLogResults, nil
}

func parseResults(resultBytes []byte, maxResultLimit int) (v1beta1.PipelineResourceResult, error) {
result := v1beta1.PipelineResourceResult{}
if len(resultBytes) > maxResultLimit {
return result, ErrSizeExceeded
}

var res SidecarLogResult
if err := json.Unmarshal(resultBytes, &res); err != nil {
return result, fmt.Errorf("Invalid result %w", err)
}
result = v1beta1.PipelineResourceResult{
Key: res.Name,
Value: res.Value,
ResultType: v1beta1.TaskRunResultType,
}
return result, nil
}
186 changes: 186 additions & 0 deletions internal/sidecarlogresults/sidecarlogresults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,22 @@ package sidecarlogresults

import (
"bytes"
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sort"
"strings"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
"github.com/tektoncd/pipeline/test/diff"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
fakekubeclientset "k8s.io/client-go/kubernetes/fake"
)

func TestLookForResults_FanOutAndWait(t *testing.T) {
Expand Down Expand Up @@ -121,6 +129,184 @@ func TestLookForResults(t *testing.T) {
}
}

func TestExtractResultsFromLogs(t *testing.T) {
inputResults := []SidecarLogResult{
{
Name: "result1",
Value: "foo",
}, {
Name: "result2",
Value: "bar",
},
}
podLogs := ""
for _, r := range inputResults {
res, _ := json.Marshal(&r)
podLogs = fmt.Sprintf("%s%s\n", podLogs, string(res))
}
logs := strings.NewReader(podLogs)

results, err := extractResultsFromLogs(logs, []v1beta1.PipelineResourceResult{}, 4096)
if err != nil {
t.Error(err)
}
want := []v1beta1.PipelineResourceResult{
{
Key: "result1",
Value: "foo",
ResultType: v1beta1.TaskRunResultType,
}, {
Key: "result2",
Value: "bar",
ResultType: v1beta1.TaskRunResultType,
},
}
if d := cmp.Diff(want, results); d != "" {
t.Fatal(diff.PrintWantGot(d))
}
}

func TestExtractResultsFromLogs_Failure(t *testing.T) {
inputResults := []SidecarLogResult{
{
Name: "result1",
Value: strings.Repeat("v", 4098),
},
}
podLogs := ""
for _, r := range inputResults {
res, _ := json.Marshal(&r)
podLogs = fmt.Sprintf("%s%s\n", podLogs, string(res))
}
logs := strings.NewReader(podLogs)

_, err := extractResultsFromLogs(logs, []v1beta1.PipelineResourceResult{}, 4096)
if err != ErrSizeExceeded {
t.Fatalf("Expected error %v but got %v", ErrSizeExceeded, err)
}
}

func TestParseResults(t *testing.T) {
results := []SidecarLogResult{
{
Name: "result1",
Value: "foo",
}, {
Name: "result2",
Value: `{"IMAGE_URL":"ar.com", "IMAGE_DIGEST":"sha234"}`,
}, {
Name: "result3",
Value: `["hello","world"]`,
},
}
podLogs := []string{}
for _, r := range results {
res, _ := json.Marshal(&r)
podLogs = append(podLogs, string(res))
}
want := []v1beta1.PipelineResourceResult{{
Key: "result1",
Value: "foo",
ResultType: v1beta1.TaskRunResultType,
}, {
Key: "result2",
Value: `{"IMAGE_URL":"ar.com", "IMAGE_DIGEST":"sha234"}`,
ResultType: v1beta1.TaskRunResultType,
}, {
Key: "result3",
Value: `["hello","world"]`,
ResultType: v1beta1.TaskRunResultType,
}}
stepResults := []v1beta1.PipelineResourceResult{}
for _, plog := range podLogs {
res, err := parseResults([]byte(plog), 4096)
if err != nil {
t.Error(err)
}
stepResults = append(stepResults, res)
}
if d := cmp.Diff(want, stepResults); d != "" {
t.Fatal(diff.PrintWantGot(d))
}
}

func TestParseResults_Failure(t *testing.T) {
result := SidecarLogResult{
Name: "result2",
Value: strings.Repeat("k", 4098),
}
res1, _ := json.Marshal("result1 v1")
res2, _ := json.Marshal(&result)
podLogs := []string{string(res1), string(res2)}
want := []string{
"Invalid result json: cannot unmarshal string into Go value of type sidecarlogresults.SidecarLogResult",
ErrSizeExceeded.Error(),
}
got := []string{}
for _, plog := range podLogs {
_, err := parseResults([]byte(plog), 4096)
got = append(got, err.Error())
}
if d := cmp.Diff(want, got); d != "" {
t.Fatal(diff.PrintWantGot(d))
}
}

func TestGetResultsFromSidecarLogs(t *testing.T) {
for _, c := range []struct {
desc string
podPhase v1.PodPhase
wantError bool
}{{
desc: "pod pending to start",
podPhase: corev1.PodPending,
wantError: false,
}, {
desc: "pod running extract logs",
podPhase: corev1.PodRunning,
wantError: true,
}} {
t.Run(c.desc, func(t *testing.T) {
ctx := context.Background()
clientset := fakekubeclientset.NewSimpleClientset()
pod := &v1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "pod",
Namespace: "foo",
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "container",
Image: "image",
},
},
},
Status: v1.PodStatus{
Phase: c.podPhase,
},
}
pod, err := clientset.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
if err != nil {
t.Errorf("Error occurred while creating pod %s: %s", pod.Name, err.Error())
}

// Fake logs are not formatted properly so there will be an error
_, err = GetResultsFromSidecarLogs(ctx, clientset, "foo", "pod", "container", pod.Status.Phase)
if err != nil && !c.wantError {
t.Fatalf("did not expect an error but got: %v", err)
}
if c.wantError && err == nil {
t.Fatal("expected to get an error but did not")
}
})
}
}

func createResult(t *testing.T, dir string, resultName string, resultValue string) {
t.Helper()
resultFile := filepath.Join(dir, resultName)
Expand Down
4 changes: 4 additions & 0 deletions internal/sidecarlogsvalidation/sidecarlogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,8 @@ const (
// ReservedResultsSidecarName is the name of the results sidecar that outputs the results to stdout
// when the results-from feature-flag is set to "sidecar-logs".
ReservedResultsSidecarName = "tekton-log-results"

// ReservedResultsSidecarContainerName is the name of the results sidecar container that is injected
// by the reconciler.
ReservedResultsSidecarContainerName = "sidecar-tekton-log-results"
)
2 changes: 2 additions & 0 deletions pkg/apis/pipeline/v1beta1/taskrun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ const (
TaskRunReasonsResultsVerificationFailed TaskRunReason = "TaskRunResultsVerificationFailed"
// AwaitingTaskRunResults is the reason set when waiting upon `TaskRun` results and signatures to verify
AwaitingTaskRunResults TaskRunReason = "AwaitingTaskRunResults"
// TaskRunReasonResultLargerThanAllowedLimit is the reason set when one of the results exceeds its maximum allowed limit of 1 KB
TaskRunReasonResultLargerThanAllowedLimit TaskRunReason = "TaskRunResultLargerThanAllowedLimit"
)

func (t TaskRunReason) String() string {
Expand Down
8 changes: 8 additions & 0 deletions pkg/pod/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"strconv"
"strings"

"github.com/tektoncd/pipeline/internal/sidecarlogsvalidation"
"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
"gomodules.xyz/jsonpatch/v2"
Expand Down Expand Up @@ -252,6 +254,12 @@ func StopSidecars(ctx context.Context, nopImage string, kubeclient kubernetes.In
updated := false
if newPod.Status.Phase == corev1.PodRunning {
for _, s := range newPod.Status.ContainerStatuses {
// If the results-from is set to sidecar logs,
// a sidecar container with name `sidecar-log-results` is injected by the reconiler.
// Do not kill this sidecar. Let it exit gracefully.
if config.FromContextOrDefaults(ctx).FeatureFlags.ResultExtractionMethod == config.ResultExtractionMethodSidecarLogs && s.Name == sidecarlogsvalidation.ReservedResultsSidecarContainerName {
continue
}
// Stop any running container that isn't a step.
// An injected sidecar container might not have the
// "sidecar-" prefix, so we can't just look for that
Expand Down
Loading

0 comments on commit d556af3

Please sign in to comment.