Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
3u13r committed Aug 22, 2023
1 parent bb654ba commit e229b8c
Show file tree
Hide file tree
Showing 18 changed files with 195 additions and 141 deletions.
1 change: 0 additions & 1 deletion bootstrapper/cmd/bootstrapper/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,5 +99,4 @@ type clusterInitJoiner interface {
type metadataAPI interface {
joinclient.MetadataAPI
initserver.MetadataAPI
GetLoadBalancerEndpoint(ctx context.Context) (host, port string, err error)
}
10 changes: 9 additions & 1 deletion bootstrapper/internal/joinclient/joinclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,14 @@ func (c *JoinClient) Stop() {
func (c *JoinClient) tryJoinWithAvailableServices() error {
ips, err := c.getControlPlaneIPs()
if err != nil {
return err
return fmt.Errorf("failed to get control plane IPs: %w", err)
}

ip, _, err := c.metadataAPI.GetLoadBalancerEndpoint(context.TODO())
if err != nil {
return fmt.Errorf("failed to get load balancer endpoint: %w", err)
}
ips = append(ips, ip)

if len(ips) == 0 {
return errors.New("no control plane IPs found")
Expand Down Expand Up @@ -425,6 +431,8 @@ type MetadataAPI interface {
List(ctx context.Context) ([]metadata.InstanceMetadata, error)
// Self retrieves the current instance.
Self(ctx context.Context) (metadata.InstanceMetadata, error)
// GetLoadBalancerEndpoint retrieves the load balancer endpoint.
GetLoadBalancerEndpoint(ctx context.Context) (host, port string, err error)
}

type encryptedDisk interface {
Expand Down
8 changes: 8 additions & 0 deletions bootstrapper/internal/joinclient/joinclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,10 @@ func (s *stubRepeaterMetadataAPI) List(_ context.Context) ([]metadata.InstanceMe
return s.listInstances, s.listErr
}

func (s *stubRepeaterMetadataAPI) GetLoadBalancerEndpoint(_ context.Context) (string, string, error) {
return "", "", nil
}

type stubMetadataAPI struct {
selfAnswerC chan selfAnswer
listAnswerC chan listAnswer
Expand All @@ -352,6 +356,10 @@ func (s *stubMetadataAPI) List(_ context.Context) ([]metadata.InstanceMetadata,
return answer.instances, answer.err
}

func (s *stubMetadataAPI) GetLoadBalancerEndpoint(_ context.Context) (string, string, error) {
return "", "", nil
}

type selfAnswer struct {
instance metadata.InstanceMetadata
err error
Expand Down
13 changes: 13 additions & 0 deletions cli/internal/terraform/terraform/aws/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ locals {
ports_verify = "30081"
ports_recovery = "9999"
ports_debugd = "4000"
ports_join = "30090"
target_group_arns = {
control-plane : flatten([
module.load_balancer_target_bootstrapper.target_group_arn,
module.load_balancer_target_kubernetes.target_group_arn,
module.load_balancer_target_verify.target_group_arn,
module.load_balancer_target_recovery.target_group_arn,
module.load_balancer_target_konnectivity.target_group_arn,
module.load_balancer_target_join.target_group_arn,
var.debug ? [module.load_balancer_target_debugd[0].target_group_arn] : [],
])
worker : []
Expand Down Expand Up @@ -96,6 +98,7 @@ resource "aws_lb" "front_end" {
internal = false
load_balancer_type = "network"
tags = local.tags
security_groups = [aws_security_group.security_group.id]

dynamic "subnet_mapping" {
# TODO(malt3): use for_each = toset(module.public_private_subnet.all_zones)
Expand Down Expand Up @@ -255,6 +258,16 @@ module "load_balancer_target_konnectivity" {
healthcheck_protocol = "TCP"
}

module "load_balancer_target_join" {
source = "./modules/load_balancer_target"
name = "${local.name}-join"
vpc_id = aws_vpc.vpc.id
lb_arn = aws_lb.front_end.arn
port = local.ports_join
tags = local.tags
healthcheck_protocol = "TCP"
}

module "instance_group" {
source = "./modules/instance_group"
for_each = var.node_groups
Expand Down
31 changes: 22 additions & 9 deletions debugd/internal/debugd/metadata/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
// Fetcher retrieves other debugd IPs from cloud provider metadata.
type Fetcher interface {
DiscoverDebugdIPs(ctx context.Context) ([]string, error)
DiscoverLoadbalancerIP(ctx context.Context) (string, error)
}

// Scheduler schedules fetching of metadata using timers.
Expand Down Expand Up @@ -51,23 +52,35 @@ func (s *Scheduler) Start(ctx context.Context, wg *sync.WaitGroup) {
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
}

ips, err := s.fetcher.DiscoverDebugdIPs(ctx)
if err != nil {
s.log.With(zap.Error(err)).Warnf("Discovering debugd IPs failed")
}
if err == nil {
s.log.With(zap.Strings("ips", ips)).Infof("Discovered instances")
s.download(ctx, ips)
if s.deploymentDone && s.infoDone {
return
}

lbip, err := s.fetcher.DiscoverLoadbalancerIP(ctx)
if err != nil {
s.log.With(zap.Error(err)).Warnf("Discovering loadbalancer IP failed")
} else {
ips = append(ips, lbip)
}

select {
case <-ctx.Done():
if len(ips) == 0 {
s.log.With(zap.Error(err)).Warnf("No debugd IPs discovered")
continue
}

s.log.With(zap.Strings("ips", ips)).Infof("Discovered instances")
s.download(ctx, ips)
if s.deploymentDone && s.infoDone {
return
case <-ticker.C:
}

}
}()
}
Expand Down
4 changes: 4 additions & 0 deletions debugd/internal/debugd/metadata/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ func (s *stubFetcher) DiscoverDebugdIPs(_ context.Context) ([]string, error) {
return s.ips, nil
}

func (s *stubFetcher) DiscoverLoadbalancerIP(_ context.Context) (string, error) {
return "", errors.New("not implemented")
}

type stubDownloader struct {
downloadDeploymentErrs []error
downloadDeploymentErrIdx int
Expand Down
4 changes: 0 additions & 4 deletions debugd/internal/debugd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,6 @@ func (s *debugdServer) UploadFiles(stream pb.Debugd_UploadFilesServer) error {
// DownloadFiles streams the previously received files to other instances.
func (s *debugdServer) DownloadFiles(_ *pb.DownloadFilesRequest, stream pb.Debugd_DownloadFilesServer) error {
s.log.Infof("Sending files to other instance")
if !s.transfer.CanSend() {
return errors.New("cannot send files at this time")
}
return s.transfer.SendFiles(stream)
}

Expand Down Expand Up @@ -185,5 +182,4 @@ type fileTransferer interface {
RecvFiles(stream filetransfer.RecvFilesStream) error
SendFiles(stream filetransfer.SendFilesStream) error
GetFiles() []filetransfer.FileStat
CanSend() bool
}
4 changes: 0 additions & 4 deletions debugd/internal/debugd/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,6 @@ func TestDownloadFiles(t *testing.T) {
canSend: true,
wantSendFileCalls: 1,
},
"transfer is not ready for sending": {
request: &pb.DownloadFilesRequest{},
wantRecvErr: true,
},
}

for name, tc := range testCases {
Expand Down
43 changes: 20 additions & 23 deletions debugd/internal/filetransfer/filetransfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"io"
"io/fs"
"sync"
"sync/atomic"

"github.com/edgelesssys/constellation/v2/debugd/internal/debugd"
"github.com/edgelesssys/constellation/v2/debugd/internal/filetransfer/streamer"
Expand All @@ -33,10 +34,10 @@ type SendFilesStream interface {

// FileTransferer manages sending and receiving of files.
type FileTransferer struct {
mux sync.RWMutex
fileMux sync.RWMutex
log *logger.Logger
receiveStarted bool
receiveFinished bool
receiveFinished atomic.Bool
files []FileStat
streamer streamReadWriter
showProgress bool
Expand All @@ -52,12 +53,15 @@ func New(log *logger.Logger, streamer streamReadWriter, showProgress bool) *File
}

// SendFiles sends files to the given stream.
// If the FileTransferer has not received any files, an error is returned.
func (s *FileTransferer) SendFiles(stream SendFilesStream) error {
s.mux.RLock()
defer s.mux.RUnlock()
if !s.receiveFinished {
if !s.receiveFinished.Load() {
return errors.New("cannot send files before receiving them")
}

s.fileMux.RLock()
defer s.fileMux.RUnlock()

for _, file := range s.files {
if err := s.handleFileSend(stream, file); err != nil {
return err
Expand All @@ -68,8 +72,8 @@ func (s *FileTransferer) SendFiles(stream SendFilesStream) error {

// RecvFiles receives files from the given stream.
func (s *FileTransferer) RecvFiles(stream RecvFilesStream) (err error) {
s.mux.Lock()
defer s.mux.Unlock()
s.fileMux.Lock()
defer s.fileMux.Unlock()
if err := s.startRecv(); err != nil {
return err
}
Expand All @@ -89,30 +93,23 @@ func (s *FileTransferer) RecvFiles(stream RecvFilesStream) (err error) {

// GetFiles returns the a copy of the list of files that have been received.
func (s *FileTransferer) GetFiles() []FileStat {
s.mux.RLock()
defer s.mux.RUnlock()
s.fileMux.RLock()
defer s.fileMux.RUnlock()
res := make([]FileStat, len(s.files))
copy(res, s.files)
return res
}

// SetFiles sets the list of files that can be sent.
// This function is used for a sender which has not received any files through
// this FileTransferer i.e. the CLI.
func (s *FileTransferer) SetFiles(files []FileStat) {
s.mux.Lock()
defer s.mux.Unlock()
s.fileMux.Lock()
defer s.fileMux.Unlock()
res := make([]FileStat, len(files))
copy(res, files)
s.files = res
s.receiveFinished = true
}

// CanSend returns true if the file receive has finished.
// This is called to determine if a debugd instance can request files from this server.
func (s *FileTransferer) CanSend() bool {
s.mux.RLock()
defer s.mux.RUnlock()
ret := s.receiveFinished
return ret
s.receiveFinished.Store(true)
}

func (s *FileTransferer) handleFileSend(stream SendFilesStream, file FileStat) error {
Expand Down Expand Up @@ -173,7 +170,7 @@ func (s *FileTransferer) handleFileRecv(stream RecvFilesStream) (bool, error) {
// startRecv marks the file receive as started. It returns an error if receiving has already started.
func (s *FileTransferer) startRecv() error {
switch {
case s.receiveFinished:
case s.receiveFinished.Load():
return ErrReceiveFinished
case s.receiveStarted:
return ErrReceiveRunning
Expand All @@ -193,7 +190,7 @@ func (s *FileTransferer) abortRecv() {
// This allows other debugd instances to request files from this server.
func (s *FileTransferer) finishRecv() {
s.receiveStarted = false
s.receiveFinished = true
s.receiveFinished.Store(true)
}

// addFile adds a file to the list of received files.
Expand Down
Loading

0 comments on commit e229b8c

Please sign in to comment.