Skip to content

Commit

Permalink
fix(networking): cleanup service/endpoint if needed
Browse files Browse the repository at this point in the history
    We meet a corner case that the service/endpoint would not be
    cleanup. That will cause the service keep the ClusterIP `None`.
    With this config, the endpoint of sharemanager would not correct.
    So the CSI driver cannot perform the mountpoint well.

    We would like to have a checking mechanism to know if the service/
    endpoint did not cleanup. Then we will cleanup the service/endpoint
    to ensure the correct endpoint.

    Remove the cleanup function in the setting controller, we could do
    the cleanup on the sm controller

Signed-off-by: Vicente Cheng <[email protected]>
Co-authored-by: Chin-Ya Huang <[email protected]>
Co-authored-by: Derek Su <[email protected]>
  • Loading branch information
3 people committed Aug 23, 2024
1 parent 555af1e commit abb2235
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 55 deletions.
40 changes: 0 additions & 40 deletions controller/setting_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,14 +359,6 @@ func (sc *SettingController) syncDangerZoneSettingsForManagedComponents(settingN
return &types.ErrorInvalidState{Reason: fmt.Sprintf("failed to apply %v setting to Longhorn components when there are attached volumes. It will be eventually applied", types.SettingNameStorageNetworkForRWXVolumeEnabled)}
}

// Perform cleanup of the share manager Service
// This is to allow the creation of the correct Service
// and Endpoint when switching between cluster network
// and storage network.
if err := sc.cleanupShareManagerServiceAndEndpoints(); err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -943,38 +935,6 @@ func (sc *SettingController) updateKubernetesClusterAutoscalerEnabled() error {
return nil
}

func (sc *SettingController) cleanupShareManagerServiceAndEndpoints() error {
var err error
defer func() {
if err != nil {
err = errors.Wrapf(err, "failed to cleanup share manager service and endpoints for %s setting update", types.SettingNameStorageNetworkForRWXVolumeEnabled)
}
}()

shareManagers, err := sc.ds.ListShareManagers()
if err != nil {
return err
}

for _, shareManager := range shareManagers {
log := sc.logger.WithField("shareManager", shareManager.Name)

log.WithField("service", shareManager.Name).Infof("Deleting Service for %v setting update", types.SettingNameStorageNetworkForRWXVolumeEnabled)
err := sc.ds.DeleteService(shareManager.Namespace, shareManager.Name)
if err != nil && !apierrors.IsNotFound(err) {
return err
}

log.WithField("endpoint", shareManager.Name).Infof("Deleting Endpoint for %v setting update", types.SettingNameStorageNetworkForRWXVolumeEnabled)
err = sc.ds.DeleteKubernetesEndpoint(shareManager.Namespace, shareManager.Name)
if err != nil && !apierrors.IsNotFound(err) {
return err
}
}

return nil
}

