Skip to content

Commit

Permalink
injecitng sidecar and parsing logs to extract results
Browse files Browse the repository at this point in the history
  • Loading branch information
chitrangpatel committed Dec 6, 2022
1 parent d53dc1b commit 6dcc538
Show file tree
Hide file tree
Showing 13 changed files with 590 additions and 36 deletions.
96 changes: 96 additions & 0 deletions internal/sidecarlogresults/sidecarlogresults.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
Copyright 2019 The Tekton Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package sidecarlogresults

import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"io"

"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)

// ErrorReasonMaxResultSizeExceeded indicates that the result exceeded its maximum allowed size
// var ErrorReasonMaxResultSizeExceeded = fmt.Errorf("%s", "MaxResultSizeExceeded")
var ErrorReasonMaxResultSizeExceeded = "MaxResultSizeExceeded"

// SidecarLogResult holds fields for storing extracted results
type SidecarLogResult struct {
Name string
Value string
}

// GetResultsFromSidecarLogs extracts results from the logs of the results sidecar
func GetResultsFromSidecarLogs(ctx context.Context, clientset kubernetes.Interface, namespace string, name string, container string) ([]v1beta1.PipelineResourceResult, error) {
sidecarLogResults := []v1beta1.PipelineResourceResult{}
p, _ := clientset.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{})
if p.Status.Phase == corev1.PodPending {
return sidecarLogResults, nil
}
podLogOpts := corev1.PodLogOptions{Container: container}
req := clientset.CoreV1().Pods(namespace).GetLogs(name, &podLogOpts)
sidecarLogs, err := req.Stream(ctx)
if err != nil {
return sidecarLogResults, err
}
defer sidecarLogs.Close()
maxResultLimit := config.FromContextOrDefaults(ctx).FeatureFlags.MaxResultSize
return extractResultsFromLogs(sidecarLogs, sidecarLogResults, maxResultLimit)
}

func extractResultsFromLogs(logs io.Reader, sidecarLogResults []v1beta1.PipelineResourceResult, maxResultLimit int) ([]v1beta1.PipelineResourceResult, error) {
scanner := bufio.NewScanner(logs)
buf := make([]byte, maxResultLimit)
scanner.Buffer(buf, maxResultLimit)
for scanner.Scan() {
result, err := parseResults(scanner.Bytes(), maxResultLimit)
if err != nil {
return nil, err
}
sidecarLogResults = append(sidecarLogResults, result)
}

if err := scanner.Err(); err != nil {
if errors.Is(err, bufio.ErrTooLong) {
return sidecarLogResults, fmt.Errorf("%s", ErrorReasonMaxResultSizeExceeded)
}
return nil, err
}
return sidecarLogResults, nil
}

func parseResults(resultBytes []byte, maxResultLimit int) (v1beta1.PipelineResourceResult, error) {
result := v1beta1.PipelineResourceResult{}
if len(resultBytes) > maxResultLimit {
return result, fmt.Errorf("%s", ErrorReasonMaxResultSizeExceeded)
}

var res SidecarLogResult
if err := json.Unmarshal(resultBytes, &res); err != nil {
return result, fmt.Errorf("Invalid result %w", err)
}
result = v1beta1.PipelineResourceResult{
Key: res.Name,
Value: res.Value,
ResultType: v1beta1.TaskRunResultType,
}
return result, nil
}
135 changes: 135 additions & 0 deletions internal/sidecarlogresults/sidecarlogresults_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package sidecarlogresults

import (
"encoding/json"
"fmt"
"strings"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
"github.com/tektoncd/pipeline/test/diff"
)

func TestExtractResultsFromLogs(t *testing.T) {
inputResults := []SidecarLogResult{
{
Name: "result1",
Value: "foo",
}, {
Name: "result2",
Value: "bar",
},
}
podLogs := ""
for _, r := range inputResults {
res, _ := json.Marshal(&r)
podLogs = fmt.Sprintf("%s%s\n", podLogs, string(res))
}
logs := strings.NewReader(podLogs)

results, err := extractResultsFromLogs(logs, []v1beta1.PipelineResourceResult{}, 4096)
if err != nil {
t.Error(err)
}
want := []v1beta1.PipelineResourceResult{
{
Key: "result1",
Value: "foo",
ResultType: v1beta1.TaskRunResultType,
}, {
Key: "result2",
Value: "bar",
ResultType: v1beta1.TaskRunResultType,
},
}
if d := cmp.Diff(want, results); d != "" {
t.Fatal(diff.PrintWantGot(d))
}
}

