Skip to content

Commit

Permalink
feat(backup): delete backup in the backupstore asynchronously
Browse files Browse the repository at this point in the history
ref: longhorn/longhorn 8746

Signed-off-by: Jack Lin <[email protected]>
  • Loading branch information
ChanYiLin committed Oct 21, 2024
1 parent 56903ba commit 40de0f8
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 27 deletions.
180 changes: 159 additions & 21 deletions controller/backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"reflect"
"strconv"
"strings"
"sync"
"time"

Expand All @@ -15,6 +14,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/kubernetes/pkg/controller"

corev1 "k8s.io/api/core/v1"
Expand All @@ -35,21 +35,35 @@ import (
)

const (
MessageTypeReconcileInfo = "info"
MessageTypeReconcileInfo = "info"
MessageTypeReconcileError = "error"
)

const (
WaitForSnapshotMessage = "Waiting for the snapshot %v to be ready"
WaitForEngineMessage = "Waiting for the engine %v to be ready"
FailedToGetSnapshotMessage = "Failed to get the Snapshot %v"
WaitForSnapshotMessage = "Waiting for the snapshot %v to be ready"
WaitForEngineMessage = "Waiting for the engine %v to be ready"
WaitForBackupDeletionIsCompleteMessage = "Wait for backup %v to be deleted"
FailedToGetSnapshotMessage = "Failed to get the Snapshot %v"
FailedToDeleteBackupMessage = "Failed to delete the backup %v in the backupstore, err %v"
NoDeletionInProgressRecordMessage = "No deletion in progress record, retry the deletion command"
)

const (
DeletionMinInterval = time.Minute * 1
DeletionMaxInterval = time.Hour * 24
)

type DeletingStatus struct {
State longhorn.BackupState
ErrorMessage string
}