// updateCNI deletes all system-managed data plane components immediately with the updated CNI annotation.
func (sc *SettingController) updateCNI(funcPreupdate func() error) error {
storageNetwork, err := sc.ds.GetSettingWithAutoFillingRO(types.SettingNameStorageNetwork)
Expand Down
111 changes: 96 additions & 15 deletions controller/share_manager_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,17 +451,12 @@ func (c *ShareManagerController) syncShareManagerEndpoint(sm *longhorn.ShareMana
return nil
}

storageNetwork, err := c.ds.GetSettingWithAutoFillingRO(types.SettingNameStorageNetwork)
if err != nil {
return err
}

storageNetworkForRWXVolumeEnabled, err := c.ds.GetSettingAsBool(types.SettingNameStorageNetworkForRWXVolumeEnabled)
storageNetworkForRWXVolume, err := c.isStorageNetworkForRWXVolume()
if err != nil {
return err
}

if types.IsStorageNetworkForRWXVolume(storageNetwork, storageNetworkForRWXVolumeEnabled) {
if storageNetworkForRWXVolume {
serviceFqdn := fmt.Sprintf("%v.%v.svc.cluster.local", sm.Name, sm.Namespace)
sm.Status.Endpoint = fmt.Sprintf("nfs://%v/%v", serviceFqdn, sm.Name)
} else {
Expand Down Expand Up @@ -1050,6 +1045,92 @@ func (c *ShareManagerController) getShareManagerTolerationsFromStorageClass(sc *
return tolerations
}

func (c *ShareManagerController) isStorageNetworkForRWXVolume() (bool, error) {
storageNetwork, err := c.ds.GetSettingWithAutoFillingRO(types.SettingNameStorageNetwork)
if err != nil {
return false, errors.Wrapf(err, "failed to get setting value %v", types.SettingNameStorageNetwork)
}

storageNetworkForRWXVolumeEnabled, err := c.ds.GetSettingAsBool(types.SettingNameStorageNetworkForRWXVolumeEnabled)
if err != nil {
return false, errors.Wrapf(err, "failed to get setting value %v", types.SettingNameStorageNetworkForRWXVolumeEnabled)
}

return types.IsStorageNetworkForRWXVolume(storageNetwork, storageNetworkForRWXVolumeEnabled), nil
}

func (c *ShareManagerController) checkStorageNetworkApplied() (bool, error) {
targetSettings := []types.SettingName{types.SettingNameStorageNetwork, types.SettingNameStorageNetworkForRWXVolumeEnabled}
for _, item := range targetSettings {
if applied, err := c.ds.GetSettingApplied(item); err != nil || !applied {
return applied, err
}
}
return true, nil
}

func (c *ShareManagerController) canCleanupService(shareManagerName string) (bool, error) {
service, err := c.ds.GetService(c.namespace, shareManagerName)
if err != nil {
// if NotFound, means the service/endpoint is already cleaned up
// The service and endpoint are related with the kubernetes endpoint controller.
// It means once the service is deleted, the corresponding endpoint will be deleted automatically.
if apierrors.IsNotFound(err) {
return false, nil
}
return false, errors.Wrap(err, "failed to get service")
}

// check the settings status of storage network and storage network for RWX volume
settingsApplied, err := c.checkStorageNetworkApplied()
if err != nil {
return false, errors.Wrap(err, "failed to check if the storage network settings are applied")
}
if !settingsApplied {
c.logger.Warn("Storage network settings are not applied, do nothing")
return false, nil
}

storageNetworkForRWXVolume, err := c.isStorageNetworkForRWXVolume()
if err != nil {
return false, err
}

// no need to cleanup because looks the service file is correct
if storageNetworkForRWXVolume {
if service.Spec.ClusterIP == core.ClusterIPNone {
return false, nil
}
} else {
if service.Spec.ClusterIP != core.ClusterIPNone {
return false, nil
}
}
return true, nil
}

func (c *ShareManagerController) cleanupService(shareManager *longhorn.ShareManager) error {
if ok, err := c.canCleanupService(shareManager.Name); !ok || err != nil {
if err != nil {
return errors.Wrapf(err, "failed to check if we can cleanup service and endpoint for share manager %v", shareManager.Name)
}
return nil
}

// let's cleanup
c.logger.Infof("Deleting Service for share manager %v", shareManager.Name)
err := c.ds.DeleteService(c.namespace, shareManager.Name)
if err != nil && !apierrors.IsNotFound(err) {
return errors.Wrapf(err, "failed to delete service for share manager %v", shareManager.Name)
}

// we don't need to cleanup the endpoint because the kubernetes endpoints_controller
// will sync the service then clean up the corresponding endpoint.
// https://github.com/kubernetes/kubernetes/blob/v1.31.0/pkg/controller/endpoint/endpoints_controller.go#L374-L392

return nil
}

func (c *ShareManagerController) createServiceAndEndpoint(shareManager *longhorn.ShareManager) error {
// check if we need to create the service
_, err := c.ds.GetService(c.namespace, shareManager.Name)
Expand Down Expand Up @@ -1122,6 +1203,11 @@ func (c *ShareManagerController) createShareManagerPod(sm *longhorn.ShareManager
}
priorityClass := setting.Value

err = c.cleanupService(sm)
if err != nil {
return nil, errors.Wrapf(err, "failed to cleanup service for share manager %v", sm.Name)
}

err = c.createServiceAndEndpoint(sm)
if err != nil {
return nil, errors.Wrapf(err, "failed to create service and endpoint for share manager %v", sm.Name)
Expand Down Expand Up @@ -1276,17 +1362,12 @@ func (c *ShareManagerController) createServiceManifest(sm *longhorn.ShareManager

log := getLoggerForShareManager(c.logger, sm)

storageNetwork, err := c.ds.GetSettingWithAutoFillingRO(types.SettingNameStorageNetwork)
if err != nil {
log.WithError(err).Warnf("Failed to get %v setting, fallback to cluster IP", types.SettingNameStorageNetwork)
}

storageNetworkForRWXVolumeEnabled, err := c.ds.GetSettingAsBool(types.SettingNameStorageNetworkForRWXVolumeEnabled)
storageNetworkForRWXVolume, err := c.isStorageNetworkForRWXVolume()
if err != nil {
log.WithError(err).Warnf("Failed to get %v setting, fallback to cluster IP", types.SettingNameStorageNetworkForRWXVolumeEnabled)
log.WithError(err).Warnf("Failed to check storage network for RWX volume")
}

if types.IsStorageNetworkForRWXVolume(storageNetwork, storageNetworkForRWXVolumeEnabled) {
if storageNetworkForRWXVolume {
// Create a headless service do it doesn't use a cluster IP. This allows
// directly reaching the share manager pods using their individual
// IP address.
Expand Down
8 changes: 8 additions & 0 deletions datastore/longhorn.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,14 @@ func (s *DataStore) GetSettingExactRO(sName types.SettingName) (*longhorn.Settin
return resultRO, nil
}

func (s *DataStore) GetSettingApplied(sName types.SettingName) (bool, error) {
resultRO, err := s.getSettingRO(string(sName))
if err != nil {
return false, err
}
return resultRO.Status.Applied, nil
}

// GetSetting will automatically fill the non-existing setting if it's a valid
// setting name.
// The function will not return nil for *longhorn.Setting when error is nil
Expand Down

0 comments on commit abb2235

Please sign in to comment.