Skip to content

Commit

Permalink
fix: stream flow
Browse files Browse the repository at this point in the history
  • Loading branch information
FemiNoviaLina committed Oct 22, 2024
1 parent 9dd6a59 commit 3462887
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 58 deletions.
1 change: 1 addition & 0 deletions internal/proxy/envoy/xds/ads/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ type Message struct {
NodeID string
VersionInfo string
Nonce string
TypeUrl string
}

type MessageChan chan Message
Expand Down
2 changes: 1 addition & 1 deletion internal/proxy/envoy/xds/ads/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (s Service) getCluster(rule rule.Rule) *cluster.Cluster {
ClusterDiscoveryType: &cluster.Cluster_Type{
Type: cluster.Cluster_LOGICAL_DNS,
},
DnsLookupFamily: cluster.Cluster_V4_ONLY,
DnsLookupFamily: cluster.Cluster_V4_PREFERRED,
Name: rule.Backend.Namespace,
ConnectTimeout: durationpb.New(1 * time.Second),
LoadAssignment: s.getEndpoint(rule),
Expand Down
104 changes: 47 additions & 57 deletions internal/proxy/envoy/xds/ads/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,8 @@ type DiscoveryResource struct {
}

type Client struct {
NodeID string
LatestVersionSent string
LatestVersionACK string
LatestNonceSent string
LatestNonceACK string
LastUpdated time.Time
NodeID string
LastUpdated time.Time
}

type Stream struct {
Expand All @@ -45,10 +41,10 @@ func NewStream(logger log.Logger, refreshInterval time.Duration, stream xds.Aggr
ctx: ctx,
cancel: cancel,
logger: logger,
refreshInterval: refreshInterval,
stream: stream,
messageChan: make(MessageChan),
services: services,
messageChan: make(MessageChan),
refreshInterval: refreshInterval,
}
}

Expand All @@ -65,44 +61,31 @@ func (s Stream) Stream() error {
if err == io.EOF {
return
}

if err != nil {
s.logger.Error(err.Error())
return
}

if in.ResponseNonce == "" {
versionInfo := strconv.FormatInt(time.Now().UnixNano(), 10)
nonce := strconv.FormatInt(time.Now().UnixNano(), 10)
s.logger.Info("received request on stream", "typeurl", in.TypeUrl)
message := Message{
NodeID: in.Node.Id,
VersionInfo: versionInfo,
Nonce: nonce,
VersionInfo: strconv.FormatInt(time.Now().UnixNano(), 10),
Nonce: strconv.FormatInt(time.Now().UnixNano(), 10),
TypeUrl: in.TypeUrl,
}
s.messageChan.Push(message)
s.client.LastUpdated = time.Now()
s.client.LatestVersionSent = versionInfo
s.client.LatestNonceSent = nonce

if s.client.NodeID == "" {
s.client.NodeID = in.Node.Id
s.PushUpdatePeriodically()
go s.PushUpdatePeriodically()
}
} else if in.ErrorDetail == nil {
s.logger.Info("received ACK on stream", "typeurl", in.TypeUrl, "version_info", in.VersionInfo)
} else {
if in.ResponseNonce == s.client.LatestNonceSent {
s.client.LatestVersionACK = in.VersionInfo
s.client.LatestNonceACK = in.ResponseNonce
s.logger.Info("received ACK on stream", in)
} else {
s.logger.Info("received NACK on stream", in.ErrorDetail)
nonce := strconv.FormatInt(time.Now().UnixNano(), 10)
message := Message{
NodeID: s.client.NodeID,
VersionInfo: s.client.LatestVersionSent,
Nonce: nonce,
}
s.client.LatestNonceSent = nonce
s.messageChan.Push(message)
s.client.LastUpdated = time.Now()
}
s.logger.Info("received NACK on stream", "typeurl", in.TypeUrl, "version_info", in.VersionInfo, "error", in.ErrorDetail)
}
}
}
Expand All @@ -111,7 +94,7 @@ func (s Stream) Stream() error {
go func() {
for e := range s.messageChan {
if err := s.streamResponses(e); err != nil {
s.logger.Debug("error while streaming response", err)
s.logger.Debug("error while streaming response", "error", err)
}
}
}()
Expand All @@ -136,53 +119,60 @@ func (s Stream) streamResponses(message Message) error {
}
}

// When using ADS we need to order responses.
// https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#eventual-consistency-considerations
responseStream := NewResponseStream(s.stream, message.VersionInfo, message.Nonce)
if err := responseStream.StreamCDS(cfg.Clusters); err != nil {
return err
}
if err := responseStream.StreamLDS(cfg.Listeners); err != nil {
return err
}
if err := responseStream.StreamRDS(cfg.Routes); err != nil {
return err
switch message.TypeUrl {
case CLUSTER_TYPE_URL:
if err := responseStream.StreamCDS(cfg.Clusters); err != nil {
return err
}
case LISTENER_TYPE_URL:
if err := responseStream.StreamLDS(cfg.Listeners); err != nil {
return err
}
case ROUTER_TYPE_URL:
if err := responseStream.StreamRDS(cfg.Routes); err != nil {
return err
}
default:
if err := responseStream.StreamCDS(cfg.Clusters); err != nil {
return err
}
if err := responseStream.StreamLDS(cfg.Listeners); err != nil {
return err
}
if err := responseStream.StreamRDS(cfg.Routes); err != nil {
return err
}
}

return nil
}

func (s Stream) PushUpdatePeriodically() {
ticker := time.NewTicker(s.refreshInterval)
defer ticker.Stop()

service, ok := s.services[s.client.NodeID]
if !ok {
s.logger.Debug("service not found for node id", s.client.NodeID)
s.logger.Debug("service not found", "node_id", s.client.NodeID)
return
}

for {
select {
case <-ticker.C:
case <-s.ctx.Done():
return
default:
time.Sleep(s.refreshInterval)
if service.IsUpdated(s.ctx, s.client.LastUpdated) {
s.logger.Debug("discovery resource update found")
versionInfo := strconv.FormatInt(time.Now().UnixNano(), 10)
nonce := strconv.FormatInt(time.Now().UnixNano(), 10)
s.logger.Debug("discovery resource update found", "node_id", s.client.NodeID)
message := Message{
NodeID: s.client.NodeID,
VersionInfo: versionInfo,
Nonce: nonce,
VersionInfo: strconv.FormatInt(time.Now().UnixNano(), 10),
Nonce: strconv.FormatInt(time.Now().UnixNano(), 10),
}
s.messageChan.Push(message)
s.client.LatestVersionSent = versionInfo
s.client.LatestNonceSent = nonce
s.client.LastUpdated = time.Now()
} else {
s.logger.Debug("no discovery resource update")
s.logger.Debug("no discovery resource update", "node_id", s.client.NodeID)
}
case <-s.ctx.Done():
return
}
}
}

0 comments on commit 3462887

Please sign in to comment.