Skip to content

Commit

Permalink
feat: add kafka resource type (#109)
Browse files Browse the repository at this point in the history
  • Loading branch information
FemiNoviaLina authored Sep 9, 2024
1 parent 417f10e commit 80d5637
Show file tree
Hide file tree
Showing 13 changed files with 417 additions and 16 deletions.
2 changes: 2 additions & 0 deletions cli/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/goto/entropy/modules"
"github.com/goto/entropy/modules/firehose"
"github.com/goto/entropy/modules/job"
"github.com/goto/entropy/modules/kafka"
"github.com/goto/entropy/modules/kubernetes"
"github.com/goto/entropy/pkg/logger"
"github.com/goto/entropy/pkg/telemetry"
Expand Down Expand Up @@ -90,6 +91,7 @@ func setupRegistry() module.Registry {
kubernetes.Module,
firehose.Module,
job.Module,
kafka.Module,
}

registry := &modules.Registry{}
Expand Down
8 changes: 4 additions & 4 deletions internal/store/postgres/resource_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
)

const listResourceByFilterQuery = `SELECT r.id, r.urn, r.kind, r.name, r.project, r.created_at, r.updated_at, r.state_status, r.state_output, r.state_module_data, r.state_next_sync, r.state_sync_result, r.created_by, r.updated_by,
array_agg(rt.tag)::text[] AS tags,
jsonb_object_agg(COALESCE(rd.dependency_key, ''), d.urn) AS dependencies
COALESCE(NULLIF(array_agg(rt.tag), '{NULL}'), '{}')::text[] AS tags,
jsonb_object_agg(COALESCE(rd.dependency_key, ''), d.urn) AS dependencies
FROM resources r
LEFT JOIN resource_dependencies rd ON r.id = rd.resource_id
LEFT JOIN resources d ON rd.depends_on = d.id
Expand All @@ -28,8 +28,8 @@ OFFSET $4
`

