Skip to content

Commit

Permalink
Do not push to the loaded channel when flags' request fails (#38)
Browse files Browse the repository at this point in the history
  • Loading branch information
zaynetro authored Mar 27, 2024
1 parent 036dfa9 commit 87b23fe
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 51 deletions.
55 changes: 49 additions & 6 deletions feature_flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/http/httptest"
"reflect"
"strings"
"sync/atomic"
"time"

"testing"
Expand Down Expand Up @@ -616,13 +617,9 @@ func TestFeatureFlagNullComeIntoPlayOnlyWhenDecideErrorsOut(t *testing.T) {

defer server.Close()

// TODO: Make this nicer, right now if all local evaluation requests fail, we block
// on waiting for atleast one request to happen before returning flags,
// which can be suboptimal
client, _ := NewWithConfig("Csyjlnlun3OzyNJAafdlv", Config{
PersonalApiKey: "some very secret key",
Endpoint: server.URL,
DefaultFeatureFlagsPollingInterval: 5 * time.Second,
PersonalApiKey: "some very secret key",
Endpoint: server.URL,
})
defer client.Close()

Expand Down Expand Up @@ -3384,3 +3381,49 @@ func TestFlagDefinitionsWithTimeoutExceeded(t *testing.T) {
t.Error("Expected timeout error fetching flags")
}
}

func TestFetchFlagsFails(t *testing.T) {
// This test verifies that even in presence of HTTP errors flags continue to be fetched.
var called uint32
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if atomic.LoadUint32(&called) == 0 {
// Load initial flags successfully
w.Write([]byte(fixture("feature_flag/test-simple-flag.json")))
} else {
// Fail all next requests
w.WriteHeader(http.StatusInternalServerError)
}
atomic.AddUint32(&called, 1)

}))
defer server.Close()

client, _ := NewWithConfig("Csyjlnlun3OzyNJAafdlv", Config{
PersonalApiKey: "some very secret key",
Endpoint: server.URL,
})
defer client.Close()

_, err := client.GetFeatureFlags()
if err != nil {
t.Error("Should not fail", err)
}
client.ReloadFeatureFlags()
client.ReloadFeatureFlags()

_, err = client.GetAllFlags(FeatureFlagPayloadNoKey{
DistinctId: "my-id",
})
if err != nil {
t.Error("Should not fail", err)
}

// Wait for the last request to complete
<-time.After(50 * time.Millisecond)

