From bb7824e29e4e0c1921e40b7d3d93ca1cd742c3ed Mon Sep 17 00:00:00 2001 From: Will Baker Date: Tue, 1 Oct 2024 12:58:50 -0400 Subject: [PATCH] materialize-rockset: remove connector Rockset is no longer a commercial service, so this materialization is no longer functional. --- .github/workflows/ci.yaml | 3 - go.mod | 3 +- go.sum | 10 - ...stRocksetDriverSpec-TestEndpointSpecSchema | 32 -- ...stRocksetDriverSpec-TestResourceSpecSchema | 55 ---- materialize-rockset/CHANGELOG.md | 12 - materialize-rockset/Dockerfile | 45 --- materialize-rockset/README.md | 33 --- materialize-rockset/VERSION | 1 - .../cmd/connector/connector.go | 20 -- materialize-rockset/driver.go | 275 ------------------ materialize-rockset/driver_test.go | 243 ---------------- materialize-rockset/rockset.go | 95 ------ materialize-rockset/transactor.go | 211 -------------- 14 files changed, 1 insertion(+), 1037 deletions(-) delete mode 100644 materialize-rockset/.snapshots/TestRocksetDriverSpec-TestEndpointSpecSchema delete mode 100644 materialize-rockset/.snapshots/TestRocksetDriverSpec-TestResourceSpecSchema delete mode 100644 materialize-rockset/CHANGELOG.md delete mode 100644 materialize-rockset/Dockerfile delete mode 100644 materialize-rockset/README.md delete mode 100644 materialize-rockset/VERSION delete mode 100644 materialize-rockset/cmd/connector/connector.go delete mode 100644 materialize-rockset/driver.go delete mode 100644 materialize-rockset/driver_test.go delete mode 100644 materialize-rockset/rockset.go delete mode 100644 materialize-rockset/transactor.go diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index c9d102fc4f..09394af58a 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -110,7 +110,6 @@ jobs: - materialize-pinecone - materialize-postgres - materialize-redshift - - materialize-rockset - materialize-s3-csv - materialize-s3-iceberg - materialize-s3-parquet @@ -200,8 +199,6 @@ jobs: load: true build-args: BASE_IMAGE=ghcr.io/estuary/base-image:${{ steps.prep.outputs.tag }} tags: ghcr.io/estuary/${{ matrix.connector }}:local - secrets: | - "rockset_api_key=${{ secrets.ROCKSET_API_KEY }}" - name: Build ${{ matrix.connector }} Python Docker Image uses: docker/build-push-action@v2 diff --git a/go.mod b/go.mod index 8b23c0a311..f5dde0cb55 100644 --- a/go.mod +++ b/go.mod @@ -52,7 +52,6 @@ require ( github.com/minio/highwayhash v1.0.2 github.com/mitchellh/mapstructure v1.5.0 github.com/pkg/sftp v1.13.6 - github.com/rockset/rockset-go-client v0.15.4 github.com/segmentio/encoding v0.4.0 github.com/senseyeio/duration v0.0.0-20180430131211-7c2a214ada46 github.com/sijms/go-ora/v2 v2.8.19 @@ -68,6 +67,7 @@ require ( golang.org/x/crypto v0.24.0 golang.org/x/oauth2 v0.21.0 golang.org/x/sync v0.7.0 + golang.org/x/text v0.16.0 golang.org/x/time v0.5.0 google.golang.org/api v0.184.0 google.golang.org/genproto v0.0.0-20240610135401-a8a62080eff3 @@ -227,7 +227,6 @@ require ( golang.org/x/net v0.26.0 // indirect golang.org/x/sys v0.22.0 // indirect golang.org/x/term v0.22.0 // indirect - golang.org/x/text v0.16.0 // indirect golang.org/x/tools v0.22.0 // indirect golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect google.golang.org/appengine v1.6.8 // indirect diff --git a/go.sum b/go.sum index b6aa92dfbe..b099503e65 100644 --- a/go.sum +++ b/go.sum @@ -282,8 +282,6 @@ github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnht github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/colinmarc/hdfs/v2 v2.1.1/go.mod h1:M3x+k8UKKmxtFu++uAZ0OtDU8jR3jnaZIAc6yK4Ue0c= -github.com/confluentinc/confluent-kafka-go v1.9.2 h1:gV/GxhMBUb03tFWkN+7kdhg+zf+QUM+wVkI9zwh770Q= -github.com/confluentinc/confluent-kafka-go v1.9.2/go.mod h1:ptXNqsuDfYbAE/LBW6pnwWZElUoWxHoV8E43DCrliyo= github.com/containerd/continuity v0.3.0 h1:nisirsYROK15TAMVukJOUyGJjz4BNQJBVsNvAXZJ/eg= github.com/containerd/continuity v0.3.0/go.mod h1:wJEAIwKOm/pBZuBd0JmeTvnLquTB1Ag8espWhkykbPM= github.com/coreos/go-oidc/v3 v3.10.0 h1:tDnXHnLyiTVyT/2zLDGj09pFPkhND8Gl8lnTRhoEaJU= @@ -351,8 +349,6 @@ github.com/evanphx/json-patch/v5 v5.9.0/go.mod h1:VNkHZ/282BpEyt/tObQO8s5CMPmYYq github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= github.com/fatih/color v1.17.0 h1:GlRw1BRJxkpqUCBKzKOw098ed57fEsKeNjpTe3cSjK4= github.com/fatih/color v1.17.0/go.mod h1:YZ7TlrGPkiz6ku9fK3TLD/pl3CpsiFyu8N92HLgmosI= -github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo= -github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= @@ -402,8 +398,6 @@ github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LB github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= -github.com/go-zookeeper/zk v1.0.3 h1:7M2kwOsc//9VeeFiPtf+uSJlVpU66x9Ba5+8XK7/TDg= -github.com/go-zookeeper/zk v1.0.3/go.mod h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw= github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee h1:s+21KNqlpePfkah2I+gwHF8xmJWRjooY+5248k6m4A0= github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo= github.com/gobwas/pool v0.2.0 h1:QEmUOlnSjWtnpRGHF3SauEiOsy82Cup83Vf2LcMlnc8= @@ -756,8 +750,6 @@ github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0leargg github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= github.com/richardartoul/molecule v1.0.1-0.20240531184615-7ca0df43c0b3 h1:4+LEVOB87y175cLJC/mbsgKmoDOjrBldtXvioEy96WY= github.com/richardartoul/molecule v1.0.1-0.20240531184615-7ca0df43c0b3/go.mod h1:vl5+MqJ1nBINuSsUI2mGgH79UweUT/B5Fy8857PqyyI= -github.com/rockset/rockset-go-client v0.15.4 h1:Pw8Vj26ynVrnAWeeEiyqD2OBIQyl9UYSGnI3dOeajIo= -github.com/rockset/rockset-go-client v0.15.4/go.mod h1:U1lMhi5CHJ+Z6lzdeDYCUy19MKdoErFinRCCQK4u+IQ= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= @@ -768,8 +760,6 @@ github.com/rs/zerolog v1.28.0/go.mod h1:NILgTygv/Uej1ra5XxGf82ZFSLk58MFGAUS2o6us github.com/ruudk/golang-pdf417 v0.0.0-20181029194003-1af4ab5afa58/go.mod h1:6lfFZQK844Gfx8o5WFuvpxWRwnSoipWe/p622j1v06w= github.com/ryanuber/go-glob v1.0.0 h1:iQh3xXAumdQ+4Ufa5b25cRpC5TYKlno6hsv6Cb3pkBk= github.com/ryanuber/go-glob v1.0.0/go.mod h1:807d1WSdnB0XRJzKNil9Om6lcp/3a0v4qIHxIXzX/Yc= -github.com/seborama/govcr v4.5.0+incompatible h1:XvdHtXi0d4cUAn+0aWolvwfS3nmhNC8Z+yMQwn/M64I= -github.com/seborama/govcr v4.5.0+incompatible/go.mod h1:EgcISudCCYDLzbiAImJ8i7kk4+wTA44Kp+j4S0LhASI= github.com/secure-systems-lab/go-securesystemslib v0.8.0 h1:mr5An6X45Kb2nddcFlbmfHkLguCE9laoZCUzEEpIZXA= github.com/secure-systems-lab/go-securesystemslib v0.8.0/go.mod h1:UH2VZVuJfCYR8WgMlCU1uFsOUU+KeyrTWcSS73NBOzU= github.com/segmentio/asm v1.1.3 h1:WM03sfUOENvvKexOLp+pCqgb/WDjsi7EK8gIsICtzhc= diff --git a/materialize-rockset/.snapshots/TestRocksetDriverSpec-TestEndpointSpecSchema b/materialize-rockset/.snapshots/TestRocksetDriverSpec-TestEndpointSpecSchema deleted file mode 100644 index bcc77eb061..0000000000 --- a/materialize-rockset/.snapshots/TestRocksetDriverSpec-TestEndpointSpecSchema +++ /dev/null @@ -1,32 +0,0 @@ -{ - "$schema": "http://json-schema.org/draft/2020-12/schema", - "$id": "https://github.com/estuary/connectors/materialize-rockset/config", - "properties": { - "region_base_url": { - "type": "string", - "enum": [ - "api.usw2a1.rockset.com", - "api.use1a1.rockset.com", - "api.euc1a1.rockset.com" - ], - "title": "Region Base URL", - "description": "The base URL to connect to your Rockset deployment. Example: api.usw2a1.rockset.com (do not include the protocol).", - "multiline": true, - "order": 0 - }, - "api_key": { - "type": "string", - "title": "Rockset API Key", - "description": "The key used to authenticate to the Rockset API. Must have role of admin or member.", - "multiline": true, - "order": 1, - "secret": true - } - }, - "type": "object", - "required": [ - "region_base_url", - "api_key" - ], - "title": "Rockset Endpoint" -} diff --git a/materialize-rockset/.snapshots/TestRocksetDriverSpec-TestResourceSpecSchema b/materialize-rockset/.snapshots/TestRocksetDriverSpec-TestResourceSpecSchema deleted file mode 100644 index 83e8d7af7b..0000000000 --- a/materialize-rockset/.snapshots/TestRocksetDriverSpec-TestResourceSpecSchema +++ /dev/null @@ -1,55 +0,0 @@ -{ - "$schema": "http://json-schema.org/draft/2020-12/schema", - "$id": "https://github.com/estuary/connectors/materialize-rockset/resource", - "properties": { - "workspace": { - "type": "string", - "title": "Workspace", - "description": "The name of the Rockset workspace (will be created if it does not exist)" - }, - "collection": { - "type": "string", - "title": "Rockset Collection", - "description": "The name of the Rockset collection (will be created if it does not exist)", - "x-collection-name": true - }, - "advancedCollectionSettings": { - "properties": { - "retention_secs": { - "type": "integer", - "title": "Retention Period", - "description": "Number of seconds after which data is purged based on event time" - }, - "clustering_key": { - "items": { - "properties": { - "field_name": { - "type": "string", - "title": "Field Name", - "description": "The name of a field" - } - }, - "additionalProperties": false, - "type": "object", - "required": [ - "field_name" - ] - }, - "type": "array", - "title": "Clustering Key", - "description": "List of clustering fields" - } - }, - "additionalProperties": false, - "type": "object", - "title": "Advanced Collection Settings", - "advanced": true - } - }, - "type": "object", - "required": [ - "workspace", - "collection" - ], - "title": "Rockset Collection" -} diff --git a/materialize-rockset/CHANGELOG.md b/materialize-rockset/CHANGELOG.md deleted file mode 100644 index 43b6bfef0f..0000000000 --- a/materialize-rockset/CHANGELOG.md +++ /dev/null @@ -1,12 +0,0 @@ -# materialize-rockset - -## v2, 2022-12-06 -- Add required field `region_base_url` to configuration for setting the base URL. Previously this - would only use `api.rs2.usw2.rockset.com`. -- Remove advanced fields `event_time_info` & `insert_only`. These have been deprecated by Rockset - and are no longer used. -- Mark `workspace` and `collection` as required in the connector spec as these fields have always - been required for a spec to validate. - -## v1, 2022-07-27 -- Beginning of changelog. diff --git a/materialize-rockset/Dockerfile b/materialize-rockset/Dockerfile deleted file mode 100644 index bbd1835e4d..0000000000 --- a/materialize-rockset/Dockerfile +++ /dev/null @@ -1,45 +0,0 @@ -ARG BASE_IMAGE=ghcr.io/estuary/base-image:v1 - -# Build Stage -################################################################################ -FROM golang:1.22-bullseye as builder - -WORKDIR /builder - -# Download & compile dependencies early. Doing this separately allows for layer -# caching opportunities when no dependencies are updated. -COPY go.* ./ -RUN go mod download - -COPY go ./go -COPY materialize-boilerplate ./materialize-boilerplate -COPY materialize-rockset ./materialize-rockset - -# Test and build the connector. These hit the Rockset API, so it needs a real key. You can pass -# these with Buildkit's `--secret` argument to `docker build`. -# To do this locally, set a ROCKSET_API_KEY env var, then add the following to your normal `docker build` command: -# `--secret # id=rockset_api_key,env=ROCKSET_API_KEY` -# See https://andrei-calazans.com/posts/2021-06-23/passing-secrets-github-actions-docker for more -# info about build-time secrets. -RUN --mount=type=secret,id=rockset_api_key \ - export ROCKSET_API_KEY=$(cat /run/secrets/rockset_api_key) && \ - go test -tags nozstd -v ./materialize-rockset/ - -RUN go build -tags nozstd -v -o ./connector ./materialize-rockset/cmd/connector/ - -# Runtime Stage -################################################################################ -FROM ${BASE_IMAGE} - -WORKDIR /connector -ENV PATH="/connector:$PATH" - -# Bring in the compiled connector artifact from the builder. -COPY --from=builder /builder/connector ./materialize-rockset - -# Avoid running the connector as root. -USER nonroot:nonroot - -LABEL FLOW_RUNTIME_PROTOCOL=materialize - -ENTRYPOINT ["/connector/materialize-rockset"] diff --git a/materialize-rockset/README.md b/materialize-rockset/README.md deleted file mode 100644 index c2aecf7e86..0000000000 --- a/materialize-rockset/README.md +++ /dev/null @@ -1,33 +0,0 @@ -# Materialize Rockset - -This is a Flow connector that materializes delta updates of each document into a Rockset collection. - -## Getting Started - -Connector images are available at `ghcr.io/estuary/materialize-rockset`. - -The connector configuration must specify a [Rockset API key](https://rockset.com/docs/iam/#api-keys), which can be created in the [Rockset console](https://console.rockset.com/apikeys). You must also specify a [Region base URL](https://rockset.com/docs/rest-api/#introduction) where you desired deployment is active. - -For each Flow collection you'd like to materialize, add a binding with the names of the target Rockset workspace and collection. Both the workspace and collection will be created automatically by the connector if they don't already exist. - -**Example flow.yaml:** - -```yaml -materializations: - example/toRockset: - endpoint: - connector: - image: ghcr.io/estuary/materialize-rockset:dev - config: - region_base_url: - api_key: - bindings: - - resource: - workspace: - collection: - source: example/flow/collection -``` - -## Potential improvements - -Rockset supports a [`field_mapping_query`](https://rockset.com/docs/rest-api/#createcollection) when creating a collection. This can allow for specifying things like `_event_time` mappings and custom field mappings. This is not currently supported by the Flow materialization, but could be a potential enhancement in the future. \ No newline at end of file diff --git a/materialize-rockset/VERSION b/materialize-rockset/VERSION deleted file mode 100644 index 8c1384d825..0000000000 --- a/materialize-rockset/VERSION +++ /dev/null @@ -1 +0,0 @@ -v2 diff --git a/materialize-rockset/cmd/connector/connector.go b/materialize-rockset/cmd/connector/connector.go deleted file mode 100644 index b3d7142172..0000000000 --- a/materialize-rockset/cmd/connector/connector.go +++ /dev/null @@ -1,20 +0,0 @@ -package main - -import ( - "math/rand" - "os" - "time" - - boilerplate "github.com/estuary/connectors/materialize-boilerplate" - connector "github.com/estuary/connectors/materialize-rockset" - log "github.com/sirupsen/logrus" -) - -func main() { - rand.Seed(time.Now().UnixNano()) - log.SetFormatter(&log.JSONFormatter{}) - log.SetOutput(os.Stderr) - log.SetLevel(log.DebugLevel) - - boilerplate.RunMain(connector.NewRocksetDriver()) -} diff --git a/materialize-rockset/driver.go b/materialize-rockset/driver.go deleted file mode 100644 index 5263465c42..0000000000 --- a/materialize-rockset/driver.go +++ /dev/null @@ -1,275 +0,0 @@ -package materialize_rockset - -import ( - "context" - "encoding/json" - "fmt" - "regexp" - "strings" - - // importing tzdata is required so that time.LoadLocation can be used to validate timezones - // without requiring timezone packages to be installed on the system. - _ "time/tzdata" - - m "github.com/estuary/connectors/go/protocols/materialize" - schemagen "github.com/estuary/connectors/go/schema-gen" - pf "github.com/estuary/flow/go/protocols/flow" - pm "github.com/estuary/flow/go/protocols/materialize" - rockset "github.com/rockset/rockset-go-client" - "github.com/rockset/rockset-go-client/option" - log "github.com/sirupsen/logrus" -) - -type config struct { - RegionBaseUrl string `json:"region_base_url" jsonschema:"title=Region Base URL,description=The base URL to connect to your Rockset deployment. Example: api.usw2a1.rockset.com (do not include the protocol).,enum=api.usw2a1.rockset.com,enum=api.use1a1.rockset.com,enum=api.euc1a1.rockset.com" jsonschema_extras:"multiline=true,order=0"` - ApiKey string `json:"api_key" jsonschema:"title=Rockset API Key,description=The key used to authenticate to the Rockset API. Must have role of admin or member." jsonschema_extras:"secret=true,multiline=true,order=1"` -} - -func (c *config) Validate() error { - var requiredProperties = [][]string{ - {"api_key", c.ApiKey}, - {"region_base_url", c.RegionBaseUrl}, - } - for _, req := range requiredProperties { - if req[1] == "" { - return fmt.Errorf("missing '%s'", req[0]) - } - } - return nil -} - -func (c *config) client() (*rockset.RockClient, error) { - return rockset.NewClient(rockset.WithAPIKey(c.ApiKey), rockset.WithAPIServer(c.RegionBaseUrl)) -} - -// fieldPartition was copied from rtypes.FieldPartition and modified both to customize the json -// schema and to remove unnecessary fields. Turns out that all the fields except for `FieldName` are -// seemingly unnecessary, beucause the only supported `type` is `AUTO`, and the -// [docs](https://rockset.com/docs/rest-api/#createcollection) say that the `keys` are not needed if -// type is `AUTO`. -type fieldPartition struct { - FieldName string `json:"field_name" jsonschema:"title=Field Name,description=The name of a field\u002C parsed as a SQL qualified name"` -} - -func (f *fieldPartition) toOpt() option.CollectionOption { - return option.WithCollectionClusteringKey( - f.FieldName, - "AUTO", - []string{}, - ) -} - -// collectionSettings exposes a subset of the "advanced" options on rtypes.CreateCollectionRequest -type collectionSettings struct { - RetentionSecs *int64 `json:"retention_secs,omitempty" jsonschema:"title=Retention Period,description=Number of seconds after which data is purged based on event time"` - ClusteringKey []fieldPartition `json:"clustering_key,omitempty" jsonschema:"title=Clustering Key,description=List of clustering fields"` -} - -func (s *collectionSettings) Validate() error { - if s.RetentionSecs != nil && *s.RetentionSecs < 0 { - return fmt.Errorf("retention period cannot be negative") - } - // nothing to validate on ClusteringKey - return nil -} - -type resource struct { - Workspace string `json:"workspace" jsonschema:"title=Workspace,description=The name of the Rockset workspace (will be created if it does not exist)"` - // The name of the Rockset collection (will be created if it does not exist) - Collection string `json:"collection" jsonschema:"title=Rockset Collection,description=The name of the Rockset collection (will be created if it does not exist)" jsonschema_extras:"x-collection-name=true"` - // Additional settings for creating the Rockset collection, which are likely to be rarely used. - AdvancedCollectionSettings *collectionSettings `json:"advancedCollectionSettings,omitempty" jsonschema:"title=Advanced Collection Settings" jsonschema_extras:"advanced=true"` -} - -func (r *resource) Validate() error { - var requiredProperties = [][]string{ - {"workspace", r.Workspace}, - {"collection", r.Collection}, - } - for _, req := range requiredProperties { - if req[1] == "" { - return fmt.Errorf("missing '%s'", req[0]) - } - } - - if err := validateRocksetName("workspace", r.Workspace); err != nil { - return err - } - if err := validateRocksetName("collection", r.Collection); err != nil { - return err - } - - return nil -} - -func validateRocksetName(field string, value string) error { - // Alphanumeric or dash - if match, err := regexp.MatchString("\\A[[:alnum:]_-]+\\z", value); err != nil { - return fmt.Errorf("malformed regexp: %v", err) - } else if !match { - return fmt.Errorf("%s must be alphanumeric. got: %s", field, value) - } - return nil - -} - -type rocksetDriver struct{} - -func NewRocksetDriver() *rocksetDriver { - return new(rocksetDriver) -} - -// pm.DriverServer interface. -func (d *rocksetDriver) Spec(ctx context.Context, req *pm.Request_Spec) (*pm.Response_Spec, error) { - if err := req.Validate(); err != nil { - return nil, fmt.Errorf("validating request: %w", err) - } - - endpointSchema, err := schemagen.GenerateSchema("Rockset Endpoint", &config{}).MarshalJSON() - if err != nil { - return nil, fmt.Errorf("generating endpoint schema: %w", err) - } - resourceSchema, err := schemagen.GenerateSchema("Rockset Collection", &resource{}).MarshalJSON() - if err != nil { - return nil, fmt.Errorf("generating resource schema: %w", err) - } - - return &pm.Response_Spec{ - ConfigSchemaJson: json.RawMessage(endpointSchema), - ResourceConfigSchemaJson: json.RawMessage(resourceSchema), - DocumentationUrl: "https://go.estuary.dev/materialize-rockset", - }, nil -} - -// pm.DriverServer interface. -func (d *rocksetDriver) Validate(ctx context.Context, req *pm.Request_Validate) (*pm.Response_Validated, error) { - if err := req.Validate(); err != nil { - return nil, fmt.Errorf("validating request: %w", err) - } - - var bindings = []*pm.Response_Validated_Binding{} - for i, binding := range req.Bindings { - res, err := ResolveResourceConfig(binding.ResourceConfigJson) - if err != nil { - return nil, fmt.Errorf("building resource for binding %v: %w", i, err) - } - - var constraints = make(map[string]*pm.Response_Validated_Constraint) - for _, projection := range binding.Collection.Projections { - var constraint = &pm.Response_Validated_Constraint{} - if projection.Inference.IsSingleScalarType() { - constraint.Type = pm.Response_Validated_Constraint_LOCATION_RECOMMENDED - constraint.Reason = "The projection has a single scalar type." - } else { - constraint.Type = pm.Response_Validated_Constraint_FIELD_OPTIONAL - constraint.Reason = "The projection may materialize this field." - } - constraints[projection.Field] = constraint - } - - bindings = append(bindings, &pm.Response_Validated_Binding{ - Constraints: constraints, - ResourcePath: []string{res.Workspace, res.Collection}, - DeltaUpdates: true, - }) - } - var response = &pm.Response_Validated{Bindings: bindings} - - return response, nil -} - -func (d *rocksetDriver) Apply(ctx context.Context, req *pm.Request_Apply) (*pm.Response_Applied, error) { - var cfg, err = ResolveEndpointConfig(req.Materialization.ConfigJson) - if err != nil { - return nil, err - } - - client, err := cfg.client() - if err != nil { - return nil, err - } - - actionLog := []string{} - for i, binding := range req.Materialization.Bindings { - var res resource - if res, err = ResolveResourceConfig(binding.ResourceConfigJson); err != nil { - return nil, fmt.Errorf("building resource for binding %v: %w", i, err) - } - - if createdWorkspace, err := ensureWorkspaceExists(ctx, client, res.Workspace); err != nil { - return nil, err - } else if createdWorkspace != nil { - actionLog = append(actionLog, fmt.Sprintf("created %s workspace", *createdWorkspace.Name)) - } - - if createdCollection, err := ensureCollectionExists(ctx, client, &res); err != nil { - return nil, err - } else if createdCollection { - actionLog = append(actionLog, fmt.Sprintf("created %s collection", res.Collection)) - } - } - - response := &pm.Response_Applied{ - ActionDescription: strings.Join(actionLog, ", "), - } - - return response, nil -} - -func (d *rocksetDriver) NewTransactor(ctx context.Context, open pm.Request_Open) (m.Transactor, *pm.Response_Opened, error) { - cfg, err := ResolveEndpointConfig(open.Materialization.ConfigJson) - if err != nil { - return nil, nil, err - } - - client, err := cfg.client() - if err != nil { - return nil, nil, err - } - - var bindings = make([]*binding, 0, len(open.Materialization.Bindings)) - for i, spec := range open.Materialization.Bindings { - if res, err := ResolveResourceConfig(spec.ResourceConfigJson); err != nil { - return nil, nil, fmt.Errorf("building resource for binding %v: %w", i, err) - } else { - bindings = append(bindings, NewBinding(spec, &res)) - } - } - - // Ensure that all the collections are ready to accept writes before returning the opened - // response. The error that's returned when attempting to write to a non-ready collection is - // not considered retryable by the client library. - log.Info("Waiting for Rockset collections to be ready to accept writes") - for _, b := range bindings { - if err := client.WaitUntilCollectionReady(ctx, b.rocksetWorkspace(), b.rocksetCollection()); err != nil { - return nil, nil, fmt.Errorf("waiting for rockset collection %q to be ready: %w", b.rocksetCollection(), err) - } - log.WithFields(log.Fields{ - "workspace": b.rocksetWorkspace(), - "collection": b.rocksetCollection(), - }).Info("Rockset collection is ready to accept writes") - } - log.Info("All Rockset collections are ready to accept writes") - - return &transactor{ - config: &cfg, - client: client, - bindings: bindings, - }, &pm.Response_Opened{}, nil -} - -func ResolveEndpointConfig(specJson json.RawMessage) (config, error) { - var cfg = config{} - if err := pf.UnmarshalStrict(specJson, &cfg); err != nil { - return cfg, fmt.Errorf("parsing Rockset config: %w", err) - } - return cfg, cfg.Validate() -} - -func ResolveResourceConfig(specJson json.RawMessage) (resource, error) { - var cfg = resource{} - if err := pf.UnmarshalStrict(specJson, &cfg); err != nil { - return cfg, fmt.Errorf("parsing Rockset config: %w", err) - } - return cfg, cfg.Validate() -} diff --git a/materialize-rockset/driver_test.go b/materialize-rockset/driver_test.go deleted file mode 100644 index 2b6c6e70c0..0000000000 --- a/materialize-rockset/driver_test.go +++ /dev/null @@ -1,243 +0,0 @@ -package materialize_rockset - -import ( - "context" - "encoding/hex" - "encoding/json" - "fmt" - "log" - "math/rand" - "os" - "testing" - "time" - - "github.com/bradleyjkemp/cupaloy" - pf "github.com/estuary/flow/go/protocols/flow" - pm "github.com/estuary/flow/go/protocols/materialize" - "github.com/stretchr/testify/require" - pb "go.gazette.dev/core/broker/protocol" -) - -func TestRocksetConfig(t *testing.T) { - var invalid = config{} - require.NotNil(t, invalid.Validate()) - - var valid = config{ApiKey: "some_key", RegionBaseUrl: "www.something.com"} - require.Nil(t, valid.Validate()) -} - -func TestRocksetResource(t *testing.T) { - var invalid = resource{} - require.Error(t, invalid.Validate()) - - var too_ascii = resource{Workspace: "must only include letters and numbers", Collection: "widgets"} - require.Error(t, too_ascii.Validate()) - - var valid = resource{Workspace: "testing-33", Collection: "widgets_1"} - require.Nil(t, valid.Validate()) -} - -func TestRocksetDriverSpec(t *testing.T) { - var driver = new(rocksetDriver) - var specReq = pm.Request_Spec{} - var response, err = driver.Spec(context.Background(), &specReq) - require.NoError(t, err) - - t.Run("TestEndpointSpecSchema", func(t *testing.T) { - formatted, err := json.MarshalIndent(response.ConfigSchemaJson, "", " ") - require.NoError(t, err) - cupaloy.SnapshotT(t, string(formatted)) - }) - t.Run("TestResourceSpecSchema", func(t *testing.T) { - formatted, err := json.MarshalIndent(response.ResourceConfigSchemaJson, "", " ") - require.NoError(t, err) - cupaloy.SnapshotT(t, string(formatted)) - }) -} - -func TestRocksetDriverValidate(t *testing.T) { - driver := new(rocksetDriver) - config := testConfig() - var endpointSpecJson []byte - - endpointSpecJson, err := json.Marshal(config) - if err != nil { - t.Fatalf("failed to marshal config: %v", err) - return - } - - resource := resource{Workspace: "testing", Collection: "widgets"} - resourceSpecJson, err := json.Marshal(resource) - if err != nil { - t.Fatalf("failed to marshal resource: %v", err) - return - } - - projections := []pf.Projection{ - { - Ptr: "/foo", - Field: "foo", - Explicit: true, - IsPartitionKey: true, - IsPrimaryKey: true, - Inference: pf.Inference{Types: []string{"object"}}, - }, - { - Ptr: "/id", - Field: "id", - Explicit: true, - IsPartitionKey: true, - IsPrimaryKey: true, - Inference: pf.Inference{Types: []string{"string"}}, - }, - } - collection := pf.CollectionSpec{ - Name: "widgets", - Key: []string{"/id"}, - Projections: projections, - PartitionTemplate: &pf.JournalSpec{ - Name: "widgets", - Replication: 1, - LabelSet: pf.LabelSet{}, - Fragment: pb.JournalSpec_Fragment{ - Length: 2048, - CompressionCodec: pb.CompressionCodec_GZIP, - RefreshInterval: time.Hour * 2, - }}, - } - fieldConfigJson := make(map[string]json.RawMessage) - - bindings := []*pm.Request_Validate_Binding{ - { - ResourceConfigJson: resourceSpecJson, - Collection: collection, - FieldConfigJsonMap: fieldConfigJson, - }, - } - - var validateReq = pm.Request_Validate{ - Name: "just-a-test", - ConfigJson: endpointSpecJson, - Bindings: bindings, - } - response, err := driver.Validate(context.Background(), &validateReq) - - require.NoError(t, err) - require.NotNil(t, response.Bindings) - require.Len(t, response.Bindings, 1) - - var binding = response.Bindings[0] - require.Len(t, binding.Constraints, 2) - require.Equal(t, binding.ResourcePath, []string{"testing", "widgets"}) - require.True(t, binding.DeltaUpdates) -} - -func TestRocksetDriverApply(t *testing.T) { - workspaceName := randWorkspace() - collectionName := randCollection() - - driver := new(rocksetDriver) - config := testConfig() - - var endpointSpecJson []byte - endpointSpecJson, err := json.Marshal(config) - if err != nil { - t.Fatalf("failed to marshal config: %v", err) - return - } - - resource := resource{Workspace: workspaceName, Collection: collectionName} - resourceSpecJson, err := json.Marshal(resource) - if err != nil { - t.Fatalf("failed to marshal resource: %v", err) - return - } - - projections := []pf.Projection{ - { - Ptr: "/id", - Field: "id", - Explicit: true, - IsPartitionKey: true, - IsPrimaryKey: true, - Inference: pf.Inference{}, - }, - } - collection := pf.CollectionSpec{ - Name: pf.Collection(collectionName), - Key: []string{"/id"}, - Projections: projections, - } - - bindings := []*pf.MaterializationSpec_Binding{ - { - ResourceConfigJson: resourceSpecJson, - ResourcePath: []string{workspaceName, collectionName}, - Collection: collection, - FieldSelection: pf.FieldSelection{}, - DeltaUpdates: true, - }, - } - - var applyReq = pm.Request_Apply{ - Materialization: &pf.MaterializationSpec{ - Name: pf.Materialization(collectionName), - ConfigJson: endpointSpecJson, - Bindings: bindings, - }, - Version: "1", - } - - defer cleanup(config, workspaceName, collectionName) - - response, err := driver.Apply(context.Background(), &applyReq) - - require.NoError(t, err) - require.Contains(t, response.ActionDescription, fmt.Sprintf("created %s collection", collectionName)) - log.Printf("Applied: %s", response.ActionDescription) -} - -func cleanup(config config, workspaceName string, collectionName string) { - ctx := context.Background() - client, err := config.client() - if err != nil { - log.Fatalf("initializing client: %s", err.Error()) - } - - if err := client.DeleteCollection(ctx, workspaceName, collectionName); err != nil { - log.Fatalf("failed to cleanup collection: %s/%s: %s", workspaceName, collectionName, err.Error()) - } - - client.WaitUntilCollectionGone(ctx, workspaceName, collectionName) - - if err := client.DeleteWorkspace(ctx, workspaceName); err != nil { - log.Fatalf("failed to cleanup workspace: %s/%s: %s", workspaceName, collectionName, err.Error()) - } -} - -func testConfig() config { - return config{ - ApiKey: fetchApiKey(), - RegionBaseUrl: "api.usw2a1.rockset.com", - } -} - -func randWorkspace() string { - return fmt.Sprintf("automated-tests-%s", randString(6)) -} - -func randCollection() string { - return fmt.Sprintf("c-%s", randString(6)) -} - -func fetchApiKey() string { - return os.Getenv("ROCKSET_API_KEY") -} - -func randString(len int) string { - var buffer = make([]byte, len) - if _, err := rand.Read(buffer); err != nil { - panic("failed to generate random string") - } - return hex.EncodeToString(buffer) -} diff --git a/materialize-rockset/rockset.go b/materialize-rockset/rockset.go deleted file mode 100644 index 618fdcb43c..0000000000 --- a/materialize-rockset/rockset.go +++ /dev/null @@ -1,95 +0,0 @@ -package materialize_rockset - -import ( - "context" - "fmt" - - rockset "github.com/rockset/rockset-go-client" - rtypes "github.com/rockset/rockset-go-client/openapi" - "github.com/rockset/rockset-go-client/option" - log "github.com/sirupsen/logrus" -) - -// Only creates the named collection if it does not already exist. -func ensureWorkspaceExists(ctx context.Context, client *rockset.RockClient, workspace string) (*rtypes.Workspace, error) { - if res, err := getWorkspace(ctx, client, workspace); err != nil { - return nil, err - } else if res != nil { - // This workspace exists within Rockset already. - return nil, nil - } else { - // This collection does not exist within Rockset yet, so we should create it. - return createWorkspace(ctx, client, workspace) - } -} - -func getWorkspace(ctx context.Context, client *rockset.RockClient, workspace string) (*rtypes.Workspace, error) { - res, err := client.GetWorkspace(ctx, workspace) - if se, ok := err.(rockset.Error); ok && se.IsNotFoundError() { - // Everything worked, but this workspace does not exist. - return nil, nil - } else if err != nil { - return nil, fmt.Errorf("failed to fetch workspace `%s`: %w", workspace, err) - } else { - return &res, nil - } -} - -func createWorkspace(ctx context.Context, client *rockset.RockClient, workspaceName string) (*rtypes.Workspace, error) { - if res, err := client.CreateWorkspace(ctx, workspaceName); err != nil { - return nil, fmt.Errorf("failed to create workspace `%s`: %w", workspaceName, err) - } else { - return &res, nil - } -} - -// Only creates the named collection if it does not already exist. The returned boolean indicates whether it was -// actually created. It will be false if the collection already exists or if an error is returned. -func ensureCollectionExists(ctx context.Context, client *rockset.RockClient, resource *resource) (bool, error) { - if existingCollection, err := getCollection(ctx, client, resource.Workspace, resource.Collection); err != nil { - return false, err - } else if existingCollection != nil { - return false, nil - } else { - // This collection does not exist within Rockset yet, so we should create it. - var err = createCollection(ctx, client, resource) - return err == nil, err - } -} - -func getCollection(ctx context.Context, client *rockset.RockClient, workspace string, collection string) (*rtypes.Collection, error) { - res, err := client.GetCollection(ctx, workspace, collection) - if se, ok := err.(rockset.Error); ok && se.IsNotFoundError() { - // Everything worked, but this collection does not exist. - return nil, nil - } else if err != nil { - return nil, fmt.Errorf("failed to fetch collection `%s`: %w", collection, err) - } else { - return &res, nil - } -} - -func createCollection(ctx context.Context, client *rockset.RockClient, resource *resource) error { - var opts []option.CollectionOption - - if advanced := resource.AdvancedCollectionSettings; advanced != nil { - if advanced.RetentionSecs != nil { - opts = append(opts, option.WithCollectionRetentionSeconds(*advanced.RetentionSecs)) - } - - for _, field := range advanced.ClusteringKey { - opts = append(opts, field.toOpt()) - } - } - - log.WithFields( - log.Fields{ - "workspace": resource.Workspace, - "collection": resource.Collection, - }).Info("Will create a new Rockset collection") - _, err := client.CreateCollection(ctx, resource.Workspace, resource.Collection, opts...) - if err != nil { - return fmt.Errorf("failed to create collection `%s`: %w", resource.Collection, err) - } - return nil -} diff --git a/materialize-rockset/transactor.go b/materialize-rockset/transactor.go deleted file mode 100644 index a47f86c1ab..0000000000 --- a/materialize-rockset/transactor.go +++ /dev/null @@ -1,211 +0,0 @@ -package materialize_rockset - -import ( - "context" - "encoding/base64" - "encoding/json" - "errors" - "fmt" - - m "github.com/estuary/connectors/go/protocols/materialize" - "github.com/estuary/flow/go/protocols/fdb/tuple" - pf "github.com/estuary/flow/go/protocols/flow" - rockset "github.com/rockset/rockset-go-client" - "github.com/sirupsen/logrus" - "golang.org/x/sync/errgroup" -) - -// A binding represents the relationship between a single Flow Collection and a single Rockset Collection. -type binding struct { - spec *pf.MaterializationSpec_Binding - // User-facing configuration settings for this binding. - res *resource - addDocsCh chan<- map[string]interface{} -} - -func NewBinding(spec *pf.MaterializationSpec_Binding, res *resource) *binding { - return &binding{ - spec: spec, - res: res, - } -} - -func (b *binding) rocksetWorkspace() string { - return b.res.Workspace -} - -func (b *binding) rocksetCollection() string { - return b.res.Collection -} - -type transactor struct { - config *config - client *rockset.RockClient - bindings []*binding -} - -func (t *transactor) UnmarshalState(state json.RawMessage) error { return nil } -func (t *transactor) Acknowledge(ctx context.Context) (*pf.ConnectorState, error) { return nil, nil } - -func (t *transactor) Load(it *m.LoadIterator, loaded func(binding int, doc json.RawMessage) error) error { - for it.Next() { - panic("Rockset is not transactional - Load should never be called") - } - return nil -} - -// max number of documents to send with each request -const storeBatchSize = 256 - -func (t *transactor) Store(it *m.StoreIterator) (m.StartCommitFunc, error) { - var errGroup, ctx = errgroup.WithContext(it.Context()) - - for it.Next() { - var b *binding = t.bindings[it.Binding] - // Lazily initialize the goroutine that sends the documents to rockset. - if b.addDocsCh == nil { - var addDocsCh = make(chan map[string]interface{}, storeBatchSize*2) - b.addDocsCh = addDocsCh - errGroup.Go(func() error { - return t.sendAllDocuments(ctx, b, addDocsCh) - }) - logrus.WithFields(logrus.Fields{ - "rocksetCollection": b.rocksetCollection(), - "rocksetWorkspace": b.rocksetWorkspace(), - }).Debug("Started AddDocuments background worker") - } - - var doc = buildDocument(b, it.Key, it.Values) - select { - case b.addDocsCh <- doc: - continue - case <-ctx.Done(): - err := ctx.Err() - - if errors.Is(err, context.Canceled) { - err = errGroup.Wait() - } - - return nil, err - } - } - - for _, binding := range t.bindings { - if binding.addDocsCh != nil { - close(binding.addDocsCh) - binding.addDocsCh = nil - logrus.WithFields(logrus.Fields{ - "rocksetCollection": binding.rocksetCollection(), - "rocksetWorkspace": binding.rocksetWorkspace(), - }).Debug("Closed AddDocuments channel") - } - } - - return nil, errGroup.Wait() -} - -func (t *transactor) Destroy() { - // Nothing to clean up -} - -func buildDocument(b *binding, keys, values tuple.Tuple) map[string]interface{} { - var document = make(map[string]interface{}) - - // Add the `_id` field to the document. This is required by Rockset. - document["_id"] = base64.RawStdEncoding.EncodeToString(keys.Pack()) - - // Add the keys to the document. - for i, value := range keys { - var propName = b.spec.FieldSelection.Keys[i] - document[propName] = value - } - - // Add the non-keys to the document. - for i, value := range values { - var propName = b.spec.FieldSelection.Values[i] - - if raw, ok := value.([]byte); ok { - document[propName] = json.RawMessage(raw) - } else { - document[propName] = value - } - } - return document -} - -func (t *transactor) sendAllDocuments(ctx context.Context, b *binding, addDocsCh <-chan map[string]interface{}) error { - var docs = make([]interface{}, 0, storeBatchSize) - - var docCount = 0 - for addDocsCh != nil { - select { - case <-ctx.Done(): - return ctx.Err() - case doc, ok := <-addDocsCh: - if ok { - docCount++ - docs = append(docs, doc) - } else { - logrus.WithFields(logrus.Fields{ - "rocksetCollection": b.rocksetCollection(), - }).Debug("store channel closed") - // Set channel to nil so that we don't try to read from it again - addDocsCh = nil - } - } - if len(docs) == storeBatchSize { - if err := t.sendReq(ctx, b, docs); err != nil { - return err - } - docs = docs[:0] - } - } - if len(docs) > 0 { - if err := t.sendReq(ctx, b, docs); err != nil { - return err - } - } - logrus.WithFields(logrus.Fields{ - "rocksetCollection": b.rocksetCollection(), - "nDocuments": docCount, - }).Debug("successfully persisted documents to Rockset") - return nil -} - -func (t *transactor) sendReq(ctx context.Context, b *binding, docs []interface{}) error { - docStatuses, err := t.client.AddDocuments(ctx, b.rocksetWorkspace(), b.rocksetCollection(), docs) - if err != nil { - return err - } - // Rockset's API doesn't fail the whole request due to an error with a single document, - // so we need to iterate over each of the returned statuses and check them individually. - // We'll log _all_ the errors, since it's unclear whether they'll all be the same or if the - // order is significant. - for _, docStatus := range docStatuses { - if docStatus.Error != nil { - // The error model has quite a few fields that seem worth logging. The naming - // here is an attempt to clarify the provenance of the error info. - var e = docStatus.Error - // I'd hope that e.Message is never nil, but a generic error message is better - // than _no_ error message just in case it is. - var errMsg = "Rockset API error" - if e.Message != nil && *e.Message != "" { - errMsg = *e.Message // ugh - } - logrus.WithFields(logrus.Fields{ - "error": errMsg, - "rocksetErrorType": e.Type, - "rocksetTraceId": e.TraceId, - "rocksetErrorId": e.ErrorId, - "documentStatus": docStatus.Status, - "rocksetCollection": docStatus.Collection, - "rocksetDocumentId": docStatus.Id, - }).Error("Document was rejected by Rockset API") - - if err == nil { - err = fmt.Errorf("a document was rejected by the Rockset API for Collection: '%s'", b.rocksetCollection()) - } - } - } - return err -}