Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replication #30

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions distributed/cmd/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package main

import (
"bufio"
"distributed/pkg"
"distributed/pkg/client"
"distributed/pkg/networking"
"distributed/pkg/storage"
"distributed/pkg/server/storage"
"fmt"
"log"
"net"
Expand All @@ -29,7 +30,11 @@ func main() {
os.Exit(1)
}(sigChan)

conn, err := net.Dial(networking.Unix, networking.SocketPath)
socketPath, err := networking.SocketPath(pkg.RolePrimary)
if err != nil {
log.Fatalf("error getting primary socket path: %v", err)
}
conn, err := net.Dial(networking.Unix, socketPath)
if err != nil {
log.Fatalf("error establishing connection: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion distributed/cmd/client/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package main

import (
"distributed/pkg/client"
"distributed/pkg/storage"
"distributed/pkg/server/storage"
"errors"
"fmt"
"regexp"
Expand Down
92 changes: 77 additions & 15 deletions distributed/cmd/server/main.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,57 @@
package main

import (
"distributed/pkg"
"distributed/pkg/networking"
"distributed/pkg/server"
"distributed/pkg/storage"
storage2 "distributed/pkg/server/storage"
"flag"
"fmt"
"log"
"net"
"os"
"os/signal"
"syscall"
)

const defaultDbFile = "/tmp/distkv"
const defaultDbDirectory = "/tmp/distkv"

var dbFile string
var dbDirectory string
var isReplica bool

type roleBasedConfig struct {
dbFileName string
socketPath string
replicaSocketPaths []string
}

func main() {
flag.StringVar(&dbFile, "db", defaultDbFile, "path for database file")
flag.StringVar(
&dbDirectory,
"db",
defaultDbDirectory,
"directory for database files (primary and replicas)",
)
flag.BoolVar(
&isReplica,
"r",
false,
"specify whether server should run as replica (affects which socket it connects to)",
)
flag.Parse()

var sigChan = make(chan os.Signal)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
go cleanupSocketOnExit(sigChan, networking.SocketPath)
listener, err := net.Listen(networking.Unix, networking.SocketPath)
var role pkg.Role
if isReplica {
role = pkg.RoleReplica
} else {
role = pkg.RolePrimary
}
roleBasedCfg, err := getRoleBasedConfig(role)
if err != nil {
log.Fatalf("error getting config: %v", err)
}

listener, err := net.Listen(networking.Unix, roleBasedCfg.socketPath)
if err != nil {
log.Fatal("error opening unix socket:", err)
}
Expand All @@ -32,29 +60,63 @@ func main() {
log.Printf("error closing listener: %v", err)
}
}(listener)

log.Println("listening at address", listener.Addr())
if err != nil {
log.Fatal("error opening unix socket:", err)
}
db, err := storage.NewPersistentStorage(dbFile)
// cleanup based stuff now that we know which socket to remove
var sigChan = make(chan os.Signal)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
go cleanupSocketOnExit(sigChan, roleBasedCfg.socketPath)

db, err := storage2.NewPersistentStorage(dbDirectory, roleBasedCfg.dbFileName)
if err != nil {
log.Fatalf("error initializing storage: %v", err)
}
defer func(db storage.Storage) {
defer func(db storage2.Storage) {
err := db.Close()
if err != nil {
log.Printf("error closing storage: %v\n", err)
}
}(db)

s, err := server.New(listener, db)
s, err := server.New(listener, db, roleBasedCfg.replicaSocketPaths...)
if err != nil {
log.Fatalf("error initializing server: %v", err)
}
s.Run()
}

func getRoleBasedConfig(role pkg.Role) (*roleBasedConfig, error) {
var (
replicaSocketPaths []string
dbFileName string
socketPath string
err error
)
socketPath, err = networking.SocketPath(role)
if err != nil {
return nil, fmt.Errorf("error getting socket path: %w", err)
}
dbFileName, err = storage2.FileName(role)
if err != nil {
return nil, fmt.Errorf("error getting db file path: %w", err)
}

if isReplica {
replicaSocketPaths = make([]string, 0)
} else {
replicaSocketPath, err := networking.SocketPath(pkg.RoleReplica)
if err != nil {
return nil, fmt.Errorf("error getting socket path for replica: %w", err)
}
replicaSocketPaths = []string{replicaSocketPath}
}

return &roleBasedConfig{
dbFileName: dbFileName,
socketPath: socketPath,
replicaSocketPaths: replicaSocketPaths,
}, nil
}

func cleanupSocketOnExit(sigChan <-chan os.Signal, socketPath string) {
<-sigChan
log.Println("exiting")
Expand Down
62 changes: 37 additions & 25 deletions distributed/pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package client

import (
"distributed/pkg/networking"
"distributed/pkg/storage"
"distributed/pkg/server/storage"
"encoding/binary"
"errors"
"fmt"
Expand All @@ -20,6 +20,40 @@ func New(conn net.Conn) (*Client, error) {
}, nil
}

// SendRequest is exported so primary servers can relay requests to replicas without having to deal with
// re-serialization
func (c *Client) SendRequest(requestPtr *networking.ExecuteCommandRequest) (*networking.ExecuteCommandResponse, error) {
encoded, err := proto.Marshal(requestPtr)
if err != nil {
return nil, fmt.Errorf("SendRequest: error marshalling message: %w", err)
}
if err := binary.Write(c.conn, binary.LittleEndian, int32(len(encoded))); err != nil {
return nil, fmt.Errorf("SendRequest: error writing message length to connection: %w", err)
}
if _, err := c.conn.Write(encoded); err != nil {
return nil, fmt.Errorf("SendRequest: error writing message to connection: %w", err)
}

var (
responsePtr = new(networking.ExecuteCommandResponse)
messageLength int32
)
if err := binary.Read(c.conn, binary.LittleEndian, &messageLength); err != nil {
return nil, fmt.Errorf("SendRequest: error reading message length from connection: %w", err)
}
var responseBuf = make([]byte, messageLength)
if _, err := c.conn.Read(responseBuf); err != nil {
return nil, fmt.Errorf("SendRequest: error reading message from connection: %w", err)
}
if err := proto.Unmarshal(responseBuf, responsePtr); err != nil {
return nil, fmt.Errorf("SendRequest: error unmarshalling message: %w", err)
}
return responsePtr, nil

}

// ExecuteCommand is a wrapper for clients to send "domain objects" through the wire; more of a "facade" but will think
// of the best way to organize this later.
func (c *Client) ExecuteCommand(command Command) (storage.Value, error) {
var requestPtr *networking.ExecuteCommandRequest
switch command.Operation {
Expand All @@ -41,32 +75,10 @@ func (c *Client) ExecuteCommand(command Command) (storage.Value, error) {
default:
return "", fmt.Errorf("unrecognized operation %s", command.Operation)
}
encoded, err := proto.Marshal(requestPtr)
responsePtr, err := c.SendRequest(requestPtr)
if err != nil {
return "", fmt.Errorf("ExecuteCommand: error marshalling message: %w", err)
}
if err := binary.Write(c.conn, binary.LittleEndian, int32(len(encoded))); err != nil {
return "", fmt.Errorf("ExecuteCommand: error writing message length to connection: %w", err)
}
if _, err := c.conn.Write(encoded); err != nil {
return "", fmt.Errorf("ExecuteCommand: error writing message to connection: %w", err)
}

var (
responsePtr = new(networking.ExecuteCommandResponse)
messageLength int32
)
if err := binary.Read(c.conn, binary.LittleEndian, &messageLength); err != nil {
return "", fmt.Errorf("ExecuteCommand: error reading message length from connection: %w", err)
}
var responseBuf = make([]byte, messageLength)
if _, err := c.conn.Read(responseBuf); err != nil {
return "", fmt.Errorf("ExecuteCommand: error reading message from connection: %w", err)
return "", err
}
if err := proto.Unmarshal(responseBuf, responsePtr); err != nil {
return "", fmt.Errorf("ExecuteCommand: error unmarshalling message: %w", err)
}

switch result := responsePtr.Result.(type) {
case *networking.ExecuteCommandResponse_Value:
return storage.Value(result.Value), nil
Expand Down
4 changes: 3 additions & 1 deletion distributed/pkg/client/format.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package client

import "distributed/pkg/storage"
import (
"distributed/pkg/server/storage"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might want to make client key/values.

)

type Operation int

Expand Down
22 changes: 21 additions & 1 deletion distributed/pkg/networking/constants.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,25 @@
package networking

import (
"distributed/pkg"
"fmt"
)

const Unix = "unix"
const Port = 8907
const SocketPath = "/tmp/distkv.sock"

const (
primarySocketPath = "/tmp/distkv.sock"
replicaSocketPath = "/tmp/distkv-replica.sock"
)

func SocketPath(role pkg.Role) (string, error) {
switch role {
case pkg.RolePrimary:
return primarySocketPath, nil
case pkg.RoleReplica:
return replicaSocketPath, nil
default:
return "", fmt.Errorf("unrecognized Role %v", role)
}
}
10 changes: 10 additions & 0 deletions distributed/pkg/role.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package pkg

// Role is a first pass at making the main functions dumber, might be better located somewhere else, and might break
// down once there are many replicas etc.
type Role int

const (
RolePrimary = iota + 1
RoleReplica
)
47 changes: 41 additions & 6 deletions distributed/pkg/server/server.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package server

import (
"distributed/pkg/client"
"distributed/pkg/networking"
"distributed/pkg/storage"
"distributed/pkg/server/storage"
"encoding/binary"
"fmt"
"google.golang.org/protobuf/proto"
Expand All @@ -11,14 +12,42 @@ import (
)

type Server struct {
listener net.Listener
db storage.Storage
listener net.Listener
db storage.Storage
replicaClients []*client.Client
}

func New(listener net.Listener, db storage.Storage) (*Server, error) {
func New(listener net.Listener, db storage.Storage, replicaSocketPaths ...string) (*Server, error) {
var replicaClients = make([]*client.Client, len(replicaSocketPaths))
if len(replicaSocketPaths) > 0 {
for j, replicaSocketPath := range replicaSocketPaths {
replicaConn, err := net.Dial(networking.Unix, replicaSocketPath)
if err != nil {
return nil, fmt.Errorf(
"server.New: error connecting to replica %s: %w",
replicaSocketPath,
err,
)
}
log.Printf("connected to replica at %s", replicaConn.RemoteAddr())

replicaClient, err := client.New(replicaConn)
if err != nil {
return nil, fmt.Errorf(
"server.New: error building replica client for %s: %w",
replicaSocketPath,
err,
)

}
replicaClients[j] = replicaClient
}
}

return &Server{
listener: listener,
db: db,
listener: listener,
db: db,
replicaClients: replicaClients,
}, nil
}

Expand All @@ -34,6 +63,7 @@ func (s *Server) Run() {
}

func (s *Server) handleConnection(conn net.Conn) {
log.Printf("handling connection from %s", conn.RemoteAddr())
for {
var (
requestPtr = new(networking.ExecuteCommandRequest)
Expand Down Expand Up @@ -64,6 +94,11 @@ func (s *Server) handleConnection(conn net.Conn) {
value, err = s.db.Get(storage.Key(req.Get.Key))
case *networking.ExecuteCommandRequest_Put:
value, err = s.db.Put(storage.Key(req.Put.Key), storage.Value(req.Put.Value))
for _, replica := range s.replicaClients {
if _, err := replica.SendRequest(requestPtr); err != nil {
log.Printf("error sending request to replica: %v", err)
}
}
default:
err = fmt.Errorf("unrecognized operation: %v", req)
}
Expand Down
Loading