Skip to content

Commit

Permalink
feat: start xds command
Browse files Browse the repository at this point in the history
  • Loading branch information
FemiNoviaLina committed Oct 22, 2024
1 parent 3462887 commit 0a2711c
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 36 deletions.
94 changes: 94 additions & 0 deletions cmd/proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package cmd

import (
"github.com/MakeNowJust/heredoc"
"github.com/goto/shield/config"
"github.com/goto/shield/internal/proxy/envoy/xds"
"github.com/goto/shield/internal/store/postgres"
shieldlogger "github.com/goto/shield/pkg/logger"
"github.com/spf13/cobra"
cli "github.com/spf13/cobra"
)

func ProxyCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "proxy <command>",
Short: "Proxy management",
Long: "Server management commands.",
Example: heredoc.Doc(`
$ shield proxy envoy-xds start -c ./config.yaml
`),
}

cmd.AddCommand(proxyEnvoyXDSCommand())

return cmd
}

func proxyEnvoyXDSCommand() *cobra.Command {
c := &cli.Command{
Use: "envoy-xds",
Short: "Envoy Agent xDS management",
Long: "Envoy Agent xDS management commands.",
Example: heredoc.Doc(`
$ shield proxy envoy-xds start
`),
}

c.AddCommand(envoyXDSStartCommand())

return c
}

func envoyXDSStartCommand() *cobra.Command {
var configFile string

c := &cli.Command{
Use: "start",
Short: "Start Envoy Agent xDS server",
Long: "Start Envoy Agent xDS server commands.",
Example: "shield proxy envoy-xds start",
RunE: func(cmd *cli.Command, args []string) error {
appConfig, err := config.Load(configFile)
if err != nil {
panic(err)
}

logger := shieldlogger.InitLogger(shieldlogger.Config{Level: appConfig.Log.Level})

dbClient, err := setupDB(appConfig.DB, logger)
if err != nil {
return err
}
defer func() {
logger.Info("cleaning up db")
dbClient.Close()
}()

ctx := cmd.Context()

pgRuleRepository := postgres.NewRuleRepository(dbClient)
if err := pgRuleRepository.InitCache(ctx); err != nil {
return err
}

cbs, repositories, err := buildXDSDependencies(ctx, logger, appConfig.Proxy, pgRuleRepository)
if err != nil {
return err
}
defer func() {
logger.Info("cleaning up rules proxy blob")
for _, f := range cbs {
if err := f(); err != nil {
logger.Warn("error occurred during shutdown rules proxy blob storages", "err", err)
}
}
}()

return xds.Serve(ctx, logger, appConfig.Proxy, repositories)
},
}

c.Flags().StringVarP(&configFile, "config", "c", "", "Config file path")
return c
}
1 change: 1 addition & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func New(cliConfig *Config) *cli.Command {
}

cmd.AddCommand(ServerCommand())
cmd.AddCommand(ProxyCommand())
cmd.AddCommand(NamespaceCommand(cliConfig))
cmd.AddCommand(UserCommand(cliConfig))
cmd.AddCommand(OrganizationCommand(cliConfig))
Expand Down
47 changes: 25 additions & 22 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,34 +179,37 @@ func StartServer(logger *log.Zap, cfg *config.Shield) error {
}

