Skip to content

Commit

Permalink
feat: flagd offline in-process support with flags sources from file (#…
Browse files Browse the repository at this point in the history
…421)

Signed-off-by: Kavindu Dodanduwa <[email protected]>
Signed-off-by: Kavindu Dodanduwa <[email protected]>
Co-authored-by: Michael Beemer <[email protected]>
  • Loading branch information
Kavindu-Dodan and beeme1mr authored Jan 23, 2024
1 parent e9dde7a commit 8685cc0
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 46 deletions.
35 changes: 28 additions & 7 deletions providers/flagd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,40 @@ openfeature.SetProvider(provider)

In the above example, in-process handlers attempt to connect to a sync service on address `localhost:8013` to obtain [flag definitions](https://github.com/open-feature/schemas/blob/main/json/flagd-definitions.json).

#### Offline mode

In-process resolvers can also work in an offline mode.
To enable this mode, you should provide a [valid flag configuration](https://flagd.dev/reference/flag-definitions/) file with the option `WithOfflineFilePath`.

```go
provider := flagd.NewProvider(
flagd.WithInProcessResolver(),
flagd.WithOfflineFilePath(OFFLINE_FLAG_PATH))
openfeature.SetProvider(provider)
```

The provider will attempt to detect file changes, but this is a best-effort attempt as file system events differ between operating systems.
This mode is useful for local development, tests and offline applications.

> [!IMPORTANT]
> Note that you can only use a single flag source (either gRPC or offline file) for the in-process resolver.
> If both sources are configured, offline mode will be selected.
## Configuration options

Configuration can be provided as constructor options or as environment variables, where constructor options having the highest precedence.

| Option name | Environment variable name | Type & supported value | Default | Compatible resolver |
|----------------------------------------------------------|--------------------------------|-----------------------------|-----------|---------------------|
| WithHost | FLAGD_HOST | string | localhost | RPC & in-process |
| WithPort | FLAGD_PORT | number | 8013 | RPC & in-process |
| WithTLS | FLAGD_TLS | boolean | false | RPC & in-process |
| WithSocketPath | FLAGD_SOCKET_PATH | string | "" | RPC & in-process |
| WithCertificatePath | FLAGD_SERVER_CERT_PATH | string | "" | RPC & in-process |
| WithLRUCache<br/>WithBasicInMemoryCache<br/>WithoutCache | FLAGD_CACHE | string (lru, mem, disabled) | lru | RPC |
| WithEventStreamConnectionMaxAttempts | FLAGD_MAX_EVENT_STREAM_RETRIES | int | 5 | RPC |
| WithHost | FLAGD_HOST | string | localhost | rpc & in-process |
| WithPort | FLAGD_PORT | number | 8013 | rpc & in-process |
| WithTLS | FLAGD_TLS | boolean | false | rpc & in-process |
| WithSocketPath | FLAGD_SOCKET_PATH | string | "" | rpc & in-process |
| WithCertificatePath | FLAGD_SERVER_CERT_PATH | string | "" | rpc & in-process |
| WithLRUCache<br/>WithBasicInMemoryCache<br/>WithoutCache | FLAGD_CACHE | string (lru, mem, disabled) | lru | rpc |
| WithEventStreamConnectionMaxAttempts | FLAGD_MAX_EVENT_STREAM_RETRIES | int | 5 | rpc |
| WithEventStreamConnectionMaxAttempts | FLAGD_MAX_EVENT_STREAM_RETRIES | int | 5 | rpc |
| WithOfflineFilePath | FLAGD_OFFLINE_FLAG_SOURCE_PATH | string | "" | in-process |

### Overriding behavior

Expand Down
9 changes: 7 additions & 2 deletions providers/flagd/pkg/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ const (
defaultCache = cache.LRUValue
defaultHost = "localhost"
defaultResolver = rpc
defaultSourceSelector = ""

rpc ResolverType = "rpc"
inProcess ResolverType = "in-process"
Expand All @@ -34,6 +33,7 @@ const (
flagdMaxEventStreamRetriesEnvironmentVariableName = "FLAGD_MAX_EVENT_STREAM_RETRIES"
flagdResolverEnvironmentVariableName = "FLAGD_RESOLVER"
flagdSourceSelectorEnvironmentVariableName = "FLAGD_SOURCE_SELECTOR"
flagdOfflinePathEnvironmentVariableName = "FLAGD_OFFLINE_FLAG_SOURCE_PATH"
)

type providerConfiguration struct {
Expand All @@ -42,6 +42,7 @@ type providerConfiguration struct {
EventStreamConnectionMaxAttempts int
Host string
MaxCacheSize int
OfflineFlagSourcePath string
OtelIntercept bool
Port uint16
Resolver ResolverType
Expand All @@ -52,7 +53,7 @@ type providerConfiguration struct {
log logr.Logger
}

func NewDefaultConfiguration(log logr.Logger) *providerConfiguration {
func newDefaultConfiguration(log logr.Logger) *providerConfiguration {
p := &providerConfiguration{
CacheType: defaultCache,
EventStreamConnectionMaxAttempts: defaultMaxEventStreamRetries,
Expand Down Expand Up @@ -150,6 +151,10 @@ func (cfg *providerConfiguration) updateFromEnvVar() {
}
}

if offlinePath := os.Getenv(flagdOfflinePathEnvironmentVariableName); offlinePath != "" {
cfg.OfflineFlagSourcePath = offlinePath
}

if selector := os.Getenv(flagdSourceSelectorEnvironmentVariableName); selector != "" {
cfg.Selector = selector
}
Expand Down
19 changes: 14 additions & 5 deletions providers/flagd/pkg/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func NewProvider(opts ...ProviderOption) *Provider {
log := logr.New(logger.Logger{})

// initialize with default configurations
providerConfiguration := NewDefaultConfiguration(log)
providerConfiguration := newDefaultConfiguration(log)

provider := &Provider{
initialized: false,
Expand Down Expand Up @@ -63,10 +63,11 @@ func NewProvider(opts ...ProviderOption) *Provider {
provider.providerConfiguration.EventStreamConnectionMaxAttempts)
} else {
service = process.NewInProcessService(process.Configuration{
Host: provider.providerConfiguration.Host,
Port: provider.providerConfiguration.Port,
Selector: provider.providerConfiguration.Selector,
TLSEnabled: provider.providerConfiguration.TLSEnabled,
Host: provider.providerConfiguration.Host,
Port: provider.providerConfiguration.Port,
Selector: provider.providerConfiguration.Selector,
TLSEnabled: provider.providerConfiguration.TLSEnabled,
OfflineFlagSource: provider.providerConfiguration.OfflineFlagSourcePath,
})
}

Expand Down Expand Up @@ -286,6 +287,14 @@ func WithInProcessResolver() ProviderOption {
}
}

// WithOfflineFilePath file path to obtain flags to run provider in offline mode with in-process evaluations.
// This is only useful with inProcess resolver type
func WithOfflineFilePath(path string) ProviderOption {
return func(p *Provider) {
p.providerConfiguration.OfflineFlagSourcePath = path
}
}

// WithSelector sets the selector to be used for InProcess flag sync calls
func WithSelector(selector string) ProviderOption {
return func(p *Provider) {
Expand Down
53 changes: 33 additions & 20 deletions providers/flagd/pkg/service/in_process/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/open-feature/flagd/core/pkg/model"
"github.com/open-feature/flagd/core/pkg/store"
"github.com/open-feature/flagd/core/pkg/sync"
"github.com/open-feature/flagd/core/pkg/sync/file"
"github.com/open-feature/flagd/core/pkg/sync/grpc"
"github.com/open-feature/flagd/core/pkg/sync/grpc/credentials"
of "github.com/open-feature/go-sdk/openfeature"
Expand All @@ -29,30 +30,17 @@ type InProcess struct {
}

type Configuration struct {
Host any
Port any
Selector string
TLSEnabled bool
Host any
Port any
Selector string
TLSEnabled bool
OfflineFlagSource string
}

func NewInProcessService(cfg Configuration) *InProcess {
log := logger.NewLogger(zap.NewRaw(), false)

// currently supports grpc syncs for in-process flag fetch
var uri string
if cfg.TLSEnabled {
uri = fmt.Sprintf("%s:%d", cfg.Host, cfg.Port)
} else {
uri = fmt.Sprintf("%s:%d", cfg.Host, cfg.Port)
}

grpcSync := &grpc.Sync{
CredentialBuilder: &credentials.CredentialBuilder{},
Logger: log,
Secure: cfg.TLSEnabled,
Selector: cfg.Selector,
URI: uri,
}
iSync, uri := makeSyncProvider(cfg, log)

// service specific metadata
var svcMetadata map[string]interface{}
Expand Down Expand Up @@ -89,7 +77,7 @@ func NewInProcessService(cfg Configuration) *InProcess {
logger: log,
listenerShutdown: make(chan interface{}),
serviceMetadata: svcMetadata,
sync: grpcSync,
sync: iSync,
}
}

Expand Down Expand Up @@ -297,6 +285,31 @@ func (i *InProcess) appendMetadata(evalMetadata map[string]interface{}) {
}
}

// makeSyncProvider is a helper to create sync.ISync and return the underlying uri used by it to the caller
func makeSyncProvider(cfg Configuration, log *logger.Logger) (sync.ISync, string) {
if cfg.OfflineFlagSource != "" {
// file sync provider
log.Info("operating in in-process mode with offline flags sourced from " + cfg.OfflineFlagSource)
return &file.Sync{
URI: cfg.OfflineFlagSource,
Logger: log,
Mux: &parallel.RWMutex{},
}, cfg.OfflineFlagSource
}

// grpc sync provider
uri := fmt.Sprintf("%s:%d", cfg.Host, cfg.Port)
log.Info("operating in in-process mode with flags sourced from " + uri)

return &grpc.Sync{
CredentialBuilder: &credentials.CredentialBuilder{},
Logger: log,
Secure: cfg.TLSEnabled,
Selector: cfg.Selector,
URI: uri,
}, uri
}

// mapError is a helper to map evaluation errors to OF errors
func mapError(err error) of.ResolutionError {
switch err.Error() {
Expand Down
53 changes: 53 additions & 0 deletions providers/flagd/pkg/service/in_process/service_file_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package process

import (
"context"
of "github.com/open-feature/go-sdk/openfeature"
"os"
"path/filepath"
"testing"
"time"
)

func TestInProcessOfflineMode(t *testing.T) {
// given
flagFile := "config.json"
offlinePath := filepath.Join(t.TempDir(), flagFile)

err := os.WriteFile(offlinePath, []byte(flagRsp), 0644)
if err != nil {
t.Fatal(err)
}

// when
service := NewInProcessService(Configuration{OfflineFlagSource: offlinePath})

err = service.Init()
if err != nil {
t.Fatal(err)
}

// then
channel := service.EventChannel()

select {
case event := <-channel:
if event.EventType != of.ProviderReady {
t.Fatalf("Provider initialization failed. Got event type %s with message %s", event.EventType, event.Message)
}
case <-time.After(2 * time.Second):
t.Fatal("Provider initialization did not complete within acceptable timeframe ")
}

// provider must evaluate flag from the grpc source data
detail := service.ResolveBoolean(context.Background(), "myBoolFlag", false, make(map[string]interface{}))

if !detail.Value {
t.Fatal("Expected true, but got false")
}

// check for metadata - scope from grpc sync
if len(detail.FlagMetadata) == 0 && detail.FlagMetadata["scope"] == "" {
t.Fatal("Expected scope to be present, but got none")
}
}
25 changes: 13 additions & 12 deletions providers/flagd/pkg/service/in_process/service_grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,8 @@ import (
"time"
)

func TestInProcessProviderEvaluation(t *testing.T) {
// given
host := "localhost"
port := 8090
scope := "app=myapp"

listen, err := net.Listen("tcp", fmt.Sprintf("%s:%d", host, port))
if err != nil {
t.Fatal(err)
}

flagRsp := `{
// shared flag for tests
var flagRsp = `{
"flags": {
"myBoolFlag": {
"state": "ENABLED",
Expand All @@ -37,6 +27,17 @@ func TestInProcessProviderEvaluation(t *testing.T) {
}
}`

func TestInProcessProviderEvaluation(t *testing.T) {
// given
host := "localhost"
port := 8090
scope := "app=myapp"

listen, err := net.Listen("tcp", fmt.Sprintf("%s:%d", host, port))
if err != nil {
t.Fatal(err)
}

bufServ := &bufferedServer{
listener: listen,
mockResponses: []*v1.SyncFlagsResponse{
Expand Down
1 change: 1 addition & 0 deletions providers/flagd/pkg/service/rpc/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type Service struct {
}

func NewService(cfg Configuration, cache *cache.Service, logger logr.Logger, retries int) *Service {
logger.Info("operating in rpc mode with flags sourced from " + fmt.Sprintf("%s:%d", cfg.Host, cfg.Port))
return &Service{
cache: cache,
cfg: cfg,
Expand Down

0 comments on commit 8685cc0

Please sign in to comment.