Skip to content

Commit

Permalink
DAOS-15549 control: Pause group update if provider is changed (#14182)
Browse files Browse the repository at this point in the history
If the system fabric provider is updated, it is possible that group
updates could be sent with stale information (a mix of old and new
URIs). Engines may attempt to begin communicating with peers whose URIs
have not been updated to the new provider. We need to prevent this.

This patch temporarily pauses group updates when it detects a fabric
provider change in order to allow all known ranks to rejoin. Afterward
it triggers the group update to the engines.

Signed-off-by: Kris Jacque <[email protected]>
  • Loading branch information
kjacque authored Apr 30, 2024
1 parent c1225e4 commit 2850e97
Show file tree
Hide file tree
Showing 4 changed files with 447 additions and 18 deletions.
67 changes: 67 additions & 0 deletions src/control/server/mgmt_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"path/filepath"
"reflect"
"runtime"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -41,6 +42,7 @@ import (
)

const fabricProviderProp = "fabric_providers"
const groupUpdatePauseProp = "group_update_paused"

// GetAttachInfo handles a request to retrieve a map of ranks to fabric URIs, in addition
// to client network autoconfiguration hints.
Expand Down Expand Up @@ -215,6 +217,13 @@ func (svc *mgmtSvc) join(ctx context.Context, req *mgmtpb.JoinReq, peerAddr *net
MapVersion: joinResponse.MapVersion,
}

if svc.isGroupUpdatePaused() && svc.allRanksJoined() {
if err := svc.resumeGroupUpdate(); err != nil {
svc.log.Errorf("failed to resume group update: %s", err.Error())
}
// join loop will trigger a new group update after this
}

// If the rank is local to the MS leader, then we need to wire up at least
// one in order to perform a CaRT group update.
if common.IsLocalAddr(peerAddr) && req.Idx == 0 {
Expand All @@ -234,6 +243,29 @@ func (svc *mgmtSvc) join(ctx context.Context, req *mgmtpb.JoinReq, peerAddr *net
return resp, nil
}

// allRanksJoined checks whether all ranks that the system knows about, and that are not admin
// excluded, are joined.
//
// NB: This checks the state to determine if the rank is joined. There is a potential hole here,
// in a case where the system was killed with ranks in the joined state, rather than stopping the
// ranks first. In that case we may fire this off too early.
func (svc *mgmtSvc) allRanksJoined() bool {
var total int
var joined int
var err error
if total, err = svc.sysdb.MemberCount(); err != nil {
svc.log.Errorf("failed to get total member count: %s", err)
return false
}

if joined, err = svc.sysdb.MemberCount(system.MemberStateJoined, system.MemberStateAdminExcluded); err != nil {
svc.log.Errorf("failed to get joined member count: %s", err)
return false
}

return total == joined
}

func (svc *mgmtSvc) checkReqFabricProvider(req *mgmtpb.JoinReq, peerAddr *net.TCPAddr, publisher events.Publisher) error {
joinProv, err := getProviderFromURI(req.Uri)
if err != nil {
Expand Down Expand Up @@ -272,6 +304,27 @@ func (svc *mgmtSvc) setFabricProviders(val string) error {
return system.SetMgmtProperty(svc.sysdb, fabricProviderProp, val)
}

func (svc *mgmtSvc) isGroupUpdatePaused() bool {
propStr, err := system.GetMgmtProperty(svc.sysdb, groupUpdatePauseProp)
if err != nil {
return false
}
result, err := strconv.ParseBool(propStr)
if err != nil {
svc.log.Errorf("invalid value for mgmt prop %q: %s", groupUpdatePauseProp, err.Error())
return false
}
return result
}

func (svc *mgmtSvc) pauseGroupUpdate() error {
return system.SetMgmtProperty(svc.sysdb, groupUpdatePauseProp, "true")
}

func (svc *mgmtSvc) resumeGroupUpdate() error {
return system.SetMgmtProperty(svc.sysdb, groupUpdatePauseProp, "false")
}

func (svc *mgmtSvc) updateFabricProviders(provList []string, publisher events.Publisher) error {
provStr := strings.Join(provList, ",")

Expand All @@ -298,7 +351,16 @@ func (svc *mgmtSvc) updateFabricProviders(provList []string, publisher events.Pu
curProv, provStr, numJoined)
}

if err := svc.pauseGroupUpdate(); err != nil {
return errors.Wrapf(err, "unable to pause group update before provider change")
}

if err := svc.setFabricProviders(provStr); err != nil {
if guErr := svc.resumeGroupUpdate(); guErr != nil {
// something is very wrong if this happens
svc.log.Errorf("unable to resume group update after provider change failed: %s", guErr.Error())
}

return errors.Wrapf(err, "changing fabric provider prop")
}
publisher.Publish(newFabricProvChangedEvent(curProv, provStr))
Expand Down Expand Up @@ -326,6 +388,11 @@ func (svc *mgmtSvc) reqGroupUpdate(ctx context.Context, sync bool) {
// NB: This method must not be called concurrently, as out-of-order
// group updates may trigger engine assertions.
func (svc *mgmtSvc) doGroupUpdate(ctx context.Context, forced bool) error {
if svc.isGroupUpdatePaused() {
svc.log.Debugf("group update requested (force: %v), but temporarily paused", forced)
return nil
}

if forced {
if err := svc.sysdb.IncMapVer(); err != nil {
return err
Expand Down
Loading

0 comments on commit 2850e97

Please sign in to comment.