From 9c6e3425906d31d6fb9c9da82463b064e9f4ad83 Mon Sep 17 00:00:00 2001 From: Viren Date: Thu, 19 May 2022 17:53:47 +0530 Subject: [PATCH 1/5] first cut changes --- cmd/dkvctl/main.go | 11 +- cmd/dkvsrv/main.go | 77 +++++++------ internal/opts/config.go | 32 ++++++ internal/slave/service.go | 31 +++++- internal/utils.go | 227 ++++++++++++++++++++++++++++++++++++++ pkg/ctl/client.go | 23 ++++ 6 files changed, 362 insertions(+), 39 deletions(-) create mode 100644 internal/utils.go diff --git a/cmd/dkvctl/main.go b/cmd/dkvctl/main.go index dde3b6ca..c76e7517 100644 --- a/cmd/dkvctl/main.go +++ b/cmd/dkvctl/main.go @@ -6,6 +6,7 @@ import ( "sort" "strings" + utils "github.com/flipkart-incubator/dkv/internal" "github.com/flipkart-incubator/dkv/pkg/ctl" "github.com/flipkart-incubator/dkv/pkg/serverpb" @@ -228,11 +229,13 @@ func (c *cmd) clusterInfo(client *ctl.DKVClient, args ...string) { } } -var dkvAddr, dkvAuthority string +var dkvAddr, dkvAuthority, caCertPath string func init() { flag.StringVarP(&dkvAddr, "dkvAddr", "a", "127.0.0.1:8080", ": - DKV server address") flag.StringVar(&dkvAuthority, "authority", "", "Override :authority pseudo header for routing purposes. Useful while accessing DKV via service mesh.") + flag.StringVar(&caCertPath, "caCertPath", "", "Path for root certificate of the chain, i.e. CA certificate") + for _, c := range cmds { if c.argDesc == "" { flag.BoolVar(&c.emptyValue, c.name, c.emptyValue, c.cmdDesc) @@ -269,7 +272,11 @@ func main() { fmt.Printf(" (:authority = %s)", dkvAuthority) } fmt.Printf("...") - client, err := ctl.NewInSecureDKVClient(dkvAddr, dkvAuthority, ctl.DefaultConnectOpts) + //client, err := ctl.NewInSecureDKVClient(dkvAddr, dkvAuthority, ctl.DefaultConnectOpts) + dkvConfig := utils.DKVConfig{ + SrvrAddr:dkvAddr, + CaCertPath:caCertPath} + client, err := utils.NewDKVClient(dkvConfig, dkvAuthority, ctl.DefaultConnectOpts) if err != nil { fmt.Printf("\nUnable to create DKV client. Error: %v\n", err) return diff --git a/cmd/dkvsrv/main.go b/cmd/dkvsrv/main.go index 9cf5de14..ddf9193c 100644 --- a/cmd/dkvsrv/main.go +++ b/cmd/dkvsrv/main.go @@ -3,18 +3,7 @@ package main import ( "encoding/json" "fmt" - "io" - "io/ioutil" - "log" - "net" - "net/http" - "net/url" - "os" - "os/signal" - "path" - "strings" - "syscall" - + utils "github.com/flipkart-incubator/dkv/internal" "github.com/flipkart-incubator/dkv/internal/discovery" "github.com/flipkart-incubator/dkv/internal/master" "github.com/flipkart-incubator/dkv/internal/opts" @@ -32,14 +21,22 @@ import ( "github.com/gorilla/mux" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" + "io" + "io/ioutil" + "log" + "net" + "net/http" + "net/url" + "os" + "os/signal" + "path" + "strings" + "syscall" - grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" flag "github.com/spf13/pflag" "go.uber.org/zap" "go.uber.org/zap/zapcore" "google.golang.org/grpc" - "google.golang.org/grpc/reflection" - "net/http/pprof" ) @@ -98,6 +95,7 @@ func main() { flag.Parse() config.Init(cfgFile) config.Print() + opts.AppConfig = config setupDKVLogger() setupAccessLogger() @@ -105,8 +103,20 @@ func main() { setupStats() go setupHttpServer() + var secure = config.CaCertPath != "" && config.KeyPath != "" && config.CertPath != "" + + var srvrMode utils.ConnectionMode + if secure { + srvrMode = utils.ServerTLS + } else { + srvrMode = utils.Insecure + } + kvs, cp, ca, br := newKVStore() - grpcSrvr, lstnr := newGrpcServerListener() + //grpcSrvr, lstnr := newGrpcServerListener() + grpcSrvr, lstnr := utils.NewGrpcServerListener(utils.DKVConfig{ConnectionMode: srvrMode, + SrvrAddr: config.ListenAddr, KeyPath: config.KeyPath, CertPath: config.CertPath, + CaCertPath: config.CaCertPath}, accessLogger) defer grpcSrvr.GracefulStop() srvrRole := toDKVSrvrRole(config.DbRole) //srvrRole.printFlags() @@ -173,7 +183,22 @@ func main() { } defer dkvSvc.Close() serverpb.RegisterDKVServer(grpcSrvr, dkvSvc) - serverpb.RegisterDKVReplicationServer(grpcSrvr, dkvSvc) + + var replSrvrMode utils.ConnectionMode + if secure { + replSrvrMode = utils.MutualTLS + } else { + replSrvrMode = utils.Insecure + } + + replGrpcSrvr, replLstnr := utils.NewGrpcServerListener(utils.DKVConfig{ConnectionMode: replSrvrMode, + SrvrAddr: config.PeerListenAddr, KeyPath: config.KeyPath, CertPath: config.CertPath, + CaCertPath: config.CaCertPath}, accessLogger) + defer replGrpcSrvr.GracefulStop() + + serverpb.RegisterDKVReplicationServer(replGrpcSrvr, dkvSvc) + go replGrpcSrvr.Serve(replLstnr) + //TODO do we need to register below with replGrpcSrvr as well health.RegisterHealthServer(grpcSrvr, dkvSvc) // Discovery servers can be only configured if node started as master. @@ -284,24 +309,6 @@ func setupDKVLogger() { } } -func newGrpcServerListener() (*grpc.Server, net.Listener) { - grpcSrvr := grpc.NewServer( - grpc.StreamInterceptor(grpc_zap.StreamServerInterceptor(accessLogger)), - grpc.UnaryInterceptor(grpc_zap.UnaryServerInterceptor(accessLogger)), - ) - reflection.Register(grpcSrvr) - return grpcSrvr, newListener() -} - -func newListener() (lis net.Listener) { - var err error - if lis, err = net.Listen("tcp", config.ListenAddr); err != nil { - log.Panicf("failed to listen: %v", err) - return - } - return -} - func setupSignalHandler() <-chan os.Signal { signals := []os.Signal{syscall.SIGINT, syscall.SIGQUIT, syscall.SIGSTOP, syscall.SIGTERM} stopChan := make(chan os.Signal, len(signals)) diff --git a/internal/opts/config.go b/internal/opts/config.go index 65c645a6..18a6fc6f 100644 --- a/internal/opts/config.go +++ b/internal/opts/config.go @@ -15,6 +15,11 @@ import ( "github.com/spf13/viper" ) +var ( + AppConfig Config +) + + type Config struct { // region level configuration. @@ -41,6 +46,14 @@ type Config struct { //Service discovery related params DiscoveryConfig DiscoveryServiceConfiguration `mapstructure:"discovery-service" desc:"config for discovery server"` + // secure communication related params + PeerListenAddr string `mapstructure:"peer-listen-addr" desc:"Address on which the DKV replication service binds"` + CertPath string `mapstructure:"cert-path" desc:"Path for certificate file of this node"` + KeyPath string `mapstructure:"key-path" desc:"Path for key file of this node"` + CaCertPath string `mapstructure:"ca-cert-path" desc:"Path for root certificate of the chain, i.e. CA certificate"` + + + // Temporary variables to be removed once https://github.com/flipkart-incubator/dkv/issues/82 is fixed // The above issue causes replication issues during master switch due to inconsistent change numbers // Thus enabling hardcoded masters to not degrade current behaviour @@ -132,6 +145,10 @@ func (c *Config) validateFlags() { log.Panicf("diskless is available only on Badger storage") } + if c.PeerListenAddr != "" && strings.IndexRune(c.PeerListenAddr, ':') < 0 { + log.Panicf("given listen address: %s is invalid, must be in host:port format", c.PeerListenAddr) + } + if c.DbEngineIni != "" { if _, err := os.Stat(c.DbEngineIni); err != nil && os.IsNotExist(err) { log.Panicf("given storage configuration file: %s does not exist", c.DbEngineIni) @@ -159,6 +176,11 @@ func (c *Config) validateFlags() { log.Panicf("Invalid discovery server client configuration") } } + + nonNullAuthFlags := btou(c.CertPath != "", c.KeyPath != "", c.CaCertPath != "") + if nonNullAuthFlags > 0 && nonNullAuthFlags < 3 { + log.Panicf("Missing TLS attributes, set all flags (caCertPath, keyPath, certPath) to run DKV in secure mode") + } } func (c *Config) Print() { @@ -235,3 +257,13 @@ func isFlagPassed(name string) bool { }) return found } + +func btou(conditions... bool) int { + cnt := 0 + for _, cond := range conditions { + if cond { + cnt += 1 + } + } + return cnt +} \ No newline at end of file diff --git a/internal/slave/service.go b/internal/slave/service.go index de794be5..8628c0be 100644 --- a/internal/slave/service.go +++ b/internal/slave/service.go @@ -11,11 +11,12 @@ import ( dto "github.com/prometheus/client_model/go" + utils "github.com/flipkart-incubator/dkv/internal" "github.com/flipkart-incubator/dkv/pkg/health" "github.com/flipkart-incubator/dkv/internal/discovery" "github.com/flipkart-incubator/dkv/internal/hlc" - opts "github.com/flipkart-incubator/dkv/internal/opts" + "github.com/flipkart-incubator/dkv/internal/opts" "github.com/flipkart-incubator/dkv/internal/stats" "github.com/flipkart-incubator/dkv/internal/storage" "github.com/flipkart-incubator/dkv/pkg/ctl" @@ -416,7 +417,12 @@ func (ss *slaveService) reconnectMaster() error { func (ss *slaveService) findAndConnectToMaster() error { if master, err := ss.findNewMaster(); err == nil { // TODO: Check if authority override option is needed for slaves while they connect with masters - if replCli, err := ctl.NewInSecureDKVClient(*master, "", ctl.DefaultConnectOpts); err == nil { + + slaveClientConfig := getSlaveClientConfig(*master) + //replCli, err := ctl.NewInSecureDKVClient(*master, "", ctl.DefaultConnectOpts); + replCli, err := utils.NewDKVClient(slaveClientConfig, "", ctl.DefaultConnectOpts); + + if err == nil { if ss.replInfo.replCli != nil { ss.replInfo.replCli.Close() } @@ -435,6 +441,27 @@ func (ss *slaveService) findAndConnectToMaster() error { return nil } +func getSlaveClientConfig(masterSvcAddr string) utils.DKVConfig { + config := opts.AppConfig + + var secure = config.CaCertPath != "" && config.KeyPath != "" && config.CertPath != "" + + var replClientMode utils.ConnectionMode + if secure { + replClientMode = utils.MutualTLS + } else { + replClientMode = utils.Insecure + } + + slaveClientConfig := utils.DKVConfig{ + ConnectionMode: replClientMode, + KeyPath: config.KeyPath, + CertPath: config.CertPath, + CaCertPath: config.CaCertPath, + SrvrAddr: masterSvcAddr} + return slaveClientConfig +} + // Finds a new active master for the region // Prefers followers within the local DC first, followed by master within local DC // followed by followers outside DC, followed by master outside DC diff --git a/internal/utils.go b/internal/utils.go new file mode 100644 index 00000000..562ab31e --- /dev/null +++ b/internal/utils.go @@ -0,0 +1,227 @@ +package internal + +import ( + "crypto/ecdsa" + "crypto/elliptic" + "crypto/rand" + "crypto/tls" + "crypto/x509" + "crypto/x509/pkix" + "encoding/pem" + "errors" + "fmt" + "github.com/flipkart-incubator/dkv/pkg/ctl" + grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" + "go.uber.org/zap" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/reflection" + "io/ioutil" + "log" + "math/big" + "net" + "os" + "path/filepath" + "time" +) + +type ConnectionMode string + +const ( + ServerTLS ConnectionMode = "serverTLS" + MutualTLS = "mutualTLS" + Insecure = "insecure" +) + +type DKVConfig struct { + ConnectionMode ConnectionMode + SrvrAddr string + KeyPath string + CertPath string + CaCertPath string +} + +func NewDKVClient(clientConfig DKVConfig, authority string, options ctl.ConnectOpts) (*ctl.DKVClient, error) { + var opt grpc.DialOption + var err error = nil + var config *tls.Config + var clientMode = clientConfig.ConnectionMode + if clientMode == "" { + clientMode = clientModeFromFlags(clientConfig) + } + switch clientMode { + case MutualTLS: + config, err = getTLSConfigWithCertPool(clientConfig.CaCertPath) + if err == nil { + var cert tls.Certificate + cert, err = tls.LoadX509KeyPair(clientConfig.CertPath, clientConfig.KeyPath) + if err == nil { + config.Certificates = []tls.Certificate{cert} + opt = grpc.WithTransportCredentials(credentials.NewTLS(config)) + } + } + case ServerTLS: + config, err = getTLSConfigWithCertPool(clientConfig.CaCertPath) + if err == nil { + opt = grpc.WithTransportCredentials(credentials.NewTLS(config)) + } + case Insecure: + opt = grpc.WithInsecure() + } + if err != nil { + return nil, err + } + return ctl.NewDKVClient(clientConfig.SrvrAddr, authority, options, opt) +} + +func getTLSConfigWithCertPool(caCertPath string) (*tls.Config, error) { + certPool := x509.NewCertPool() + bs, err := ioutil.ReadFile(caCertPath) + if err != nil { + return nil, err + } + ok := certPool.AppendCertsFromPEM(bs) + if !ok { + return nil, errors.New("failed to append certs") + } + return &tls.Config{InsecureSkipVerify: false, RootCAs: certPool}, err +} + +func NewGrpcServerListener(config DKVConfig, loogger *zap.Logger) (*grpc.Server, net.Listener) { + opts := []grpc.ServerOption{ + grpc.StreamInterceptor(grpc_zap.StreamServerInterceptor(loogger)), + grpc.UnaryInterceptor(grpc_zap.UnaryServerInterceptor(loogger))} + if config.ConnectionMode != Insecure { + srvrCred, err := loadTLSCredentials(config) + if err != nil { + log.Fatal("Unable to load tls credentials", err) + } + opts = append(opts, grpc.Creds(srvrCred)) + } + grpcSrvr := grpc.NewServer(opts...) + reflection.Register(grpcSrvr) + return grpcSrvr, NewListener(config.SrvrAddr) +} + +func loadTLSCredentials(clientConfig DKVConfig) (credentials.TransportCredentials, error) { + var serverCert tls.Certificate + var err error + + serverCert, err = tls.LoadX509KeyPair(clientConfig.CertPath, clientConfig.KeyPath) + + if err != nil { + return nil, err + } + + var config = &tls.Config{ + Certificates: []tls.Certificate{serverCert}, + } + + if clientConfig.ConnectionMode == MutualTLS { + pemClientCA, err := ioutil.ReadFile(clientConfig.CaCertPath) + if err != nil { + return nil, err + } + + certPool := x509.NewCertPool() + if !certPool.AppendCertsFromPEM(pemClientCA) { + return nil, fmt.Errorf("failed to add client CA's certificate") + } + + config.ClientAuth = tls.RequireAndVerifyClientCert + config.ClientCAs = certPool + } else { + config.ClientAuth = tls.NoClientCert + } + return credentials.NewTLS(config), nil +} + +func GenerateSelfSignedCert(generatedCertDir string, generatedCertValidity int) (string, string, error) { + var err error + if _, errCreate := os.Stat(generatedCertDir); os.IsNotExist(errCreate) { + err = os.MkdirAll(generatedCertDir, os.ModePerm) + } + + if err != nil { + return "", "", err + } + certPath := filepath.Join(generatedCertDir, "cert.pem") + keyPath := filepath.Join(generatedCertDir, "key.pem") + + _, errcert := os.Stat(certPath) + _, errkey := os.Stat(keyPath) + if errcert == nil && errkey == nil { + return "", "", nil + } + + serialNumberLimit := new(big.Int).Lsh(big.NewInt(1), 128) + serialNumber, err := rand.Int(rand.Reader, serialNumberLimit) + if err != nil { + return "", "", err + } + + tmpl := x509.Certificate{ + SerialNumber: serialNumber, + Subject: pkix.Name{Organization: []string{"DKV"}}, + NotBefore: time.Now(), + NotAfter: time.Now().Add(time.Duration(generatedCertValidity) * (24 * time.Hour)), + + KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth}, + BasicConstraintsValid: true, + } + + log.Println("automatically generate certificates", + zap.Time("certificate-validity-bound-not-after", tmpl.NotAfter)) + + priv, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) + if err != nil { + return "", "", err + } + + derBytes, err := x509.CreateCertificate(rand.Reader, &tmpl, &tmpl, &priv.PublicKey, priv) + if err != nil { + return "", "", err + } + + certOut, err := os.Create(certPath) + if err != nil { + return "", "", err + } + pem.Encode(certOut, &pem.Block{Type: "CERTIFICATE", Bytes: derBytes}) + certOut.Close() + log.Println("created cert file", zap.String("path", certPath)) + + b, err := x509.MarshalECPrivateKey(priv) + if err != nil { + return "", "", err + } + keyOut, err := os.OpenFile(keyPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) + if err != nil { + return "", "", err + } + + pem.Encode(keyOut, &pem.Block{Type: "EC PRIVATE KEY", Bytes: b}) + keyOut.Close() + return certPath, keyPath, nil +} + +func NewListener(listenAddr string) (lis net.Listener) { + var err error + if lis, err = net.Listen("tcp", listenAddr); err != nil { + log.Panicf("failed to listen: %v", err) + return + } + return +} + +func clientModeFromFlags(config DKVConfig) ConnectionMode { + if config.CaCertPath != "" { + if config.KeyPath != "" && config.CertPath != "" { + return MutualTLS + } + return ServerTLS + } else { + return Insecure + } +} \ No newline at end of file diff --git a/pkg/ctl/client.go b/pkg/ctl/client.go index 984a3f27..e8758a9e 100644 --- a/pkg/ctl/client.go +++ b/pkg/ctl/client.go @@ -79,6 +79,29 @@ func NewInSecureDKVClient(svcAddr, authority string, opts ConnectOpts) (*DKVClie return dkvClnt, err } +// NewDKVClient creates a GRPC client against the +// given DKV service address and dial options. Optionally the authority param can be +//// used to send a :authority psuedo-header for routing purposes +func NewDKVClient(svcAddr string, authority string, connectOptions ConnectOpts, opts ...grpc.DialOption) (*DKVClient, error) { + var dkvClnt *DKVClient + ctx, cancel := context.WithTimeout(context.Background(), connectOptions.ConnectTimeout) + defer cancel() + opts = append(opts, grpc.WithBlock(), grpc.WithReadBufferSize(connectOptions.ReadBufSize), + grpc.WithWriteBufferSize(connectOptions.WriteBufSize), grpc.WithAuthority(authority), + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(connectOptions.MaxMsgSize)), grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`)) + conn, err := grpc.DialContext(ctx, svcAddr, opts...) + if err == nil { + dkvCli := serverpb.NewDKVClient(conn) + dkvReplCli := serverpb.NewDKVReplicationClient(conn) + dkvBRCli := serverpb.NewDKVBackupRestoreClient(conn) + dkvClusCli := serverpb.NewDKVClusterClient(conn) + dkvDisCli := serverpb.NewDKVDiscoveryClient(conn) + dkvClnt = &DKVClient{conn, dkvCli, dkvReplCli, dkvBRCli, dkvClusCli, dkvDisCli, connectOptions} + } + return dkvClnt, err +} + + // Put takes the key and value as byte arrays and invokes the // GRPC Put method. This is a convenience wrapper. func (dkvClnt *DKVClient) Put(key []byte, value []byte) error { From e5ba95bb7f44b5a40806c9de0206ec0bad3de900 Mon Sep 17 00:00:00 2001 From: Viren Date: Fri, 20 May 2022 17:29:56 +0530 Subject: [PATCH 2/5] fix ut --- internal/discovery/service_test.go | 2 +- internal/master/ds_service_test.go | 6 +++--- internal/master/ss_service_test.go | 4 ++-- internal/slave/service_test.go | 6 +++--- pkg/ctl/client.go | 26 -------------------------- 5 files changed, 9 insertions(+), 35 deletions(-) diff --git a/internal/discovery/service_test.go b/internal/discovery/service_test.go index 231c9dcf..31367bc8 100644 --- a/internal/discovery/service_test.go +++ b/internal/discovery/service_test.go @@ -37,7 +37,7 @@ func TestDKVDiscoveryService(t *testing.T) { defer grpcSvc.GracefulStop() svcAddr := fmt.Sprintf("%s:%d", dkvSvcHost, dkvSvcPort) - if dkvCli, err = ctl.NewInSecureDKVClient(svcAddr, "", ctl.DefaultConnectOpts); err != nil { + if dkvCli, err = ctl.NewDKVClient(svcAddr, "", ctl.DefaultConnectOpts, grpc.WithInsecure()); err != nil { panic(err) } defer dkvCli.Close() diff --git a/internal/master/ds_service_test.go b/internal/master/ds_service_test.go index 72c026e8..2d3c0a8b 100644 --- a/internal/master/ds_service_test.go +++ b/internal/master/ds_service_test.go @@ -416,7 +416,7 @@ func testNewDKVNodeJoiningAndLeaving(t *testing.T) { // Create the client for the new DKV node svcAddr := fmt.Sprintf("%s:%d", dkvSvcHost, dkvPorts[newNodeID]) - if dkvCli, err := ctl.NewInSecureDKVClient(svcAddr, "", ctl.DefaultConnectOpts); err != nil { + if dkvCli, err := ctl.NewDKVClient(svcAddr, "", ctl.DefaultConnectOpts, grpc.WithInsecure()); err != nil { t.Fatal(err) } else { defer dkvCli.Close() @@ -453,7 +453,7 @@ func testNewDKVNodeJoiningAndLeaving(t *testing.T) { func initDKVClients(ids ...int) { for id := 1; id <= clusterSize; id++ { svcAddr := fmt.Sprintf("%s:%d", dkvSvcHost, dkvPorts[id]) - if client, err := ctl.NewInSecureDKVClient(svcAddr, "", ctl.DefaultConnectOpts); err != nil { + if client, err := ctl.NewDKVClient(svcAddr, "", ctl.DefaultConnectOpts, grpc.WithInsecure()); err != nil { panic(err) } else { dkvClis[id] = client @@ -464,7 +464,7 @@ func initDKVClients(ids ...int) { func initStreamingDKVClients(ids ...int) { for id := 1; id <= clusterSize; id++ { svcAddr := fmt.Sprintf("%s:%d", dkvSvcHost, dkvStreamingPorts[id]) - if client, err := ctl.NewInSecureDKVClient(svcAddr, "", ctl.DefaultConnectOpts); err != nil { + if client, err := ctl.NewDKVClient(svcAddr, "", ctl.DefaultConnectOpts, grpc.WithInsecure()); err != nil { panic(err) } else { dkvStreamingClis[id] = client diff --git a/internal/master/ss_service_test.go b/internal/master/ss_service_test.go index 899a105d..dc730250 100644 --- a/internal/master/ss_service_test.go +++ b/internal/master/ss_service_test.go @@ -55,7 +55,7 @@ func TestStandaloneService(t *testing.T) { go serveStandaloneDKV() sleepInSecs(3) dkvSvcAddr := fmt.Sprintf("%s:%d", dkvSvcHost, dkvSvcPort) - if client, err := ctl.NewInSecureDKVClient(dkvSvcAddr, "", ctl.DefaultConnectOpts); err != nil { + if client, err := ctl.NewDKVClient(dkvSvcAddr, "", ctl.DefaultConnectOpts, grpc.WithInsecure()); err != nil { t.Fatalf("Unable to connect to DKV service at %s. Error: %v", dkvSvcAddr, err) } else { dkvCli = client @@ -82,7 +82,7 @@ func TestStreamingHealthCheck(t *testing.T) { go serveStandaloneDKV() sleepInSecs(3) dkvSvcAddr := fmt.Sprintf("%s:%d", dkvSvcHost, dkvSvcPort) - if client, err := ctl.NewInSecureDKVClient(dkvSvcAddr, "", ctl.DefaultConnectOpts); err != nil { + if client, err := ctl.NewDKVClient(dkvSvcAddr, "", ctl.DefaultConnectOpts, grpc.WithInsecure()); err != nil { t.Fatalf("Unable to connect to DKV service at %s. Error: %v", dkvSvcAddr, err) } else { dkvCli = client diff --git a/internal/slave/service_test.go b/internal/slave/service_test.go index d89396bb..d4667ec6 100644 --- a/internal/slave/service_test.go +++ b/internal/slave/service_test.go @@ -316,7 +316,7 @@ func stopServers() { func initDKVClients(ids ...int) { for id := 1; id <= clusterSize; id++ { svcAddr := fmt.Sprintf("%s:%d", dkvSvcHost, dkvPorts[id]) - if client, err := ctl.NewInSecureDKVClient(svcAddr, "", ctl.DefaultConnectOpts); err != nil { + if client, err := ctl.NewDKVClient(svcAddr, "", ctl.DefaultConnectOpts, grpc.WithInsecure()); err != nil { panic(err) } else { dkvClis[id] = client @@ -326,7 +326,7 @@ func initDKVClients(ids ...int) { func initSingleDkvClient(id int) { svcAddr := fmt.Sprintf("%s:%d", dkvSvcHost, dkvPorts[id]) - if client, err := ctl.NewInSecureDKVClient(svcAddr, "", ctl.DefaultConnectOpts); err != nil { + if client, err := ctl.NewDKVClient(svcAddr, "", ctl.DefaultConnectOpts, grpc.WithInsecure()); err != nil { panic(err) } else { dkvClis[id] = client @@ -833,7 +833,7 @@ func testDelete(t *testing.T, dkvCli *ctl.DKVClient, keyPrefix string) { func newDKVClient(port int) *ctl.DKVClient { dkvSvcAddr := fmt.Sprintf("%s:%d", dkvSvcHost, port) - if client, err := ctl.NewInSecureDKVClient(dkvSvcAddr, "", ctl.DefaultConnectOpts); err != nil { + if client, err := ctl.NewDKVClient(dkvSvcAddr, "", ctl.DefaultConnectOpts, grpc.WithInsecure()); err != nil { panic(err) } else { return client diff --git a/pkg/ctl/client.go b/pkg/ctl/client.go index e8758a9e..8ee5e270 100644 --- a/pkg/ctl/client.go +++ b/pkg/ctl/client.go @@ -53,32 +53,6 @@ var DefaultConnectOpts ConnectOpts = ConnectOpts{ ConnectTimeout: DefaultConnectTimeout, } -// NewInSecureDKVClient creates an insecure GRPC client against the -// given DKV service address. Optionally the authority param can be -// used to send a :authority psuedo-header for routing purposes. -func NewInSecureDKVClient(svcAddr, authority string, opts ConnectOpts) (*DKVClient, error) { - var dkvClnt *DKVClient - ctx, cancel := context.WithTimeout(context.Background(), opts.ConnectTimeout) - defer cancel() - conn, err := grpc.DialContext(ctx, svcAddr, - grpc.WithInsecure(), - grpc.WithBlock(), - grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(opts.MaxMsgSize)), - grpc.WithReadBufferSize(opts.ReadBufSize), - grpc.WithWriteBufferSize(opts.WriteBufSize), - grpc.WithAuthority(authority), - grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`)) - if err == nil { - dkvCli := serverpb.NewDKVClient(conn) - dkvReplCli := serverpb.NewDKVReplicationClient(conn) - dkvBRCli := serverpb.NewDKVBackupRestoreClient(conn) - dkvClusCli := serverpb.NewDKVClusterClient(conn) - dkvDisCli := serverpb.NewDKVDiscoveryClient(conn) - dkvClnt = &DKVClient{conn, dkvCli, dkvReplCli, dkvBRCli, dkvClusCli, dkvDisCli, opts} - } - return dkvClnt, err -} - // NewDKVClient creates a GRPC client against the // given DKV service address and dial options. Optionally the authority param can be //// used to send a :authority psuedo-header for routing purposes From 37215c4c94455ad3da6c0dc3415161f6a22c6891 Mon Sep 17 00:00:00 2001 From: Viren Date: Wed, 15 Jun 2022 23:41:59 +0530 Subject: [PATCH 3/5] add chaanges --- .../dkv-client/src/test/resources/Procfile | 14 - cmd/dkvsrv/main.go | 6 +- internal/discovery/client.go | 18 +- internal/discovery/client_test.go | 4 +- internal/helper/tlsHelper.go | 15 + internal/slave/ca-cert.pem | 33 +++ internal/slave/ca-cert.srl | 1 + internal/slave/ca-key.pem | 52 ++++ internal/slave/server-cert.pem | 0 internal/slave/server-key.pem | 52 ++++ internal/slave/server-req.pem | 28 ++ internal/slave/service.go | 1 - internal/slave/service_test.go | 262 +++++++++++++++--- internal/utils.go | 75 +++++ pkg/ctl/client.go | 8 +- 15 files changed, 499 insertions(+), 70 deletions(-) create mode 100644 internal/helper/tlsHelper.go create mode 100644 internal/slave/ca-cert.pem create mode 100644 internal/slave/ca-cert.srl create mode 100644 internal/slave/ca-key.pem create mode 100644 internal/slave/server-cert.pem create mode 100644 internal/slave/server-key.pem create mode 100644 internal/slave/server-req.pem diff --git a/clients/java/dkv-client/src/test/resources/Procfile b/clients/java/dkv-client/src/test/resources/Procfile index 633d2be4..4f089b32 100644 --- a/clients/java/dkv-client/src/test/resources/Procfile +++ b/clients/java/dkv-client/src/test/resources/Procfile @@ -12,17 +12,3 @@ dkv_slave1a:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-add dkv_slave1b:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:7092 --http-listen-addr 127.0.0.1:7096 --node-name s1b --database s0 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml dkv_slave1c:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:7093 --http-listen-addr 127.0.0.1:7097 --node-name s1c --database s0 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml dkv_slave1d:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:7094 --http-listen-addr 127.0.0.1:7098 --node-name s1d --database s0 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml - -dkv_slave2a:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:8091 --http-listen-addr 127.0.0.1:8095 --node-name s2a --database s1 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml -dkv_slave2b:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:8092 --http-listen-addr 127.0.0.1:8096 --node-name s2b --database s1 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml -dkv_slave2c:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:8093 --http-listen-addr 127.0.0.1:8097 --node-name s2c --database s1 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml -dkv_slave2d:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:8094 --http-listen-addr 127.0.0.1:8098 --node-name s2d --database s1 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml - -dkv_slave3a:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:9091 --http-listen-addr 127.0.0.1:9095 --node-name s3a --database s2 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml -dkv_slave3b:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:9092 --http-listen-addr 127.0.0.1:9096 --node-name s3b --database s2 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml -dkv_slave3c:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:9093 --http-listen-addr 127.0.0.1:9097 --node-name s3c --database s2 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml -dkv_slave3d:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:9094 --http-listen-addr 127.0.0.1:9098 --node-name s3d --database s2 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml - -dkv_master1:./bin/dkvsrv --role master --listen-addr 127.0.0.1:6080 --http-listen-addr 127.0.0.1:6085 --node-name s0 --nexus-node-url "http://127.0.0.1:8021" --nexus-cluster-url "http://127.0.0.1:8021,http://127.0.0.1:8022,http://127.0.0.1:8023" --config clients/java/dkv-client/src/test/resources/dkv_config.yaml -dkv_master2:./bin/dkvsrv --role master --listen-addr 127.0.0.1:6081 --http-listen-addr 127.0.0.1:6086 --node-name s1 --nexus-node-url "http://127.0.0.1:8022" --nexus-cluster-url "http://127.0.0.1:8021,http://127.0.0.1:8022,http://127.0.0.1:8023" --config clients/java/dkv-client/src/test/resources/dkv_config.yaml -dkv_master3:./bin/dkvsrv --role master --listen-addr 127.0.0.1:6082 --http-listen-addr 127.0.0.1:6087 --node-name s2 --nexus-node-url "http://127.0.0.1:8023" --nexus-cluster-url "http://127.0.0.1:8021,http://127.0.0.1:8022,http://127.0.0.1:8023" --config clients/java/dkv-client/src/test/resources/dkv_config.yaml diff --git a/cmd/dkvsrv/main.go b/cmd/dkvsrv/main.go index d79c9583..d1a96af6 100644 --- a/cmd/dkvsrv/main.go +++ b/cmd/dkvsrv/main.go @@ -153,7 +153,7 @@ func main() { if srvrRole != noRole && srvrRole != discoveryRole { var err error - discoveryClient, err = newDiscoveryClient() + discoveryClient, err = newDiscoveryClient(utils.DKVConfig{CaCertPath:config.CaCertPath}) if err != nil { log.Panicf("Failed to start Discovery Client %v.", err) } @@ -443,8 +443,8 @@ func registerDiscoveryServer(grpcSrvr *grpc.Server, dkvService master.DKVService return nil } -func newDiscoveryClient() (discovery.Client, error) { - client, err := discovery.NewDiscoveryClient(&config.DiscoveryConfig.ClientConfig, dkvLogger) +func newDiscoveryClient(dkvConfig utils.DKVConfig) (discovery.Client, error) { + client, err := discovery.NewDiscoveryClient(&config.DiscoveryConfig.ClientConfig, dkvConfig, dkvLogger) if err != nil { return nil, err } diff --git a/internal/discovery/client.go b/internal/discovery/client.go index c1aae7a6..2316ed68 100644 --- a/internal/discovery/client.go +++ b/internal/discovery/client.go @@ -7,6 +7,8 @@ This class contains the behaviour of propagating a nodes status updates to disco import ( "context" "github.com/flipkart-incubator/dkv/internal/opts" + utils "github.com/flipkart-incubator/dkv/internal" + "github.com/flipkart-incubator/dkv/pkg/ctl" "time" _ "github.com/Jille/grpc-multi-resolver" @@ -40,12 +42,24 @@ const ( connectTimeout = 10 * time.Second ) -func NewDiscoveryClient(config *opts.DiscoveryClientConfig, logger *zap.Logger) (Client, error) { - conn, err := getDiscoveryClient(config.DiscoveryServiceAddr) +var DiscoveryClientConnectOpts = ctl.ConnectOpts{ + ReadBufSize: readBufSize, + WriteBufSize: writeBufSize, + MaxMsgSize: maxMsgSize, + Timeout: timeout, + ConnectTimeout: connectTimeout, +} + + +func NewDiscoveryClient(config *opts.DiscoveryClientConfig, dkvConfig utils.DKVConfig, logger *zap.Logger) (Client, error) { + //conn, err := getDiscoveryClient(config.DiscoveryServiceAddr) + dkvConfig.SrvrAddr = config.DiscoveryServiceAddr + client, err := utils.NewDKVClient(dkvConfig, "", DiscoveryClientConnectOpts) if err != nil { logger.Error("Unable to create DKV client to connect to discovery server", zap.Error(err)) return nil, err } + conn := client.CliConn dkvCli := serverpb.NewDKVDiscoveryClient(conn) storePropagator := &discoveryClient{regions: []serverpb.DKVDiscoveryNodeServer{}, diff --git a/internal/discovery/client_test.go b/internal/discovery/client_test.go index cf457532..297c9a9b 100644 --- a/internal/discovery/client_test.go +++ b/internal/discovery/client_test.go @@ -4,7 +4,7 @@ import ( "fmt" "testing" "time" - + utils "github.com/flipkart-incubator/dkv/internal" "github.com/flipkart-incubator/dkv/internal/master" "github.com/flipkart-incubator/dkv/internal/opts" "github.com/flipkart-incubator/dkv/internal/stats" @@ -40,7 +40,7 @@ func TestDiscoveryClient(t *testing.T) { PollClusterInfoInterval: pollClusterInfoInterval, } - dClient, _ := NewDiscoveryClient(&discoveryClientConfig, zap.NewNop()) + dClient, _ := NewDiscoveryClient(&discoveryClientConfig, utils.DKVConfig{} , zap.NewNop()) defer dClient.Close() // stop the poller so as to avoid race with these poller diff --git a/internal/helper/tlsHelper.go b/internal/helper/tlsHelper.go new file mode 100644 index 00000000..467f37c7 --- /dev/null +++ b/internal/helper/tlsHelper.go @@ -0,0 +1,15 @@ +package helper + +import "os" + +var HomeDir = os.Getenv("HOME") + +var TlsFileContent = "#!/bin/sh \n" + + "cd "+ HomeDir + "/cert/ \n"+ + "openssl req -x509 -newkey rsa:4096 -days 365 -nodes -keyout ca-key.pem -out ca-cert.pem -subj \"/C=FR/ST=Occitanie/L=Toulouse/O=Tech School/OU=Education/CN=*.techschool.guru/emailAddress=techschool.guru@gmail.com\" \n" + + "openssl x509 -in ca-cert.pem -noout -text \n" + + "openssl req -newkey rsa:4096 -nodes -keyout server-key.pem -out server-req.pem -subj \"/C=FR/ST=Ile de France/L=Paris/O=PC Book/OU=Computer/CN=*.pcbook.com/emailAddress=pcbook@gmail.com\" \n" + + "openssl x509 -req -in server-req.pem -days 60 -CA ca-cert.pem -CAkey ca-key.pem -CAcreateserial -out server-cert.pem -extfile /Users/viren.gupta/cert/server-ext.cnf\n" + + "openssl x509 -in server-cert.pem -noout -text \n" + +var ServerConf = "subjectAltName = DNS:docker.local, DNS:localhost, IP:127.0.0.1" \ No newline at end of file diff --git a/internal/slave/ca-cert.pem b/internal/slave/ca-cert.pem new file mode 100644 index 00000000..94754275 --- /dev/null +++ b/internal/slave/ca-cert.pem @@ -0,0 +1,33 @@ +-----BEGIN CERTIFICATE----- +MIIFxjCCA64CCQDsORrfDgvfMDANBgkqhkiG9w0BAQsFADCBpDELMAkGA1UEBhMC +RlIxEjAQBgNVBAgMCU9jY2l0YW5pZTERMA8GA1UEBwwIVG91bG91c2UxFDASBgNV +BAoMC1RlY2ggU2Nob29sMRIwEAYDVQQLDAlFZHVjYXRpb24xGjAYBgNVBAMMESou +dGVjaHNjaG9vbC5ndXJ1MSgwJgYJKoZIhvcNAQkBFhl0ZWNoc2Nob29sLmd1cnVA +Z21haWwuY29tMB4XDTIyMDUyODEyMDk0MloXDTIzMDUyODEyMDk0MlowgaQxCzAJ +BgNVBAYTAkZSMRIwEAYDVQQIDAlPY2NpdGFuaWUxETAPBgNVBAcMCFRvdWxvdXNl +MRQwEgYDVQQKDAtUZWNoIFNjaG9vbDESMBAGA1UECwwJRWR1Y2F0aW9uMRowGAYD +VQQDDBEqLnRlY2hzY2hvb2wuZ3VydTEoMCYGCSqGSIb3DQEJARYZdGVjaHNjaG9v +bC5ndXJ1QGdtYWlsLmNvbTCCAiIwDQYJKoZIhvcNAQEBBQADggIPADCCAgoCggIB +AN3yVugtIcclPC98JisJvW3lg4R92gu5sKIHMA+m4Ish347Ifc2W8RCPDUuYkLjj +TxjQTpzPCaQ8dezUH9DQqmziUsaqbvd37DRCyut9O1+AG8BOkmXeIrj2hWNkYmd8 ++RkYxWPXq6f9XVaeW696xNnWJBwyeSPdIAcCfUzwGvQGtzWmBQ4hB+CWsqpM+q3A +OKyxIgN61JKxKhffqtEXgKpdbbv7SZzMhbpsT871vvjmRng/p9pAnhvFba08yAoi +sn/chnUEUj1fozeZrC+uAp0B8tICx02GqHVEVKiJeZx3L43wt7bObK/j2v0UJYA6 +If+QdlZouNCLCcKz3NZXh9q0AqLvIAEYJlAjUhTZHvPj3rBoVozo6KIabWLZGJQG +WLiDfKPhJyY+qFc3hCyOSLnRccLxKomAEc7d2NbcaQj55OgQzEyHzVJ63sLgQQMD +VlsT/WuOfh7QiJsB4mi1kbuwvGJxLS5w+AC94aQUzcDRuoUzBgaaOBAbpAvhFenW +2P9zj0IaiZJKpysS+PnC+nZspxf1Ujj5IxcGmHivA61Dd+oFLfYr20ZsMCe2dMqo +gxHhv34ICuds47MiDE1Ygawa53K7S4mapgIpEUVvN27bPVUSVpRVWmTysvtZM1OU +2spqAM5jw0TO3i6l/rTy5/ZoxuYNPmdTqblzK1dttBB7AgMBAAEwDQYJKoZIhvcN +AQELBQADggIBADzUZ8e56Lj4MviDh7ou3RXuoPnWi5J4nyPRqg+a5/HtU4skO6U+ +uZ3VPUeSH8ShV1kQTkXgeM3VQVh9c622CC6u6C4CMqPHchJy4h+fIoA1tTFPa5LQ +gPhGqJ7bWhUjN9VRDy1YycpRv1qRzNXuEU5MageJOW+OcnZFjU6tgeb/W34A1Q0T +3Jd4oDUUsy2JuA8cIP5m17UuR/9iy/gQo4qduWsFfNyWuy2WOtoHx1d4O0bQ1/Rj +7mYcFdQd3EczvVDVqf/9JQZFrIpPkHKDN7ORSCxwxmWzUIno61fuDTEDGA8/f2hX +GosZGPrUAUFBwTcdggfzeTIzHoBiXPGQJOF28s/avjKi1HX7uSwnWuo3TlyMcTcw +Y4fi9uS9hW8nvb1BWdsewsuVw8QU7a5t1b67rM+0GRnC66Npeou+LWmCscXj/aYG +aN2UnhsSQ5rtSDQd2zxDyKr0qMPdHol5A0WnL1pkr3Gjr7fXqwPkDX7LHMhkJ6ta +BTGaxUWj9qmAjPYDHGNV/UB2qMmYnRb0FIt5QaP+7dwpT6joZ3MdRvpdVyyDVrAZ +CWGiMY8k1NDaqxQNtgqOBF5ywqQrA9Gis2zMYeGOtYZmECLzonTJ2QKxkZLjUuaR +JwfH9BgKsIh/k6T35M5d0rgSLkE4OogxkMAXfyJw+TRJOiJ/5ocvru96 +-----END CERTIFICATE----- diff --git a/internal/slave/ca-cert.srl b/internal/slave/ca-cert.srl new file mode 100644 index 00000000..8737f1d4 --- /dev/null +++ b/internal/slave/ca-cert.srl @@ -0,0 +1 @@ +E6A0FF61B348207B diff --git a/internal/slave/ca-key.pem b/internal/slave/ca-key.pem new file mode 100644 index 00000000..7be638f4 --- /dev/null +++ b/internal/slave/ca-key.pem @@ -0,0 +1,52 @@ +-----BEGIN PRIVATE KEY----- +MIIJRQIBADANBgkqhkiG9w0BAQEFAASCCS8wggkrAgEAAoICAQDNyyZxtHVDYwAF +sCOEFV+zRhITQYNzyduSm6QISGkW4OeOfK9JqRixX18Bs6VyMSq0cvV/kQyDIu26 +uKkpO31xBo4Z4BlcxOBdDgXdPoTXTJzKvXYeyiuqV6/pw2WKF5z3lWxyt8TyfAQm +TUzxoyE3dE/nFhFe8WN0idCWifY3/Ng+3AzIPHakJyKzxGbBxqXHTZgYIzD/7Vfh +O6QO82ZOFhr3hR65rV2Gq6eAG/c1lEGA6O8HdtJ80tZ3z44KLjQ6pagfSZ07aEiM +EZh7CqhjUCdL2wvXrPT9mVDBn7dCFFSftezq03ehVawNkVx/rHAs+Tm8jdvuMhR6 +Z845KJ9xUcwSCS9wde7MwOGsmjXWPwXLuNKDishYO1otWA50S7yU//g3GU8z3/WW +fP5U8yDm9oDjyNKSwD8YPgYz/cLjteg9W5EXm0YetvfuByx+lLJCQCPozAlCOcIW +H7QKqDzYlIWZ1p1ETlf9wkvmR0lMTSAQXL4kKVZRPKMZt9Xk9ufagY6yxubwKAzp +UZ5Cwfac1axvZRouBz4BbbjwRNFRsEUYZCoTJQfkApExO9Q1KGgMBxFUHHjeRHmx +nJ6NBe/5aptvtPRoV49sgMMU1qK4x+8MnjI1/Mn2Ug4ugc/1H6IOt6HqPMzd2V4z +GwsqIdyCzjoy/pzFbol+uR63omIlLQIDAQABAoICAQDCCxK11iVXo+qqcloauhih +rqludLChJi2ewJjbooNMqkZMaS7S8tmwra+kqCKRWODegyMO5NL6d53XGTWQBmMS +hs7IgTEI8y5nL8jBw6h4ZenVlO3ivFiN5t6dObvtiLReEwvN44+V4O/8uZiao2m3 +ozmQOJOqhUt04FnJCE8cnZG9O7nluHVZ9PjYrpEBl/BKy9RkthIGz2G0o4re+gx2 +Bovo5iGBjIGKcSKgPj5Ly/4zse9oh1aTki3djNxupwBiaD7tOXlDQeJeCF1Rb0w1 +UKes897nv1LGKKEn4RRn/0mHyn+ZMEo12tU/edfRHuQq09jn/I7qu1fpzyV5ad9j +xE3XF+j/so2gLbk3MjLTo6ItGdX6AdHsjoL4FmX5j1YRmzaxcAO+tpEcKY88ks8R +Onq6c/eoq9PXH6R53Ez5ijOeC4AXzvPZMzV4HRUFsTLttiYGESucAFIHpW686uCi +idh7Oj6nicE5ZlMwnu6N5Y9AtXMrIv8iQngbkjXJjuhSoeHvtwGQu2L/NfL2i8QO +c0t+8DCjKvgSRpJof5o3C2Ku5xCCa4MZpMzntoAG5lYzw3n0RQCJkCsf0aWOopBy +mNESuC32avYPRixDsnqsNtSyN66vUfYkFlZ3S0SnAeQv+DXRlRigXs/2lZ6h+R2N +/+N8FY2Wai4pfBBNFiWUNQKCAQEA/lXXtiRV326ymWH+12XFAnJxOudR0w/L8ZsY +xHdpN2oN+O5yCBAKUMNlX8XrFAh0aWrHkGThU6s/o0PiWNHcOIVL/5JuzY7xiVl+ +mhGKkDjDn5velAHG+Jw4F/KHlhuAqhg7HUhasSzKPZbyCXRbLcQQfrNkr4CEIBTr +qCzYgOmyAaU7q8bxTpsJVZVawHQLHmzoUEUfdRhxFOKrRrjMDFMPPn5yTKygjbdT +PgY97lz4y2abV8HRuQMQDWYLXx9g4Bs0cikieeho5MRfsR9kSznnatX2GkFHFnOx +ll3/nCMiDOa7eFPNEt+cDQ1/17U+HHtRMOvC3le3GBK6G6vXtwKCAQEAzyP450c1 +TVSFkddLLtnWKM2Y474Y26SNuloXL1b/MU5zhKk7syXjgBenltyX8lxVj+qBD1P3 +3Gd9d3PFSy6CDk18kSholkELwBHlXQB+mFN4CSHd63sv7GIl94XpyYn+zslvAHq7 +gO/hf5NhsSPLmfAgNfv/Vd3qb3h52vFPr65JzE5CcZ2hB3i/2G5zaO5IjtHJKMKy +8ZCzvnWfm0BeRyDzccZZ3h77Y9J4YpknDZQ0BixIVKz61fjBGViYSXhyOUvl9xdS +muPsdoSAcqt5bx6sQZY+XbD0rfgysxzO58HhPmdgSS80/lIO5sMxNxx4ztrGcMrG +FrGvD8/YdqgCOwKCAQEAvPyYj3WsgZxGBgd1UCUFIW5ciQJ+ILFqQBuZadO5UT3z +J/g2r5sLrzuiRmpQ5c/qgLKhm7/6Vyu22+CDfrBiTXVvJXdv8jq3a/IeSumh5szn +P+CYoa44ZV/fppvG+FlzX/Ela/rpuSGpHd7d9vyJDBbWku9Gb8q1YwlomszIC/Kb +HPHkxx3LDTfS1c9nYf5YjMEsVwkl+OKylGxiDOkgZkAJ4cwfrq3Pl76TklEdD9Px +42JZJ/qTlkCaWvpqlMJCQAS3EeVTAvcZinQJDxpeci+SsCZQ+u1qn1pUqYErgJ/y +m1eQEjdi+RVDZceUKXs5rfxMNmfZo1HWcYLo28rzPQKCAQEAoRvMcy4aUgwkW1uF +eWoqvM+0U6Y+NU+WgFOj5skWcSYgiRULfvpAKRe83IS0leelEOuG+AIKLV484tSj +picFNtSjx08TkZID7p68s4o37Ig3O3sg3Q+SJZvtBckdHgj7q6BWepv7DH0G+bAK +8rjxmch/dR2L6iBRP+QHevfnT7ASbBqOaUn9vg73wAGgOAPlYVffypksjuBKPR/F +RP5alZ4WWu64ZR/ZzTkdMysFifU0RXzZwSpNanEw12LmB3br+I6EzZtRP3F5MHE8 +anPeXkQQHG8cEMkaddkurcsBTpaod132MNXLtObj+i3lMIUvnUvhvkaorWwDyhuR +tDBWkwKCAQEA9WBv8xAbHr+AVdjiuSaUttKs3ljtXvr2P8Z/Z0V8A+En3Lct7HGP +gnfsEpY4I/O49vpqsB4l34857SvyN2pmM94vTB0vb6kp1wj3F214nvxadEiRdCC5 +dM0U1S0Ehg0C11BBff0FJxDCk9RHkPVI1pA5KFd6LMxc89mc/2fo498XPlUWx+RZ +i4ujKh1u30y23k+a1EhZ7WhUgHyi2N/bWtPtAjdo1cngYjUC7bulAzlBIuDzrnZi +6PRn71kKlkWqO21lZSlnXw5sUZOD0nyYXN2k5zw+X0LQ27Ps3kCfm7yt++G72Lpo +g5WMpViDo5gu3TttueWJ7jGiRW6ZLIv7vQ== +-----END PRIVATE KEY----- diff --git a/internal/slave/server-cert.pem b/internal/slave/server-cert.pem new file mode 100644 index 00000000..e69de29b diff --git a/internal/slave/server-key.pem b/internal/slave/server-key.pem new file mode 100644 index 00000000..ba03989d --- /dev/null +++ b/internal/slave/server-key.pem @@ -0,0 +1,52 @@ +-----BEGIN PRIVATE KEY----- +MIIJQQIBADANBgkqhkiG9w0BAQEFAASCCSswggknAgEAAoICAQDKyZNtQfkeE9HG +Ft4pPLLnL4vnkMC7265xdS5vGY2uYgtdFRomVJTrYwA+B8ljISb1Afro/f+P1fa6 +ad89QZk+WF4sIi/giduJODqIV305hDF7gSOw9HlAxX/cYUWGD/Q51yRclz4goXSE +ps4hNAPiAgFlMu7kiuO7eaLYLoF3T3LCwMoLRWZ3tMdHpsTl9Mmz1uN+N/B/e2K7 +RH/u4NI53Yj48yQEbERHk13j3NYf8yCjYAfXSWBG6LROY7xQxC3VbbxYYYJcYJ4t +G5JLQOyQ4ZhrxlWvlth2vOUB9c1KZCaXbAMzuc3gv4hwEXJdbYfjWHiMjhtYQAvP +ouL6gGndxEymndRxAFTpHDfdzo3PIfJnYke7V8hPOLa4oG6OBMjmwrOTyTpzRZYl +AUyqm4+Lcvt1898Giq3MRFLLPtuW2t8HDVTJerzf2EVEG+EM7/ZBu1s7EXvK/Glm +L679FTvgVi77arjDFvH0TPZbmI2kXt81viGB6i9cdNZAaqTFS4BQ9Kh4fjZSFO8N +NU/Wd2gIn0y5IGSUTD/azcsQ2WSClUyKBxD93RrkGDXNLjgmWBeUYGnBbVVirYN+ +T1+HmAYuulfEuse7qw2AlfMvZ/LMRXSW81pqSRUFp75E829bfto1BvFPKMDtUzJI +vNIUezJfkAv1fGhUbEKcM7FGvAMTjQIDAQABAoICAAdS9ncovdWfQ6FS7B0zKwb8 +iaIIcV2XlppUcCPlGV/l/KEVBQfDYEmMHtMetuBUrzKEKsrFebh5MtsSvdRZjS5a +RLR1W6NRd1Nia4LHHie4pYzfYime+s4jE/YisT8hTmHUamTJtZ26Fxo/aAx883Lz +PZepGkW42nbH8uC59UUY4n00N1fm3seOCPCGjm/7wlkzdq31WUNRcoRu+evnxNAA +N1U+etuKCYFISkjTjgxDXLdp3X6iEM8e0nxOT9ve6gTeXMIYa3vopW3dpBABvUmS +TE4AMNIRsanAMsk6VnrMWTR42zjACjaG1GevyaCkNxc398qjy0sJrSOHr4j6oFgo +qjhqidsUWUyd8R+u9K/z6WkCWD5SyRLX3lxc7QD4jW/EXzfy5F3OJV9e0IqhiX/x +0Eq/4drnRfbUNS5ENRFPu9RPKpyAkazJ8DDRmlk4ScLj1bp5rsd3RwAdWu+hE6Lg +tcxKRChbk93MW2B5+N1VSYBnGA9gChsuCIQGwj2yXrsfPDq/N13k4fdFMTv/TXdc +pX2QWa95+i7Uq7chvS+ER7cdWuLw3RtME6IIlS/I3faZJtH/6u0QCzg76L6qqEoA +J39qLLboSsggJr7NTZUlexnm2PiRhDa7q2iU/QOf9T40S++ejK5koEIAmGUaUMy3 +THAOUuklDW0cw7SqLKVBAoIBAQDrbzGUE/9/B4g1tO5Nu08moqmT6JwcwxGyI6zS +v8nZ+tTczTXe9pm31SdD+s+J9NvfS55ORXufrX9tKoPRKPTuADzdkFf7WcN9UIWZ +ftFeATTc3YCv5h0SquMoeg385h4FtPEd+lFI3sYUn8EjcbL/7MrDx87CS5m8dhlg +AfxkaQOmJv/CJstCdqFVc8uPNJe5ZVYpg+AZRQ5X0uerLt98+c0RP7buIBjjdFmf +mwMx0yEDdBoj7Q9B/pTJ9owSsqW1cZjA+sB3AWi0+SrZWViOx51iY7Uv98taAz3h +lbDowGgqv2QtTcQps4ms3tj1oWtswsYfara/doU36astaLtRAoIBAQDcgFP4F995 +sMaJ9bosvyZ5RXmyHbmQGDUYj1gFV5sCNAY89xiTmYpUacNspYTA6EnFMhtVEWZ0 +zaxSV+xt4MIRhOjLaMPIet2q/fap132VT+FTqbaNJKR/YOqZ25nCpgOaDm1cmBcz +ZAn24uWWKtXGd84XLWoIeRjnWOZV/Xn+FN6Pk4kEguowQT8U6JFka2SObGSamHnV +I5fp9ZYmfwu8zM0RSdYn+LixI9wCo5Y4I9iVJ375RgsRgTR1MHbFym7n99jy+Rlg +o+Vg1A+VXxvHFpmIAC4RyIQvbqTSu2zVYpH3U5AUnf7y6kqYuMEDytAA/Ah0I6Kn +b5m4hxoAdI19AoIBABlzuutkQhHMkagkuEKFBmiJ20VXN1FQsWANxUg/XaYIHO0F +7lX9WN7r10PdIISsiPyZ4zJZBqVbj1QQ7ZE2GUyFkA7v5OMScDTi+J6AfEfqX3ml +JJ1fhjMXcFlW4J4/jm1Vg1+XSSRUmRtvreSUDSLnyX/i4PJWU/dw2QttC4kbZT6Y +Jrwpj33O72AalZ4UtHZhgzAXxqlUstlBvrPd7GFJbCb6OV+C7s3gLTQ5AN5pEb+a +1Jyfqy+aQyGWWsxGre1wq2/pa9cxus0dxskahJ1i7RvO5a6a3kwL36u8kBSJgHlm +mYDgUF2lwDOFH/VmDwQCBoMs+Ta3rG1AALZ3BRECggEAYjiDruqrcGCPP9sGsHIB +aFnLSM024Hn4BNWtZsvMT6Hu+W80MmNdnBZ0p8vfI7MVUVLtm+ohUnxfe1qRBazt +pbInpJwpQf+BCG8SVjuYRE+N1p+topek8tywBBMvHeanihNNIcbZ5tl0GtadYFP6 +X+r0EZz4M4gF0N/n0nwTwFt5fElC6pCKUy5tNBoY0buI0mx0L+0lsAyn2d68oK9N +Ai/UVRQn4ixH0RhhI4fNHXrSOjaat908hvKf9Ge/MIHo/0mHCUxuwn6faY9s1aPF +Etz9TPiPjPpd72cA4qH8a9z5mktNzfFHJNu6mOcLi23nU9jwxxMsnBr5RZpgub0x +XQKCAQA58DQVUCvpvfC/ZZiE9hzxz2X6ew8Jh+w3a+UYbhIb88WO8LfN371OOUZy +zIpnJlGcGoOFlOa61t44XuAsXMMjB/I8alga/SdRRGsyU6Lpl8HUZKJuaYCAspQm +HikJXuaMHfp399C9+oG7Biku409iRSchR8Fb4yXa5+KQJK8DwySOyhMBPzLPItMu +g01Zbui+YyPqZJ27dd6vWjtj+7ihfHakEVD8dzuyYpf1qunsdojdpelYkBQ2zS5t +4ayqx39d31VQMEj9kzCMQsbzHUa3+a1POfkiCpU+q+ThC0/SwDVVLmQ7pWO5ZsOI +Yo9B4RbPBknliRnYJ4EM4Sqq17Tu +-----END PRIVATE KEY----- diff --git a/internal/slave/server-req.pem b/internal/slave/server-req.pem new file mode 100644 index 00000000..e88f5c33 --- /dev/null +++ b/internal/slave/server-req.pem @@ -0,0 +1,28 @@ +-----BEGIN CERTIFICATE REQUEST----- +MIIE2DCCAsACAQAwgZIxCzAJBgNVBAYTAkZSMRYwFAYDVQQIDA1JbGUgZGUgRnJh +bmNlMQ4wDAYDVQQHDAVQYXJpczEQMA4GA1UECgwHUEMgQm9vazERMA8GA1UECwwI +Q29tcHV0ZXIxFTATBgNVBAMMDCoucGNib29rLmNvbTEfMB0GCSqGSIb3DQEJARYQ +cGNib29rQGdtYWlsLmNvbTCCAiIwDQYJKoZIhvcNAQEBBQADggIPADCCAgoCggIB +AMrJk21B+R4T0cYW3ik8sucvi+eQwLvbrnF1Lm8Zja5iC10VGiZUlOtjAD4HyWMh +JvUB+uj9/4/V9rpp3z1BmT5YXiwiL+CJ24k4OohXfTmEMXuBI7D0eUDFf9xhRYYP +9DnXJFyXPiChdISmziE0A+ICAWUy7uSK47t5otgugXdPcsLAygtFZne0x0emxOX0 +ybPW43438H97YrtEf+7g0jndiPjzJARsREeTXePc1h/zIKNgB9dJYEbotE5jvFDE +LdVtvFhhglxgni0bkktA7JDhmGvGVa+W2Ha85QH1zUpkJpdsAzO5zeC/iHARcl1t +h+NYeIyOG1hAC8+i4vqAad3ETKad1HEAVOkcN93Ojc8h8mdiR7tXyE84trigbo4E +yObCs5PJOnNFliUBTKqbj4ty+3Xz3waKrcxEUss+25ba3wcNVMl6vN/YRUQb4Qzv +9kG7WzsRe8r8aWYvrv0VO+BWLvtquMMW8fRM9luYjaRe3zW+IYHqL1x01kBqpMVL +gFD0qHh+NlIU7w01T9Z3aAifTLkgZJRMP9rNyxDZZIKVTIoHEP3dGuQYNc0uOCZY +F5RgacFtVWKtg35PX4eYBi66V8S6x7urDYCV8y9n8sxFdJbzWmpJFQWnvkTzb1t+ +2jUG8U8owO1TMki80hR7Ml+QC/V8aFRsQpwzsUa8AxONAgMBAAGgADANBgkqhkiG +9w0BAQsFAAOCAgEAcyqLbxtz0MCFgXXlT4kFdb3s9cFLSkLFodC+sbys4PL3l6cy +FGzWiBlaunNMBqFpBdYJ+DYQA0PCWCRjXJmlqXIigkKgnuMIl+73waDswkrRQDmS +HdU9/L393dwVpD1kBDXxhEHR5BphGUYGoOqteZzJcLIlr7DypmjawmzrwTHM+nOG +Ze67RkqC6rzQJPUxYNiHvEEjpEr4dJ0qmdQX9fbk/vCHnaXribjBHaxzmi6CxmOu +GSLdm+g17gUdzz79w0dZR7F6mC0J/MF84fM3EFFVuEk57T3MdvCGGs0BCsbWpP9o +S8ki39b6gDjRvdkljMgbfhVUdnrfbzxAn1tTl9dx//iSV5ABz+cOmHIOOZxYHadT +gu/GMSGWgxnmTYq+EIUbIo+jN8E7evchVLQrQXrcFo5QiXzToxoxPhuuppN2dO58 +z+DlHfFz3fPsBXc12WpnKbpjisrRPPqtG+CaW/HBVZBqHU3iuXN7yXobIkF8hGAN +WibyHVXAXsG+PekCObW0dXYfTnvV17MA897CgRsVOI1HKVfSi5LF7mthfbvBy7sI +hZ5Giqjilh7FZcAltQBl9+1aduKjGwcjXrMKFFqK3dn0KfxLJQ9wILBTcxf1Lc3N +FzP5bAgBBqhKHU5cCGldqj78OTJ/Lt4kflPBLPQTPG+z2GNfyMyBUR81bUE= +-----END CERTIFICATE REQUEST----- diff --git a/internal/slave/service.go b/internal/slave/service.go index 8628c0be..f868fec3 100644 --- a/internal/slave/service.go +++ b/internal/slave/service.go @@ -419,7 +419,6 @@ func (ss *slaveService) findAndConnectToMaster() error { // TODO: Check if authority override option is needed for slaves while they connect with masters slaveClientConfig := getSlaveClientConfig(*master) - //replCli, err := ctl.NewInSecureDKVClient(*master, "", ctl.DefaultConnectOpts); replCli, err := utils.NewDKVClient(slaveClientConfig, "", ctl.DefaultConnectOpts); if err == nil { diff --git a/internal/slave/service_test.go b/internal/slave/service_test.go index d4667ec6..bf007517 100644 --- a/internal/slave/service_test.go +++ b/internal/slave/service_test.go @@ -5,6 +5,9 @@ import ( "context" "crypto/rand" "fmt" + utils "github.com/flipkart-incubator/dkv/internal" + "github.com/flipkart-incubator/dkv/internal/helper" + "log" "net" "os/exec" "strings" @@ -34,9 +37,9 @@ import ( const ( masterDBFolder = "/tmp/dkv_test_db_master" slaveDBFolder = "/tmp/dkv_test_db_slave" - masterSvcPort = 8181 + masterSvcPort = 8185 slaveSvcPort = 8282 - dkvSvcHost = "localhost" + dkvSvcHost = "127.0.0.1" cacheSize = 3 << 30 // for creating a distribute server cluster @@ -62,6 +65,7 @@ var ( slaveCli *ctl.DKVClient slaveSvc DKVService slaveGrpcSrvr *grpc.Server + discServerGrpcSrvr *grpc.Server healthCheckCli *HealthCheckClient lgr, _ = zap.NewDevelopment() @@ -81,6 +85,9 @@ var ( discoveryCli discovery.Client discoverydkvSvc master.DKVService closedMasters = make(map[int]bool) + caCertPath = helper.HomeDir + "/cert/ca-cert.pem" + certPath = helper.HomeDir + "/cert/server-cert.pem" + keyPath = helper.HomeDir + "/cert/server-key.pem" ) type HealthCheckClient struct { @@ -89,51 +96,100 @@ type HealthCheckClient struct { } func TestMasterRocksDBSlaveRocksDB(t *testing.T) { + opts.AppConfig = getConfigInsecure() masterRDB := newRocksDBStore(masterDBFolder) slaveRDB := newRocksDBStore(slaveDBFolder) testMasterSlaveRepl(t, masterRDB, slaveRDB, masterRDB, slaveRDB, masterRDB) } +func TestMasterRocksDBSlaveRocksDB_Secured(t *testing.T) { + + utils.GenerateTlsFiles() + opts.AppConfig = getConfigSecure() + masterRDB := newRocksDBStore(masterDBFolder) + slaveRDB := newRocksDBStore(slaveDBFolder) + testMasterSlaveRepl(t, masterRDB, slaveRDB, masterRDB, slaveRDB, masterRDB) +} + +func TestMasterRocksDBSlaveBadger_Secure(t *testing.T) { + utils.GenerateTlsFiles() + opts.AppConfig = getConfigSecure() + masterRDB := newRocksDBStore(masterDBFolder) + slaveRDB := newBadgerDBStore(slaveDBFolder) + testMasterSlaveRepl(t, masterRDB, slaveRDB, masterRDB, slaveRDB, masterRDB) +} + func TestMasterRocksDBSlaveBadger(t *testing.T) { + opts.AppConfig = getConfigInsecure() masterRDB := newRocksDBStore(masterDBFolder) slaveRDB := newBadgerDBStore(slaveDBFolder) testMasterSlaveRepl(t, masterRDB, slaveRDB, masterRDB, slaveRDB, masterRDB) } + func TestSlaveDiscoveryFunctionality(t *testing.T) { + opts.AppConfig = getConfigInsecure() masterRDB := newRocksDBStore(masterDBFolder) slaveRDB := newRocksDBStore(slaveDBFolder) testGetStatus(t, masterRDB, slaveRDB, masterRDB, slaveRDB, masterRDB) } func TestSlaveHealthCheck(t *testing.T) { + opts.AppConfig = getConfigInsecure() masterRDB := newRocksDBStore(masterDBFolder) slaveRDB := newRocksDBStore(slaveDBFolder) testHealthCheck(t, masterRDB, slaveRDB, masterRDB, slaveRDB, masterRDB) } func TestSlaveHealthCheckStream(t *testing.T) { + opts.AppConfig = getConfigInsecure() masterRDB := newRocksDBStore(masterDBFolder) slaveRDB := newRocksDBStore(slaveDBFolder) testHealthCheckStream(t, masterRDB, slaveRDB, masterRDB, slaveRDB, masterRDB) } -func TestSlaveAutoConnect(t *testing.T) { + +func TestSlaveAutoConnect_Insecure(t *testing.T) { + + testSlaveAutoConnect("InSecured", t) +} + +/* +func TestSlaveAutoConnect_Secure(t *testing.T) { + + testSlaveAutoConnect("Secured", t) +}*/ + +func testSlaveAutoConnect(commType string, t *testing.T) { + + if commType == "Secured" { + opts.AppConfig = getConfigSecure() + }else { + opts.AppConfig = getConfigInsecure() + } + //reset raft directories resetRaftStateDirs(t) //start discovery server startDiscoveryServer() + t.Log("Started disc server") sleepInSecs(3) startDiscoveryCli() + t.Log("Started disc client") defer discoverydkvSvc.Close() defer discoveryCli.Close() + t.Log("closed") + //start dkvservers initDKVServers() sleepInSecs(3) + t.Log("DKV servers initisalised") registerDkvServerWithDiscovery() + t.Log("register complete") initDKVClients() + t.Log("dkv clients init") defer stopClients() defer stopServers() @@ -145,6 +201,8 @@ func TestSlaveAutoConnect(t *testing.T) { if err := slaveSvc.(*slaveService).findAndConnectToMaster(); err != nil { t.Fatalf("Cannot connect to new master. Error: %v", err) } + + t.Log("present here") lastClosedMasterId := -1 for cnt := 0; cnt < clusterSize+1; cnt++ { masterId := getCurrentMasterIdFromSlave(t) @@ -188,6 +246,12 @@ func TestSlaveAutoConnect(t *testing.T) { t.Log("New master port:", dkvPorts[masterId]) } + t.Log("hola here") + stopDiscoveryServer() +} + +func stopDiscoveryServer() { + discServerGrpcSrvr.GracefulStop() } func getNodeUrl(id int) string { @@ -204,30 +268,6 @@ func startDkvSvcAndCli(id int) { discoveryCli.PropagateStatus() } -func getCurrentMasterId(t *testing.T) int { - var currentMaster string - if regions, err := discoveryCli.GetClusterStatus(dbName, vbucket); err != nil { - t.Fatalf("Error while fetching the cluster info. Cannot proceed further. Error: %v", err) - } else { - for _, region := range regions { - if region.MasterHost != nil { - currentMaster = *region.MasterHost - } - } - } - if currentMaster == "" { - t.Fatalf("No master found") - } - detail := strings.Split(currentMaster, ":") - for i := 1; i <= clusterSize; i++ { - if fmt.Sprintf("%d", dkvPorts[i]) == detail[1] { - return i - } - } - t.Fatalf("Error: Master port not found") - return -1 -} - func getCurrentMasterIdFromSlave(t *testing.T) int { var currentMaster string if slaveSvc.(*slaveService).regionInfo.MasterHost != nil { @@ -259,32 +299,57 @@ func registerDkvServerWithDiscovery() { sleepInSecs(3) } +func getSrvrMode() utils.ConnectionMode { + config := opts.AppConfig + + var secure = config.CaCertPath != "" && config.KeyPath != "" && config.CertPath != "" + + var srvrMode utils.ConnectionMode + if secure { + srvrMode = utils.ServerTLS + } else { + srvrMode = utils.Insecure + } + + return srvrMode +} + + func startDiscoveryServer() { + + config := opts.AppConfig + srvrMode := getSrvrMode() + var lstnr net.Listener //todo check why does this need a kv store? discoverykvs, discoverycp, discoveryba := newKVStore(masterDBFolder + "_DC") discoverydkvSvc = master.NewStandaloneService(discoverykvs, discoverycp, discoveryba, &serverpb.RegionInfo{Database: dbName, VBucket: vbucket}, serverOpts) - grpcSrvr := grpc.NewServer() - serverpb.RegisterDKVServer(grpcSrvr, discoverydkvSvc) - serverpb.RegisterDKVReplicationServer(grpcSrvr, discoverydkvSvc) - serverpb.RegisterDKVBackupRestoreServer(grpcSrvr, discoverydkvSvc) + discServerGrpcSrvr, lstnr = utils.NewGrpcServerListener(utils.DKVConfig{ConnectionMode: srvrMode, + SrvrAddr: fmt.Sprintf("%s:%d", dkvSvcHost, discoveryPort), KeyPath: config.KeyPath, CertPath: config.CertPath, + CaCertPath: config.CaCertPath}, zap.NewNop()) + serverpb.RegisterDKVServer(discServerGrpcSrvr, discoverydkvSvc) + serverpb.RegisterDKVReplicationServer(discServerGrpcSrvr, discoverydkvSvc) + serverpb.RegisterDKVBackupRestoreServer(discServerGrpcSrvr, discoverydkvSvc) discoverServiceConf := &opts.DiscoveryServerConfig{statusTtl, heartBeatTimeOut} discoveryService, _ := discovery.NewDiscoveryService(discoverydkvSvc, zap.NewNop(), discoverServiceConf) - serverpb.RegisterDKVDiscoveryServer(grpcSrvr, discoveryService) - go grpcSrvr.Serve(newListener(discoveryPort)) + serverpb.RegisterDKVDiscoveryServer(discServerGrpcSrvr, discoveryService) + go discServerGrpcSrvr.Serve(lstnr) } func startDiscoveryCli() { + + config := opts.AppConfig + clientConfig := &opts.DiscoveryClientConfig{DiscoveryServiceAddr: fmt.Sprintf("%s:%d", dkvSvcHost, discoveryPort), PushStatusInterval: time.Duration(5), PollClusterInfoInterval: time.Duration(5)} - discoveryCli, _ = discovery.NewDiscoveryClient(clientConfig, zap.NewNop()) + discoveryCli, _ = discovery.NewDiscoveryClient(clientConfig, utils.DKVConfig{CaCertPath:config.CaCertPath} ,zap.NewNop()) } func startSlaveAndAttachToMaster(client *ctl.DKVClient) { wg := sync.WaitGroup{} wg.Add(1) rdbStore := newRocksDBStore(dbFolderSlave) - go serveStandaloneDKVSlave(&wg, rdbStore, rdbStore, client, discoveryCli) + go serveStandaloneDKVSlaveV2(&wg, rdbStore, rdbStore, client, discoveryCli) wg.Wait() // stop the slave poller so as to avoid race with this poller @@ -314,9 +379,10 @@ func stopServers() { } func initDKVClients(ids ...int) { + config := opts.AppConfig for id := 1; id <= clusterSize; id++ { svcAddr := fmt.Sprintf("%s:%d", dkvSvcHost, dkvPorts[id]) - if client, err := ctl.NewDKVClient(svcAddr, "", ctl.DefaultConnectOpts, grpc.WithInsecure()); err != nil { + if client, err := utils.NewDKVClient(utils.DKVConfig{SrvrAddr:svcAddr, CaCertPath:config.CaCertPath}, "", ctl.DefaultConnectOpts); err != nil { panic(err) } else { dkvClis[id] = client @@ -325,8 +391,9 @@ func initDKVClients(ids ...int) { } func initSingleDkvClient(id int) { + config := opts.AppConfig svcAddr := fmt.Sprintf("%s:%d", dkvSvcHost, dkvPorts[id]) - if client, err := ctl.NewDKVClient(svcAddr, "", ctl.DefaultConnectOpts, grpc.WithInsecure()); err != nil { + if client, err := utils.NewDKVClient(utils.DKVConfig{SrvrAddr:svcAddr, CaCertPath:config.CaCertPath}, "", ctl.DefaultConnectOpts); err != nil { panic(err) } else { dkvClis[id] = client @@ -341,12 +408,12 @@ func initDKVServers() { } func serveDistributedDKV(id int, nodeURL string) { - dkvSvc, grpcSvc := newDistributedDKVNode(id, nodeURL, clusterURL) + dkvSvc, grpcSvc, lstnr := newDistributedDKVNode(id, nodeURL, clusterURL) mutex.Lock() dkvSvcs[id] = dkvSvc grpcSrvs[id] = grpcSvc mutex.Unlock() - grpcSrvs[id].Serve(newListener(dkvPorts[id])) + grpcSrvs[id].Serve(lstnr) } func newReplicator(kvs storage.KVStore, nodeURL, clusterURL string) nexus_api.RaftReplicator { @@ -397,7 +464,7 @@ func newKVStore(dir string) (storage.KVStore, storage.ChangePropagator, storage. } } -func newDistributedDKVNode(id int, nodeURL, clusURL string) (DKVService, *grpc.Server) { +func newDistributedDKVNode(id int, nodeURL, clusURL string) (DKVService, *grpc.Server, net.Listener) { dir := fmt.Sprintf("%s_%d", dbFolderMaster, id) kvs, cp, br := newKVStore(dir) dkvRepl := newReplicator(kvs, nodeURL, clusURL) @@ -405,11 +472,19 @@ func newDistributedDKVNode(id int, nodeURL, clusURL string) (DKVService, *grpc.S regionInfo := &serverpb.RegionInfo{Database: dbName, VBucket: vbucket} regionInfo.NodeAddress = "127.0.0.1" + ":" + fmt.Sprint(dkvPorts[id]) distSrv := master.NewDistributedService(kvs, cp, br, dkvRepl, regionInfo, serverOpts) - grpcSrv := grpc.NewServer() + //grpcSrv := grpc.NewServer() + + srvrMode := getSrvrMode() + config := opts.AppConfig + + grpcSrv, lstnr := utils.NewGrpcServerListener(utils.DKVConfig{ConnectionMode: srvrMode, + SrvrAddr: regionInfo.NodeAddress, KeyPath: config.KeyPath, CertPath: config.CertPath, + CaCertPath: config.CaCertPath}, zap.NewNop()) + serverpb.RegisterDKVServer(grpcSrv, distSrv) serverpb.RegisterDKVClusterServer(grpcSrv, distSrv) serverpb.RegisterDKVReplicationServer(grpcSrv, distSrv) - return distSrv, grpcSrv + return distSrv, grpcSrv, lstnr } func resetRaftStateDirs(t *testing.T) { @@ -513,13 +588,13 @@ func TestLargePayloadsDuringRepl(t *testing.T) { func initMasterAndSlaves(masterStore, slaveStore storage.KVStore, cp storage.ChangePropagator, ca storage.ChangeApplier, masterBU storage.Backupable) { var wg sync.WaitGroup wg.Add(1) - go serveStandaloneDKVMaster(&wg, masterStore, cp, masterBU) + go serveStandaloneDKVMasterV2(&wg, masterStore, cp, masterBU) wg.Wait() masterCli = newDKVClient(masterSvcPort) wg.Add(1) - go serveStandaloneDKVSlave(&wg, slaveStore, ca, masterCli, testingClusterInfo{}) + go serveStandaloneDKVSlaveV2(&wg, slaveStore, ca, masterCli, testingClusterInfo{}) wg.Wait() // stop the slave poller so as to avoid race with this poller @@ -832,8 +907,11 @@ func testDelete(t *testing.T, dkvCli *ctl.DKVClient, keyPrefix string) { } func newDKVClient(port int) *ctl.DKVClient { + + config := opts.AppConfig + dkvSvcAddr := fmt.Sprintf("%s:%d", dkvSvcHost, port) - if client, err := ctl.NewDKVClient(dkvSvcAddr, "", ctl.DefaultConnectOpts, grpc.WithInsecure()); err != nil { + if client, err := utils.NewDKVClient(utils.DKVConfig{SrvrAddr: dkvSvcAddr, CaCertPath: config.CaCertPath}, "", ctl.DefaultConnectOpts); err != nil { panic(err) } else { return client @@ -889,6 +967,89 @@ func serveStandaloneDKVMaster(wg *sync.WaitGroup, store storage.KVStore, cp stor masterGrpcSrvr.Serve(lis) } +func serveStandaloneDKVMasterV2(wg *sync.WaitGroup, store storage.KVStore, cp storage.ChangePropagator, bu storage.Backupable) { + // No need to set the storage.Backupable instance since its not needed here + + config := opts.AppConfig + + var secure = config.CaCertPath != "" && config.KeyPath != "" && config.CertPath != "" + + var srvrMode utils.ConnectionMode + if secure { + srvrMode = utils.ServerTLS + } else { + srvrMode = utils.Insecure + } + + log.Print(srvrMode ) + masterSvc = master.NewStandaloneService(store, cp, bu, &serverpb.RegionInfo{}, serverOpts) + + var lsnr net.Listener + + masterGrpcSrvr, lsnr = utils.NewGrpcServerListener(utils.DKVConfig{ConnectionMode: srvrMode, + SrvrAddr: fmt.Sprintf("%s:%d", dkvSvcHost, masterSvcPort), KeyPath: config.KeyPath, CertPath: config.CertPath, + CaCertPath: config.CaCertPath}, zap.NewNop()) + + serverpb.RegisterDKVServer(masterGrpcSrvr, masterSvc) + serverpb.RegisterDKVReplicationServer(masterGrpcSrvr, masterSvc) + serverpb.RegisterDKVBackupRestoreServer(masterGrpcSrvr, masterSvc) + //lis := listen(masterSvcPort) + wg.Done() + masterGrpcSrvr.Serve(lsnr) + //go masterGrpcSrvr.Serve(lstnr) +} + +func serveStandaloneDKVSlaveV2(wg *sync.WaitGroup, store storage.KVStore, ca storage.ChangeApplier, masterCli *ctl.DKVClient, discoveryClient discovery.Client) { + + config := opts.AppConfig + var secure = config.CaCertPath != "" && config.KeyPath != "" && config.CertPath != "" + + var srvrMode utils.ConnectionMode + if secure { + srvrMode = utils.ServerTLS + } else { + srvrMode = utils.Insecure + } + + lgr, _ := zap.NewDevelopment() + replConf := ReplicationConfig{ + MaxNumChngs: 2, + ReplPollInterval: 5 * time.Second, + MaxActiveReplLag: 10, + MaxActiveReplElapsed: 5, + } + + specialOpts := &opts.ServerOpts{ + Logger: lgr, + StatsCli: stats.NewNoOpClient(), + PrometheusRegistry: stats.NewPromethousNoopRegistry(), + HealthCheckTickerInterval: uint(1), + } + + if ss, err := NewService(store, ca, &serverpb.RegionInfo{Database: dbName, VBucket: vbucket}, &replConf, discoveryClient, specialOpts); err != nil { + panic(err) + } else { + slaveSvc = ss + if masterCli != nil { + slaveSvc.(*slaveService).replInfo.replCli = masterCli + } + + var lstnr net.Listener + + slaveGrpcSrvr, lstnr = utils.NewGrpcServerListener(utils.DKVConfig{ConnectionMode: srvrMode, + SrvrAddr: fmt.Sprintf("%s:%d", dkvSvcHost, slaveSvcPort), KeyPath: config.KeyPath, CertPath: config.CertPath, + CaCertPath: config.CaCertPath}, zap.NewNop()) + + serverpb.RegisterDKVServer(slaveGrpcSrvr, slaveSvc) + health.RegisterHealthServer(slaveGrpcSrvr, slaveSvc) + //lis := listen(slaveSvcPort) + wg.Done() + slaveGrpcSrvr.Serve(lstnr) + } +} + + + func serveStandaloneDKVSlave(wg *sync.WaitGroup, store storage.KVStore, ca storage.ChangeApplier, masterCli *ctl.DKVClient, discoveryClient discovery.Client) { lgr, _ := zap.NewDevelopment() replConf := ReplicationConfig{ @@ -972,3 +1133,14 @@ func (m testingClusterInfo) GetClusterStatus(database string, vBucket string) ([ return regions, nil } + +func getConfigInsecure() opts.Config { + + return opts.Config{} +} + +func getConfigSecure() opts.Config { + + dkvSvcAddr := fmt.Sprintf("%s:%d", dkvSvcHost, masterSvcPort) + return opts.Config{CaCertPath: caCertPath, KeyPath: keyPath, CertPath: certPath, ListenAddr:dkvSvcAddr} +} diff --git a/internal/utils.go b/internal/utils.go index 562ab31e..2dca18f1 100644 --- a/internal/utils.go +++ b/internal/utils.go @@ -10,6 +10,7 @@ import ( "encoding/pem" "errors" "fmt" + "github.com/flipkart-incubator/dkv/internal/helper" "github.com/flipkart-incubator/dkv/pkg/ctl" grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" "go.uber.org/zap" @@ -21,6 +22,7 @@ import ( "math/big" "net" "os" + "os/exec" "path/filepath" "time" ) @@ -49,28 +51,42 @@ func NewDKVClient(clientConfig DKVConfig, authority string, options ctl.ConnectO if clientMode == "" { clientMode = clientModeFromFlags(clientConfig) } + + log.Print(clientMode) + switch clientMode { case MutualTLS: config, err = getTLSConfigWithCertPool(clientConfig.CaCertPath) if err == nil { var cert tls.Certificate + //log.Fatal("hrt33") + cert, err = tls.LoadX509KeyPair(clientConfig.CertPath, clientConfig.KeyPath) + if err == nil { + + //log.Fatal("hrtrr") + + config.Certificates = []tls.Certificate{cert} + opt = grpc.WithTransportCredentials(credentials.NewTLS(config)) } } case ServerTLS: config, err = getTLSConfigWithCertPool(clientConfig.CaCertPath) if err == nil { + opt = grpc.WithTransportCredentials(credentials.NewTLS(config)) } case Insecure: opt = grpc.WithInsecure() } if err != nil { + log.Print("here") return nil, err } + return ctl.NewDKVClient(clientConfig.SrvrAddr, authority, options, opt) } @@ -136,6 +152,65 @@ func loadTLSCredentials(clientConfig DKVConfig) (credentials.TransportCredential return credentials.NewTLS(config), nil } +func GenerateTlsFiles() { + + var err error + + homeDir := helper.HomeDir + generatedCertDir := homeDir + "/cert" + tlsFile := generatedCertDir + "/tlsFiles.sh" + serverConfFile := generatedCertDir + "/server-ext.cnf" + + if _, errCreate := os.Stat(generatedCertDir); os.IsNotExist(errCreate) { + err = os.MkdirAll(generatedCertDir, os.ModePerm) + } + + if err != nil { + log.Fatal(err) + } + + f, err := os.Create(tlsFile) + if err != nil { + log.Fatal(err) + } + + defer f.Close() + + err = os.Chmod(tlsFile, 0777) + if err != nil { + log.Fatal(err) + } + _, err2 := f.WriteString(helper.TlsFileContent) + + if err2 != nil { + log.Fatal(err2) + } + + f1, err := os.Create(serverConfFile) + if err != nil { + log.Fatal(err) + } + defer f1.Close() + + err = os.Chmod(serverConfFile, 0777) + if err != nil { + log.Fatal(err) + } + + _, err3 := f1.WriteString(helper.ServerConf) + + if err3 != nil { + log.Fatal(err3) + } + + cmd := exec.Command("/bin/sh", tlsFile) + + err = cmd.Run() + if err != nil { + log.Fatal(err) + } +} + func GenerateSelfSignedCert(generatedCertDir string, generatedCertValidity int) (string, string, error) { var err error if _, errCreate := os.Stat(generatedCertDir); os.IsNotExist(errCreate) { diff --git a/pkg/ctl/client.go b/pkg/ctl/client.go index 8ee5e270..509530bc 100644 --- a/pkg/ctl/client.go +++ b/pkg/ctl/client.go @@ -6,6 +6,7 @@ import ( "github.com/flipkart-incubator/dkv/internal/hlc" "github.com/flipkart-incubator/nexus/models" "io" + "log" "time" "github.com/flipkart-incubator/dkv/pkg/serverpb" @@ -18,7 +19,7 @@ import ( // exposes a simpler API to its users without having to deal with timeouts, // contexts and other GRPC semantics. type DKVClient struct { - cliConn *grpc.ClientConn + CliConn *grpc.ClientConn dkvCli serverpb.DKVClient dkvReplCli serverpb.DKVReplicationClient dkvBRCli serverpb.DKVBackupRestoreClient @@ -63,6 +64,7 @@ func NewDKVClient(svcAddr string, authority string, connectOptions ConnectOpts, opts = append(opts, grpc.WithBlock(), grpc.WithReadBufferSize(connectOptions.ReadBufSize), grpc.WithWriteBufferSize(connectOptions.WriteBufSize), grpc.WithAuthority(authority), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(connectOptions.MaxMsgSize)), grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`)) + log.Print(svcAddr) conn, err := grpc.DialContext(ctx, svcAddr, opts...) if err == nil { dkvCli := serverpb.NewDKVClient(conn) @@ -279,8 +281,8 @@ func (dkvClnt *DKVClient) Iterate(keyPrefix, startKey []byte) (<-chan *KVPair, e // Close closes the underlying GRPC client connection to DKV service func (dkvClnt *DKVClient) Close() error { - if dkvClnt.cliConn != nil { - return dkvClnt.cliConn.Close() + if dkvClnt.CliConn != nil { + return dkvClnt.CliConn.Close() } return nil } From 35df530632f5661d5987034a35be38ff8e66864e Mon Sep 17 00:00:00 2001 From: Viren Date: Wed, 15 Jun 2022 23:53:07 +0530 Subject: [PATCH 4/5] remove unused stuff --- .../dkv-client/src/test/resources/Procfile | 14 +++++ cmd/dkvctl/main.go | 1 - cmd/dkvsrv/main.go | 1 - internal/discovery/client.go | 15 ------ internal/slave/ca-cert.pem | 33 ------------ internal/slave/ca-cert.srl | 1 - internal/slave/ca-key.pem | 52 ------------------- internal/slave/server-cert.pem | 0 internal/slave/server-key.pem | 52 ------------------- internal/slave/server-req.pem | 28 ---------- internal/slave/service_test.go | 11 +--- 11 files changed, 15 insertions(+), 193 deletions(-) delete mode 100644 internal/slave/ca-cert.pem delete mode 100644 internal/slave/ca-cert.srl delete mode 100644 internal/slave/ca-key.pem delete mode 100644 internal/slave/server-cert.pem delete mode 100644 internal/slave/server-key.pem delete mode 100644 internal/slave/server-req.pem diff --git a/clients/java/dkv-client/src/test/resources/Procfile b/clients/java/dkv-client/src/test/resources/Procfile index 4f089b32..8daf6647 100644 --- a/clients/java/dkv-client/src/test/resources/Procfile +++ b/clients/java/dkv-client/src/test/resources/Procfile @@ -12,3 +12,17 @@ dkv_slave1a:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-add dkv_slave1b:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:7092 --http-listen-addr 127.0.0.1:7096 --node-name s1b --database s0 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml dkv_slave1c:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:7093 --http-listen-addr 127.0.0.1:7097 --node-name s1c --database s0 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml dkv_slave1d:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:7094 --http-listen-addr 127.0.0.1:7098 --node-name s1d --database s0 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml + +dkv_slave2a:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:8091 --http-listen-addr 127.0.0.1:8095 --node-name s2a --database s1 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml +dkv_slave2b:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:8092 --http-listen-addr 127.0.0.1:8096 --node-name s2b --database s1 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml +dkv_slave2c:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:8093 --http-listen-addr 127.0.0.1:8097 --node-name s2c --database s1 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml +dkv_slave2d:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:8094 --http-listen-addr 127.0.0.1:8098 --node-name s2d --database s1 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml + +dkv_slave3a:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:9091 --http-listen-addr 127.0.0.1:9095 --node-name s3a --database s2 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml +dkv_slave3b:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:9092 --http-listen-addr 127.0.0.1:9096 --node-name s3b --database s2 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml +dkv_slave3c:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:9093 --http-listen-addr 127.0.0.1:9097 --node-name s3c --database s2 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml +dkv_slave3d:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:9094 --http-listen-addr 127.0.0.1:9098 --node-name s3d --database s2 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml + +dkv_master1:./bin/dkvsrv --role master --listen-addr 127.0.0.1:6080 --http-listen-addr 127.0.0.1:6085 --node-name s0 --nexus-node-url "http://127.0.0.1:8021" --nexus-cluster-url "http://127.0.0.1:8021,http://127.0.0.1:8022,http://127.0.0.1:8023" --config clients/java/dkv-client/src/test/resources/dkv_config.yaml +dkv_master2:./bin/dkvsrv --role master --listen-addr 127.0.0.1:6081 --http-listen-addr 127.0.0.1:6086 --node-name s1 --nexus-node-url "http://127.0.0.1:8022" --nexus-cluster-url "http://127.0.0.1:8021,http://127.0.0.1:8022,http://127.0.0.1:8023" --config clients/java/dkv-client/src/test/resources/dkv_config.yaml +dkv_master3:./bin/dkvsrv --role master --listen-addr 127.0.0.1:6082 --http-listen-addr 127.0.0.1:6087 --node-name s2 --nexus-node-url "http://127.0.0.1:8023" --nexus-cluster-url "http://127.0.0.1:8021,http://127.0.0.1:8022,http://127.0.0.1:8023" --config clients/java/dkv-client/src/test/resources/dkv_config.yaml diff --git a/cmd/dkvctl/main.go b/cmd/dkvctl/main.go index c76e7517..623aadf9 100644 --- a/cmd/dkvctl/main.go +++ b/cmd/dkvctl/main.go @@ -272,7 +272,6 @@ func main() { fmt.Printf(" (:authority = %s)", dkvAuthority) } fmt.Printf("...") - //client, err := ctl.NewInSecureDKVClient(dkvAddr, dkvAuthority, ctl.DefaultConnectOpts) dkvConfig := utils.DKVConfig{ SrvrAddr:dkvAddr, CaCertPath:caCertPath} diff --git a/cmd/dkvsrv/main.go b/cmd/dkvsrv/main.go index d1a96af6..a6575cf8 100644 --- a/cmd/dkvsrv/main.go +++ b/cmd/dkvsrv/main.go @@ -113,7 +113,6 @@ func main() { } kvs, cp, ca, br := newKVStore() - //grpcSrvr, lstnr := newGrpcServerListener() grpcSrvr, lstnr := utils.NewGrpcServerListener(utils.DKVConfig{ConnectionMode: srvrMode, SrvrAddr: config.ListenAddr, KeyPath: config.KeyPath, CertPath: config.CertPath, CaCertPath: config.CaCertPath}, accessLogger) diff --git a/internal/discovery/client.go b/internal/discovery/client.go index 2316ed68..93a58ad8 100644 --- a/internal/discovery/client.go +++ b/internal/discovery/client.go @@ -52,7 +52,6 @@ var DiscoveryClientConnectOpts = ctl.ConnectOpts{ func NewDiscoveryClient(config *opts.DiscoveryClientConfig, dkvConfig utils.DKVConfig, logger *zap.Logger) (Client, error) { - //conn, err := getDiscoveryClient(config.DiscoveryServiceAddr) dkvConfig.SrvrAddr = config.DiscoveryServiceAddr client, err := utils.NewDKVClient(dkvConfig, "", DiscoveryClientConnectOpts) if err != nil { @@ -68,20 +67,6 @@ func NewDiscoveryClient(config *opts.DiscoveryClientConfig, dkvConfig utils.DKVC return storePropagator, nil } -func getDiscoveryClient(discoveryServiceAddr string) (*grpc.ClientConn, error) { - // TODO - check if authority is required - ctx, cancel := context.WithTimeout(context.Background(), connectTimeout) - defer cancel() - return grpc.DialContext(ctx, discoveryServiceAddr, - grpc.WithInsecure(), - grpc.WithBlock(), - grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)), - grpc.WithReadBufferSize(readBufSize), - grpc.WithWriteBufferSize(writeBufSize), - grpc.WithAuthority(""), - grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`)) -} - func (m *discoveryClient) RegisterRegion(server serverpb.DKVDiscoveryNodeServer) { m.regions = append(m.regions, server) } diff --git a/internal/slave/ca-cert.pem b/internal/slave/ca-cert.pem deleted file mode 100644 index 94754275..00000000 --- a/internal/slave/ca-cert.pem +++ /dev/null @@ -1,33 +0,0 @@ ------BEGIN CERTIFICATE----- -MIIFxjCCA64CCQDsORrfDgvfMDANBgkqhkiG9w0BAQsFADCBpDELMAkGA1UEBhMC -RlIxEjAQBgNVBAgMCU9jY2l0YW5pZTERMA8GA1UEBwwIVG91bG91c2UxFDASBgNV -BAoMC1RlY2ggU2Nob29sMRIwEAYDVQQLDAlFZHVjYXRpb24xGjAYBgNVBAMMESou -dGVjaHNjaG9vbC5ndXJ1MSgwJgYJKoZIhvcNAQkBFhl0ZWNoc2Nob29sLmd1cnVA -Z21haWwuY29tMB4XDTIyMDUyODEyMDk0MloXDTIzMDUyODEyMDk0MlowgaQxCzAJ -BgNVBAYTAkZSMRIwEAYDVQQIDAlPY2NpdGFuaWUxETAPBgNVBAcMCFRvdWxvdXNl -MRQwEgYDVQQKDAtUZWNoIFNjaG9vbDESMBAGA1UECwwJRWR1Y2F0aW9uMRowGAYD -VQQDDBEqLnRlY2hzY2hvb2wuZ3VydTEoMCYGCSqGSIb3DQEJARYZdGVjaHNjaG9v -bC5ndXJ1QGdtYWlsLmNvbTCCAiIwDQYJKoZIhvcNAQEBBQADggIPADCCAgoCggIB -AN3yVugtIcclPC98JisJvW3lg4R92gu5sKIHMA+m4Ish347Ifc2W8RCPDUuYkLjj -TxjQTpzPCaQ8dezUH9DQqmziUsaqbvd37DRCyut9O1+AG8BOkmXeIrj2hWNkYmd8 -+RkYxWPXq6f9XVaeW696xNnWJBwyeSPdIAcCfUzwGvQGtzWmBQ4hB+CWsqpM+q3A -OKyxIgN61JKxKhffqtEXgKpdbbv7SZzMhbpsT871vvjmRng/p9pAnhvFba08yAoi -sn/chnUEUj1fozeZrC+uAp0B8tICx02GqHVEVKiJeZx3L43wt7bObK/j2v0UJYA6 -If+QdlZouNCLCcKz3NZXh9q0AqLvIAEYJlAjUhTZHvPj3rBoVozo6KIabWLZGJQG -WLiDfKPhJyY+qFc3hCyOSLnRccLxKomAEc7d2NbcaQj55OgQzEyHzVJ63sLgQQMD -VlsT/WuOfh7QiJsB4mi1kbuwvGJxLS5w+AC94aQUzcDRuoUzBgaaOBAbpAvhFenW -2P9zj0IaiZJKpysS+PnC+nZspxf1Ujj5IxcGmHivA61Dd+oFLfYr20ZsMCe2dMqo -gxHhv34ICuds47MiDE1Ygawa53K7S4mapgIpEUVvN27bPVUSVpRVWmTysvtZM1OU -2spqAM5jw0TO3i6l/rTy5/ZoxuYNPmdTqblzK1dttBB7AgMBAAEwDQYJKoZIhvcN -AQELBQADggIBADzUZ8e56Lj4MviDh7ou3RXuoPnWi5J4nyPRqg+a5/HtU4skO6U+ -uZ3VPUeSH8ShV1kQTkXgeM3VQVh9c622CC6u6C4CMqPHchJy4h+fIoA1tTFPa5LQ -gPhGqJ7bWhUjN9VRDy1YycpRv1qRzNXuEU5MageJOW+OcnZFjU6tgeb/W34A1Q0T -3Jd4oDUUsy2JuA8cIP5m17UuR/9iy/gQo4qduWsFfNyWuy2WOtoHx1d4O0bQ1/Rj -7mYcFdQd3EczvVDVqf/9JQZFrIpPkHKDN7ORSCxwxmWzUIno61fuDTEDGA8/f2hX -GosZGPrUAUFBwTcdggfzeTIzHoBiXPGQJOF28s/avjKi1HX7uSwnWuo3TlyMcTcw -Y4fi9uS9hW8nvb1BWdsewsuVw8QU7a5t1b67rM+0GRnC66Npeou+LWmCscXj/aYG -aN2UnhsSQ5rtSDQd2zxDyKr0qMPdHol5A0WnL1pkr3Gjr7fXqwPkDX7LHMhkJ6ta -BTGaxUWj9qmAjPYDHGNV/UB2qMmYnRb0FIt5QaP+7dwpT6joZ3MdRvpdVyyDVrAZ -CWGiMY8k1NDaqxQNtgqOBF5ywqQrA9Gis2zMYeGOtYZmECLzonTJ2QKxkZLjUuaR -JwfH9BgKsIh/k6T35M5d0rgSLkE4OogxkMAXfyJw+TRJOiJ/5ocvru96 ------END CERTIFICATE----- diff --git a/internal/slave/ca-cert.srl b/internal/slave/ca-cert.srl deleted file mode 100644 index 8737f1d4..00000000 --- a/internal/slave/ca-cert.srl +++ /dev/null @@ -1 +0,0 @@ -E6A0FF61B348207B diff --git a/internal/slave/ca-key.pem b/internal/slave/ca-key.pem deleted file mode 100644 index 7be638f4..00000000 --- a/internal/slave/ca-key.pem +++ /dev/null @@ -1,52 +0,0 @@ ------BEGIN PRIVATE KEY----- -MIIJRQIBADANBgkqhkiG9w0BAQEFAASCCS8wggkrAgEAAoICAQDNyyZxtHVDYwAF -sCOEFV+zRhITQYNzyduSm6QISGkW4OeOfK9JqRixX18Bs6VyMSq0cvV/kQyDIu26 -uKkpO31xBo4Z4BlcxOBdDgXdPoTXTJzKvXYeyiuqV6/pw2WKF5z3lWxyt8TyfAQm -TUzxoyE3dE/nFhFe8WN0idCWifY3/Ng+3AzIPHakJyKzxGbBxqXHTZgYIzD/7Vfh -O6QO82ZOFhr3hR65rV2Gq6eAG/c1lEGA6O8HdtJ80tZ3z44KLjQ6pagfSZ07aEiM -EZh7CqhjUCdL2wvXrPT9mVDBn7dCFFSftezq03ehVawNkVx/rHAs+Tm8jdvuMhR6 -Z845KJ9xUcwSCS9wde7MwOGsmjXWPwXLuNKDishYO1otWA50S7yU//g3GU8z3/WW -fP5U8yDm9oDjyNKSwD8YPgYz/cLjteg9W5EXm0YetvfuByx+lLJCQCPozAlCOcIW -H7QKqDzYlIWZ1p1ETlf9wkvmR0lMTSAQXL4kKVZRPKMZt9Xk9ufagY6yxubwKAzp -UZ5Cwfac1axvZRouBz4BbbjwRNFRsEUYZCoTJQfkApExO9Q1KGgMBxFUHHjeRHmx -nJ6NBe/5aptvtPRoV49sgMMU1qK4x+8MnjI1/Mn2Ug4ugc/1H6IOt6HqPMzd2V4z -GwsqIdyCzjoy/pzFbol+uR63omIlLQIDAQABAoICAQDCCxK11iVXo+qqcloauhih -rqludLChJi2ewJjbooNMqkZMaS7S8tmwra+kqCKRWODegyMO5NL6d53XGTWQBmMS -hs7IgTEI8y5nL8jBw6h4ZenVlO3ivFiN5t6dObvtiLReEwvN44+V4O/8uZiao2m3 -ozmQOJOqhUt04FnJCE8cnZG9O7nluHVZ9PjYrpEBl/BKy9RkthIGz2G0o4re+gx2 -Bovo5iGBjIGKcSKgPj5Ly/4zse9oh1aTki3djNxupwBiaD7tOXlDQeJeCF1Rb0w1 -UKes897nv1LGKKEn4RRn/0mHyn+ZMEo12tU/edfRHuQq09jn/I7qu1fpzyV5ad9j -xE3XF+j/so2gLbk3MjLTo6ItGdX6AdHsjoL4FmX5j1YRmzaxcAO+tpEcKY88ks8R -Onq6c/eoq9PXH6R53Ez5ijOeC4AXzvPZMzV4HRUFsTLttiYGESucAFIHpW686uCi -idh7Oj6nicE5ZlMwnu6N5Y9AtXMrIv8iQngbkjXJjuhSoeHvtwGQu2L/NfL2i8QO -c0t+8DCjKvgSRpJof5o3C2Ku5xCCa4MZpMzntoAG5lYzw3n0RQCJkCsf0aWOopBy -mNESuC32avYPRixDsnqsNtSyN66vUfYkFlZ3S0SnAeQv+DXRlRigXs/2lZ6h+R2N -/+N8FY2Wai4pfBBNFiWUNQKCAQEA/lXXtiRV326ymWH+12XFAnJxOudR0w/L8ZsY -xHdpN2oN+O5yCBAKUMNlX8XrFAh0aWrHkGThU6s/o0PiWNHcOIVL/5JuzY7xiVl+ -mhGKkDjDn5velAHG+Jw4F/KHlhuAqhg7HUhasSzKPZbyCXRbLcQQfrNkr4CEIBTr -qCzYgOmyAaU7q8bxTpsJVZVawHQLHmzoUEUfdRhxFOKrRrjMDFMPPn5yTKygjbdT -PgY97lz4y2abV8HRuQMQDWYLXx9g4Bs0cikieeho5MRfsR9kSznnatX2GkFHFnOx -ll3/nCMiDOa7eFPNEt+cDQ1/17U+HHtRMOvC3le3GBK6G6vXtwKCAQEAzyP450c1 -TVSFkddLLtnWKM2Y474Y26SNuloXL1b/MU5zhKk7syXjgBenltyX8lxVj+qBD1P3 -3Gd9d3PFSy6CDk18kSholkELwBHlXQB+mFN4CSHd63sv7GIl94XpyYn+zslvAHq7 -gO/hf5NhsSPLmfAgNfv/Vd3qb3h52vFPr65JzE5CcZ2hB3i/2G5zaO5IjtHJKMKy -8ZCzvnWfm0BeRyDzccZZ3h77Y9J4YpknDZQ0BixIVKz61fjBGViYSXhyOUvl9xdS -muPsdoSAcqt5bx6sQZY+XbD0rfgysxzO58HhPmdgSS80/lIO5sMxNxx4ztrGcMrG -FrGvD8/YdqgCOwKCAQEAvPyYj3WsgZxGBgd1UCUFIW5ciQJ+ILFqQBuZadO5UT3z -J/g2r5sLrzuiRmpQ5c/qgLKhm7/6Vyu22+CDfrBiTXVvJXdv8jq3a/IeSumh5szn -P+CYoa44ZV/fppvG+FlzX/Ela/rpuSGpHd7d9vyJDBbWku9Gb8q1YwlomszIC/Kb -HPHkxx3LDTfS1c9nYf5YjMEsVwkl+OKylGxiDOkgZkAJ4cwfrq3Pl76TklEdD9Px -42JZJ/qTlkCaWvpqlMJCQAS3EeVTAvcZinQJDxpeci+SsCZQ+u1qn1pUqYErgJ/y -m1eQEjdi+RVDZceUKXs5rfxMNmfZo1HWcYLo28rzPQKCAQEAoRvMcy4aUgwkW1uF -eWoqvM+0U6Y+NU+WgFOj5skWcSYgiRULfvpAKRe83IS0leelEOuG+AIKLV484tSj -picFNtSjx08TkZID7p68s4o37Ig3O3sg3Q+SJZvtBckdHgj7q6BWepv7DH0G+bAK -8rjxmch/dR2L6iBRP+QHevfnT7ASbBqOaUn9vg73wAGgOAPlYVffypksjuBKPR/F -RP5alZ4WWu64ZR/ZzTkdMysFifU0RXzZwSpNanEw12LmB3br+I6EzZtRP3F5MHE8 -anPeXkQQHG8cEMkaddkurcsBTpaod132MNXLtObj+i3lMIUvnUvhvkaorWwDyhuR -tDBWkwKCAQEA9WBv8xAbHr+AVdjiuSaUttKs3ljtXvr2P8Z/Z0V8A+En3Lct7HGP -gnfsEpY4I/O49vpqsB4l34857SvyN2pmM94vTB0vb6kp1wj3F214nvxadEiRdCC5 -dM0U1S0Ehg0C11BBff0FJxDCk9RHkPVI1pA5KFd6LMxc89mc/2fo498XPlUWx+RZ -i4ujKh1u30y23k+a1EhZ7WhUgHyi2N/bWtPtAjdo1cngYjUC7bulAzlBIuDzrnZi -6PRn71kKlkWqO21lZSlnXw5sUZOD0nyYXN2k5zw+X0LQ27Ps3kCfm7yt++G72Lpo -g5WMpViDo5gu3TttueWJ7jGiRW6ZLIv7vQ== ------END PRIVATE KEY----- diff --git a/internal/slave/server-cert.pem b/internal/slave/server-cert.pem deleted file mode 100644 index e69de29b..00000000 diff --git a/internal/slave/server-key.pem b/internal/slave/server-key.pem deleted file mode 100644 index ba03989d..00000000 --- a/internal/slave/server-key.pem +++ /dev/null @@ -1,52 +0,0 @@ ------BEGIN PRIVATE KEY----- -MIIJQQIBADANBgkqhkiG9w0BAQEFAASCCSswggknAgEAAoICAQDKyZNtQfkeE9HG -Ft4pPLLnL4vnkMC7265xdS5vGY2uYgtdFRomVJTrYwA+B8ljISb1Afro/f+P1fa6 -ad89QZk+WF4sIi/giduJODqIV305hDF7gSOw9HlAxX/cYUWGD/Q51yRclz4goXSE -ps4hNAPiAgFlMu7kiuO7eaLYLoF3T3LCwMoLRWZ3tMdHpsTl9Mmz1uN+N/B/e2K7 -RH/u4NI53Yj48yQEbERHk13j3NYf8yCjYAfXSWBG6LROY7xQxC3VbbxYYYJcYJ4t -G5JLQOyQ4ZhrxlWvlth2vOUB9c1KZCaXbAMzuc3gv4hwEXJdbYfjWHiMjhtYQAvP -ouL6gGndxEymndRxAFTpHDfdzo3PIfJnYke7V8hPOLa4oG6OBMjmwrOTyTpzRZYl -AUyqm4+Lcvt1898Giq3MRFLLPtuW2t8HDVTJerzf2EVEG+EM7/ZBu1s7EXvK/Glm -L679FTvgVi77arjDFvH0TPZbmI2kXt81viGB6i9cdNZAaqTFS4BQ9Kh4fjZSFO8N -NU/Wd2gIn0y5IGSUTD/azcsQ2WSClUyKBxD93RrkGDXNLjgmWBeUYGnBbVVirYN+ -T1+HmAYuulfEuse7qw2AlfMvZ/LMRXSW81pqSRUFp75E829bfto1BvFPKMDtUzJI -vNIUezJfkAv1fGhUbEKcM7FGvAMTjQIDAQABAoICAAdS9ncovdWfQ6FS7B0zKwb8 -iaIIcV2XlppUcCPlGV/l/KEVBQfDYEmMHtMetuBUrzKEKsrFebh5MtsSvdRZjS5a -RLR1W6NRd1Nia4LHHie4pYzfYime+s4jE/YisT8hTmHUamTJtZ26Fxo/aAx883Lz -PZepGkW42nbH8uC59UUY4n00N1fm3seOCPCGjm/7wlkzdq31WUNRcoRu+evnxNAA -N1U+etuKCYFISkjTjgxDXLdp3X6iEM8e0nxOT9ve6gTeXMIYa3vopW3dpBABvUmS -TE4AMNIRsanAMsk6VnrMWTR42zjACjaG1GevyaCkNxc398qjy0sJrSOHr4j6oFgo -qjhqidsUWUyd8R+u9K/z6WkCWD5SyRLX3lxc7QD4jW/EXzfy5F3OJV9e0IqhiX/x -0Eq/4drnRfbUNS5ENRFPu9RPKpyAkazJ8DDRmlk4ScLj1bp5rsd3RwAdWu+hE6Lg -tcxKRChbk93MW2B5+N1VSYBnGA9gChsuCIQGwj2yXrsfPDq/N13k4fdFMTv/TXdc -pX2QWa95+i7Uq7chvS+ER7cdWuLw3RtME6IIlS/I3faZJtH/6u0QCzg76L6qqEoA -J39qLLboSsggJr7NTZUlexnm2PiRhDa7q2iU/QOf9T40S++ejK5koEIAmGUaUMy3 -THAOUuklDW0cw7SqLKVBAoIBAQDrbzGUE/9/B4g1tO5Nu08moqmT6JwcwxGyI6zS -v8nZ+tTczTXe9pm31SdD+s+J9NvfS55ORXufrX9tKoPRKPTuADzdkFf7WcN9UIWZ -ftFeATTc3YCv5h0SquMoeg385h4FtPEd+lFI3sYUn8EjcbL/7MrDx87CS5m8dhlg -AfxkaQOmJv/CJstCdqFVc8uPNJe5ZVYpg+AZRQ5X0uerLt98+c0RP7buIBjjdFmf -mwMx0yEDdBoj7Q9B/pTJ9owSsqW1cZjA+sB3AWi0+SrZWViOx51iY7Uv98taAz3h -lbDowGgqv2QtTcQps4ms3tj1oWtswsYfara/doU36astaLtRAoIBAQDcgFP4F995 -sMaJ9bosvyZ5RXmyHbmQGDUYj1gFV5sCNAY89xiTmYpUacNspYTA6EnFMhtVEWZ0 -zaxSV+xt4MIRhOjLaMPIet2q/fap132VT+FTqbaNJKR/YOqZ25nCpgOaDm1cmBcz -ZAn24uWWKtXGd84XLWoIeRjnWOZV/Xn+FN6Pk4kEguowQT8U6JFka2SObGSamHnV -I5fp9ZYmfwu8zM0RSdYn+LixI9wCo5Y4I9iVJ375RgsRgTR1MHbFym7n99jy+Rlg -o+Vg1A+VXxvHFpmIAC4RyIQvbqTSu2zVYpH3U5AUnf7y6kqYuMEDytAA/Ah0I6Kn -b5m4hxoAdI19AoIBABlzuutkQhHMkagkuEKFBmiJ20VXN1FQsWANxUg/XaYIHO0F -7lX9WN7r10PdIISsiPyZ4zJZBqVbj1QQ7ZE2GUyFkA7v5OMScDTi+J6AfEfqX3ml -JJ1fhjMXcFlW4J4/jm1Vg1+XSSRUmRtvreSUDSLnyX/i4PJWU/dw2QttC4kbZT6Y -Jrwpj33O72AalZ4UtHZhgzAXxqlUstlBvrPd7GFJbCb6OV+C7s3gLTQ5AN5pEb+a -1Jyfqy+aQyGWWsxGre1wq2/pa9cxus0dxskahJ1i7RvO5a6a3kwL36u8kBSJgHlm -mYDgUF2lwDOFH/VmDwQCBoMs+Ta3rG1AALZ3BRECggEAYjiDruqrcGCPP9sGsHIB -aFnLSM024Hn4BNWtZsvMT6Hu+W80MmNdnBZ0p8vfI7MVUVLtm+ohUnxfe1qRBazt -pbInpJwpQf+BCG8SVjuYRE+N1p+topek8tywBBMvHeanihNNIcbZ5tl0GtadYFP6 -X+r0EZz4M4gF0N/n0nwTwFt5fElC6pCKUy5tNBoY0buI0mx0L+0lsAyn2d68oK9N -Ai/UVRQn4ixH0RhhI4fNHXrSOjaat908hvKf9Ge/MIHo/0mHCUxuwn6faY9s1aPF -Etz9TPiPjPpd72cA4qH8a9z5mktNzfFHJNu6mOcLi23nU9jwxxMsnBr5RZpgub0x -XQKCAQA58DQVUCvpvfC/ZZiE9hzxz2X6ew8Jh+w3a+UYbhIb88WO8LfN371OOUZy -zIpnJlGcGoOFlOa61t44XuAsXMMjB/I8alga/SdRRGsyU6Lpl8HUZKJuaYCAspQm -HikJXuaMHfp399C9+oG7Biku409iRSchR8Fb4yXa5+KQJK8DwySOyhMBPzLPItMu -g01Zbui+YyPqZJ27dd6vWjtj+7ihfHakEVD8dzuyYpf1qunsdojdpelYkBQ2zS5t -4ayqx39d31VQMEj9kzCMQsbzHUa3+a1POfkiCpU+q+ThC0/SwDVVLmQ7pWO5ZsOI -Yo9B4RbPBknliRnYJ4EM4Sqq17Tu ------END PRIVATE KEY----- diff --git a/internal/slave/server-req.pem b/internal/slave/server-req.pem deleted file mode 100644 index e88f5c33..00000000 --- a/internal/slave/server-req.pem +++ /dev/null @@ -1,28 +0,0 @@ ------BEGIN CERTIFICATE REQUEST----- -MIIE2DCCAsACAQAwgZIxCzAJBgNVBAYTAkZSMRYwFAYDVQQIDA1JbGUgZGUgRnJh -bmNlMQ4wDAYDVQQHDAVQYXJpczEQMA4GA1UECgwHUEMgQm9vazERMA8GA1UECwwI -Q29tcHV0ZXIxFTATBgNVBAMMDCoucGNib29rLmNvbTEfMB0GCSqGSIb3DQEJARYQ -cGNib29rQGdtYWlsLmNvbTCCAiIwDQYJKoZIhvcNAQEBBQADggIPADCCAgoCggIB -AMrJk21B+R4T0cYW3ik8sucvi+eQwLvbrnF1Lm8Zja5iC10VGiZUlOtjAD4HyWMh -JvUB+uj9/4/V9rpp3z1BmT5YXiwiL+CJ24k4OohXfTmEMXuBI7D0eUDFf9xhRYYP -9DnXJFyXPiChdISmziE0A+ICAWUy7uSK47t5otgugXdPcsLAygtFZne0x0emxOX0 -ybPW43438H97YrtEf+7g0jndiPjzJARsREeTXePc1h/zIKNgB9dJYEbotE5jvFDE -LdVtvFhhglxgni0bkktA7JDhmGvGVa+W2Ha85QH1zUpkJpdsAzO5zeC/iHARcl1t -h+NYeIyOG1hAC8+i4vqAad3ETKad1HEAVOkcN93Ojc8h8mdiR7tXyE84trigbo4E -yObCs5PJOnNFliUBTKqbj4ty+3Xz3waKrcxEUss+25ba3wcNVMl6vN/YRUQb4Qzv -9kG7WzsRe8r8aWYvrv0VO+BWLvtquMMW8fRM9luYjaRe3zW+IYHqL1x01kBqpMVL -gFD0qHh+NlIU7w01T9Z3aAifTLkgZJRMP9rNyxDZZIKVTIoHEP3dGuQYNc0uOCZY -F5RgacFtVWKtg35PX4eYBi66V8S6x7urDYCV8y9n8sxFdJbzWmpJFQWnvkTzb1t+ -2jUG8U8owO1TMki80hR7Ml+QC/V8aFRsQpwzsUa8AxONAgMBAAGgADANBgkqhkiG -9w0BAQsFAAOCAgEAcyqLbxtz0MCFgXXlT4kFdb3s9cFLSkLFodC+sbys4PL3l6cy -FGzWiBlaunNMBqFpBdYJ+DYQA0PCWCRjXJmlqXIigkKgnuMIl+73waDswkrRQDmS -HdU9/L393dwVpD1kBDXxhEHR5BphGUYGoOqteZzJcLIlr7DypmjawmzrwTHM+nOG -Ze67RkqC6rzQJPUxYNiHvEEjpEr4dJ0qmdQX9fbk/vCHnaXribjBHaxzmi6CxmOu -GSLdm+g17gUdzz79w0dZR7F6mC0J/MF84fM3EFFVuEk57T3MdvCGGs0BCsbWpP9o -S8ki39b6gDjRvdkljMgbfhVUdnrfbzxAn1tTl9dx//iSV5ABz+cOmHIOOZxYHadT -gu/GMSGWgxnmTYq+EIUbIo+jN8E7evchVLQrQXrcFo5QiXzToxoxPhuuppN2dO58 -z+DlHfFz3fPsBXc12WpnKbpjisrRPPqtG+CaW/HBVZBqHU3iuXN7yXobIkF8hGAN -WibyHVXAXsG+PekCObW0dXYfTnvV17MA897CgRsVOI1HKVfSi5LF7mthfbvBy7sI -hZ5Giqjilh7FZcAltQBl9+1aduKjGwcjXrMKFFqK3dn0KfxLJQ9wILBTcxf1Lc3N -FzP5bAgBBqhKHU5cCGldqj78OTJ/Lt4kflPBLPQTPG+z2GNfyMyBUR81bUE= ------END CERTIFICATE REQUEST----- diff --git a/internal/slave/service_test.go b/internal/slave/service_test.go index bf007517..d34e138f 100644 --- a/internal/slave/service_test.go +++ b/internal/slave/service_test.go @@ -37,7 +37,7 @@ import ( const ( masterDBFolder = "/tmp/dkv_test_db_master" slaveDBFolder = "/tmp/dkv_test_db_slave" - masterSvcPort = 8185 + masterSvcPort = 8181 slaveSvcPort = 8282 dkvSvcHost = "127.0.0.1" cacheSize = 3 << 30 @@ -173,23 +173,16 @@ func testSlaveAutoConnect(commType string, t *testing.T) { //start discovery server startDiscoveryServer() - t.Log("Started disc server") sleepInSecs(3) startDiscoveryCli() - t.Log("Started disc client") defer discoverydkvSvc.Close() defer discoveryCli.Close() - t.Log("closed") - //start dkvservers initDKVServers() sleepInSecs(3) - t.Log("DKV servers initisalised") registerDkvServerWithDiscovery() - t.Log("register complete") initDKVClients() - t.Log("dkv clients init") defer stopClients() defer stopServers() @@ -202,7 +195,6 @@ func testSlaveAutoConnect(commType string, t *testing.T) { t.Fatalf("Cannot connect to new master. Error: %v", err) } - t.Log("present here") lastClosedMasterId := -1 for cnt := 0; cnt < clusterSize+1; cnt++ { masterId := getCurrentMasterIdFromSlave(t) @@ -246,7 +238,6 @@ func testSlaveAutoConnect(commType string, t *testing.T) { t.Log("New master port:", dkvPorts[masterId]) } - t.Log("hola here") stopDiscoveryServer() } From 4c58f8110ccdf56729c683b1e6786742ebb9fc17 Mon Sep 17 00:00:00 2001 From: Viren Date: Thu, 16 Jun 2022 14:32:43 +0530 Subject: [PATCH 5/5] check issue --- internal/slave/service_test.go | 4 ++-- internal/utils.go | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/internal/slave/service_test.go b/internal/slave/service_test.go index d34e138f..2eb3f653 100644 --- a/internal/slave/service_test.go +++ b/internal/slave/service_test.go @@ -104,7 +104,7 @@ func TestMasterRocksDBSlaveRocksDB(t *testing.T) { func TestMasterRocksDBSlaveRocksDB_Secured(t *testing.T) { - utils.GenerateTlsFiles() + utils.GenerateTlsFiles(t) opts.AppConfig = getConfigSecure() masterRDB := newRocksDBStore(masterDBFolder) slaveRDB := newRocksDBStore(slaveDBFolder) @@ -112,7 +112,7 @@ func TestMasterRocksDBSlaveRocksDB_Secured(t *testing.T) { } func TestMasterRocksDBSlaveBadger_Secure(t *testing.T) { - utils.GenerateTlsFiles() + utils.GenerateTlsFiles(t) opts.AppConfig = getConfigSecure() masterRDB := newRocksDBStore(masterDBFolder) slaveRDB := newBadgerDBStore(slaveDBFolder) diff --git a/internal/utils.go b/internal/utils.go index 2dca18f1..957960b0 100644 --- a/internal/utils.go +++ b/internal/utils.go @@ -24,6 +24,7 @@ import ( "os" "os/exec" "path/filepath" + "testing" "time" ) @@ -152,11 +153,12 @@ func loadTLSCredentials(clientConfig DKVConfig) (credentials.TransportCredential return credentials.NewTLS(config), nil } -func GenerateTlsFiles() { +func GenerateTlsFiles(t *testing.T) { var err error homeDir := helper.HomeDir + t.Log(homeDir) generatedCertDir := homeDir + "/cert" tlsFile := generatedCertDir + "/tlsFiles.sh" serverConfFile := generatedCertDir + "/server-ext.cnf"