Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Secure communication via tls #147

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions clients/java/dkv-client/src/test/resources/Procfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ dkv_slave1c:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-add
dkv_slave1d:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:7094 --http-listen-addr 127.0.0.1:7098 --node-name s1d --database s0 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml

dkv_slave2a:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:8091 --http-listen-addr 127.0.0.1:8095 --node-name s2a --database s1 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml
dkv_slave2b:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:8092 --http-listen-addr 127.0.0.1:8096 --node-name s2b --database s1 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml
dkv_slave2b:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:8092 --http-listen-addr 127.0.0.1:8096 --node-name s2b --database s1 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml
dkv_slave2c:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:8093 --http-listen-addr 127.0.0.1:8097 --node-name s2c --database s1 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml
dkv_slave2d:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:8094 --http-listen-addr 127.0.0.1:8098 --node-name s2d --database s1 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml
dkv_slave2d:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:8094 --http-listen-addr 127.0.0.1:8098 --node-name s2d --database s1 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml

dkv_slave3a:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:9091 --http-listen-addr 127.0.0.1:9095 --node-name s3a --database s2 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml
dkv_slave3b:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:9092 --http-listen-addr 127.0.0.1:9096 --node-name s3b --database s2 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml
dkv_slave3c:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:9093 --http-listen-addr 127.0.0.1:9097 --node-name s3c --database s2 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml
dkv_slave3d:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:9094 --http-listen-addr 127.0.0.1:9098 --node-name s3d --database s2 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml
dkv_slave3a:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:9091 --http-listen-addr 127.0.0.1:9095 --node-name s3a --database s2 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml
dkv_slave3b:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:9092 --http-listen-addr 127.0.0.1:9096 --node-name s3b --database s2 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml
dkv_slave3c:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:9093 --http-listen-addr 127.0.0.1:9097 --node-name s3c --database s2 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml
dkv_slave3d:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:9094 --http-listen-addr 127.0.0.1:9098 --node-name s3d --database s2 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml

dkv_master1:./bin/dkvsrv --role master --listen-addr 127.0.0.1:6080 --http-listen-addr 127.0.0.1:6085 --node-name s0 --nexus-node-url "http://127.0.0.1:8021" --nexus-cluster-url "http://127.0.0.1:8021,http://127.0.0.1:8022,http://127.0.0.1:8023" --config clients/java/dkv-client/src/test/resources/dkv_config.yaml
dkv_master2:./bin/dkvsrv --role master --listen-addr 127.0.0.1:6081 --http-listen-addr 127.0.0.1:6086 --node-name s1 --nexus-node-url "http://127.0.0.1:8022" --nexus-cluster-url "http://127.0.0.1:8021,http://127.0.0.1:8022,http://127.0.0.1:8023" --config clients/java/dkv-client/src/test/resources/dkv_config.yaml
dkv_master3:./bin/dkvsrv --role master --listen-addr 127.0.0.1:6082 --http-listen-addr 127.0.0.1:6087 --node-name s2 --nexus-node-url "http://127.0.0.1:8023" --nexus-cluster-url "http://127.0.0.1:8021,http://127.0.0.1:8022,http://127.0.0.1:8023" --config clients/java/dkv-client/src/test/resources/dkv_config.yaml
dkv_master1:./bin/dkvsrv --role master --listen-addr 127.0.0.1:6080 --http-listen-addr 127.0.0.1:6085 --node-name s0 --nexus-node-url "http://127.0.0.1:8021" --nexus-cluster-url "http://127.0.0.1:8021,http://127.0.0.1:8022,http://127.0.0.1:8023" --config clients/java/dkv-client/src/test/resources/dkv_config.yaml
dkv_master2:./bin/dkvsrv --role master --listen-addr 127.0.0.1:6081 --http-listen-addr 127.0.0.1:6086 --node-name s1 --nexus-node-url "http://127.0.0.1:8022" --nexus-cluster-url "http://127.0.0.1:8021,http://127.0.0.1:8022,http://127.0.0.1:8023" --config clients/java/dkv-client/src/test/resources/dkv_config.yaml
dkv_master3:./bin/dkvsrv --role master --listen-addr 127.0.0.1:6082 --http-listen-addr 127.0.0.1:6087 --node-name s2 --nexus-node-url "http://127.0.0.1:8023" --nexus-cluster-url "http://127.0.0.1:8021,http://127.0.0.1:8022,http://127.0.0.1:8023" --config clients/java/dkv-client/src/test/resources/dkv_config.yaml
10 changes: 8 additions & 2 deletions cmd/dkvctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sort"
"strings"