func TestExtractResultsFromLogs_Failure(t *testing.T) {
inputResults := []SidecarLogResult{
{
Name: "result1",
Value: strings.Repeat("v", 4098),
},
}
podLogs := ""
for _, r := range inputResults {
res, _ := json.Marshal(&r)
podLogs = fmt.Sprintf("%s%s\n", podLogs, string(res))
}
logs := strings.NewReader(podLogs)

_, err := extractResultsFromLogs(logs, []v1beta1.PipelineResourceResult{}, 4096)
if err.Error() != ErrorReasonMaxResultSizeExceeded {
t.Fatal(fmt.Sprintf("Expexted error %v but got %v", ErrorReasonMaxResultSizeExceeded, err))
}
}

func TestParseResults(t *testing.T) {
results := []SidecarLogResult{
{
Name: "result1",
Value: "foo",
}, {
Name: "result2",
Value: `{"IMAGE_URL":"ar.com", "IMAGE_DIGEST":"sha234"}`,
}, {
Name: "result3",
Value: `["hello","world"]`,
},
}
podLogs := []string{}
for _, r := range results {
res, _ := json.Marshal(&r)
podLogs = append(podLogs, string(res))
}
want := []v1beta1.PipelineResourceResult{{
Key: "result1",
Value: "foo",
ResultType: 1,
}, {
Key: "result2",
Value: `{"IMAGE_URL":"ar.com", "IMAGE_DIGEST":"sha234"}`,
ResultType: 1,
}, {
Key: "result3",
Value: `["hello","world"]`,
ResultType: 1,
}}
stepResults := []v1beta1.PipelineResourceResult{}
for _, plog := range podLogs {
res, err := parseResults([]byte(plog), 4096)
if err != nil {
t.Error(err)
}
stepResults = append(stepResults, res)
}
if d := cmp.Diff(want, stepResults); d != "" {
t.Fatal(diff.PrintWantGot(d))
}
}

func TestParseResults_Failure(t *testing.T) {
result := SidecarLogResult{
Name: "result2",
Value: strings.Repeat("k", 4098),
}
res1, _ := json.Marshal("result1 v1")
res2, _ := json.Marshal(&result)
podLogs := []string{string(res1), string(res2)}
want := []string{
"Invalid result json: cannot unmarshal string into Go value of type sidecarlogresults.SidecarLogResult",
ErrorReasonMaxResultSizeExceeded,
}
got := []string{}
for _, plog := range podLogs {
_, err := parseResults([]byte(plog), 4096)
got = append(got, err.Error())
}
if d := cmp.Diff(want, got); d != "" {
t.Fatal(diff.PrintWantGot(d))
}
}
18 changes: 18 additions & 0 deletions pkg/apis/pipeline/reservedsidecar.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
Copyright 2019 The Tekton Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package pipeline

// ReservedResultsSidecarName is the name of the results sidecar that outputs the results to stdout
// when the results-from feature-flag is set to "sidecar-logs".
const ReservedResultsSidecarName = "tekton-log-results"
2 changes: 2 additions & 0 deletions pkg/apis/pipeline/v1beta1/taskrun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ const (
TaskRunReasonsResultsVerificationFailed TaskRunReason = "TaskRunResultsVerificationFailed"
// AwaitingTaskRunResults is the reason set when waiting upon `TaskRun` results and signatures to verify
AwaitingTaskRunResults TaskRunReason = "AwaitingTaskRunResults"
// TaskRunReasonResultLargerThanAllowedLimit is the reason set when one of the results exceeds its maximum allowed limit of 1 KB
TaskRunReasonResultLargerThanAllowedLimit TaskRunReason = "TaskRunResultLargerThanAllowedLimit"
)

