Skip to content

Commit

Permalink
materialize-pinecone: update to Pinecone Go SDK
Browse files Browse the repository at this point in the history
This updates our connector to use the Pinecone Go SDK, which will allow support
for serverless indices, among other things.

Also a few updates that I have made at my discretion, since now seems like a
good time to make updates:

* We will require a namespace input for the resource config. It's no longer
  possible to create a Pinecone index that doesn't support namespaces.
* Various checks related to Pinecone indexes that don't support namespaces are
  removed, since that isn't possible anymore.
* Include the configured index name in the resource path, in case we ever want
  to support multiple indices in the same materialization. This is sort of like
  Pinecone's concept of a "schema", so we'll probably want to do this.
* Hex-encode embedding document keys, since that is what we do for other
  materializations like this.
* Implement batch flushing after the Store iterator ticks over to the next
  binding.
  • Loading branch information
williamhbaker committed Oct 4, 2024
1 parent c7c25f7 commit 5219a94
Show file tree
Hide file tree
Showing 12 changed files with 245 additions and 520 deletions.
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ require (
golang.org/x/time v0.5.0
google.golang.org/api v0.184.0
google.golang.org/genproto v0.0.0-20240610135401-a8a62080eff3
google.golang.org/grpc v1.64.1
google.golang.org/grpc v1.65.0
google.golang.org/protobuf v1.34.2
vitess.io/vitess v0.15.3
)
Expand Down Expand Up @@ -115,6 +115,7 @@ require (
github.com/apache/arrow/go/v15 v15.0.2 // indirect
github.com/apache/arrow/go/v16 v16.0.0 // indirect
github.com/apache/thrift v0.20.0 // indirect
github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.13 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.2 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.40 // indirect
Expand Down Expand Up @@ -180,11 +181,13 @@ require (
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/montanaflynn/stats v0.7.1 // indirect
github.com/mtibben/percent v0.2.1 // indirect
github.com/oapi-codegen/runtime v1.1.1 // indirect
github.com/opentracing-contrib/go-grpc v0.0.0-20210225150812-73cb765af46e // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/outcaste-io/ristretto v0.2.3 // indirect
github.com/philhofer/fwd v1.1.2 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pinecone-io/go-pinecone v1.1.1 // indirect
github.com/pingcap/errors v0.11.5-0.20240311024730-e056997136bb // indirect
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect
github.com/pkg/errors v0.9.1 // indirect
Expand Down
12 changes: 12 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw=
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk=
github.com/RobinUS2/golang-moving-average v1.0.0/go.mod h1:MdzhY+KoEvi+OBygTPH0OSaKrOJzvILWN2SPQzaKVsY=
github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
github.com/alpacahq/alpaca-trade-api-go/v2 v2.8.0 h1:9rsXPiOSewm+zGoupFzCjxiOcYjJcCMunuCrXiEsrSc=
Expand All @@ -159,6 +160,8 @@ github.com/apache/thrift v0.0.0-20181112125854-24918abba929/go.mod h1:cp2SuWMxlE
github.com/apache/thrift v0.14.2/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/apache/thrift v0.20.0 h1:631+KvYbsBZxmuJjYwhezVsrfc/TbqtZV4QcxOX1fOI=
github.com/apache/thrift v0.20.0/go.mod h1:hOk1BQqcp2OLzGsyVXdfMk7YFlMxK3aoEVhjD06QhB8=
github.com/apapsch/go-jsonmerge/v2 v2.0.0 h1:axGnT1gRIfimI7gJifB699GoE/oq+F2MU7Dml6nw9rQ=
github.com/apapsch/go-jsonmerge/v2 v2.0.0/go.mod h1:lvDnEdqiQrp0O42VQGgmlKpxL1AP2+08jFMw88y4klk=
github.com/aws/aws-sdk-go v1.30.19/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/aws/aws-sdk-go v1.54.2 h1:Wo6AVWcleNHrYa48YzfYz60hzxGRqsJrK5s/qePe+3I=
github.com/aws/aws-sdk-go v1.54.2/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU=
Expand Down Expand Up @@ -257,6 +260,7 @@ github.com/aws/smithy-go v1.20.2/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w=
github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8=
github.com/bradleyjkemp/cupaloy v2.3.0+incompatible h1:UafIjBvWQmS9i/xRg+CamMrnLTKNzo+bdmT/oH34c2Y=
github.com/bradleyjkemp/cupaloy v2.3.0+incompatible/go.mod h1:Au1Xw1sgaJ5iSFktEhYsS0dbQiS1B0/XMXl+42y9Ilk=
Expand Down Expand Up @@ -618,6 +622,7 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
github.com/juju/gnuflag v0.0.0-20171113085948-2ce1bb71843d/go.mod h1:2PavIy+JPciBPrBUjwbNvtwB6RQlve+hkpll6QSNmOE=
github.com/jung-kurt/gofpdf v1.0.0/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes=
github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
Expand Down Expand Up @@ -694,6 +699,8 @@ github.com/mtibben/percent v0.2.1 h1:5gssi8Nqo8QU/r2pynCm+hBQHpkB/uNK7BJCFogWdzs
github.com/mtibben/percent v0.2.1/go.mod h1:KG9uO+SZkUp+VkRHsCdYQV3XSZrrSpR3O9ibNBTZrns=
github.com/ncw/swift v1.0.52/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/ZM=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/oapi-codegen/runtime v1.1.1 h1:EXLHh0DXIJnWhdRPN2w4MXAzFyE4CskzhNLUmtpMYro=
github.com/oapi-codegen/runtime v1.1.1/go.mod h1:SK9X900oXmPWilYR5/WKPzt3Kqxn/uS/+lbpREv+eCg=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
github.com/opencontainers/image-spec v1.0.2 h1:9yCKha/T5XdGtO0q9Q9a6T5NUCsTn/DrBg0D7ufOcFM=
Expand All @@ -720,6 +727,8 @@ github.com/phpdave11/gofpdi v1.0.12/go.mod h1:vBmVV0Do6hSBHC8uKUQ71JGW+ZGQq74llk
github.com/pierrec/lz4/v4 v4.1.8/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pinecone-io/go-pinecone v1.1.1 h1:pKoIiYcBIbrR7gaq0JXPiVnNEtevFYeq/AYL7T0NbbE=
github.com/pinecone-io/go-pinecone v1.1.1/go.mod h1:KfJhn4yThX293+fbtrZLnxe2PJYo8557Py062W4FYKk=
github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/errors v0.11.5-0.20240311024730-e056997136bb h1:3pSi4EDG6hg0orE1ndHkXvX6Qdq2cZn8gAPir8ymKZk=
github.com/pingcap/errors v0.11.5-0.20240311024730-e056997136bb/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
Expand Down Expand Up @@ -794,6 +803,7 @@ github.com/spf13/afero v1.11.0/go.mod h1:GH9Y3pIexgf1MTIWtNGyogA5MwRIDXGUr+hbWNo
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
Expand Down Expand Up @@ -1376,6 +1386,8 @@ google.golang.org/grpc v1.39.1/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnD
google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34=
google.golang.org/grpc v1.64.1 h1:LKtvyfbX3UGVPFcGqJ9ItpVWW6oN/2XqTxfAnwRRXiA=
google.golang.org/grpc v1.64.1/go.mod h1:hiQF4LFZelK2WKaP6W0L92zGHtiQdZxk8CrSdvyjeP0=
google.golang.org/grpc v1.65.0 h1:bs/cUb4lp1G5iImFFd3u5ixQzweKizoZJAwBNLR42lc=
google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw=
google.golang.org/grpc/examples v0.0.0-20210430044426-28078834f35b h1:D/GTYPo6I1oEo08Bfpuj3xl5XE+UGHj7//5fVyKxhsQ=
google.golang.org/grpc/examples v0.0.0-20210430044426-28078834f35b/go.mod h1:Ly7ZA/ARzg8fnPU9TyZIxoz33sEUuWX7txiqs8lPTgE=
Expand Down
12 changes: 4 additions & 8 deletions materialize-pinecone/.snapshots/TestSpecification
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,6 @@
"description": "Pinecone index for this materialization. Must already exist and have appropriate dimensions for the embedding model used.",
"order": 0
},
"environment": {
"type": "string",
"title": "Pinecone Environment",
"description": "Cloud region for your Pinecone project. Example: us-central1-gcp",
"order": 1
},
"pineconeApiKey": {
"type": "string",
"title": "Pinecone API Key",
Expand Down Expand Up @@ -53,7 +47,6 @@
"type": "object",
"required": [
"index",
"environment",
"pineconeApiKey",
"openAiApiKey"
],
Expand All @@ -66,11 +59,14 @@
"namespace": {
"type": "string",
"title": "Pinecone Namespace",
"description": "Name of the Pinecone namespace that this collection will materialize vectors into. For Pinecone starter plans, leave blank to use no namespace. Only a single binding can have a blank namespace, and Pinecone starter plans can only materialize a single binding.",
"description": "Name of the Pinecone namespace that this collection will materialize vectors into.",
"x-collection-name": true
}
},
"type": "object",
"required": [
"namespace"
],
"title": "Pinecone Collection"
},
"documentation_url": "https://go.estuary.dev/materialize-pinecone"
Expand Down
203 changes: 1 addition & 202 deletions materialize-pinecone/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type OpenAIEmbeddingsResponse struct {

type Embedding struct {
Object string `json:"object"`
Embedding []float64 `json:"embedding"`
Embedding []float32 `json:"embedding"`
Index int `json:"index"`
}

Expand Down Expand Up @@ -140,207 +140,6 @@ func (c *OpenAiClient) VerifyModelExists(ctx context.Context) error {
return nil
}

type PineconeUpsertRequest struct {
Vectors []Vector `json:"vectors"`
Namespace string `json:"namespace"`
}

type Vector struct {
Id string `json:"id"`
Values []float64 `json:"values"`
Metadata map[string]interface{} `json:"metadata"`
}

type PineconeUpsertResponse struct {
UpsertedCount int `json:"upsertedCount"`
}

type PineconeIndexStatsResponse struct {
Namespaces map[string]interface{} `json:"namespaces"`
Dimension int `json:"dimension"`
IndexFullness float64 `json:"index_fullness"`
}

type PineconeIndexDescribeResponse struct {
Database struct {
MetadataConfig struct {
Indexed []string `json:"indexed"`
} `json:"metadata_config"`
PodType string `json:"pod_type"`
} `json:"database"`
}

type whoamiResponse struct {
ProjectName string `json:"project_name"`
}

type pineconeUpsertError struct {
Code int `json:"code"`
Message string `json:"message"`
Details []string `json:"details"`
}

type PineconeClient struct {
http *http.Client
index string
projectName string
environment string
apiKey string
}

func NewPineconeClient(ctx context.Context, index string, environment string, apiKey string) (*PineconeClient, error) {
c := &PineconeClient{
http: http.DefaultClient,
index: index,
environment: environment,
apiKey: apiKey,
}

whoami, err := c.whoami(ctx)
if err != nil {
return nil, err
}

c.projectName = whoami.ProjectName

return c, nil
}

func (c *PineconeClient) baseUrl() string {
return fmt.Sprintf("https://%s-%s.svc.%s.pinecone.io", c.index, c.projectName, c.environment)
}

func (c *PineconeClient) DescribeIndexStats(ctx context.Context) (PineconeIndexStatsResponse, error) {
req, err := http.NewRequestWithContext(ctx, "POST", c.baseUrl()+"/describe_index_stats", nil)
if err != nil {
return PineconeIndexStatsResponse{}, err
}
req.Header.Set("accept", "application/json")
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Api-Key", c.apiKey)
req.Header.Set("User-Agent", "source_tag=estuary")

res, err := c.http.Do(req)
if err != nil {
return PineconeIndexStatsResponse{}, err
}
defer res.Body.Close()

if res.StatusCode != http.StatusOK {
return PineconeIndexStatsResponse{}, fmt.Errorf("PineconeClient DescribeIndexStats unexpected status: %s", res.Status)
}

indexResponse := PineconeIndexStatsResponse{}
if err := json.NewDecoder(res.Body).Decode(&indexResponse); err != nil {
return PineconeIndexStatsResponse{}, err
}

return indexResponse, nil
}

func (c *PineconeClient) DescribeIndex(ctx context.Context) (PineconeIndexDescribeResponse, error) {
req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("https://controller.%s.pinecone.io/databases/%s", c.environment, c.index), nil)
if err != nil {
return PineconeIndexDescribeResponse{}, err
}
req.Header.Set("accept", "application/json")
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Api-Key", c.apiKey)
req.Header.Set("User-Agent", "source_tag=estuary")

res, err := c.http.Do(req)
if err != nil {
return PineconeIndexDescribeResponse{}, err
}
defer res.Body.Close()

if res.StatusCode != http.StatusOK {
return PineconeIndexDescribeResponse{}, fmt.Errorf("PineconeClient DescribeIndex unexpected status: %s", res.Status)
}

indexResponse := PineconeIndexDescribeResponse{}
if err := json.NewDecoder(res.Body).Decode(&indexResponse); err != nil {
return PineconeIndexDescribeResponse{}, err
}

return indexResponse, nil
}

