Skip to content

Commit

Permalink
chore: ignore context canceled errors when publishing to pubsub (#1185)
Browse files Browse the repository at this point in the history
Signed-off-by: Alessandro Yuichi Okimoto <[email protected]>
  • Loading branch information
cre8ivejp authored Aug 6, 2024
1 parent 63ab2f7 commit 14454f3
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 65 deletions.
19 changes: 2 additions & 17 deletions pkg/autoops/api/flag_trigger_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
ftdomain "github.com/bucketeer-io/bucketeer/pkg/feature/domain"
ftstorage "github.com/bucketeer-io/bucketeer/pkg/feature/storage/v2"
"github.com/bucketeer-io/bucketeer/pkg/locale"
"github.com/bucketeer-io/bucketeer/pkg/log"
autoopsproto "github.com/bucketeer-io/bucketeer/proto/autoops"
)

Expand Down Expand Up @@ -63,14 +62,7 @@ func enableFeature(
logger *zap.Logger,
) error {
if err := feature.Enable(); err != nil {
logger.Error(
"Failed to enable feature flag",
log.FieldsFromImcomingContext(ctx).AddFields(
zap.Error(err),
zap.String("featureId", feature.Id),
zap.String("environmentNamespace", environmentNamespace),
)...,
)
// If the flag is already disabled, we skip the updating
return nil
}
if err := ftStorage.UpdateFeature(ctx, feature, environmentNamespace); err != nil {
Expand All @@ -90,14 +82,7 @@ func disableFeature(
logger *zap.Logger,
) error {
if err := feature.Disable(); err != nil {
logger.Error(
"Failed to disable feature flag",
log.FieldsFromImcomingContext(ctx).AddFields(
zap.Error(err),
zap.String("featureId", feature.Id),
zap.String("environmentNamespace", environmentNamespace),
)...,
)
// If the flag is already disabled, we skip the updating
return nil
}
if err := ftStorage.UpdateFeature(ctx, feature, environmentNamespace); err != nil {
Expand Down
109 changes: 61 additions & 48 deletions pkg/gateway/api/api_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package api

import (
"context"
"errors"
"time"

"github.com/golang/protobuf/ptypes"
Expand Down Expand Up @@ -309,15 +310,17 @@ func (s *grpcGatewayService) GetEvaluations(
defer span.End()
envAPIKey, err := s.checkRequest(ctx, []accountproto.APIKey_Role{accountproto.APIKey_SDK_CLIENT})
if err != nil {
s.logger.Error("Failed to check GetEvaluations request",
log.FieldsFromImcomingContext(ctx).AddFields(
zap.Error(err),
zap.String("tag", req.Tag),
zap.Any("user", req.User),
zap.Any("sourceId", req.SourceId),
zap.String("sdkVersion", req.SdkVersion),
)...,
)
if !errors.Is(err, context.Canceled) {
s.logger.Error("Failed to check GetEvaluations request",
log.FieldsFromImcomingContext(ctx).AddFields(
zap.Error(err),
zap.String("tag", req.Tag),
zap.Any("user", req.User),
zap.Any("sourceId", req.SourceId),
zap.String("sdkVersion", req.SdkVersion),
)...,
)
}
return nil, err
}
projectID := envAPIKey.ProjectId
Expand Down Expand Up @@ -500,16 +503,18 @@ func (s *grpcGatewayService) GetEvaluation(
defer span.End()
envAPIKey, err := s.checkRequest(ctx, []accountproto.APIKey_Role{accountproto.APIKey_SDK_CLIENT})
if err != nil {
s.logger.Error("Failed to check GetEvaluation request",
log.FieldsFromImcomingContext(ctx).AddFields(
zap.Error(err),
zap.String("tag", req.Tag),
zap.Any("user", req.User),
zap.String("featureId", req.FeatureId),
zap.Any("sourceId", req.SourceId),
zap.String("sdkVersion", req.SdkVersion),
)...,
)
if !errors.Is(err, context.Canceled) {
s.logger.Error("Failed to check GetEvaluation request",
log.FieldsFromImcomingContext(ctx).AddFields(
zap.Error(err),
zap.String("tag", req.Tag),
zap.Any("user", req.User),
zap.String("featureId", req.FeatureId),
zap.Any("sourceId", req.SourceId),
zap.String("sdkVersion", req.SdkVersion),
)...,
)
}
return nil, err
}
requestTotal.WithLabelValues(
Expand Down Expand Up @@ -611,15 +616,17 @@ func (s *grpcGatewayService) GetFeatureFlags(
defer span.End()
envAPIKey, err := s.checkRequest(ctx, []accountproto.APIKey_Role{accountproto.APIKey_SDK_SERVER})
if err != nil {
s.logger.Error("Failed to check GetFeatureFlags request",
log.FieldsFromImcomingContext(ctx).AddFields(
zap.Error(err),
zap.String("tag", req.Tag),
zap.String("featureFlagsId", req.FeatureFlagsId),
zap.Any("sourceId", req.SourceId),
zap.String("sdkVersion", req.SdkVersion),
)...,
)
if !errors.Is(err, context.Canceled) {
s.logger.Error("Failed to check GetFeatureFlags request",
log.FieldsFromImcomingContext(ctx).AddFields(
zap.Error(err),
zap.String("tag", req.Tag),
zap.String("featureFlagsId", req.FeatureFlagsId),
zap.Any("sourceId", req.SourceId),
zap.String("sdkVersion", req.SdkVersion),
)...,
)
}
return nil, err
}
projectID := envAPIKey.ProjectId
Expand Down Expand Up @@ -769,14 +776,16 @@ func (s *grpcGatewayService) GetSegmentUsers(
defer span.End()
envAPIKey, err := s.checkRequest(ctx, []accountproto.APIKey_Role{accountproto.APIKey_SDK_SERVER})
if err != nil {
s.logger.Error("Failed to check GetSegmentUsers request",
log.FieldsFromImcomingContext(ctx).AddFields(
zap.Error(err),
zap.Strings("segmentIds", req.SegmentIds),
zap.Any("sourceId", req.SourceId),
zap.String("sdkVersion", req.SdkVersion),
)...,
)
if !errors.Is(err, context.Canceled) {
s.logger.Error("Failed to check GetSegmentUsers request",
log.FieldsFromImcomingContext(ctx).AddFields(
zap.Error(err),
zap.Strings("segmentIds", req.SegmentIds),
zap.Any("sourceId", req.SourceId),
zap.String("sdkVersion", req.SdkVersion),
)...,
)
}
return nil, err
}
projectID := envAPIKey.ProjectId
Expand Down Expand Up @@ -1408,32 +1417,35 @@ func (s *grpcGatewayService) RegisterEvents(
typ string,
) map[string]*gwproto.RegisterEventsResponse_Error {
errs := make(map[string]*gwproto.RegisterEventsResponse_Error)
errors := p.PublishMulti(ctx, messages)
multiErrs := p.PublishMulti(ctx, messages)
var repeatableErrors, nonRepeateableErrors float64
for id, err := range errors {
for id, err := range multiErrs {
retriable := err != publisher.ErrBadMessage
if retriable {
repeatableErrors++
} else {
nonRepeateableErrors++
}
s.logger.Error(
"Failed to publish event",
log.FieldsFromImcomingContext(ctx).AddFields(
zap.Error(err),
zap.String("environmentID", envAPIKey.Environment.Id),
zap.String("eventType", typ),
zap.String("id", id),
)...,
)
if !errors.Is(err, context.Canceled) {
s.logger.Error(
"Failed to publish event",
log.FieldsFromImcomingContext(ctx).AddFields(
zap.Error(err),
zap.String("environmentID", envAPIKey.Environment.Id),
zap.String("eventType", typ),
zap.String("id", id),
zap.Any("sourceId", req.SourceId),
)...,
)
}
errs[id] = &gwproto.RegisterEventsResponse_Error{
Retriable: retriable,
Message: "Failed to publish event",
}
}
eventCounter.WithLabelValues(callerGatewayService, typ, codeNonRepeatableError).Add(nonRepeateableErrors)
eventCounter.WithLabelValues(callerGatewayService, typ, codeRepeatableError).Add(repeatableErrors)
eventCounter.WithLabelValues(callerGatewayService, typ, codeOK).Add(float64(len(messages) - len(errors)))
eventCounter.WithLabelValues(callerGatewayService, typ, codeOK).Add(float64(len(messages) - len(multiErrs)))
return errs
}
for i, event := range req.Events {
Expand All @@ -1454,6 +1466,7 @@ func (s *grpcGatewayService) RegisterEvents(
zap.String("eventID", event.Id),
zap.String("environmentNamespace", event.EnvironmentNamespace),
zap.Any("event", event.Event),
zap.Any("sourceId", req.SourceId),
)
continue
}
Expand Down

0 comments on commit 14454f3

Please sign in to comment.