Skip to content

Commit

Permalink
refactor: remove cache mechanism to reduce complexity (#20)
Browse files Browse the repository at this point in the history
  • Loading branch information
irainia authored Aug 28, 2023
1 parent c665c06 commit aeee206
Show file tree
Hide file tree
Showing 5 changed files with 6 additions and 117 deletions.
48 changes: 2 additions & 46 deletions task/bq2bq/factory.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package main

import (
"bytes"
"context"
"fmt"
"sync"

"cloud.google.com/go/bigquery"
"github.com/googleapis/google-cloud-go-testing/bigquery/bqiface"
Expand All @@ -16,69 +14,27 @@ import (
"github.com/goto/transformers/task/bq2bq/upstream"
)

const (
MaxBQClientReuse = 5
MaxExtractorReuse = 10
)

type DefaultBQClientFactory struct {
cachedClient bqiface.Client
cachedCred *google.Credentials
timesUsed int
mu sync.Mutex
}

func (fac *DefaultBQClientFactory) New(ctx context.Context, svcAccount string) (bqiface.Client, error) {
fac.mu.Lock()
defer fac.mu.Unlock()

cred, err := google.CredentialsFromJSON(ctx, []byte(svcAccount),
bigquery.Scope, storageV1.CloudPlatformScope, drive.DriveScope)
if err != nil {
return nil, fmt.Errorf("failed to read secret: %w", err)
}

// check if cached client can be reused
if fac.cachedCred != nil && fac.cachedClient != nil && fac.timesUsed == MaxBQClientReuse &&
bytes.Equal(cred.JSON, fac.cachedCred.JSON) {
fac.timesUsed++
return fac.cachedClient, nil
}

client, err := bigquery.NewClient(ctx, cred.ProjectID, option.WithCredentials(cred))
if err != nil {
return nil, fmt.Errorf("failed to create BQ client: %w", err)
}

fac.cachedCred = cred
fac.cachedClient = bqiface.AdaptClient(client)
fac.timesUsed = 1
return fac.cachedClient, nil
return bqiface.AdaptClient(client), nil
}

type DefaultUpstreamExtractorFactory struct {
mu sync.Mutex

cachedExtractor UpstreamExtractor
timeUsed int
}

func (d *DefaultUpstreamExtractorFactory) New(client bqiface.Client) (UpstreamExtractor, error) {
d.mu.Lock()
defer d.mu.Unlock()

if d.cachedExtractor != nil && d.timeUsed < MaxExtractorReuse {
d.timeUsed++
return d.cachedExtractor, nil
}

extractor, err := upstream.NewExtractor(client)
if err != nil {
return nil, fmt.Errorf("error initializing extractor: %w", err)
}

d.cachedExtractor = extractor
d.timeUsed = 1

return extractor, nil
return upstream.NewExtractor(client)
}
4 changes: 1 addition & 3 deletions task/bq2bq/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ require (
github.com/goto/optimus v0.8.1
github.com/goto/optimus/sdk v0.0.0-20230313071811-2d68a9c815bf
github.com/hashicorp/go-hclog v1.2.0
github.com/mitchellh/hashstructure/v2 v2.0.2
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/spf13/cast v1.4.1
github.com/stretchr/testify v1.8.1
go.opentelemetry.io/otel v1.7.0
go.opentelemetry.io/otel/exporters/jaeger v1.0.1
Expand Down Expand Up @@ -69,6 +66,7 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/spf13/afero v1.9.2 // indirect
github.com/spf13/cast v1.4.1 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.8.1 // indirect
Expand Down
4 changes: 0 additions & 4 deletions task/bq2bq/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -962,8 +962,6 @@ github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eI
github.com/mitchellh/go-testing-interface v1.14.1 h1:jrgshOhYAUVNMAJiKbEu7EqAwgJJ2JqpQmpLJOu07cU=
github.com/mitchellh/go-testing-interface v1.14.1/go.mod h1:gfgS7OtZj6MA4U1UrDRp04twqAjfvlZyCfX3sDjEym8=
github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS42BGNg=
github.com/mitchellh/hashstructure/v2 v2.0.2 h1:vGKWl0YJqUNxE8d+h8f6NJLcCJrgbhC4NcD46KavDd4=
github.com/mitchellh/hashstructure/v2 v2.0.2/go.mod h1:MG3aRVU/N29oo/V/IhBX8GR/zz4kQkprJgF2EVszyDE=
github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY=
github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v0.0.0-20180220230111-00c29f56e238/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
Expand Down Expand Up @@ -1065,8 +1063,6 @@ github.com/opencontainers/selinux v1.10.0/go.mod h1:2i0OySw99QjzBBQByd1Gr9gSjvuh
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/ory/dockertest/v3 v3.9.1/go.mod h1:42Ir9hmvaAPm0Mgibk6mBPi7SFvTXxEcnztDYOJ//uM=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE=
github.com/pelletier/go-toml v1.8.1/go.mod h1:T2/BmBdy8dvIRq1a/8aqjN41wvWlN4lrapLU/GW4pbc=
Expand Down
65 changes: 3 additions & 62 deletions task/bq2bq/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,12 @@ import (
"flag"
"fmt"
"strings"
"sync"
"time"

"github.com/googleapis/google-cloud-go-testing/bigquery/bqiface"
oplugin "github.com/goto/optimus/plugin"
"github.com/goto/optimus/sdk/plugin"
"github.com/hashicorp/go-hclog"
"github.com/mitchellh/hashstructure/v2"
"github.com/patrickmn/go-cache"
"github.com/spf13/cast"

"github.com/goto/transformers/task/bq2bq/upstream"
)
Expand All @@ -31,22 +27,12 @@ const (
)

var (
Name = "bq2bq"

// Version should be injected while building
Name = "bq2bq"
Version = "dev"

QueryFileName = "query.sql"

QueryFileName = "query.sql"
BqServiceAccount = "BQ_SERVICE_ACCOUNT"

TimeoutDuration = time.Second * 180
MaxBQApiRetries = 3
FakeSelectStmt = "SELECT * from `%s` WHERE FALSE LIMIT 1"

CacheTTL = time.Hour * 24
CacheCleanUp = time.Hour * 6
ErrCacheNotFound = errors.New("item not found")
MaxBQApiRetries = 3

LoadMethod = "LOAD_METHOD"
LoadMethodReplace = "REPLACE"
Expand All @@ -71,8 +57,6 @@ type ExtractorFactory interface {
type BQ2BQ struct {
ClientFac ClientFactory
ExtractorFac ExtractorFactory
mu sync.Mutex
C *cache.Cache
Compiler *Compiler

logger hclog.Logger
Expand Down Expand Up @@ -208,15 +192,6 @@ func (b *BQ2BQ) GenerateDependencies(ctx context.Context, request plugin.Generat
response = &plugin.GenerateDependenciesResponse{}
response.Dependencies = []string{}

// check if exists in cache
if cachedResponse, err := b.IsCached(request); err == nil {
// cache ready
span.AddEvent("Request found in cache")
return cachedResponse, nil
} else if err != ErrCacheNotFound {
return nil, err
}

var svcAcc string
accConfig, ok := request.Config.Get(BqServiceAccount)
if !ok || len(accConfig.Value) == 0 {
Expand Down Expand Up @@ -259,7 +234,6 @@ func (b *BQ2BQ) GenerateDependencies(ctx context.Context, request plugin.Generat

response.Dependencies = formattedUpstreams

b.Cache(request, response)
return response, nil
}

Expand Down Expand Up @@ -325,38 +299,6 @@ func (b *BQ2BQ) formatUpstreams(upstreams []upstream.Resource, fn func(r upstrea
return output
}

func (b *BQ2BQ) IsCached(request plugin.GenerateDependenciesRequest) (*plugin.GenerateDependenciesResponse, error) {
if b.C == nil {
return nil, ErrCacheNotFound
}
b.mu.Lock()
defer b.mu.Unlock()
requestHash, err := hashstructure.Hash(request, hashstructure.FormatV2, nil)
if err != nil {
return nil, err
}
hashString := cast.ToString(requestHash)
if item, ok := b.C.Get(hashString); ok {
return item.(*plugin.GenerateDependenciesResponse), nil
}
return nil, ErrCacheNotFound
}

func (b *BQ2BQ) Cache(request plugin.GenerateDependenciesRequest, response *plugin.GenerateDependenciesResponse) error {
if b.C == nil {
return nil
}
b.mu.Lock()
defer b.mu.Unlock()
requestHash, err := hashstructure.Hash(request, hashstructure.FormatV2, nil)
if err != nil {
return err
}
hashString := cast.ToString(requestHash)
b.C.Set(hashString, response, cache.DefaultExpiration)
return nil
}

func main() {
var tracingAddr string
flag.StringVar(&tracingAddr, "t", "", "endpoint for traces collector")
Expand All @@ -374,7 +316,6 @@ func main() {
return &BQ2BQ{
ClientFac: &DefaultBQClientFactory{},
ExtractorFac: &DefaultUpstreamExtractorFactory{},
C: cache.New(CacheTTL, CacheCleanUp),
Compiler: NewCompiler(),
logger: log,
}
Expand Down
2 changes: 0 additions & 2 deletions task/bq2bq/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/googleapis/google-cloud-go-testing/bigquery/bqiface"
"github.com/goto/optimus/sdk/plugin"
"github.com/patrickmn/go-cache"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

Expand Down Expand Up @@ -583,7 +582,6 @@ Select * from table where ts > "2021-01-16T00:00:00Z"`
b := &BQ2BQ{
ClientFac: bqClientFac,
ExtractorFac: extractorFac,
C: cache.New(CacheTTL, CacheCleanUp),
}
got, err := b.GenerateDependencies(ctx, data)
if err != nil {
Expand Down

0 comments on commit aeee206

Please sign in to comment.