func (c *PineconeClient) Upsert(ctx context.Context, req PineconeUpsertRequest) error {
res, err := withRetry(ctx, func() (*http.Response, error) {
body := new(bytes.Buffer)
if err := json.NewEncoder(body).Encode(&req); err != nil {
return nil, err
}

req, err := http.NewRequestWithContext(ctx, "POST", c.baseUrl()+"/vectors/upsert", body)
if err != nil {
return nil, err
}
req.Header.Set("accept", "application/json")
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Api-Key", c.apiKey)
req.Header.Set("User-Agent", "source_tag=estuary")

return c.http.Do(req)
})
if err != nil {
return err
}
defer res.Body.Close()

if res.StatusCode != http.StatusOK {
var errorBody pineconeUpsertError
if err := json.NewDecoder(res.Body).Decode(&errorBody); err != nil {
log.WithField("error", err).Warn("could not decode error response body")
} else if errorBody.Message != "" {
return fmt.Errorf("pinecone vector upsert failed (%s): %s", res.Status, errorBody.Message)
} else {
log.WithField("errorBody", errorBody).Warn("errorBody error message was empty")
}

return fmt.Errorf("PineconeClient Upsert unexpected status: %s", res.Status)
}

upsertResponse := PineconeUpsertResponse{}
if err := json.NewDecoder(res.Body).Decode(&upsertResponse); err != nil {
return err
}

if len(req.Vectors) != upsertResponse.UpsertedCount {
return fmt.Errorf("upserted unexpected vector count: %d expected vs %d upserted", len(req.Vectors), upsertResponse.UpsertedCount)
}

return nil
}

func (c *PineconeClient) whoami(ctx context.Context) (whoamiResponse, error) {
req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("https://controller.%s.pinecone.io/actions/whoami", c.environment), nil)
if err != nil {
return whoamiResponse{}, err
}
req.Header.Set("Api-Key", c.apiKey)
req.Header.Set("accept", "application/json")
req.Header.Set("User-Agent", "source_tag=estuary")

res, err := c.http.Do(req)
if err != nil {
return whoamiResponse{}, err
}
defer res.Body.Close()

if res.StatusCode != http.StatusOK {
return whoamiResponse{}, fmt.Errorf("PineconeClient whoami unexpected status: %s", res.Status)
}

whoami := whoamiResponse{}
if err := json.NewDecoder(res.Body).Decode(&whoami); err != nil {
return whoamiResponse{}, err
}

return whoami, nil
}

var (
maxRetries = 10
initialBackoff float64 = 200 // Milliseconds
Expand Down
Loading

0 comments on commit 5219a94

Please sign in to comment.