Skip to content

Commit

Permalink
Report primitives (#540)
Browse files Browse the repository at this point in the history
* reports RU for volume, zdb and container

* added RU report for k8s

* fixed typo

* split logic to new package 'resource_units.go'

* WIP report used resources to BCDB

* review pair programming

* units test resource units parsing code

* address linting errors

* rename UpdateUsedResources to UpdateReservedResources

Co-authored-by: Christophe de Carvalho <[email protected]>
  • Loading branch information
maximevanhees and zaibon authored Feb 27, 2020
1 parent ee4ff00 commit b904f34
Show file tree
Hide file tree
Showing 10 changed files with 595 additions and 96 deletions.
2 changes: 1 addition & 1 deletion cmds/provisiond/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func main() {
provision.NewDecommissionSource(localStore),
)

engine := provision.New(source, localStore, remoteStore)
engine := provision.New(nodeID.Identity(), source, localStore, remoteStore)
server.Register(zbus.ObjectID{Name: module, Version: "0.0.1"}, pkg.ProvisionMonitor(engine))

log.Info().
Expand Down
114 changes: 65 additions & 49 deletions pkg/gedis/commands_provision.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import (

"github.com/threefoldtech/zos/pkg/schema"

types "github.com/threefoldtech/zos/pkg/gedis/types/provision"
dtypes "github.com/threefoldtech/zos/pkg/gedis/types/directory"
ptypes "github.com/threefoldtech/zos/pkg/gedis/types/provision"
"github.com/threefoldtech/zos/pkg/provision"

"github.com/threefoldtech/zos/pkg"
Expand All @@ -28,8 +29,8 @@ var provisionOrder = map[provision.ReservationType]int{

// Reserve provision.Reserver
func (g *Gedis) Reserve(r *provision.Reservation) (string, error) {
res := types.TfgridReservation1{
DataReservation: types.TfgridReservationData1{},
res := ptypes.TfgridReservation1{
DataReservation: ptypes.TfgridReservationData1{},
// CustomerTid: r.User, //TODO: wrong type.
}

Expand All @@ -40,23 +41,23 @@ func (g *Gedis) Reserve(r *provision.Reservation) (string, error) {

switch r.Type {
case provision.ContainerReservation:
res.DataReservation.Containers = []types.TfgridReservationContainer1{
res.DataReservation.Containers = []ptypes.TfgridReservationContainer1{
containerReservation(w, r.NodeID),
}
case provision.VolumeReservation:
res.DataReservation.Volumes = []types.TfgridReservationVolume1{
res.DataReservation.Volumes = []ptypes.TfgridReservationVolume1{
volumeReservation(w, r.NodeID),
}
case provision.ZDBReservation:
res.DataReservation.Zdbs = []types.TfgridReservationZdb1{
res.DataReservation.Zdbs = []ptypes.TfgridReservationZdb1{
zdbReservation(w, r.NodeID),
}
case provision.NetworkReservation:
res.DataReservation.Networks = []types.TfgridReservationNetwork1{
res.DataReservation.Networks = []ptypes.TfgridReservationNetwork1{
networkReservation(w),
}
case provision.KubernetesReservation:
res.DataReservation.Kubernetes = []types.TfgridWorkloadsReservationK8S1{
res.DataReservation.Kubernetes = []ptypes.TfgridWorkloadsReservationK8S1{
k8sReservation(w, r.NodeID),
}
}
Expand All @@ -69,7 +70,7 @@ func (g *Gedis) Reserve(r *provision.Reservation) (string, error) {
return "", err
}

res = types.TfgridReservation1{}
res = ptypes.TfgridReservation1{}
if err = json.Unmarshal(result, &res); err != nil {
return "", err
}
Expand All @@ -87,7 +88,7 @@ func (g *Gedis) Get(id string) (*provision.Reservation, error) {
return nil, err
}

var workload types.TfgridReservationWorkload1
var workload ptypes.TfgridReservationWorkload1

if err = json.Unmarshal(result, &workload); err != nil {
return nil, err
Expand All @@ -112,7 +113,7 @@ func (g *Gedis) Poll(nodeID pkg.Identifier, from uint64) ([]*provision.Reservati
}

var out struct {
Workloads []types.TfgridReservationWorkload1 `json:"workloads"`
Workloads []ptypes.TfgridReservationWorkload1 `json:"workloads"`
}

if err = json.Unmarshal(result, &out); err != nil {
Expand Down Expand Up @@ -140,27 +141,27 @@ func (g *Gedis) Poll(nodeID pkg.Identifier, from uint64) ([]*provision.Reservati
// Feedback implements provision.Feedbacker
func (g *Gedis) Feedback(id string, r *provision.Result) error {

var rType types.TfgridReservationResult1CategoryEnum
var rType ptypes.TfgridReservationResult1CategoryEnum
switch r.Type {
case provision.VolumeReservation:
rType = types.TfgridReservationResult1CategoryVolume
rType = ptypes.TfgridReservationResult1CategoryVolume
case provision.ContainerReservation:
rType = types.TfgridReservationResult1CategoryContainer
rType = ptypes.TfgridReservationResult1CategoryContainer
case provision.ZDBReservation:
rType = types.TfgridReservationResult1CategoryZdb
rType = ptypes.TfgridReservationResult1CategoryZdb
case provision.NetworkReservation:
rType = types.TfgridReservationResult1CategoryNetwork
rType = ptypes.TfgridReservationResult1CategoryNetwork
}

var rState types.TfgridReservationResult1StateEnum
var rState ptypes.TfgridReservationResult1StateEnum
switch r.State {
case "ok":
rState = types.TfgridReservationResult1StateOk
rState = ptypes.TfgridReservationResult1StateOk
case "error":
rState = types.TfgridReservationResult1StateError
rState = ptypes.TfgridReservationResult1StateError
}

result := types.TfgridReservationResult1{
result := ptypes.TfgridReservationResult1{
Category: rType,
WorkloadID: id,
DataJSON: string(r.Data),
Expand Down Expand Up @@ -191,7 +192,22 @@ func (g *Gedis) Delete(id string) error {
return err
}

func reservationFromSchema(w types.TfgridReservationWorkload1) (*provision.Reservation, error) {
// UpdateReservedResources send the amount of resource units reserved to BCDB
func (g *Gedis) UpdateReservedResources(nodeID string, c provision.Counters) error {
r := dtypes.TfgridNodeResourceAmount1{
Cru: c.CRU.Current(),
Mru: c.MRU.Current(),
Hru: c.HRU.Current(),
Sru: c.SRU.Current(),
}
_, err := g.Send("tfgrid.directory.nodes", "update_reserved_capacity", Args{
"node_id": nodeID,
"resources": r,
})
return err
}

func reservationFromSchema(w ptypes.TfgridReservationWorkload1) (*provision.Reservation, error) {
reservation := &provision.Reservation{
ID: w.WorkloadID,
User: w.User,
Expand All @@ -211,7 +227,7 @@ func reservationFromSchema(w types.TfgridReservationWorkload1) (*provision.Reser
// convert the workload description from jsx schema to zos types
switch reservation.Type {
case provision.ZDBReservation:
tmp := types.TfgridReservationZdb1{}
tmp := ptypes.TfgridReservationZdb1{}
if err := json.Unmarshal(reservation.Data, &tmp); err != nil {
return nil, err
}
Expand All @@ -222,7 +238,7 @@ func reservationFromSchema(w types.TfgridReservationWorkload1) (*provision.Reser
}

case provision.VolumeReservation:
tmp := types.TfgridReservationVolume1{}
tmp := ptypes.TfgridReservationVolume1{}
if err := json.Unmarshal(reservation.Data, &tmp); err != nil {
return nil, err
}
Expand All @@ -233,7 +249,7 @@ func reservationFromSchema(w types.TfgridReservationWorkload1) (*provision.Reser
}

case provision.NetworkReservation:
tmp := types.TfgridReservationNetwork1{}
tmp := ptypes.TfgridReservationNetwork1{}
if err := json.Unmarshal(reservation.Data, &tmp); err != nil {
return nil, err
}
Expand All @@ -244,7 +260,7 @@ func reservationFromSchema(w types.TfgridReservationWorkload1) (*provision.Reser
}

case provision.ContainerReservation:
tmp := types.TfgridReservationContainer1{}
tmp := ptypes.TfgridReservationContainer1{}
if err := json.Unmarshal(reservation.Data, &tmp); err != nil {
return nil, err
}
Expand All @@ -255,7 +271,7 @@ func reservationFromSchema(w types.TfgridReservationWorkload1) (*provision.Reser
}

case provision.KubernetesReservation:
tmp := types.TfgridWorkloadsReservationK8S1{}
tmp := ptypes.TfgridWorkloadsReservationK8S1{}
if err := json.Unmarshal(reservation.Data, &tmp); err != nil {
return nil, err
}
Expand Down Expand Up @@ -306,26 +322,26 @@ func workloadFromRaw(s json.RawMessage, t provision.ReservationType) (interface{
return nil, fmt.Errorf("unsupported reservation type %v", t)
}

func networkReservation(i interface{}) types.TfgridReservationNetwork1 {
func networkReservation(i interface{}) ptypes.TfgridReservationNetwork1 {
n := i.(pkg.Network)
network := types.TfgridReservationNetwork1{
network := ptypes.TfgridReservationNetwork1{
Name: n.Name,
Iprange: n.IPRange.ToSchema(),
WorkloadID: 1,
NetworkResources: make([]types.TfgridNetworkNetResource1, len(n.NetResources)),
NetworkResources: make([]ptypes.TfgridNetworkNetResource1, len(n.NetResources)),
}

for i, nr := range n.NetResources {
network.NetworkResources[i] = types.TfgridNetworkNetResource1{
network.NetworkResources[i] = ptypes.TfgridNetworkNetResource1{
NodeID: nr.NodeID,
IPRange: nr.Subnet.ToSchema(),
WireguardPrivateKeyEncrypted: nr.WGPrivateKey,
WireguardPublicKey: nr.WGPublicKey,
Peers: make([]types.WireguardPeer1, len(nr.Peers)),
Peers: make([]ptypes.WireguardPeer1, len(nr.Peers)),
}

for y, peer := range nr.Peers {
network.NetworkResources[i].Peers[y] = types.WireguardPeer1{
network.NetworkResources[i].Peers[y] = ptypes.WireguardPeer1{
Endpoint: peer.Endpoint,
PublicKey: peer.WGPublicKey,
AllowedIPs: make([]string, len(peer.AllowedIPs)),
Expand All @@ -339,17 +355,17 @@ func networkReservation(i interface{}) types.TfgridReservationNetwork1 {
return network
}

func containerReservation(i interface{}, nodeID string) types.TfgridReservationContainer1 {
func containerReservation(i interface{}, nodeID string) ptypes.TfgridReservationContainer1 {
c := i.(provision.Container)
container := types.TfgridReservationContainer1{
container := ptypes.TfgridReservationContainer1{
// NodeID: nodeID,
Flist: c.FList,
HubURL: c.FlistStorage,
Environment: c.Env,
Entrypoint: c.Entrypoint,
Interactive: c.Interactive,
Volumes: make([]types.TfgridReservationContainerMount1, len(c.Mounts)),
NetworkConnection: []types.TfgridReservationNetworkConnection1{
Volumes: make([]ptypes.TfgridReservationContainerMount1, len(c.Mounts)),
NetworkConnection: []ptypes.TfgridReservationNetworkConnection1{
{
NetworkID: string(c.Network.NetworkID),
Ipaddress: c.Network.IPs[0],
Expand All @@ -361,18 +377,18 @@ func containerReservation(i interface{}, nodeID string) types.TfgridReservationC
}

for i, v := range c.Mounts {
container.Volumes[i] = types.TfgridReservationContainerMount1{
container.Volumes[i] = ptypes.TfgridReservationContainerMount1{
VolumeID: v.VolumeID,
Mountpoint: v.Mountpoint,
}
}
return container
}

func volumeReservation(i interface{}, nodeID string) types.TfgridReservationVolume1 {
func volumeReservation(i interface{}, nodeID string) ptypes.TfgridReservationVolume1 {
v := i.(provision.Volume)

volume := types.TfgridReservationVolume1{
volume := ptypes.TfgridReservationVolume1{
// WorkloadID:
// NodeID:
// ReservationID:
Expand All @@ -381,18 +397,18 @@ func volumeReservation(i interface{}, nodeID string) types.TfgridReservationVolu
// FarmerTid:
}
if v.Type == provision.HDDDiskType {
volume.Type = types.TfgridReservationVolume1TypeHDD
volume.Type = ptypes.TfgridReservationVolume1TypeHDD
} else if v.Type == provision.SSDDiskType {
volume.Type = types.TfgridReservationVolume1TypeSSD
volume.Type = ptypes.TfgridReservationVolume1TypeSSD
}

return volume
}

func zdbReservation(i interface{}, nodeID string) types.TfgridReservationZdb1 {
func zdbReservation(i interface{}, nodeID string) ptypes.TfgridReservationZdb1 {
z := i.(provision.ZDB)

zdb := types.TfgridReservationZdb1{
zdb := ptypes.TfgridReservationZdb1{
// WorkloadID:
// NodeID:
// ReservationID:
Expand All @@ -403,24 +419,24 @@ func zdbReservation(i interface{}, nodeID string) types.TfgridReservationZdb1 {
// FarmerTid:
}
if z.DiskType == pkg.SSDDevice {
zdb.DiskType = types.TfgridReservationZdb1DiskTypeHdd
zdb.DiskType = ptypes.TfgridReservationZdb1DiskTypeHdd
} else if z.DiskType == pkg.HDDDevice {
zdb.DiskType = types.TfgridReservationZdb1DiskTypeSsd
zdb.DiskType = ptypes.TfgridReservationZdb1DiskTypeSsd
}

if z.Mode == pkg.ZDBModeUser {
zdb.Mode = types.TfgridReservationZdb1ModeUser
zdb.Mode = ptypes.TfgridReservationZdb1ModeUser
} else if z.Mode == pkg.ZDBModeSeq {
zdb.Mode = types.TfgridReservationZdb1ModeSeq
zdb.Mode = ptypes.TfgridReservationZdb1ModeSeq
}

return zdb
}

func k8sReservation(i interface{}, nodeID string) types.TfgridWorkloadsReservationK8S1 {
func k8sReservation(i interface{}, nodeID string) ptypes.TfgridWorkloadsReservationK8S1 {
k := i.(provision.Kubernetes)

k8s := types.TfgridWorkloadsReservationK8S1{
k8s := ptypes.TfgridWorkloadsReservationK8S1{
// WorkloadID int64
NodeID: nodeID,
Size: k.Size,
Expand Down
21 changes: 18 additions & 3 deletions pkg/provision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,19 @@ type ReservationCache interface {
Get(id string) (*Reservation, error)
Remove(id string) error
Exists(id string) (bool, error)
Counters() pkg.ProvisionCounters
Counters() Counters
}

// Feedbacker defines the method that needs to be implemented
// to send the provision result to BCDB
type Feedbacker interface {
Feedback(id string, r *Result) error
Deleted(id string) error
UpdateReservedResources(nodeID string, c Counters) error
}

type defaultEngine struct {
nodeID string
source ReservationSource
store ReservationCache
fb Feedbacker
Expand All @@ -42,8 +44,9 @@ type defaultEngine struct {
// the default implementation is a single threaded worker. so it process
// one reservation at a time. On error, the engine will log the error. and
// continue to next reservation.
func New(source ReservationSource, rw ReservationCache, fb Feedbacker) Engine {
func New(nodeID string, source ReservationSource, rw ReservationCache, fb Feedbacker) Engine {
return &defaultEngine{
nodeID: nodeID,
source: source,
store: rw,
fb: fb,
Expand Down Expand Up @@ -91,6 +94,9 @@ func (e *defaultEngine) Run(ctx context.Context) error {
continue
}
}
if err := e.fb.UpdateReservedResources(e.nodeID, e.store.Counters()); err != nil {
log.Error().Err(err).Msg("failed to updated the used resources")
}
}
}
}
Expand Down Expand Up @@ -217,9 +223,18 @@ func (e *defaultEngine) Counters(ctx context.Context) <-chan pkg.ProvisionCounte
case <-ctx.Done():
}

c := e.store.Counters()
pc := pkg.ProvisionCounters{
Container: int64(c.containers),
Network: int64(c.networks),
ZDB: int64(c.zdbs),
Volume: int64(c.volumes),
VM: int64(c.vms),
}

select {
case <-ctx.Done():
case ch <- e.store.Counters():
case ch <- pc:
}
}
}()
Expand Down
Loading

0 comments on commit b904f34

Please sign in to comment.