diff --git a/providers/flagd/README.md b/providers/flagd/README.md index 5df579c99..d8d89b748 100644 --- a/providers/flagd/README.md +++ b/providers/flagd/README.md @@ -42,19 +42,40 @@ openfeature.SetProvider(provider) In the above example, in-process handlers attempt to connect to a sync service on address `localhost:8013` to obtain [flag definitions](https://github.com/open-feature/schemas/blob/main/json/flagd-definitions.json). +#### Offline mode + +In-process resolvers can also work in an offline mode. +To enable this mode, you should provide a [valid flag configuration](https://flagd.dev/reference/flag-definitions/) file with the option `WithOfflineFilePath`. + +```go +provider := flagd.NewProvider( + flagd.WithInProcessResolver(), + flagd.WithOfflineFilePath(OFFLINE_FLAG_PATH)) +openfeature.SetProvider(provider) +``` + +The provider will attempt to detect file changes, but this is a best-effort attempt as file system events differ between operating systems. +This mode is useful for local development, tests and offline applications. + +> [!IMPORTANT] +> Note that you can only use a single flag source (either gRPC or offline file) for the in-process resolver. +> If both sources are configured, offline mode will be selected. + ## Configuration options Configuration can be provided as constructor options or as environment variables, where constructor options having the highest precedence. | Option name | Environment variable name | Type & supported value | Default | Compatible resolver | |----------------------------------------------------------|--------------------------------|-----------------------------|-----------|---------------------| -| WithHost | FLAGD_HOST | string | localhost | RPC & in-process | -| WithPort | FLAGD_PORT | number | 8013 | RPC & in-process | -| WithTLS | FLAGD_TLS | boolean | false | RPC & in-process | -| WithSocketPath | FLAGD_SOCKET_PATH | string | "" | RPC & in-process | -| WithCertificatePath | FLAGD_SERVER_CERT_PATH | string | "" | RPC & in-process | -| WithLRUCache
WithBasicInMemoryCache
WithoutCache | FLAGD_CACHE | string (lru, mem, disabled) | lru | RPC | -| WithEventStreamConnectionMaxAttempts | FLAGD_MAX_EVENT_STREAM_RETRIES | int | 5 | RPC | +| WithHost | FLAGD_HOST | string | localhost | rpc & in-process | +| WithPort | FLAGD_PORT | number | 8013 | rpc & in-process | +| WithTLS | FLAGD_TLS | boolean | false | rpc & in-process | +| WithSocketPath | FLAGD_SOCKET_PATH | string | "" | rpc & in-process | +| WithCertificatePath | FLAGD_SERVER_CERT_PATH | string | "" | rpc & in-process | +| WithLRUCache
WithBasicInMemoryCache
WithoutCache | FLAGD_CACHE | string (lru, mem, disabled) | lru | rpc | +| WithEventStreamConnectionMaxAttempts | FLAGD_MAX_EVENT_STREAM_RETRIES | int | 5 | rpc | +| WithEventStreamConnectionMaxAttempts | FLAGD_MAX_EVENT_STREAM_RETRIES | int | 5 | rpc | +| WithOfflineFilePath | FLAGD_OFFLINE_FLAG_SOURCE_PATH | string | "" | in-process | ### Overriding behavior diff --git a/providers/flagd/pkg/configuration.go b/providers/flagd/pkg/configuration.go index be4218cc3..8f2d612e3 100644 --- a/providers/flagd/pkg/configuration.go +++ b/providers/flagd/pkg/configuration.go @@ -19,7 +19,6 @@ const ( defaultCache = cache.LRUValue defaultHost = "localhost" defaultResolver = rpc - defaultSourceSelector = "" rpc ResolverType = "rpc" inProcess ResolverType = "in-process" @@ -34,6 +33,7 @@ const ( flagdMaxEventStreamRetriesEnvironmentVariableName = "FLAGD_MAX_EVENT_STREAM_RETRIES" flagdResolverEnvironmentVariableName = "FLAGD_RESOLVER" flagdSourceSelectorEnvironmentVariableName = "FLAGD_SOURCE_SELECTOR" + flagdOfflinePathEnvironmentVariableName = "FLAGD_OFFLINE_FLAG_SOURCE_PATH" ) type providerConfiguration struct { @@ -42,6 +42,7 @@ type providerConfiguration struct { EventStreamConnectionMaxAttempts int Host string MaxCacheSize int + OfflineFlagSourcePath string OtelIntercept bool Port uint16 Resolver ResolverType @@ -52,7 +53,7 @@ type providerConfiguration struct { log logr.Logger } -func NewDefaultConfiguration(log logr.Logger) *providerConfiguration { +func newDefaultConfiguration(log logr.Logger) *providerConfiguration { p := &providerConfiguration{ CacheType: defaultCache, EventStreamConnectionMaxAttempts: defaultMaxEventStreamRetries, @@ -150,6 +151,10 @@ func (cfg *providerConfiguration) updateFromEnvVar() { } } + if offlinePath := os.Getenv(flagdOfflinePathEnvironmentVariableName); offlinePath != "" { + cfg.OfflineFlagSourcePath = offlinePath + } + if selector := os.Getenv(flagdSourceSelectorEnvironmentVariableName); selector != "" { cfg.Selector = selector } diff --git a/providers/flagd/pkg/provider.go b/providers/flagd/pkg/provider.go index 8da4558a0..de509fbdf 100644 --- a/providers/flagd/pkg/provider.go +++ b/providers/flagd/pkg/provider.go @@ -27,7 +27,7 @@ func NewProvider(opts ...ProviderOption) *Provider { log := logr.New(logger.Logger{}) // initialize with default configurations - providerConfiguration := NewDefaultConfiguration(log) + providerConfiguration := newDefaultConfiguration(log) provider := &Provider{ initialized: false, @@ -63,10 +63,11 @@ func NewProvider(opts ...ProviderOption) *Provider { provider.providerConfiguration.EventStreamConnectionMaxAttempts) } else { service = process.NewInProcessService(process.Configuration{ - Host: provider.providerConfiguration.Host, - Port: provider.providerConfiguration.Port, - Selector: provider.providerConfiguration.Selector, - TLSEnabled: provider.providerConfiguration.TLSEnabled, + Host: provider.providerConfiguration.Host, + Port: provider.providerConfiguration.Port, + Selector: provider.providerConfiguration.Selector, + TLSEnabled: provider.providerConfiguration.TLSEnabled, + OfflineFlagSource: provider.providerConfiguration.OfflineFlagSourcePath, }) } @@ -286,6 +287,14 @@ func WithInProcessResolver() ProviderOption { } } +// WithOfflineFilePath file path to obtain flags to run provider in offline mode with in-process evaluations. +// This is only useful with inProcess resolver type +func WithOfflineFilePath(path string) ProviderOption { + return func(p *Provider) { + p.providerConfiguration.OfflineFlagSourcePath = path + } +} + // WithSelector sets the selector to be used for InProcess flag sync calls func WithSelector(selector string) ProviderOption { return func(p *Provider) { diff --git a/providers/flagd/pkg/service/in_process/service.go b/providers/flagd/pkg/service/in_process/service.go index 5b1985bff..dd103c933 100644 --- a/providers/flagd/pkg/service/in_process/service.go +++ b/providers/flagd/pkg/service/in_process/service.go @@ -8,6 +8,7 @@ import ( "github.com/open-feature/flagd/core/pkg/model" "github.com/open-feature/flagd/core/pkg/store" "github.com/open-feature/flagd/core/pkg/sync" + "github.com/open-feature/flagd/core/pkg/sync/file" "github.com/open-feature/flagd/core/pkg/sync/grpc" "github.com/open-feature/flagd/core/pkg/sync/grpc/credentials" of "github.com/open-feature/go-sdk/openfeature" @@ -29,30 +30,17 @@ type InProcess struct { } type Configuration struct { - Host any - Port any - Selector string - TLSEnabled bool + Host any + Port any + Selector string + TLSEnabled bool + OfflineFlagSource string } func NewInProcessService(cfg Configuration) *InProcess { log := logger.NewLogger(zap.NewRaw(), false) - // currently supports grpc syncs for in-process flag fetch - var uri string - if cfg.TLSEnabled { - uri = fmt.Sprintf("%s:%d", cfg.Host, cfg.Port) - } else { - uri = fmt.Sprintf("%s:%d", cfg.Host, cfg.Port) - } - - grpcSync := &grpc.Sync{ - CredentialBuilder: &credentials.CredentialBuilder{}, - Logger: log, - Secure: cfg.TLSEnabled, - Selector: cfg.Selector, - URI: uri, - } + iSync, uri := makeSyncProvider(cfg, log) // service specific metadata var svcMetadata map[string]interface{} @@ -89,7 +77,7 @@ func NewInProcessService(cfg Configuration) *InProcess { logger: log, listenerShutdown: make(chan interface{}), serviceMetadata: svcMetadata, - sync: grpcSync, + sync: iSync, } } @@ -297,6 +285,31 @@ func (i *InProcess) appendMetadata(evalMetadata map[string]interface{}) { } } +// makeSyncProvider is a helper to create sync.ISync and return the underlying uri used by it to the caller +func makeSyncProvider(cfg Configuration, log *logger.Logger) (sync.ISync, string) { + if cfg.OfflineFlagSource != "" { + // file sync provider + log.Info("operating in in-process mode with offline flags sourced from " + cfg.OfflineFlagSource) + return &file.Sync{ + URI: cfg.OfflineFlagSource, + Logger: log, + Mux: ¶llel.RWMutex{}, + }, cfg.OfflineFlagSource + } + + // grpc sync provider + uri := fmt.Sprintf("%s:%d", cfg.Host, cfg.Port) + log.Info("operating in in-process mode with flags sourced from " + uri) + + return &grpc.Sync{ + CredentialBuilder: &credentials.CredentialBuilder{}, + Logger: log, + Secure: cfg.TLSEnabled, + Selector: cfg.Selector, + URI: uri, + }, uri +} + // mapError is a helper to map evaluation errors to OF errors func mapError(err error) of.ResolutionError { switch err.Error() { diff --git a/providers/flagd/pkg/service/in_process/service_file_test.go b/providers/flagd/pkg/service/in_process/service_file_test.go new file mode 100644 index 000000000..3a929c279 --- /dev/null +++ b/providers/flagd/pkg/service/in_process/service_file_test.go @@ -0,0 +1,53 @@ +package process + +import ( + "context" + of "github.com/open-feature/go-sdk/openfeature" + "os" + "path/filepath" + "testing" + "time" +) + +func TestInProcessOfflineMode(t *testing.T) { + // given + flagFile := "config.json" + offlinePath := filepath.Join(t.TempDir(), flagFile) + + err := os.WriteFile(offlinePath, []byte(flagRsp), 0644) + if err != nil { + t.Fatal(err) + } + + // when + service := NewInProcessService(Configuration{OfflineFlagSource: offlinePath}) + + err = service.Init() + if err != nil { + t.Fatal(err) + } + + // then + channel := service.EventChannel() + + select { + case event := <-channel: + if event.EventType != of.ProviderReady { + t.Fatalf("Provider initialization failed. Got event type %s with message %s", event.EventType, event.Message) + } + case <-time.After(2 * time.Second): + t.Fatal("Provider initialization did not complete within acceptable timeframe ") + } + + // provider must evaluate flag from the grpc source data + detail := service.ResolveBoolean(context.Background(), "myBoolFlag", false, make(map[string]interface{})) + + if !detail.Value { + t.Fatal("Expected true, but got false") + } + + // check for metadata - scope from grpc sync + if len(detail.FlagMetadata) == 0 && detail.FlagMetadata["scope"] == "" { + t.Fatal("Expected scope to be present, but got none") + } +} diff --git a/providers/flagd/pkg/service/in_process/service_grpc_test.go b/providers/flagd/pkg/service/in_process/service_grpc_test.go index dbe64a62c..9dae8286f 100644 --- a/providers/flagd/pkg/service/in_process/service_grpc_test.go +++ b/providers/flagd/pkg/service/in_process/service_grpc_test.go @@ -13,18 +13,8 @@ import ( "time" ) -func TestInProcessProviderEvaluation(t *testing.T) { - // given - host := "localhost" - port := 8090 - scope := "app=myapp" - - listen, err := net.Listen("tcp", fmt.Sprintf("%s:%d", host, port)) - if err != nil { - t.Fatal(err) - } - - flagRsp := `{ +// shared flag for tests +var flagRsp = `{ "flags": { "myBoolFlag": { "state": "ENABLED", @@ -37,6 +27,17 @@ func TestInProcessProviderEvaluation(t *testing.T) { } }` +func TestInProcessProviderEvaluation(t *testing.T) { + // given + host := "localhost" + port := 8090 + scope := "app=myapp" + + listen, err := net.Listen("tcp", fmt.Sprintf("%s:%d", host, port)) + if err != nil { + t.Fatal(err) + } + bufServ := &bufferedServer{ listener: listen, mockResponses: []*v1.SyncFlagsResponse{ diff --git a/providers/flagd/pkg/service/rpc/service.go b/providers/flagd/pkg/service/rpc/service.go index e2dba14f2..fb23eb0fe 100644 --- a/providers/flagd/pkg/service/rpc/service.go +++ b/providers/flagd/pkg/service/rpc/service.go @@ -54,6 +54,7 @@ type Service struct { } func NewService(cfg Configuration, cache *cache.Service, logger logr.Logger, retries int) *Service { + logger.Info("operating in rpc mode with flags sourced from " + fmt.Sprintf("%s:%d", cfg.Host, cfg.Port)) return &Service{ cache: cache, cfg: cfg,