utils "github.com/flipkart-incubator/dkv/internal"
"github.com/flipkart-incubator/dkv/pkg/ctl"
"github.com/flipkart-incubator/dkv/pkg/serverpb"

Expand Down Expand Up @@ -228,11 +229,13 @@ func (c *cmd) clusterInfo(client *ctl.DKVClient, args ...string) {
}
}

var dkvAddr, dkvAuthority string
var dkvAddr, dkvAuthority, caCertPath string

func init() {
flag.StringVarP(&dkvAddr, "dkvAddr", "a", "127.0.0.1:8080", "<host>:<port> - DKV server address")
flag.StringVar(&dkvAuthority, "authority", "", "Override :authority pseudo header for routing purposes. Useful while accessing DKV via service mesh.")
flag.StringVar(&caCertPath, "caCertPath", "", "Path for root certificate of the chain, i.e. CA certificate")

for _, c := range cmds {
if c.argDesc == "" {
flag.BoolVar(&c.emptyValue, c.name, c.emptyValue, c.cmdDesc)
Expand Down Expand Up @@ -269,7 +272,10 @@ func main() {
fmt.Printf(" (:authority = %s)", dkvAuthority)
}
fmt.Printf("...")
client, err := ctl.NewInSecureDKVClient(dkvAddr, dkvAuthority, ctl.DefaultConnectOpts)
dkvConfig := utils.DKVConfig{
SrvrAddr:dkvAddr,
CaCertPath:caCertPath}
client, err := utils.NewDKVClient(dkvConfig, dkvAuthority, ctl.DefaultConnectOpts)
if err != nil {
fmt.Printf("\nUnable to create DKV client. Error: %v\n", err)
return
Expand Down
82 changes: 44 additions & 38 deletions cmd/dkvsrv/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,7 @@ package main
import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"net/http"
"net/url"
"os"
"os/signal"
"path"
"strings"
"syscall"

utils "github.com/flipkart-incubator/dkv/internal"
"github.com/flipkart-incubator/dkv/internal/discovery"
"github.com/flipkart-incubator/dkv/internal/master"
"github.com/flipkart-incubator/dkv/internal/opts"
Expand All @@ -32,14 +21,22 @@ import (
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"io"
"io/ioutil"
"log"
"net"
"net/http"
"net/url"
"os"
"os/signal"
"path"
"strings"
"syscall"

grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
flag "github.com/spf13/pflag"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"

"net/http/pprof"
)

Expand Down Expand Up @@ -98,15 +95,27 @@ func main() {
flag.Parse()
config.Init(cfgFile)
config.Print()
opts.AppConfig = config

setupDKVLogger()
setupAccessLogger()
setFlagsForNexusDirs()
setupStats()
go setupHttpServer()

var secure = config.CaCertPath != "" && config.KeyPath != "" && config.CertPath != ""

var srvrMode utils.ConnectionMode
if secure {
srvrMode = utils.ServerTLS
} else {
srvrMode = utils.Insecure
}

kvs, cp, ca, br := newKVStore()
grpcSrvr, lstnr := newGrpcServerListener()
grpcSrvr, lstnr := utils.NewGrpcServerListener(utils.DKVConfig{ConnectionMode: srvrMode,
SrvrAddr: config.ListenAddr, KeyPath: config.KeyPath, CertPath: config.CertPath,
CaCertPath: config.CaCertPath}, accessLogger)
defer grpcSrvr.GracefulStop()
srvrRole := toDKVSrvrRole(config.DbRole)
//srvrRole.printFlags()
Expand Down Expand Up @@ -143,7 +152,7 @@ func main() {

if srvrRole != noRole && srvrRole != discoveryRole {
var err error
discoveryClient, err = newDiscoveryClient()
discoveryClient, err = newDiscoveryClient(utils.DKVConfig{CaCertPath:config.CaCertPath})
if err != nil {
log.Panicf("Failed to start Discovery Client %v.", err)
}
Expand Down Expand Up @@ -173,7 +182,22 @@ func main() {
}
defer dkvSvc.Close()
serverpb.RegisterDKVServer(grpcSrvr, dkvSvc)
serverpb.RegisterDKVReplicationServer(grpcSrvr, dkvSvc)

var replSrvrMode utils.ConnectionMode
if secure {
replSrvrMode = utils.MutualTLS
} else {
replSrvrMode = utils.Insecure
}

replGrpcSrvr, replLstnr := utils.NewGrpcServerListener(utils.DKVConfig{ConnectionMode: replSrvrMode,
SrvrAddr: config.PeerListenAddr, KeyPath: config.KeyPath, CertPath: config.CertPath,
CaCertPath: config.CaCertPath}, accessLogger)
defer replGrpcSrvr.GracefulStop()

serverpb.RegisterDKVReplicationServer(replGrpcSrvr, dkvSvc)
go replGrpcSrvr.Serve(replLstnr)
//TODO do we need to register below with replGrpcSrvr as well
health.RegisterHealthServer(grpcSrvr, dkvSvc)

// Discovery servers can be only configured if node started as master.
Expand Down Expand Up @@ -284,24 +308,6 @@ func setupDKVLogger() {
}
}

func newGrpcServerListener() (*grpc.Server, net.Listener) {
grpcSrvr := grpc.NewServer(
grpc.StreamInterceptor(grpc_zap.StreamServerInterceptor(accessLogger)),
grpc.UnaryInterceptor(grpc_zap.UnaryServerInterceptor(accessLogger)),
)
reflection.Register(grpcSrvr)
return grpcSrvr, newListener()
}

func newListener() (lis net.Listener) {
var err error
if lis, err = net.Listen("tcp", config.ListenAddr); err != nil {
log.Panicf("failed to listen: %v", err)
return
}
return
}

func setupSignalHandler() <-chan os.Signal {
signals := []os.Signal{syscall.SIGINT, syscall.SIGQUIT, syscall.SIGSTOP, syscall.SIGTERM}
stopChan := make(chan os.Signal, len(signals))
Expand Down Expand Up @@ -436,8 +442,8 @@ func registerDiscoveryServer(grpcSrvr *grpc.Server, dkvService master.DKVService
return nil
}

func newDiscoveryClient() (discovery.Client, error) {
client, err := discovery.NewDiscoveryClient(&config.DiscoveryConfig.ClientConfig, dkvLogger)
func newDiscoveryClient(dkvConfig utils.DKVConfig) (discovery.Client, error) {
client, err := discovery.NewDiscoveryClient(&config.DiscoveryConfig.ClientConfig, dkvConfig, dkvLogger)
if err != nil {
return nil, err
}
Expand Down
31 changes: 15 additions & 16 deletions internal/discovery/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ This class contains the behaviour of propagating a nodes status updates to disco
import (
"context"
"github.com/flipkart-incubator/dkv/internal/opts"
utils "github.com/flipkart-incubator/dkv/internal"
"github.com/flipkart-incubator/dkv/pkg/ctl"
"time"

_ "github.com/Jille/grpc-multi-resolver"
Expand Down Expand Up @@ -40,12 +42,23 @@ const (
connectTimeout = 10 * time.Second
)

func NewDiscoveryClient(config *opts.DiscoveryClientConfig, logger *zap.Logger) (Client, error) {
conn, err := getDiscoveryClient(config.DiscoveryServiceAddr)
var DiscoveryClientConnectOpts = ctl.ConnectOpts{
ReadBufSize: readBufSize,
WriteBufSize: writeBufSize,
MaxMsgSize: maxMsgSize,
Timeout: timeout,
ConnectTimeout: connectTimeout,
}


func NewDiscoveryClient(config *opts.DiscoveryClientConfig, dkvConfig utils.DKVConfig, logger *zap.Logger) (Client, error) {
dkvConfig.SrvrAddr = config.DiscoveryServiceAddr
client, err := utils.NewDKVClient(dkvConfig, "", DiscoveryClientConnectOpts)
if err != nil {
logger.Error("Unable to create DKV client to connect to discovery server", zap.Error(err))
return nil, err
}
conn := client.CliConn

dkvCli := serverpb.NewDKVDiscoveryClient(conn)
storePropagator := &discoveryClient{regions: []serverpb.DKVDiscoveryNodeServer{},
Expand All @@ -54,20 +67,6 @@ func NewDiscoveryClient(config *opts.DiscoveryClientConfig, logger *zap.Logger)
return storePropagator, nil
}

func getDiscoveryClient(discoveryServiceAddr string) (*grpc.ClientConn, error) {
// TODO - check if authority is required
ctx, cancel := context.WithTimeout(context.Background(), connectTimeout)
defer cancel()
return grpc.DialContext(ctx, discoveryServiceAddr,
grpc.WithInsecure(),
grpc.WithBlock(),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)),
grpc.WithReadBufferSize(readBufSize),
grpc.WithWriteBufferSize(writeBufSize),
grpc.WithAuthority(""),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`))
}

func (m *discoveryClient) RegisterRegion(server serverpb.DKVDiscoveryNodeServer) {
m.regions = append(m.regions, server)
}
Expand Down
4 changes: 2 additions & 2 deletions internal/discovery/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"fmt"
"testing"
"time"

utils "github.com/flipkart-incubator/dkv/internal"
"github.com/flipkart-incubator/dkv/internal/master"
"github.com/flipkart-incubator/dkv/internal/opts"
"github.com/flipkart-incubator/dkv/internal/stats"
Expand Down Expand Up @@ -40,7 +40,7 @@ func TestDiscoveryClient(t *testing.T) {
PollClusterInfoInterval: pollClusterInfoInterval,
}

dClient, _ := NewDiscoveryClient(&discoveryClientConfig, zap.NewNop())
dClient, _ := NewDiscoveryClient(&discoveryClientConfig, utils.DKVConfig{} , zap.NewNop())
defer dClient.Close()

// stop the poller so as to avoid race with these poller
Expand Down
2 changes: 1 addition & 1 deletion internal/discovery/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestDKVDiscoveryService(t *testing.T) {
defer grpcSvc.GracefulStop()

svcAddr := fmt.Sprintf("%s:%d", dkvSvcHost, dkvSvcPort)
if dkvCli, err = ctl.NewInSecureDKVClient(svcAddr, "", ctl.DefaultConnectOpts); err != nil {
if dkvCli, err = ctl.NewDKVClient(svcAddr, "", ctl.DefaultConnectOpts, grpc.WithInsecure()); err != nil {
panic(err)
}
defer dkvCli.Close()
Expand Down
15 changes: 15 additions & 0 deletions internal/helper/tlsHelper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package helper

import "os"

var HomeDir = os.Getenv("HOME")

var TlsFileContent = "#!/bin/sh \n" +
"cd "+ HomeDir + "/cert/ \n"+
"openssl req -x509 -newkey rsa:4096 -days 365 -nodes -keyout ca-key.pem -out ca-cert.pem -subj \"/C=FR/ST=Occitanie/L=Toulouse/O=Tech School/OU=Education/CN=*.techschool.guru/[email protected]\" \n" +
"openssl x509 -in ca-cert.pem -noout -text \n" +
"openssl req -newkey rsa:4096 -nodes -keyout server-key.pem -out server-req.pem -subj \"/C=FR/ST=Ile de France/L=Paris/O=PC Book/OU=Computer/CN=*.pcbook.com/[email protected]\" \n" +
"openssl x509 -req -in server-req.pem -days 60 -CA ca-cert.pem -CAkey ca-key.pem -CAcreateserial -out server-cert.pem -extfile /Users/viren.gupta/cert/server-ext.cnf\n" +
"openssl x509 -in server-cert.pem -noout -text \n"

var ServerConf = "subjectAltName = DNS:docker.local, DNS:localhost, IP:127.0.0.1"
6 changes: 3 additions & 3 deletions internal/master/ds_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ func testNewDKVNodeJoiningAndLeaving(t *testing.T) {

// Create the client for the new DKV node
svcAddr := fmt.Sprintf("%s:%d", dkvSvcHost, dkvPorts[newNodeID])
if dkvCli, err := ctl.NewInSecureDKVClient(svcAddr, "", ctl.DefaultConnectOpts); err != nil {
if dkvCli, err := ctl.NewDKVClient(svcAddr, "", ctl.DefaultConnectOpts, grpc.WithInsecure()); err != nil {
t.Fatal(err)
} else {
defer dkvCli.Close()
Expand Down Expand Up @@ -453,7 +453,7 @@ func testNewDKVNodeJoiningAndLeaving(t *testing.T) {
func initDKVClients(ids ...int) {
for id := 1; id <= clusterSize; id++ {
svcAddr := fmt.Sprintf("%s:%d", dkvSvcHost, dkvPorts[id])
if client, err := ctl.NewInSecureDKVClient(svcAddr, "", ctl.DefaultConnectOpts); err != nil {
if client, err := ctl.NewDKVClient(svcAddr, "", ctl.DefaultConnectOpts, grpc.WithInsecure()); err != nil {
panic(err)
} else {
dkvClis[id] = client
Expand All @@ -464,7 +464,7 @@ func initDKVClients(ids ...int) {
func initStreamingDKVClients(ids ...int) {
for id := 1; id <= clusterSize; id++ {
svcAddr := fmt.Sprintf("%s:%d", dkvSvcHost, dkvStreamingPorts[id])
if client, err := ctl.NewInSecureDKVClient(svcAddr, "", ctl.DefaultConnectOpts); err != nil {
if client, err := ctl.NewDKVClient(svcAddr, "", ctl.DefaultConnectOpts, grpc.WithInsecure()); err != nil {
panic(err)
} else {
dkvStreamingClis[id] = client
Expand Down
4 changes: 2 additions & 2 deletions internal/master/ss_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestStandaloneService(t *testing.T) {
go serveStandaloneDKV()
sleepInSecs(3)
dkvSvcAddr := fmt.Sprintf("%s:%d", dkvSvcHost, dkvSvcPort)
if client, err := ctl.NewInSecureDKVClient(dkvSvcAddr, "", ctl.DefaultConnectOpts); err != nil {
if client, err := ctl.NewDKVClient(dkvSvcAddr, "", ctl.DefaultConnectOpts, grpc.WithInsecure()); err != nil {
t.Fatalf("Unable to connect to DKV service at %s. Error: %v", dkvSvcAddr, err)
} else {
dkvCli = client
Expand All @@ -82,7 +82,7 @@ func TestStreamingHealthCheck(t *testing.T) {
go serveStandaloneDKV()
sleepInSecs(3)
dkvSvcAddr := fmt.Sprintf("%s:%d", dkvSvcHost, dkvSvcPort)
if client, err := ctl.NewInSecureDKVClient(dkvSvcAddr, "", ctl.DefaultConnectOpts); err != nil {
if client, err := ctl.NewDKVClient(dkvSvcAddr, "", ctl.DefaultConnectOpts, grpc.WithInsecure()); err != nil {
t.Fatalf("Unable to connect to DKV service at %s. Error: %v", dkvSvcAddr, err)
} else {
dkvCli = client
Expand Down
Loading