Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Secure communication #146

Draft
wants to merge 12 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 40 additions & 2 deletions cmd/dkvctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"fmt"
utils "github.com/flipkart-incubator/dkv/internal"
"os"
"sort"
"strings"
Expand Down Expand Up @@ -228,11 +229,21 @@ func (c *cmd) clusterInfo(client *ctl.DKVClient, args ...string) {
}
}

var dkvAddr, dkvAuthority string
var (
dkvAddr string
caCertPath string
dkvAuthority string
)

func init() {
<<<<<<< HEAD
flag.StringVarP(&dkvAddr, "dkvAddr", "a", "127.0.0.1:8080", "<host>:<port> - DKV server address")
=======
flag.StringVar(&dkvAddr, "dkvAddr", "127.0.0.1:8080", "<host>:<port> - DKV server address")
flag.StringVar(&caCertPath, "caCertPath", "", "Path for root certificate of the chain, i.e. CA certificate")
>>>>>>> 5cda07c6b9de1771e120145aad6d107a9eb3f108
flag.StringVar(&dkvAuthority, "authority", "", "Override :authority pseudo header for routing purposes. Useful while accessing DKV via service mesh.")

for _, c := range cmds {
if c.argDesc == "" {
flag.BoolVar(&c.emptyValue, c.name, c.emptyValue, c.cmdDesc)
Expand All @@ -247,6 +258,7 @@ func init() {
flag.CommandLine.SortFlags = false
}

<<<<<<< HEAD
func isFlagPassed(name string) bool {
found := false
flag.Visit(func(f *flag.Flag) {
Expand All @@ -255,6 +267,27 @@ func isFlagPassed(name string) bool {
}
})
return found
=======
func usage() {
fmt.Printf("Usage of %s:\n", os.Args[0])
printUsage([]string{"dkvAddr", "authority", "caCertPath"})
for _, cmd := range cmds {
cmd.usage()
}
}

func printUsage(flags []string) {
for _, flagName := range flags {
dkvFlag := flag.Lookup(flagName)
if dkvFlag != nil {
fmt.Printf(" -%s %s (default: %s)\n", dkvFlag.Name, dkvFlag.Usage, dkvFlag.DefValue)
}
}
}

func trimLower(str string) string {
return strings.ToLower(strings.TrimSpace(str))
>>>>>>> 5cda07c6b9de1771e120145aad6d107a9eb3f108
}

func main() {
Expand All @@ -269,7 +302,12 @@ func main() {
fmt.Printf(" (:authority = %s)", dkvAuthority)
}
fmt.Printf("...")
<<<<<<< HEAD
client, err := ctl.NewInSecureDKVClient(dkvAddr, dkvAuthority, ctl.DefaultConnectOpts)
=======
client, err := utils.NewDKVClient(utils.DKVConfig{
SrvrAddr: dkvAddr, CaCertPath: caCertPath}, dkvAuthority)
>>>>>>> 5cda07c6b9de1771e120145aad6d107a9eb3f108
if err != nil {
fmt.Printf("\nUnable to create DKV client. Error: %v\n", err)
return
Expand All @@ -293,4 +331,4 @@ func main() {
if !validCmd {
flag.Usage()
}
}
}
164 changes: 163 additions & 1 deletion cmd/dkvsrv/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,23 @@ import (
"io"
"io/ioutil"
"log"
<<<<<<< HEAD
"net"
"net/http"
"net/url"
=======
>>>>>>> 5cda07c6b9de1771e120145aad6d107a9eb3f108
"os"
"os/signal"
"path"
"strings"
"syscall"

<<<<<<< HEAD
"github.com/flipkart-incubator/dkv/internal/discovery"
=======
utils "github.com/flipkart-incubator/dkv/internal"
>>>>>>> 5cda07c6b9de1771e120145aad6d107a9eb3f108
"github.com/flipkart-incubator/dkv/internal/master"
"github.com/flipkart-incubator/dkv/internal/opts"
"github.com/flipkart-incubator/dkv/internal/slave"
Expand All @@ -25,6 +32,7 @@ import (
"github.com/flipkart-incubator/dkv/internal/storage/badger"
"github.com/flipkart-incubator/dkv/internal/storage/rocksdb"
"github.com/flipkart-incubator/dkv/internal/sync"
<<<<<<< HEAD
"github.com/flipkart-incubator/dkv/pkg/health"
"github.com/flipkart-incubator/dkv/pkg/serverpb"
nexus_api "github.com/flipkart-incubator/nexus/pkg/api"
Expand Down Expand Up @@ -65,6 +73,30 @@ var (

//nexus flags
nexusLogDirFlag, nexusSnapDirFlag *flag.Flag
=======
"github.com/flipkart-incubator/dkv/pkg/serverpb"
nexus_api "github.com/flipkart-incubator/nexus/pkg/api"
nexus "github.com/flipkart-incubator/nexus/pkg/raft"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

var (
disklessMode bool
dbEngine string
dbEngineIni string
dbFolder string
dbListenAddr string
peerListenAddr string
dbRole string
statsdAddr string
replMasterAddr string
replPollInterval time.Duration
certPath string
keyPath string
caCertPath string
blockCacheSize uint64
>>>>>>> 5cda07c6b9de1771e120145aad6d107a9eb3f108

// Logging vars
verboseLogging bool
Expand All @@ -82,7 +114,27 @@ var (
)

func init() {
<<<<<<< HEAD
initializeFlags()
=======
flag.BoolVar(&disklessMode, "dbDiskless", false, fmt.Sprintf("Enables diskless mode where data is stored entirely in memory.\nAvailable on Badger for standalone and slave roles. (default %v)", disklessMode))
flag.StringVar(&dbFolder, "dbFolder", "/tmp/dkvsrv", "DB folder path for storing data files")
flag.StringVar(&dbListenAddr, "dbListenAddr", "127.0.0.1:8080", "Address on which the DKV service binds")
flag.StringVar(&peerListenAddr, "peerListenAddr", "127.0.0.1:8083", "Address on which the DKV replication service binds")
flag.StringVar(&dbEngine, "dbEngine", "rocksdb", "Underlying DB engine for storing data - badger|rocksdb")
flag.StringVar(&dbEngineIni, "dbEngineIni", "", "An .ini file for configuring the underlying storage engine. Refer badger.ini or rocks.ini for more details.")
flag.StringVar(&dbRole, "dbRole", "none", "DB role of this node - none|master|slave")
flag.StringVar(&statsdAddr, "statsdAddr", "", "StatsD service address in host:port format")
flag.StringVar(&replMasterAddr, "replMasterAddr", "", "Service address of DKV master node for replication")
flag.DurationVar(&replPollInterval, "replPollInterval", 5*time.Second, "Interval used for polling changes from master. Eg., 10s, 5ms, 2h, etc.")
flag.StringVar(&dbAccessLog, "dbAccessLog", "", "File for logging DKV accesses eg., stdout, stderr, /tmp/access.log")
flag.StringVar(&certPath, "certPath", "", "Path for certificate file of this node")
flag.StringVar(&keyPath, "keyPath", "", "Path for key file of this node")
flag.StringVar(&caCertPath, "caCertPath", "", "Path for root certificate of the chain, i.e. CA certificate")
flag.BoolVar(&verboseLogging, "verbose", false, fmt.Sprintf("Enable verbose logging.\nBy default, only warnings and errors are logged. (default %v)", verboseLogging))
flag.Uint64Var(&blockCacheSize, "block-cache-size", defBlockCacheSize, "Amount of cache (in bytes) to set aside for data blocks. A value of 0 disables block caching altogether.")
setDKVDefaultsForNexusDirs()
>>>>>>> 5cda07c6b9de1771e120145aad6d107a9eb3f108
}

func initializeFlags() {
Expand All @@ -105,8 +157,19 @@ func main() {
setupStats()
go setupHttpServer()

var secure = caCertPath != "" && keyPath != "" && certPath != ""

var srvrMode utils.ConnectionMode
if secure {
srvrMode = utils.ServerTLS
} else {
srvrMode = utils.Insecure
}

kvs, cp, ca, br := newKVStore()
grpcSrvr, lstnr := newGrpcServerListener()
grpcSrvr, lstnr := utils.NewGrpcServerListener(utils.DKVConfig{ConnectionMode: srvrMode,
SrvrAddr: dbListenAddr, KeyPath: keyPath, CertPath: certPath,
CaCertPath: caCertPath}, accessLogger)
defer grpcSrvr.GracefulStop()
srvrRole := toDKVSrvrRole(config.DbRole)
//srvrRole.printFlags()
Expand Down Expand Up @@ -173,6 +236,7 @@ func main() {
}
defer dkvSvc.Close()
serverpb.RegisterDKVServer(grpcSrvr, dkvSvc)
<<<<<<< HEAD
serverpb.RegisterDKVReplicationServer(grpcSrvr, dkvSvc)
health.RegisterHealthServer(grpcSrvr, dkvSvc)

Expand All @@ -182,6 +246,35 @@ func main() {
if err != nil {
log.Panicf("Failed to start Discovery Service %v.", err)
}
=======
var replSrvrMode utils.ConnectionMode
if secure {
replSrvrMode = utils.MutualTLS
} else {
replSrvrMode = utils.Insecure
}
replGrpcSrvr, replLstnr := utils.NewGrpcServerListener(utils.DKVConfig{ConnectionMode: replSrvrMode,
SrvrAddr: peerListenAddr, KeyPath: keyPath, CertPath: certPath,
CaCertPath: caCertPath}, accessLogger)
defer replGrpcSrvr.GracefulStop()
serverpb.RegisterDKVReplicationServer(replGrpcSrvr, dkvSvc)
go replGrpcSrvr.Serve(replLstnr)
case slaveRole:
var replClientMode utils.ConnectionMode
if secure {
replClientMode = utils.MutualTLS
} else {
replClientMode = utils.Insecure
}

// TODO: Check if authority override option is needed for slaves
// while they connect with masters
if replCli, err := utils.NewDKVClient(utils.DKVConfig{ConnectionMode: replClientMode,
SrvrAddr: replMasterAddr, KeyPath: keyPath, CertPath: certPath,
CaCertPath: caCertPath}, "");
err != nil {
panic(err)
>>>>>>> 5cda07c6b9de1771e120145aad6d107a9eb3f108
} else {
// Currently nodes can be either discovery server or client. This will change when a node supports multiple regions
discoveryClient.RegisterRegion(dkvSvc)
Expand Down Expand Up @@ -209,6 +302,42 @@ func main() {
log.Printf("[WARN] Caught signal: %v. Shutting down...\n", sig)
}

<<<<<<< HEAD
=======
func validateFlags() {
if dbListenAddr != "" && strings.IndexRune(dbListenAddr, ':') < 0 {
log.Panicf("given listen address: %s is invalid, must be in host:port format", dbListenAddr)
}
if peerListenAddr != "" && strings.IndexRune(peerListenAddr, ':') < 0 {
log.Panicf("given listen address: %s is invalid, must be in host:port format", dbListenAddr)
}
if replMasterAddr != "" && strings.IndexRune(replMasterAddr, ':') < 0 {
log.Panicf("given master address: %s for replication is invalid, must be in host:port format", replMasterAddr)
}
if statsdAddr != "" && strings.IndexRune(statsdAddr, ':') < 0 {
log.Panicf("given StatsD address: %s is invalid, must be in host:port format", statsdAddr)
}

if disklessMode && strings.ToLower(dbEngine) == "rocksdb" {
log.Panicf("diskless is available only on Badger storage")
}

if strings.ToLower(dbRole) == slaveRole && replMasterAddr == "" {
log.Panicf("repl-master-addr must be given in slave mode")
}
nonNullAuthFlags := btou(certPath != "", keyPath != "", caCertPath != "")
if nonNullAuthFlags > 0 && nonNullAuthFlags < 3 {
log.Panicf("Missing TLS attributes, set all flags (caCertPath, keyPath, certPath) to run DKV in secure mode")
}

if dbEngineIni != "" {
if _, err := os.Stat(dbEngineIni); err != nil && os.IsNotExist(err) {
log.Panicf("given storage configuration file: %s does not exist", dbEngineIni)
}
}
}

>>>>>>> 5cda07c6b9de1771e120145aad6d107a9eb3f108
func setupAccessLogger() {
accessLogger = zap.NewNop()
if config.AccessLog != "" {
Expand Down Expand Up @@ -284,6 +413,7 @@ func setupDKVLogger() {
}
}

<<<<<<< HEAD
func newGrpcServerListener() (*grpc.Server, net.Listener) {
grpcSrvr := grpc.NewServer(
grpc.StreamInterceptor(grpc_zap.StreamServerInterceptor(accessLogger)),
Expand All @@ -302,6 +432,8 @@ func newListener() (lis net.Listener) {
return
}

=======
>>>>>>> 5cda07c6b9de1771e120145aad6d107a9eb3f108
func setupSignalHandler() <-chan os.Signal {
signals := []os.Signal{syscall.SIGINT, syscall.SIGQUIT, syscall.SIGSTOP, syscall.SIGTERM}
stopChan := make(chan os.Signal, len(signals))
Expand All @@ -323,7 +455,26 @@ func toDKVSrvrRole(role string) dkvSrvrRole {
return dkvSrvrRole(strings.TrimSpace(strings.ToLower(role)))
}

<<<<<<< HEAD
func setFlagsForNexusDirs() {
=======
func (role dkvSrvrRole) printFlags() {
log.Println("Launching DKV server with following flags:")
switch role {
case noRole:
printFlagsWithPrefix("db")
case masterRole:
if haveFlagsWithPrefix("nexus") {
printFlagsWithPrefix("db", "nexus", "peer")
} else {
printFlagsWithPrefix("db", "peer")
}
case slaveRole:
printFlagsWithPrefix("db", "repl")
}
printFlagsWithoutPrefix("db", "repl", "nexus")
}
>>>>>>> 5cda07c6b9de1771e120145aad6d107a9eb3f108

nexusLogDirFlag, nexusSnapDirFlag = flag.Lookup("nexus-log-dir"), flag.Lookup("nexus-snap-dir")
if nexusLogDirFlag.Value.String() == "" {
Expand Down Expand Up @@ -426,6 +577,7 @@ func newDKVReplicator(kvs storage.KVStore) nexus_api.RaftReplicator {
}
}

<<<<<<< HEAD
func registerDiscoveryServer(grpcSrvr *grpc.Server, dkvService master.DKVService) error {
discoveryService, err := discovery.NewDiscoveryService(dkvService, dkvLogger, &config.DiscoveryConfig.ServerConfig)
if err != nil {
Expand Down Expand Up @@ -585,4 +737,14 @@ func clusterMetricsHandler(w http.ResponseWriter, r *http.Request) {
}
}
}
=======
func btou(conditions... bool) int {
cnt := 0
for _, cond := range conditions {
if cond {
cnt += 1
}
}
return cnt
>>>>>>> 5cda07c6b9de1771e120145aad6d107a9eb3f108
}
13 changes: 13 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (
github.com/prometheus/client_model v0.2.0
github.com/prometheus/procfs v0.0.10 // indirect
github.com/smira/go-statsd v1.3.1
<<<<<<< HEAD
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.10.1
github.com/vmihailenco/msgpack/v5 v5.3.4
Expand All @@ -29,6 +30,18 @@ require (
google.golang.org/grpc v1.43.0
google.golang.org/protobuf v1.27.1
gopkg.in/ini.v1 v1.66.2
=======
go.uber.org/zap v1.14.1
golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect
golang.org/x/net v0.0.0-20201002202402-0a1ea396d57c // indirect
golang.org/x/sys v0.0.0-20201005065044-765f4ea38db3 // indirect
golang.org/x/text v0.3.2 // indirect
golang.org/x/tools v0.0.0-20210108195828-e2f9c7f1fc8e // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/grpc v1.28.0
google.golang.org/protobuf v1.26.0
gopkg.in/ini.v1 v1.62.0
>>>>>>> 5cda07c6b9de1771e120145aad6d107a9eb3f108
honnef.co/go/tools v0.0.1-2020.1.3 // indirect
)

Expand Down
Loading