Skip to content

Commit

Permalink
naïve replication implementation
Browse files Browse the repository at this point in the history
Want to adjust how this is orchestrated but right now I see data
going into both files
  • Loading branch information
gmkohler committed May 8, 2024
1 parent 984fd76 commit d817b48
Show file tree
Hide file tree
Showing 12 changed files with 209 additions and 55 deletions.
9 changes: 7 additions & 2 deletions distributed/cmd/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package main

import (
"bufio"
"distributed/pkg"
"distributed/pkg/client"
"distributed/pkg/networking"
"distributed/pkg/storage"
"distributed/pkg/server/storage"
"fmt"
"log"
"net"
Expand All @@ -29,7 +30,11 @@ func main() {
os.Exit(1)
}(sigChan)

conn, err := net.Dial(networking.Unix, networking.SocketPath)
socketPath, err := networking.SocketPath(pkg.RolePrimary)
if err != nil {
log.Fatalf("error getting primary socket path: %v", err)
}
conn, err := net.Dial(networking.Unix, socketPath)
if err != nil {
log.Fatalf("error establishing connection: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion distributed/cmd/client/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package main

import (
"distributed/pkg/client"
"distributed/pkg/storage"
"distributed/pkg/server/storage"
"errors"
"fmt"
"regexp"
Expand Down
85 changes: 71 additions & 14 deletions distributed/cmd/server/main.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package main

import (
"distributed/pkg"
"distributed/pkg/networking"
"distributed/pkg/server"
"distributed/pkg/storage"
storage2 "distributed/pkg/server/storage"
"flag"
"fmt"
"log"
"net"
"os"
Expand All @@ -14,21 +16,42 @@ import (

const defaultDbDirectory = "/tmp/distkv"

var dbFile string
var dbDirectory string
var isReplica bool

type roleBasedConfig struct {
dbFileName string
socketPath string
replicaSocketPaths []string
}

func main() {
flag.StringVar(
&dbFile,
&dbDirectory,
"db",
defaultDbDirectory,
"directory for database files (primary and replicas)",
)
flag.BoolVar(
&isReplica,
"r",
false,
"specify whether server should run as replica (affects which socket it connects to)",
)
flag.Parse()

var sigChan = make(chan os.Signal)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
go cleanupSocketOnExit(sigChan, networking.SocketPath)
listener, err := net.Listen(networking.Unix, networking.SocketPath)
var role pkg.Role
if isReplica {
role = pkg.RoleReplica
} else {
role = pkg.RolePrimary
}
roleBasedCfg, err := getRoleBasedConfig(role)
if err != nil {
log.Fatalf("error getting config: %v", err)
}

listener, err := net.Listen(networking.Unix, roleBasedCfg.socketPath)
if err != nil {
log.Fatal("error opening unix socket:", err)
}
Expand All @@ -37,29 +60,63 @@ func main() {
log.Printf("error closing listener: %v", err)
}
}(listener)

log.Println("listening at address", listener.Addr())
if err != nil {
log.Fatal("error opening unix socket:", err)
}
db, err := storage.NewPersistentStorage(dbFile)
// cleanup based stuff now that we know which socket to remove
var sigChan = make(chan os.Signal)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
go cleanupSocketOnExit(sigChan, roleBasedCfg.socketPath)

db, err := storage2.NewPersistentStorage(dbDirectory, roleBasedCfg.dbFileName)
if err != nil {
log.Fatalf("error initializing storage: %v", err)
}
defer func(db storage.Storage) {
defer func(db storage2.Storage) {
err := db.Close()
if err != nil {
log.Printf("error closing storage: %v\n", err)
}
}(db)

s, err := server.New(listener, db)
s, err := server.New(listener, db, roleBasedCfg.replicaSocketPaths...)
if err != nil {
log.Fatalf("error initializing server: %v", err)
}
s.Run()
}

func getRoleBasedConfig(role pkg.Role) (*roleBasedConfig, error) {
var (
replicaSocketPaths []string
dbFileName string
socketPath string
err error
)
socketPath, err = networking.SocketPath(role)
if err != nil {
return nil, fmt.Errorf("error getting socket path: %w", err)
}
dbFileName, err = storage2.FileName(role)
if err != nil {
return nil, fmt.Errorf("error getting db file path: %w", err)
}

if isReplica {
replicaSocketPaths = make([]string, 0)
} else {
replicaSocketPath, err := networking.SocketPath(pkg.RoleReplica)
if err != nil {
return nil, fmt.Errorf("error getting socket path for replica: %w", err)
}
replicaSocketPaths = []string{replicaSocketPath}
}

return &roleBasedConfig{
dbFileName: dbFileName,
socketPath: socketPath,
replicaSocketPaths: replicaSocketPaths,
}, nil
}

func cleanupSocketOnExit(sigChan <-chan os.Signal, socketPath string) {
<-sigChan
log.Println("exiting")
Expand Down
62 changes: 37 additions & 25 deletions distributed/pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package client

import (
"distributed/pkg/networking"
"distributed/pkg/storage"
"distributed/pkg/server/storage"
"encoding/binary"
"errors"
"fmt"
Expand All @@ -20,6 +20,40 @@ func New(conn net.Conn) (*Client, error) {
}, nil
}

// SendRequest is exported so primary servers can relay requests to replicas without having to deal with
// re-serialization
func (c *Client) SendRequest(requestPtr *networking.ExecuteCommandRequest) (*networking.ExecuteCommandResponse, error) {
encoded, err := proto.Marshal(requestPtr)
if err != nil {
return nil, fmt.Errorf("SendRequest: error marshalling message: %w", err)
}
if err := binary.Write(c.conn, binary.LittleEndian, int32(len(encoded))); err != nil {
return nil, fmt.Errorf("SendRequest: error writing message length to connection: %w", err)
}
if _, err := c.conn.Write(encoded); err != nil {
return nil, fmt.Errorf("SendRequest: error writing message to connection: %w", err)
}

var (
responsePtr = new(networking.ExecuteCommandResponse)
messageLength int32
)
if err := binary.Read(c.conn, binary.LittleEndian, &messageLength); err != nil {
return nil, fmt.Errorf("SendRequest: error reading message length from connection: %w", err)
}
var responseBuf = make([]byte, messageLength)
if _, err := c.conn.Read(responseBuf); err != nil {
return nil, fmt.Errorf("SendRequest: error reading message from connection: %w", err)
}
if err := proto.Unmarshal(responseBuf, responsePtr); err != nil {
return nil, fmt.Errorf("SendRequest: error unmarshalling message: %w", err)
}
return responsePtr, nil

}

// ExecuteCommand is a wrapper for clients to send "domain objects" through the wire; more of a "facade" but will think
// of the best way to organize this later.
func (c *Client) ExecuteCommand(command Command) (storage.Value, error) {
var requestPtr *networking.ExecuteCommandRequest
switch command.Operation {
Expand All @@ -41,32 +75,10 @@ func (c *Client) ExecuteCommand(command Command) (storage.Value, error) {
default:
return "", fmt.Errorf("unrecognized operation %s", command.Operation)
}
encoded, err := proto.Marshal(requestPtr)
responsePtr, err := c.SendRequest(requestPtr)
if err != nil {
return "", fmt.Errorf("ExecuteCommand: error marshalling message: %w", err)
}
if err := binary.Write(c.conn, binary.LittleEndian, int32(len(encoded))); err != nil {
return "", fmt.Errorf("ExecuteCommand: error writing message length to connection: %w", err)
}
if _, err := c.conn.Write(encoded); err != nil {
return "", fmt.Errorf("ExecuteCommand: error writing message to connection: %w", err)
}

var (
responsePtr = new(networking.ExecuteCommandResponse)
messageLength int32
)
if err := binary.Read(c.conn, binary.LittleEndian, &messageLength); err != nil {
return "", fmt.Errorf("ExecuteCommand: error reading message length from connection: %w", err)
}
var responseBuf = make([]byte, messageLength)
if _, err := c.conn.Read(responseBuf); err != nil {
return "", fmt.Errorf("ExecuteCommand: error reading message from connection: %w", err)
return "", err
}
if err := proto.Unmarshal(responseBuf, responsePtr); err != nil {
return "", fmt.Errorf("ExecuteCommand: error unmarshalling message: %w", err)
}

switch result := responsePtr.Result.(type) {
case *networking.ExecuteCommandResponse_Value:
return storage.Value(result.Value), nil
Expand Down
4 changes: 3 additions & 1 deletion distributed/pkg/client/format.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package client

import "distributed/pkg/storage"
import (
"distributed/pkg/server/storage"
)

type Operation int

Expand Down
22 changes: 21 additions & 1 deletion distributed/pkg/networking/constants.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,25 @@
package networking

import (
"distributed/pkg"
"fmt"
)

const Unix = "unix"
const Port = 8907
const SocketPath = "/tmp/distkv.sock"

const (
primarySocketPath = "/tmp/distkv.sock"
replicaSocketPath = "/tmp/distkv-replica.sock"
)

func SocketPath(role pkg.Role) (string, error) {
switch role {
case pkg.RolePrimary:
return primarySocketPath, nil
case pkg.RoleReplica:
return replicaSocketPath, nil
default:
return "", fmt.Errorf("unrecognized Role %v", role)
}
}
10 changes: 10 additions & 0 deletions distributed/pkg/role.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package pkg

// Role is a first pass at making the main functions dumber, might be better located somewhere else, and might break
// down once there are many replicas etc.
type Role int

const (
RolePrimary = iota + 1
RoleReplica
)
47 changes: 41 additions & 6 deletions distributed/pkg/server/server.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package server

import (
"distributed/pkg/client"
"distributed/pkg/networking"
"distributed/pkg/storage"
"distributed/pkg/server/storage"
"encoding/binary"
"fmt"
"google.golang.org/protobuf/proto"
Expand All @@ -11,14 +12,42 @@ import (
)

type Server struct {
listener net.Listener
db storage.Storage
listener net.Listener
db storage.Storage
replicaClients []*client.Client
}

func New(listener net.Listener, db storage.Storage) (*Server, error) {
func New(listener net.Listener, db storage.Storage, replicaSocketPaths ...string) (*Server, error) {
var replicaClients = make([]*client.Client, len(replicaSocketPaths))
if len(replicaSocketPaths) > 0 {
for j, replicaSocketPath := range replicaSocketPaths {
replicaConn, err := net.Dial(networking.Unix, replicaSocketPath)
if err != nil {
return nil, fmt.Errorf(
"server.New: error connecting to replica %s: %w",
replicaSocketPath,
err,
)
}
log.Printf("connected to replica at %s", replicaConn.RemoteAddr())

replicaClient, err := client.New(replicaConn)
if err != nil {
return nil, fmt.Errorf(
"server.New: error building replica client for %s: %w",
replicaSocketPath,
err,
)

}
replicaClients[j] = replicaClient
}
}

return &Server{
listener: listener,
db: db,
listener: listener,
db: db,
replicaClients: replicaClients,
}, nil
}

Expand All @@ -34,6 +63,7 @@ func (s *Server) Run() {
}

func (s *Server) handleConnection(conn net.Conn) {
log.Printf("handling connection from %s", conn.RemoteAddr())
for {
var (
requestPtr = new(networking.ExecuteCommandRequest)
Expand Down Expand Up @@ -64,6 +94,11 @@ func (s *Server) handleConnection(conn net.Conn) {
value, err = s.db.Get(storage.Key(req.Get.Key))
case *networking.ExecuteCommandRequest_Put:
value, err = s.db.Put(storage.Key(req.Put.Key), storage.Value(req.Put.Value))
for _, replica := range s.replicaClients {
if _, err := replica.SendRequest(requestPtr); err != nil {
log.Printf("error sending request to replica: %v", err)
}
}
default:
err = fmt.Errorf("unrecognized operation: %v", req)
}
Expand Down
File renamed without changes.
File renamed without changes.
Loading

0 comments on commit d817b48

Please sign in to comment.