func (t TaskRunReason) String() string {
Expand Down
5 changes: 4 additions & 1 deletion pkg/entrypoint/entrypointer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"strings"
"time"

"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
"github.com/tektoncd/pipeline/pkg/spire"
Expand Down Expand Up @@ -85,6 +86,8 @@ type Entrypointer struct {
SpireWorkloadAPI spire.EntrypointerAPIClient
// ResultsDirectory is the directory to find results, defaults to pipeline.DefaultResultPath
ResultsDirectory string
// ResultExtractionMethod is the method using which the controller extracts the results from the task pod.
ResultExtractionMethod string
}

// Waiter encapsulates waiting for files to exist.
Expand Down Expand Up @@ -231,7 +234,7 @@ func (e Entrypointer) readResultsFromDisk(ctx context.Context, resultDir string)
}

// push output to termination path
if len(output) != 0 {
if e.ResultExtractionMethod == config.ResultExtractionMethodTerminationMessage && len(output) != 0 {
if err := termination.WriteMessage(e.TerminationPath, output); err != nil {
return err
}
Expand Down
33 changes: 18 additions & 15 deletions pkg/entrypoint/entrypointer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"time"

"github.com/google/go-cmp/cmp"
"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
"github.com/tektoncd/pipeline/pkg/spire"
"github.com/tektoncd/pipeline/pkg/termination"
Expand Down Expand Up @@ -315,8 +316,9 @@ func TestReadResultsFromDisk(t *testing.T) {
}

e := Entrypointer{
Results: resultsFilePath,
TerminationPath: terminationPath,
Results: resultsFilePath,
TerminationPath: terminationPath,
ResultExtractionMethod: config.ResultExtractionMethodTerminationMessage,
}
if err := e.readResultsFromDisk(ctx, ""); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -544,19 +546,20 @@ func TestEntrypointerResults(t *testing.T) {
}

err := Entrypointer{
Command: append([]string{c.entrypoint}, c.args...),
WaitFiles: c.waitFiles,
PostFile: c.postFile,
Waiter: fw,
Runner: fr,
PostWriter: fpw,
Results: results,
ResultsDirectory: resultsDir,
TerminationPath: terminationPath,
Timeout: &timeout,
BreakpointOnFailure: c.breakpointOnFailure,
StepMetadataDir: c.stepDir,
SpireWorkloadAPI: signClient,
Command: append([]string{c.entrypoint}, c.args...),
WaitFiles: c.waitFiles,
PostFile: c.postFile,
Waiter: fw,
Runner: fr,
PostWriter: fpw,
Results: results,
ResultsDirectory: resultsDir,
ResultExtractionMethod: config.ResultExtractionMethodTerminationMessage,
TerminationPath: terminationPath,
Timeout: &timeout,
BreakpointOnFailure: c.breakpointOnFailure,
StepMetadataDir: c.stepDir,
SpireWorkloadAPI: signClient,
}.Go()
if err != nil {
t.Fatalf("Entrypointer failed: %v", err)
Expand Down
8 changes: 8 additions & 0 deletions pkg/pod/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"strconv"
"strings"

"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
"gomodules.xyz/jsonpatch/v2"
Expand Down Expand Up @@ -247,6 +248,13 @@ func StopSidecars(ctx context.Context, nopImage string, kubeclient kubernetes.In
updated := false
if newPod.Status.Phase == corev1.PodRunning {
for _, s := range newPod.Status.ContainerStatuses {
// If the results-from is set to sidecar logs,
// a sidecar container with name `sidecar-log-results` is injected by the reconiler.
// Do not kill this sidecar. Let it exit gracefully.
resultsSidecarName := fmt.Sprintf("sidecar-%v", pipeline.ReservedResultsSidecarName)
if config.FromContextOrDefaults(ctx).FeatureFlags.ResultExtractionMethod == config.ResultExtractionMethodSidecarLogs && s.Name == resultsSidecarName {
continue
}
// Stop any running container that isn't a step.
// An injected sidecar container might not have the
// "sidecar-" prefix, so we can't just look for that
Expand Down
Loading

0 comments on commit 6dcc538

Please sign in to comment.