// serving proxies
var cbs []func() error
var cps []func(context.Context) error
if cfg.Proxy.EnvoyAgent.XDS.Host != "" && cfg.Proxy.EnvoyAgent.XDS.Port != 0 {
serveXDS(ctx, logger, cfg.Proxy, pgRuleRepository)
cbs, err = serveXDS(ctx, logger, cfg.Proxy, pgRuleRepository)
} else {
cbs, cps, err := serveProxies(ctx, logger, cfg.App.IdentityProxyHeader, cfg.App.UserIDHeader, cfg.Proxy, pgRuleRepository, deps.ResourceService, deps.RelationService, deps.UserService, deps.GroupService, deps.ProjectService, deps.RelationAdapter)
if err != nil {
return err
}
defer func() {
// clean up stage
logger.Info("cleaning up rules proxy blob")
for _, f := range cbs {
if err := f(); err != nil {
logger.Warn("error occurred during shutdown rules proxy blob storages", "err", err)
}
cbs, cps, err = serveProxies(ctx, logger, cfg.App.IdentityProxyHeader, cfg.App.UserIDHeader, cfg.Proxy, pgRuleRepository, deps.ResourceService, deps.RelationService, deps.UserService, deps.GroupService, deps.ProjectService, deps.RelationAdapter)
}
if err != nil {
return err
}
defer func() {
// clean up stage
logger.Info("cleaning up rules proxy blob")
for _, f := range cbs {
if err := f(); err != nil {
logger.Warn("error occurred during shutdown rules proxy blob storages", "err", err)
}
}

logger.Info("cleaning up proxies")
for _, f := range cps {
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), time.Second*20)
if err := f(shutdownCtx); err != nil {
shutdownCancel()
logger.Warn("error occurred during shutdown proxies", "err", err)
continue
}
logger.Info("cleaning up proxies")
for _, f := range cps {
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), time.Second*20)
if err := f(shutdownCtx); err != nil {
shutdownCancel()
logger.Warn("error occurred during shutdown proxies", "err", err)
continue
}
}()
}
shutdownCancel()
}
}()

// serving server
return server.Serve(ctx, logger, cfg.App, nrApp, deps)
}
Expand Down
31 changes: 25 additions & 6 deletions cmd/serve_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,31 @@ func serveProxies(
}

func serveXDS(ctx context.Context, logger *log.Zap, cfg proxy.ServicesConfig, pgRuleRepository *postgres.RuleRepository) ([]func() error, error) {
var cleanUpBlobs []func() error
cleanUpBlobs, repositories, err := buildXDSDependencies(ctx, logger, cfg, pgRuleRepository)
if err != nil {
return nil, err
}

errChan := make(chan error)
go func() {
err := xds.Serve(ctx, logger, cfg, repositories)
if err != nil {
errChan <- err
logger.Error("error while running envoy xds server", "error", err)
}
}()

return cleanUpBlobs, nil
}

func buildXDSDependencies(ctx context.Context, logger *log.Zap, cfg proxy.ServicesConfig, pgRuleRepository *postgres.RuleRepository) ([]func() error, map[string]ads.Repository, error) {
var cleanUpBlobs []func() error
repositories := make(map[string]ads.Repository)

for _, svcConfig := range cfg.Services {
parsedRuleConfigURL, err := url.Parse(svcConfig.RulesPath)
if err != nil {
return nil, err
return nil, nil, err
}

var repository ads.Repository
Expand All @@ -119,21 +137,22 @@ func serveXDS(ctx context.Context, logger *log.Zap, cfg proxy.ServicesConfig, pg
rule.RULES_CONFIG_STORAGE_MEM:
ruleBlobFS, err := blob.NewStore(ctx, svcConfig.RulesPath, svcConfig.RulesPathSecret)
if err != nil {
return nil, err
return nil, nil, err
}

blobRuleRepository := blob.NewRuleRepository(logger, ruleBlobFS)
if err := blobRuleRepository.InitCache(ctx, ruleCacheRefreshDelay); err != nil {
return nil, err
return nil, nil, err
}
cleanUpBlobs = append(cleanUpBlobs, blobRuleRepository.Close)
repository = blobRuleRepository
default:
return nil, errors.New("invalid rule config storage")
return nil, nil, errors.New("invalid rule config storage")
}
repositories[svcConfig.Name] = repository
}
return cleanUpBlobs, xds.Serve(ctx, logger, cfg, repositories)

return cleanUpBlobs, repositories, nil
}

func buildHookPipeline(
Expand Down
9 changes: 1 addition & 8 deletions internal/proxy/envoy/xds/xds.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,5 @@ func Serve(ctx context.Context, logger log.Logger, cfg proxy.ServicesConfig, rep
return err
}

go func() {
if err := server.Serve(lis); err != nil {
logger.Error("[envoy agent] failed to serve envoy xds: %v\n", err)
}
}()

logger.Info("[envoy agent] envoy xds is up")
return nil
return server.Serve(lis)
}

0 comments on commit 0a2711c

Please sign in to comment.