Skip to content

Commit

Permalink
feat: support templating for http sinks (#45)
Browse files Browse the repository at this point in the history
  • Loading branch information
haveiss authored Nov 20, 2023
1 parent 03d709b commit 16f8852
Show file tree
Hide file tree
Showing 5 changed files with 242 additions and 10 deletions.
43 changes: 41 additions & 2 deletions plugins/sinks/http/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,58 @@ sinks:
config:
method: POST
success_code: 200
url: https://compass.com/v1beta1/asset
url: https://compass.requestcatcher.com/v1beta2/asset/{{ .Type }}/{{ .Urn }}
headers:
Header-1: value11,value12
script:
engine: tengo
source: |
payload := {
details: {
some_key: asset.urn,
another_key: asset.name
}
}
sink(payload)
```
## Config Defination
| Key | Value | Example | Description | |
| :-- | :---- | :------ | :---------- | :-- |
|`url` | `string` | `http://compass.production.com/v1beta1/asset` | URL to the http server, contains all the info needed to make the request, like port and route | *required*|
|`url` | `string` | `https://compass.requestcatcher.com/v1beta2/asset/{{ .Type }}/{{ .Urn }}` | URL to the http server, contains all the info needed to make the request, like port and route, support go [text/template](https://pkg.go.dev/text/template) (see the properties in [v1beta2.Asset](https://github.com/goto/meteor/blob/main/models/gotocompany/assets/v1beta2/asset.pb.go#L25-L68)) | *required*|
| `method` | `string` | `POST` | the method string of by which the request is to be made, e.g. POST/PATCH/GET | *required* |
| `success_code` | `integer` | `200` | to identify the expected success code the http server returns, defult is `200` | *optional* |
| `headers` | `map` | `"Content-Type": "application/json"` | to add any header/headers that may be required for making the request | *optional* |
| `script` | `Object` | see [Script](#Script) | Script for building custom payload | *optional |

## Script

| Key | Value | Example | Description | |
| :-- | :---- | :------ | :---------- | :-- |
| `engine` | `string` | `tengo` | Script engine. Only `"tengo"` is supported currently | *required* |
| `source` | `string` | see [Usage](#Usage). | [Tengo][tengo] script used to map the request into custom payload to be sent to url. | *required* |

### Script Globals

- [`asset`](#recipe_scope)
- [`sink(Payload)`](#sinkpayload)
- [`exit`](#exit)

#### `asset`

The asset record emitted by the extractor and processors is made available in the script
environment as `asset`. The field names will be as
per the [`Asset` proto definition](https://github.com/goto/proton/blob/5b5dc72/gotocompany/assets/v1beta2/asset.proto#L14).

#### `sink(Payload)`

Send http request to url with payload.


#### `exit()`

Terminates the script execution.

## Contributing

Expand Down
43 changes: 43 additions & 0 deletions plugins/sinks/http/fixtures/response_with_script.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
---
version: 1
interactions:
- request:
body: '{"details":{"another_key":"index1","some_key":"elasticsearch.index1"}}'
form: {}
headers:
Accept:
- application/json
Content-Type:
- application/json
url: http://127.0.0.1:54943/table/elasticsearch.index1
method: POST
response:
body: ""
headers:
Content-Length:
- "0"
Date:
- Mon, 11 Jul 2022 09:32:21 GMT
status: 200 OK
code: 200
duration: 274.68µs
- request:
body: '{"details":{"another_key":"index2","some_key":"elasticsearch.index2"}}'
form: {}
headers:
Accept:
- application/json
Content-Type:
- application/json
url: http://127.0.0.1:54943/table/elasticsearch.index2
method: POST
response:
body: ""
headers:
Content-Length:
- "0"
Date:
- Mon, 11 Jul 2022 09:32:21 GMT
status: 200 OK
code: 200
duration: 173.792µs
48 changes: 40 additions & 8 deletions plugins/sinks/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ import (
_ "embed"
"encoding/json"
"fmt"
"html/template"
"io"
"net/http"
"strings"

"github.com/MakeNowJust/heredoc"
"github.com/goto/meteor/metrics/otelhttpclient"
"github.com/goto/meteor/models"
v1beta2 "github.com/goto/meteor/models/gotocompany/assets/v1beta2"
"github.com/goto/meteor/plugins"
"github.com/goto/meteor/registry"
"github.com/goto/salt/log"
Expand All @@ -26,6 +28,10 @@ type Config struct {
Headers map[string]string `mapstructure:"headers"`
Method string `mapstructure:"method" validate:"required"`
SuccessCode int `mapstructure:"success_code" default:"200"`
Script *struct {
Engine string `mapstructure:"engine" validate:"required,oneof=tengo"`
Source string `mapstructure:"source" validate:"required"`
} `mapstructure:"script"`
}

var info = plugins.Info{
Expand All @@ -34,11 +40,21 @@ var info = plugins.Info{
Tags: []string{"http", "sink"},
SampleConfig: heredoc.Doc(`
# The url (hostname and route) of the http service
url: https://compass.com/route
url: https://compass.requestcatcher.com/{{ .Type }}/{{ .Urn }}
method: "PUT"
# Additional HTTP headers, multiple headers value are separated by a comma
headers:
X-Other-Header: value1, value2
script:
engine: tengo
source: |
payload := {
details: {
some_key: asset.urn,
another_key: asset.name
}
}
sink(payload)
`),
}

Expand Down Expand Up @@ -75,12 +91,8 @@ func (s *Sink) Sink(ctx context.Context, batch []models.Record) error {
for _, record := range batch {
metadata := record.Data()
s.logger.Info("sinking record to http", "record", metadata.Urn)
payload, err := json.Marshal(metadata)
if err != nil {
return fmt.Errorf("build http payload: %w", err)
}

if err = s.send(ctx, payload); err != nil {
if err := s.send(ctx, metadata); err != nil {
return fmt.Errorf("send data: %w", err)
}

Expand All @@ -92,9 +104,29 @@ func (s *Sink) Sink(ctx context.Context, batch []models.Record) error {

func (*Sink) Close() error { return nil }

func (s *Sink) send(ctx context.Context, payloadBytes []byte) error {
func (s *Sink) send(ctx context.Context, asset *v1beta2.Asset) error {
t := template.Must(template.New("url").Parse(s.config.URL))
var buf bytes.Buffer
if err := t.Execute(&buf, asset); err != nil {
return fmt.Errorf("build http url: %w", err)
}
url := buf.String()

if s.config.Script != nil {
return s.executeScript(ctx, url, asset)
}

payload, err := json.Marshal(asset)
if err != nil {
return fmt.Errorf("build http payload: %w", err)
}

// send request
req, err := http.NewRequestWithContext(ctx, s.config.Method, s.config.URL, bytes.NewBuffer(payloadBytes))
return s.makeRequest(ctx, url, payload)
}

func (s *Sink) makeRequest(ctx context.Context, url string, payload []byte) error {
req, err := http.NewRequestWithContext(ctx, s.config.Method, url, bytes.NewBuffer(payload))
if err != nil {
return err
}
Expand Down
40 changes: 40 additions & 0 deletions plugins/sinks/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,46 @@ func TestSink(t *testing.T) {
err = httpSink.Sink(context.TODO(), getExpectedVal(t))
assert.NoError(t, err)
})

t.Run("should return no error when using templates", func(t *testing.T) {
r, err := recorder.New("fixtures/response_with_script")
if err != nil {
t.Fatal(err)
}
defer func() {
err := r.Stop()
if err != nil {
t.Fatal(err)
}
}()
httpSink := h.New(&http.Client{Transport: r}, testutils.Logger)
config := map[string]interface{}{
"success_code": success_code,
"url": "http://127.0.0.1:54943/{{ .Type }}/{{ .Urn }}",
"method": "POST",
"headers": map[string]string{
"Content-Type": "application/json",
"Accept": "application/json",
},
"script": map[string]interface{}{
"engine": "tengo",
"source": `
payload := {
details: {
some_key: asset.urn,
another_key: asset.name
}
}
sink(payload)
`,
},
}
err = httpSink.Init(context.TODO(), plugins.Config{RawConfig: config})
assert.NoError(t, err)
defer httpSink.Close()
err = httpSink.Sink(context.TODO(), getExpectedVal(t))
assert.NoError(t, err)
})
}

func getExpectedVal(t *testing.T) []models.Record {
Expand Down
78 changes: 78 additions & 0 deletions plugins/sinks/http/tengo_script.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package http

import (
"context"
"encoding/json"
"errors"
"fmt"

"github.com/d5/tengo/v2"
v1beta2 "github.com/goto/meteor/models/gotocompany/assets/v1beta2"
"github.com/goto/meteor/plugins/internal/tengoutil"
"github.com/goto/meteor/plugins/internal/tengoutil/structmap"
)

var errUserExit = errors.New("user exit")

func (s *Sink) executeScript(ctx context.Context, url string, asset *v1beta2.Asset) error {
scriptCfg := s.config.Script
script, err := tengoutil.NewSecureScript(
([]byte)(scriptCfg.Source), s.scriptGlobals(ctx, url),
)
if err != nil {
return err
}

c, err := script.Compile()
if err != nil {
return fmt.Errorf("compile: %w", err)
}

assetMap, err := structmap.AsMap(asset)
if err != nil {
return fmt.Errorf("convert asset to map: %w", err)
}

if err := c.Set("asset", assetMap); err != nil {
return fmt.Errorf("set asset into vm: %w", err)
}

if err := c.RunContext(ctx); err != nil && !errors.Is(err, errUserExit) {
return fmt.Errorf("run: %w", err)
}

return nil
}

func (s *Sink) scriptGlobals(ctx context.Context, url string) map[string]interface{} {
return map[string]interface{}{
"asset": map[string]interface{}{},
"sink": &tengo.UserFunction{
Name: "sink",
Value: s.executeRequestWrapper(ctx, url),
},
"exit": &tengo.UserFunction{
Name: "exit",
Value: func(...tengo.Object) (tengo.Object, error) {
return nil, errUserExit
},
},
}
}

func (s *Sink) executeRequestWrapper(ctx context.Context, url string) tengo.CallableFunc {
return func(args ...tengo.Object) (tengo.Object, error) {
if len(args) != 1 {
return nil, fmt.Errorf("execute request: invalid number of arguments of sink function, expected 1, got %d", len(args))
}
payloadObj, ok := args[0].(*tengo.Map)
if !ok {
return nil, fmt.Errorf("execute request: invalid type of argument of sink function, expected map, got %T", args[0])
}
payload, err := json.Marshal(tengo.ToInterface(payloadObj))
if err != nil {
return nil, fmt.Errorf("execute request: marshal payload: %w", err)
}
return nil, s.makeRequest(ctx, url, payload)
}
}

0 comments on commit 16f8852

Please sign in to comment.