type BackupController struct {
*baseController

// which namespace controller is running with
// Which namespace controller is running with
namespace string
// use as the OwnerID of the controller
// Use as the OwnerID of the controller
controllerID string

kubeClient clientset.Interface
Expand All @@ -63,6 +77,13 @@ type BackupController struct {
cacheSyncs []cache.InformerSynced

proxyConnCounter util.Counter

// Use to track the result of the deletion command.
// Also used to track if controller crashes after the deletion command is triggered.
deletingMapLock sync.Mutex
inProgressDeletingMap map[string]*DeletingStatus

deletingBackoff *flowcontrol.Backoff
}

func NewBackupController(
Expand Down Expand Up @@ -96,6 +117,11 @@ func NewBackupController(
eventRecorder: eventBroadcaster.NewRecorder(scheme, corev1.EventSource{Component: "longhorn-backup-controller"}),

proxyConnCounter: proxyConnCounter,

deletingMapLock: sync.Mutex{},
inProgressDeletingMap: map[string]*DeletingStatus{},

deletingBackoff: flowcontrol.NewBackOff(DeletionMinInterval, DeletionMaxInterval),
}

var err error
Expand Down Expand Up @@ -269,6 +295,10 @@ func (bc *BackupController) reconcile(backupName string) (err error) {
return errors.Wrap(err, "failed to get backup volume name")
}

if backup.Status.Messages == nil {
backup.Status.Messages = map[string]string{}
}

// Examine DeletionTimestamp to determine if object is under deletion
if !backup.DeletionTimestamp.IsZero() {
if err := bc.handleAttachmentTicketDeletion(backup, backupVolumeName); err != nil {
Expand All @@ -291,16 +321,12 @@ func (bc *BackupController) reconcile(backupName string) (err error) {
log.WithError(err).Warn("Failed to init backup target clients")
return nil // Ignore error to prevent enqueue
}
backupURL := backupstore.EncodeBackupURL(backup.Name, backupVolumeName, backupTargetClient.URL)

if unused, err := bc.isBackupNotBeingUsedForVolumeRestore(backup.Name, backupVolumeName); !unused {
log.WithError(err).Warn("Failed to delete remote backup")
backupDeleted := bc.handleBackupDeletionInBackupStore(backup, backupVolumeName, backupURL, backupTargetClient)
if !backupDeleted {
return nil
}

backupURL := backupstore.EncodeBackupURL(backup.Name, backupVolumeName, backupTargetClient.URL)
if err := backupTargetClient.BackupDelete(backupURL, backupTargetClient.Credential); err != nil {
return errors.Wrap(err, "failed to delete remote backup")
}
}

// Request backup_volume_controller to reconcile BackupVolume immediately if it's the last backup
Expand Down Expand Up @@ -400,10 +426,6 @@ func (bc *BackupController) reconcile(backupName string) (err error) {
return err
}

if backup.Status.Messages == nil {
backup.Status.Messages = map[string]string{}
}

monitor, err := bc.checkMonitor(backup, volume, backupTarget)
if err != nil {
if backup.Status.State == longhorn.BackupStateError {
Expand Down Expand Up @@ -448,8 +470,8 @@ func (bc *BackupController) reconcile(backupName string) (err error) {

backupURL := backupstore.EncodeBackupURL(backup.Name, backupVolumeName, backupTargetClient.URL)
backupInfo, err := backupTargetClient.BackupGet(backupURL, backupTargetClient.Credential)
if err != nil {
if !strings.Contains(err.Error(), "in progress") {
if err != nil && !types.ErrorIsNotFound(err) {
if !types.ErrorIsInProgress(err) {
log.WithError(err).Error("Error inspecting backup config")
}
return nil // Ignore error to prevent enqueue
Expand Down Expand Up @@ -759,6 +781,18 @@ func (bc *BackupController) checkMonitor(backup *longhorn.Backup, volume *longho
}
}

clusterBackups, err := bc.ds.ListBackupsWithBackupVolumeName(volume.Name)
if err != nil {
return nil, errors.Wrapf(err, "failed to list backups in the cluster")
}
for _, b := range clusterBackups {
if b.Status.State == longhorn.BackupStateDeleting {
backup.Status.State = longhorn.BackupStatePending
backup.Status.Messages[MessageTypeReconcileInfo] = fmt.Sprintf(WaitForBackupDeletionIsCompleteMessage, b.Name)
return nil, fmt.Errorf("waiting for the backup %v to be deleted before enabling backup monitor", b.Name)
}
}

// Enable the backup monitor
monitor, err := bc.enableBackupMonitor(backup, volume, backupTargetClient, biChecksum,
volume.Spec.BackupCompressionMethod, int(concurrentLimit), storageClassName, engineClientProxy)
Expand Down Expand Up @@ -908,5 +942,109 @@ func (bc *BackupController) syncBackupStatusWithSnapshotCreationTimeAndVolumeSiz
func (bc *BackupController) backupInFinalState(backup *longhorn.Backup) bool {
return backup.Status.State == longhorn.BackupStateCompleted ||
backup.Status.State == longhorn.BackupStateError ||
backup.Status.State == longhorn.BackupStateUnknown
backup.Status.State == longhorn.BackupStateUnknown ||
backup.Status.State == longhorn.BackupStateDeleting
}

func (bc *BackupController) startDeletingBackupInBackupStore(backupURL string, backupTargetClient *engineapi.BackupTargetClient) {
bc.deletingMapLock.Lock()
bc.inProgressDeletingMap[backupURL] = &DeletingStatus{
State: longhorn.BackupStateDeleting,
ErrorMessage: "",
}
bc.deletingMapLock.Unlock()

// The backup deletion will be executed asynchronously.
// After triggering the backup deletion, update the state to deleting.
// Then keep checking if the backup is deleted in the backupstore before removing the finalizer.
go func() {
defer func() {
if r := recover(); r != nil {
bc.setInprogressDeletionMap(backupURL, longhorn.BackupStateError, fmt.Sprintf("Recovered from panic: %v", r))
}
}()
err := backupTargetClient.BackupDelete(backupURL, backupTargetClient.Credential)
// If the deletion command fails, update the in-memory map to inform the controller.
if err != nil {
bc.setInprogressDeletionMap(backupURL, longhorn.BackupStateError, fmt.Sprintf(FailedToDeleteBackupMessage, backupURL, err.Error()))
}
}()
}

func (bc *BackupController) handleBackupDeletionInBackupStore(backup *longhorn.Backup, backupVolumeName string, backupURL string, backupTargetClient *engineapi.BackupTargetClient) bool {
log := getLoggerForBackup(bc.logger, backup)
existingBackup := backup.DeepCopy()

if backup.Status.State != longhorn.BackupStateDeleting {
if bc.deletingBackoff.IsInBackOffSinceUpdate(backup.Name, time.Now()) {
return false
}

if unused, err := bc.isBackupNotBeingUsedForVolumeRestore(backup.Name, backupVolumeName); !unused {
log.WithError(err).Warn("Failed to delete backup in backupstore")
return false
}
bc.startDeletingBackupInBackupStore(backupURL, backupTargetClient)

// After triggering the asynchronous deletion,
// update the status to Deleting and requeue the backup to check the backupstore in the following reconciliations.
// Controller won't execute this code block if the status is deleting.
backup.Status.State = longhorn.BackupStateDeleting
backup.Status.Messages = map[string]string{}
if _, err := bc.ds.UpdateBackupStatus(backup); err != nil {
log.WithError(err).Debugf("Backup %v update status error", backup.Name)
}
bc.deletingBackoff.Next(backup.Name, time.Now())
bc.enqueueBackup(backup)
return false
}

// For the in progress backup, the inspect command returns nil backupInfo with in progress error.
// We should consider the backup exists even the backupInfo is nil when it is in progress.
backupInfo, err := backupTargetClient.BackupGet(backupURL, backupTargetClient.Credential)
if err != nil && !types.ErrorIsNotFound(err) && !types.ErrorIsInProgress(err) {
log.WithError(err).Debugf("Failed to check backup %v in the backupstore", backup.Name)
bc.enqueueBackup(backup)
return false
}
if backupInfo != nil || types.ErrorIsInProgress(err) {
bc.deletingMapLock.Lock()
defer bc.deletingMapLock.Unlock()
// Controller could have crashed if there is no record in the map and the backup still exists in the backupstore.
// We update the status to Error so the controller can retry the deletion command again.
if bc.inProgressDeletingMap[backupURL] == nil {
backup.Status.State = longhorn.BackupStateError
backup.Status.Messages[MessageTypeReconcileError] = NoDeletionInProgressRecordMessage
} else {
// If the state is Deleting, we early return to check the file in the backupstore in the next reconciliation
// If the state is Error, we update the state and message to inform users.
// With the state being Error, other pending backups can start without considering the deletion lock in the backupstore.
// Controller can also retry the deletion command after the status is Error.
if bc.inProgressDeletingMap[backupURL].State == longhorn.BackupStateDeleting {
return false
}
log.Warnf("Failed to delete backup in the backupstore, %v", bc.inProgressDeletingMap[backupURL].ErrorMessage)
backup.Status.State = longhorn.BackupStateError
backup.Status.Messages[MessageTypeReconcileError] = fmt.Sprintf(FailedToDeleteBackupMessage, backupURL, bc.inProgressDeletingMap[backupURL].ErrorMessage)
}
if reflect.DeepEqual(existingBackup.Status, backup.Status) {
return false
}
if _, err := bc.ds.UpdateBackupStatus(backup); err != nil {
log.WithError(err).Errorf("Backup %v update status error", backup.Name)
}
return false
}

// Clean up the deleting backoff
bc.deletingBackoff.DeleteEntry(backup.Name)
return true
}

func (bc *BackupController) setInprogressDeletionMap(backupURL string, state longhorn.BackupState, errMsg string) {
bc.deletingMapLock.Lock()
defer bc.deletingMapLock.Unlock()

bc.inProgressDeletingMap[backupURL].State = state
bc.inProgressDeletingMap[backupURL].ErrorMessage = errMsg
}
2 changes: 1 addition & 1 deletion controller/backup_volume_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func (bvc *BackupVolumeController) reconcile(backupVolumeName string) (err error
backupLabelMap := map[string]string{}

backupURL := backupstore.EncodeBackupURL(backupName, backupVolumeName, backupTargetClient.URL)
if backupInfo, err := backupTargetClient.BackupGet(backupURL, backupTargetClient.Credential); err != nil {
if backupInfo, err := backupTargetClient.BackupGet(backupURL, backupTargetClient.Credential); err != nil && !types.ErrorIsNotFound(err) {
log.WithError(err).WithFields(logrus.Fields{
"backup": backupName,
"backupvolume": backupVolumeName,
Expand Down
4 changes: 1 addition & 3 deletions engineapi/backups.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,6 @@ func parseBackupConfig(output string) (*Backup, error) {
func (btc *BackupTargetClient) BackupGet(backupConfigURL string, credential map[string]string) (*Backup, error) {
output, err := btc.ExecuteEngineBinary("backup", "inspect", backupConfigURL)
if err != nil {
if types.ErrorIsNotFound(err) {
return nil, nil
}
return nil, errors.Wrapf(err, "error getting backup config %s", backupConfigURL)
}
return parseBackupConfig(output)
Expand Down Expand Up @@ -302,6 +299,7 @@ func (btc *BackupTargetClient) BackupConfigMetaGet(url string, credential map[st

// BackupDelete deletes the backup from the remote backup target
func (btc *BackupTargetClient) BackupDelete(backupURL string, credential map[string]string) error {
logrus.Infof("Start deleting backup %s", backupURL)
_, err := btc.ExecuteEngineBinaryWithoutTimeout("backup", "rm", backupURL)
if err != nil {
if types.ErrorIsNotFound(err) {
Expand Down
7 changes: 5 additions & 2 deletions k8s/pkg/apis/longhorn/v1beta2/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@ import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
type BackupState string

const (
// non-final state
// Non-final state
BackupStateNew = BackupState("")
BackupStatePending = BackupState("Pending")
BackupStateInProgress = BackupState("InProgress")
// final state
// Final state
BackupStateCompleted = BackupState("Completed")
BackupStateError = BackupState("Error")
BackupStateUnknown = BackupState("Unknown")
// Deleting is also considered as final state
// as it only happens when the backup is being deleting and has deletion timestamp
BackupStateDeleting = BackupState("Deleting")
)

type BackupCompressionMethod string
Expand Down
4 changes: 4 additions & 0 deletions types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,10 @@ func ErrorIsNotFound(err error) bool {
return strings.Contains(err.Error(), "cannot find")
}

func ErrorIsInProgress(err error) bool {
return strings.Contains(err.Error(), "in progress")
}

func ErrorIsStopped(err error) bool {
return strings.Contains(err.Error(), "is stopped")
}
Expand Down

0 comments on commit 40de0f8

Please sign in to comment.