const listResourceWithSpecConfigsByFilterQuery = `SELECT r.id, r.urn, r.kind, r.name, r.project, r.created_at, r.updated_at, r.spec_configs, r.state_status, r.state_output, r.state_module_data, r.state_next_sync, r.state_sync_result, r.created_by, r.updated_by,
array_agg(rt.tag)::text[] AS tags,
jsonb_object_agg(COALESCE(rd.dependency_key, ''), d.urn) AS dependencies
COALESCE(NULLIF(array_agg(rt.tag), '{NULL}'), '{}')::text[] AS tags,
jsonb_object_agg(COALESCE(rd.dependency_key, ''), d.urn) AS dependencies
FROM resources r
LEFT JOIN resource_dependencies rd ON r.id = rd.resource_id
LEFT JOIN resources d ON rd.depends_on = d.id
Expand Down
74 changes: 74 additions & 0 deletions modules/kafka/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package kafka

import (
_ "embed"
"encoding/json"

"github.com/goto/entropy/core/resource"
"github.com/goto/entropy/pkg/errors"
"github.com/goto/entropy/pkg/validator"
)

var (
//go:embed schema/config.json
configSchemaRaw []byte
validateConfig = validator.FromJSONSchema(configSchemaRaw)
)

type Config struct {
Entity string `json:"entity,omitempty"`
Environment string `json:"environment,omitempty"`
Landscape string `json:"landscape,omitempty"`
Organization string `json:"organization,omitempty"`
AdvertiseMode AdvertiseMode `json:"advertise_mode"`
Brokers []Broker `json:"brokers,omitempty"`
Type string `json:"type"`
}

type AdvertiseMode struct {
Host string `json:"host"`
Address string `json:"address"`
}

type Broker struct {
Name string `json:"name"`
Host string `json:"host"`
Address string `json:"address"`
}

func readConfig(res resource.Resource, confJSON json.RawMessage, dc driverConf) (*Config, error) {
var resCfg, cfg Config

if err := json.Unmarshal(confJSON, &cfg); err != nil {
return nil, errors.ErrInvalid.WithMsgf("failed to unmarshal").WithCausef(err.Error())
}

if res.Spec.Configs != nil {
if err := json.Unmarshal(res.Spec.Configs, &resCfg); err != nil {
return nil, errors.ErrInvalid.WithMsgf("failed to unmarshal").WithCausef(err.Error())
}
}

if cfg.Type == "" {
if resCfg.Type != "" {
cfg.Type = resCfg.Type
} else {
cfg.Type = dc.Type
}
}

if cfg.Brokers == nil {
cfg.Brokers = resCfg.Brokers
}

newConfJSON, err := json.Marshal(cfg)
if err != nil {
return nil, errors.ErrInvalid.WithMsgf("failed to marshal").WithCausef(err.Error())
}

if err := validateConfig(newConfJSON); err != nil {
return nil, err
}

return &cfg, nil
}
94 changes: 94 additions & 0 deletions modules/kafka/driver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package kafka

import (
"context"
"encoding/json"
"fmt"
"strings"

"github.com/goto/entropy/core/module"
"github.com/goto/entropy/core/resource"
"github.com/goto/entropy/modules"
)

var defaultDriverConf = driverConf{
Type: "source",
}

type kafkaDriver struct {
conf driverConf
}

type Output struct {
URL string `json:"url"`
}

type driverConf struct {
Type string `json:"type"`
}

func (m *kafkaDriver) Plan(ctx context.Context, res module.ExpandedResource,
act module.ActionRequest,
) (*resource.Resource, error) {
cfg, err := readConfig(res.Resource, act.Params, m.conf)
if err != nil {
return nil, err
}

res.Resource.Spec = resource.Spec{
Configs: modules.MustJSON(cfg),
Dependencies: nil,
}

res.Resource.State = resource.State{
Status: resource.StatusCompleted,
Output: modules.MustJSON(Output{
URL: mapUrl(cfg),
}),
}

return &res.Resource, nil
}

func (*kafkaDriver) Sync(_ context.Context, res module.ExpandedResource) (*resource.State, error) {
return &resource.State{
Status: resource.StatusCompleted,
Output: res.Resource.State.Output,
ModuleData: nil,
}, nil
}

func (m *kafkaDriver) Output(ctx context.Context, res module.ExpandedResource) (json.RawMessage, error) {
cfg, err := readConfig(res.Resource, res.Resource.Spec.Configs, m.conf)
if err != nil {
return nil, err
}

return modules.MustJSON(Output{
URL: mapUrl(cfg),
}), nil
}

func mapUrl(cfg *Config) string {
var mode, port string
if cfg.AdvertiseMode.Address != "" {
mode = "address"
port = cfg.AdvertiseMode.Address
} else {
mode = "host"
port = cfg.AdvertiseMode.Host
}

var urls []string
for _, broker := range cfg.Brokers {
var addr string
if mode == "address" {
addr = broker.Address
} else {
addr = broker.Host
}
urls = append(urls, fmt.Sprintf("%s:%s", addr, port))
}

return strings.Join(urls, ",")
}
24 changes: 24 additions & 0 deletions modules/kafka/module.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package kafka

import (
"encoding/json"

"github.com/goto/entropy/core/module"
)

var Module = module.Descriptor{
Kind: "kafka",
Actions: []module.ActionDesc{
{
Name: module.CreateAction,
},
{
Name: module.UpdateAction,
},
},
DriverFactory: func(_ json.RawMessage) (module.Driver, error) {
return &kafkaDriver{
conf: defaultDriverConf,
}, nil
},
}
53 changes: 53 additions & 0 deletions modules/kafka/schema/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "http://json-schema.org/draft-07/schema#",
"type": "object",
"required": ["type", "brokers"],
"properties": {
"type": {
"type": "string"
},
"advertise_mode": {
"type": "object",
"additionalProperties": true,
"properties": {
"host": {
"type": "string"
},
"address": {
"type": "string"
}
}
},
"brokers": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {
"type": "string"
},
"host": {
"type": "string"
},
"address": {
"type": "string"
}
},
"required": ["name", "host", "address"]
}
},
"entity": {
"type": "string"
},
"environment": {
"type": "string"
},
"landscape": {
"type": "string"
},
"organization": {
"type": "string"
}
}
}
2 changes: 1 addition & 1 deletion test/e2e_test/firehose_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type FirehoseTestSuite struct {
}

func (s *FirehoseTestSuite) SetupTest() {
s.ctx, s.moduleClient, s.resourceClient, s.appConfig, s.pool, s.resource, s.kubeProvider, s.cancelModuleClient, s.cancelResourceClient, s.cancel = testbench.SetupTests(s.T(), true)
s.ctx, s.moduleClient, s.resourceClient, s.appConfig, s.pool, s.resource, s.kubeProvider, s.cancelModuleClient, s.cancelResourceClient, s.cancel = testbench.SetupTests(s.T(), true, true)

modules, err := s.moduleClient.ListModules(s.ctx, &entropyv1beta1.ListModulesRequest{})
s.Require().NoError(err)
Expand Down
Loading

0 comments on commit 80d5637

Please sign in to comment.