const expectedCalls = 3
actualCalls := atomic.LoadUint32(&called)
if actualCalls != expectedCalls {
t.Error("Expected to be called", expectedCalls, "times but got", actualCalls)
}
}
85 changes: 41 additions & 44 deletions featureflags.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,20 @@ import (
const LONG_SCALE = 0xfffffffffffffff

type FeatureFlagsPoller struct {
loaded chan bool
shutdown chan bool
forceReload chan bool
featureFlags []FeatureFlag
cohorts map[string]PropertyGroup
groups map[string]string
personalApiKey string
projectApiKey string
Errorf func(format string, args ...interface{})
Endpoint string
http http.Client
mutex sync.RWMutex
fetchedFlagsSuccessfullyOnce bool
nextPollTick func() time.Duration
flagTimeout time.Duration
loaded chan bool
shutdown chan bool
forceReload chan bool
featureFlags []FeatureFlag
cohorts map[string]PropertyGroup
groups map[string]string
personalApiKey string
projectApiKey string
Errorf func(format string, args ...interface{})
Endpoint string
http http.Client
mutex sync.RWMutex
nextPollTick func() time.Duration
flagTimeout time.Duration
}

type FeatureFlag struct {
Expand Down Expand Up @@ -131,18 +130,17 @@ func newFeatureFlagsPoller(
}

poller := FeatureFlagsPoller{
loaded: make(chan bool),
shutdown: make(chan bool),
forceReload: make(chan bool),
personalApiKey: personalApiKey,
projectApiKey: projectApiKey,
Errorf: errorf,
Endpoint: endpoint,
http: httpClient,
mutex: sync.RWMutex{},
fetchedFlagsSuccessfullyOnce: false,
nextPollTick: nextPollTick,
flagTimeout: flagTimeout,
loaded: make(chan bool),
shutdown: make(chan bool),
forceReload: make(chan bool),
personalApiKey: personalApiKey,
projectApiKey: projectApiKey,
Errorf: errorf,
Endpoint: endpoint,
http: httpClient,
mutex: sync.RWMutex{},
nextPollTick: nextPollTick,
flagTimeout: flagTimeout,
}

go poller.run()
Expand All @@ -151,14 +149,14 @@ func newFeatureFlagsPoller(

func (poller *FeatureFlagsPoller) run() {
poller.fetchNewFeatureFlags()
close(poller.loaded)

for {
timer := time.NewTimer(poller.nextPollTick())
select {
case <-poller.shutdown:
close(poller.shutdown)
close(poller.forceReload)
close(poller.loaded)
timer.Stop()
return
case <-poller.forceReload:
Expand All @@ -176,27 +174,21 @@ func (poller *FeatureFlagsPoller) fetchNewFeatureFlags() {
res, cancel, err := poller.localEvaluationFlags(headers)
defer cancel()
if err != nil || res.StatusCode != http.StatusOK {
poller.loaded <- false
poller.Errorf("Unable to fetch feature flags", err)
return
}
defer res.Body.Close()
resBody, err := ioutil.ReadAll(res.Body)
if err != nil {
poller.loaded <- false
poller.Errorf("Unable to fetch feature flags", err)
return
}
featureFlagsResponse := FeatureFlagsResponse{}
err = json.Unmarshal([]byte(resBody), &featureFlagsResponse)
if err != nil {
poller.loaded <- false
poller.Errorf("Unable to unmarshal response from api/feature_flag/local_evaluation", err)
return
}
if !poller.fetchedFlagsSuccessfullyOnce {
poller.loaded <- true
}
newFlags := []FeatureFlag{}
newFlags = append(newFlags, featureFlagsResponse.Flags...)
poller.mutex.Lock()
Expand All @@ -205,12 +197,14 @@ func (poller *FeatureFlagsPoller) fetchNewFeatureFlags() {
if featureFlagsResponse.GroupTypeMapping != nil {
poller.groups = *featureFlagsResponse.GroupTypeMapping
}
poller.fetchedFlagsSuccessfullyOnce = true
poller.mutex.Unlock()
}

func (poller *FeatureFlagsPoller) GetFeatureFlag(flagConfig FeatureFlagPayload) (interface{}, error) {
featureFlags := poller.GetFeatureFlags()
featureFlags, err := poller.GetFeatureFlags()
if err != nil {
return nil, err
}
cohorts := poller.cohorts

featureFlag := FeatureFlag{Key: ""}
Expand All @@ -224,7 +218,6 @@ func (poller *FeatureFlagsPoller) GetFeatureFlag(flagConfig FeatureFlagPayload)
}

var result interface{}
var err error

if featureFlag.Key != "" {
result, err = poller.computeFlagLocally(
Expand Down Expand Up @@ -254,7 +247,10 @@ func (poller *FeatureFlagsPoller) GetFeatureFlag(flagConfig FeatureFlagPayload)

func (poller *FeatureFlagsPoller) GetAllFlags(flagConfig FeatureFlagPayloadNoKey) (map[string]interface{}, error) {
response := map[string]interface{}{}
featureFlags := poller.GetFeatureFlags()
featureFlags, err := poller.GetFeatureFlags()
if err != nil {
return nil, err
}
fallbackToDecide := false
cohorts := poller.cohorts

Expand Down Expand Up @@ -808,14 +804,15 @@ func _hash(key string, distinctId string, salt string) (float64, error) {
return float64(value) / LONG_SCALE, nil
}

func (poller *FeatureFlagsPoller) GetFeatureFlags() []FeatureFlag {
// ensure flags are loaded on the first call

if !poller.fetchedFlagsSuccessfullyOnce {
<-poller.loaded
func (poller *FeatureFlagsPoller) GetFeatureFlags() ([]FeatureFlag, error) {
// When channel is open this will block. When channel is closed it will immediately exit.
_, closed := <-poller.loaded
if closed && poller.featureFlags == nil {
// There was an error with initial flag fetching
return nil, fmt.Errorf("Flags were not successfully fetched yet")
}

return poller.featureFlags
return poller.featureFlags, nil
}

func (poller *FeatureFlagsPoller) decide(requestData []byte, headers [][2]string) (*http.Response, context.CancelFunc, error) {
Expand Down
2 changes: 1 addition & 1 deletion posthog.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func (c *client) GetFeatureFlags() ([]FeatureFlag, error) {
c.Errorf(errorMessage)
return nil, errors.New(errorMessage)
}
return c.featureFlagsPoller.GetFeatureFlags(), nil
return c.featureFlagsPoller.GetFeatureFlags()
}

func (c *client) GetAllFlags(flagConfig FeatureFlagPayloadNoKey) (map[string]interface{}, error) {
Expand Down

0 comments on commit 87b23fe

Please sign in to comment.