From a535434a61a751159b6bf25bd52d99d6981dcdfe Mon Sep 17 00:00:00 2001 From: Dawid Rusnak Date: Fri, 25 Oct 2024 15:54:07 +0200 Subject: [PATCH] fix: GZip the binaries before sending --- cmd/tcl/kubectl-testkube/devbox/command.go | 42 +++---------------- .../devbox/devutils/objectstorage.go | 39 ++++++++++++++--- 2 files changed, 39 insertions(+), 42 deletions(-) diff --git a/cmd/tcl/kubectl-testkube/devbox/command.go b/cmd/tcl/kubectl-testkube/devbox/command.go index e921b01a14..bb98585158 100644 --- a/cmd/tcl/kubectl-testkube/devbox/command.go +++ b/cmd/tcl/kubectl-testkube/devbox/command.go @@ -231,12 +231,7 @@ func NewDevBoxCommand() *cobra.Command { fmt.Println("Uploading binaries...") g.Go(func() error { its := time.Now() - file, err := os.Open(agentBin.Path()) - if err != nil { - return err - } - defer file.Close() - err = objectStorage.Upload(ctx, "bin/testkube-api-server", file, agentBin.Hash()) + err = objectStorage.Upload(ctx, "bin/testkube-api-server", agentBin.Path(), agentBin.Hash()) if err != nil { fmt.Printf("Agent: upload finished in %s. Error: %s\n", time.Since(its).Truncate(time.Millisecond), err) } else { @@ -246,12 +241,7 @@ func NewDevBoxCommand() *cobra.Command { }) g.Go(func() error { its := time.Now() - file, err := os.Open(toolkitBin.Path()) - if err != nil { - return err - } - defer file.Close() - err = objectStorage.Upload(ctx, "bin/toolkit", file, toolkitBin.Hash()) + err = objectStorage.Upload(ctx, "bin/toolkit", toolkitBin.Path(), toolkitBin.Hash()) if err != nil { fmt.Printf("Toolkit: upload finished in %s. Error: %s\n", time.Since(its).Truncate(time.Millisecond), err) } else { @@ -261,12 +251,7 @@ func NewDevBoxCommand() *cobra.Command { }) g.Go(func() error { its := time.Now() - file, err := os.Open(initProcessBin.Path()) - if err != nil { - return err - } - defer file.Close() - err = objectStorage.Upload(ctx, "bin/init", file, initProcessBin.Hash()) + err = objectStorage.Upload(ctx, "bin/init", initProcessBin.Path(), initProcessBin.Hash()) if err != nil { fmt.Printf("Init Process: upload finished in %s. Error: %s\n", time.Since(its).Truncate(time.Millisecond), err) } else { @@ -410,12 +395,7 @@ func NewDevBoxCommand() *cobra.Command { fmt.Printf("Agent: build finished in %s.\n", time.Since(its).Truncate(time.Millisecond)) its = time.Now() - file, err := os.Open(agentBin.Path()) - if err != nil { - return err - } - defer file.Close() - err = objectStorage.Upload(ctx, "bin/testkube-api-server", file, agentBin.Hash()) + err = objectStorage.Upload(ctx, "bin/testkube-api-server", agentBin.Path(), agentBin.Hash()) if err != nil { fmt.Printf("Agent: upload finished in %s. Error: %s\n", time.Since(its).Truncate(time.Millisecond), err) return err @@ -453,12 +433,7 @@ func NewDevBoxCommand() *cobra.Command { fmt.Printf("Toolkit: build finished in %s.\n", time.Since(its).Truncate(time.Millisecond)) its = time.Now() - file, err := os.Open(toolkitBin.Path()) - if err != nil { - return err - } - defer file.Close() - err = objectStorage.Upload(ctx, "bin/toolkit", file, toolkitBin.Hash()) + err = objectStorage.Upload(ctx, "bin/toolkit", toolkitBin.Path(), toolkitBin.Hash()) if err != nil { fmt.Printf("Toolkit: upload finished in %s. Error: %s\n", time.Since(its).Truncate(time.Millisecond), err) return err @@ -476,12 +451,7 @@ func NewDevBoxCommand() *cobra.Command { fmt.Printf("Init Process: build finished in %s.\n", time.Since(its).Truncate(time.Millisecond)) its = time.Now() - file, err := os.Open(initProcessBin.Path()) - if err != nil { - return err - } - defer file.Close() - err = objectStorage.Upload(ctx, "bin/init", file, initProcessBin.Hash()) + err = objectStorage.Upload(ctx, "bin/init", initProcessBin.Path(), initProcessBin.Hash()) if err != nil { fmt.Printf("Init Process: upload finished in %s. Error: %s\n", time.Since(its).Truncate(time.Millisecond), err) return err diff --git a/cmd/tcl/kubectl-testkube/devbox/devutils/objectstorage.go b/cmd/tcl/kubectl-testkube/devbox/devutils/objectstorage.go index 77a1f62354..41d61bfd12 100644 --- a/cmd/tcl/kubectl-testkube/devbox/devutils/objectstorage.go +++ b/cmd/tcl/kubectl-testkube/devbox/devutils/objectstorage.go @@ -15,6 +15,8 @@ import ( "fmt" "io" "net/http" + "os" + "path/filepath" "sync" "time" @@ -23,6 +25,7 @@ import ( minio2 "github.com/minio/minio-go/v7" + "github.com/kubeshop/testkube/cmd/testworkflow-toolkit/artifacts" "github.com/kubeshop/testkube/internal/common" "github.com/kubeshop/testkube/pkg/log" "github.com/kubeshop/testkube/pkg/storage/minio" @@ -154,7 +157,7 @@ func (r *ObjectStorage) WaitForReady(ctx context.Context) error { } // TODO: Compress on-fly -func (r *ObjectStorage) Upload(ctx context.Context, path string, reader io.Reader, hash string) error { +func (r *ObjectStorage) Upload(ctx context.Context, path string, fsPath string, hash string) error { c, err := r.Client() if err != nil { return err @@ -162,21 +165,45 @@ func (r *ObjectStorage) Upload(ctx context.Context, path string, reader io.Reade if hash != "" && r.Is(path, hash) { return nil } - putUrl, err := c.PresignedPutObject(ctx, "devbox", path, 15*time.Minute) + //putUrl, err := c.PresignedPutObject(ctx, "devbox", path, 15*time.Minute) + putUrl, err := c.PresignHeader(ctx, "PUT", "devbox", path, 15*time.Minute, nil, http.Header{ + "X-Amz-Meta-Snowball-Auto-Extract": {"true"}, + "X-Amz-Meta-Minio-Snowball-Prefix": {filepath.Dir(path)}, + "Content-Type": {"application/gzip"}, + "Content-Encoding": {"gzip"}, + }) + if err != nil { + return err + } + + file, err := os.Open(fsPath) if err != nil { return err } + defer file.Close() + stat, err := file.Stat() + if err != nil { + return err + } + buf := new(bytes.Buffer) - //g := gzip.NewWriter(buf) - io.Copy(buf, reader) + tarStream := artifacts.NewTarStream() + go func() { + tarStream.Add(filepath.Base(path), file, stat) + tarStream.Close() + }() + io.Copy(buf, tarStream) + req, err := http.NewRequestWithContext(ctx, http.MethodPut, putUrl.String(), buf) if err != nil { return err } req.ContentLength = int64(buf.Len()) + req.Header.Set("X-Amz-Meta-Snowball-Auto-Extract", "true") + req.Header.Set("X-Amz-Meta-Minio-Snowball-Prefix", filepath.Dir(path)) + req.Header.Set("Content-Type", "application/gzip") + req.Header.Set("Content-Encoding", "gzip") - req.Header.Set("Content-Type", "application/octet-stream") - //req.Header.Set("Content-Encoding", "gzip") tr := http.DefaultTransport.(*http.Transport).Clone() tr.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} client := &http.Client{Transport: tr}