diff --git a/agreementbot/agreementbot.go b/agreementbot/agreementbot.go index 92bde07d9..bb0cc9746 100644 --- a/agreementbot/agreementbot.go +++ b/agreementbot/agreementbot.go @@ -83,7 +83,7 @@ func NewAgreementBotWorker(name string, cfg *config.HorizonConfig, db persistenc newMessagesToProcess: false, nodeSearch: NewNodeSearch(), secretProvider: s, - secretUpdateManager: NewSecretUpdateManager(), + secretUpdateManager: NewSecretUpdateManager(cfg.AgreementBot.SecretsUpdateCheckInterval, cfg.AgreementBot.SecretsUpdateCheckInterval, cfg.AgreementBot.SecretsUpdateCheckMaxInterval, cfg.AgreementBot.SecretsUpdateCheckIncrement), } patternManager = NewPatternManager() @@ -1576,7 +1576,7 @@ func (w *AgreementBotWorker) secretsProviderMaintenance() int { // This function is called by the secrets update sub worker to learn about secrets that have been updated. func (w *AgreementBotWorker) secretsUpdate() int { - + nextRunWait := w.secretUpdateManager.PollInterval secretUpdates, err := w.secretUpdateManager.CheckForUpdates(w.secretProvider, w.db) if err != nil { glog.Errorf(AWlogString(err)) @@ -1585,10 +1585,13 @@ func (w *AgreementBotWorker) secretsUpdate() int { // Send out an event with the changed secrets and affected policies in it. if secretUpdates != nil && secretUpdates.Length() != 0 { w.Messages() <- events.NewSecretUpdatesMessage(events.UPDATED_SECRETS, secretUpdates) + nextRunWait = w.secretUpdateManager.AdjustSecretsPollingInterval(secretUpdates.Length()) + } else { + nextRunWait = w.secretUpdateManager.AdjustSecretsPollingInterval(0) } } - return 0 + return nextRunWait } func (w *AgreementBotWorker) monitorHAGroupNMPUpdates() int { diff --git a/agreementbot/secret_updater.go b/agreementbot/secret_updater.go index 086f5f694..e2c4e62fa 100644 --- a/agreementbot/secret_updater.go +++ b/agreementbot/secret_updater.go @@ -17,12 +17,57 @@ import ( // The main component which holds secret updates for the governance functions. type SecretUpdateManager struct { - PendingUpdates []*events.SecretUpdates // Secret update events that need to be processed - PULock sync.Mutex // The lock that protects the list of pending secret updates. + PendingUpdates []*events.SecretUpdates // Secret update events that need to be processed + PollInterval int // Number of seconds to pull secret update + PollMinInterval int + PollMaxInterval int + PollIntervalIncrement int + PULock sync.Mutex // The lock that protects the list of pending secret updates. } -func NewSecretUpdateManager() *SecretUpdateManager { - return new(SecretUpdateManager) +func NewSecretUpdateManager(pollInterval int, pollMinInterval int, pollMaxInterval int, pollIntervalIncrement int) *SecretUpdateManager { + sum := &SecretUpdateManager{ + PendingUpdates: make([]*events.SecretUpdates, 0), + PollInterval: pollInterval, // 60s + PollMinInterval: pollMinInterval, // 60s + PollMaxInterval: pollMaxInterval, // 300s + PollIntervalIncrement: pollIntervalIncrement, // 30s + } + return sum +} + +func (sm *SecretUpdateManager) GetPollInterval() int { + return sm.PollInterval +} + +func (sm *SecretUpdateManager) SetPollInterval(interval int) { + sm.PULock.Lock() + defer sm.PULock.Unlock() + sm.PollInterval = interval +} + +func (sm *SecretUpdateManager) AdjustSecretsPollingInterval(numOfSecretUpdate int) int { + if numOfSecretUpdate == 0 { + // no update, increase the poll interval + sm.PollInterval += sm.PollIntervalIncrement + if sm.PollInterval > sm.PollMaxInterval { + sm.PollInterval = sm.PollMaxInterval + } + } else if numOfSecretUpdate < 100 { + // small number of updates, use the minInterval + sm.PollInterval = sm.PollMinInterval + } else { + // get some updates, set the poll interval according to the number of updates, starting from (min interval + increment) if number of 100 < updates < 500 + factor := numOfSecretUpdate/500 + 1 + pInterval := sm.PollMinInterval + sm.PollIntervalIncrement*factor // +30s + if pInterval > sm.PollMaxInterval { + sm.PollInterval = sm.PollMaxInterval + } + } + + glog.V(5).Infof(smlogString(fmt.Sprintf("AdjustSecretsPollingInterval to %v, numOfSecretUpdate is: %v", sm.PollInterval, numOfSecretUpdate))) + + return sm.PollInterval } func (sm *SecretUpdateManager) GetNextUpdateEvent() (su *events.SecretUpdates) { diff --git a/config/config.go b/config/config.go index af8b9f8bb..da12cc639 100644 --- a/config/config.go +++ b/config/config.go @@ -132,7 +132,9 @@ type AGConfig struct { RetryLookBackWindow uint64 // The time window (in seconds) used by the agbot to look backward in time for node changes when node agreements are retried. PolicySearchOrder bool // When true, search policies from most recently changed to least recently changed. Vault VaultConfig // The hashicorp vault config to connect to and fetch secrets from. - SecretsUpdateCheck int // The number of seconds between checks for updated secrets. + SecretsUpdateCheckInterval int // The number of seconds between checks for updated secrets. Default is 60 + SecretsUpdateCheckMaxInterval int // As the runtime increases the ExchangeMessagePollInterval, this value is the maximum that value can attain. + SecretsUpdateCheckIncrement int // The number of seconds to increment the ExchangeMessagePollInterval when its time to increase the poll interval. CSSDestinationBatchSize int // The max number of destination updates to send to CSS in a single update. } @@ -188,7 +190,15 @@ func (c *HorizonConfig) GetSecretsManagerFilePath() string { } func (c *HorizonConfig) GetSecretsUpdateCheck() int { - return c.AgreementBot.SecretsUpdateCheck + return c.AgreementBot.SecretsUpdateCheckInterval +} + +func (c *HorizonConfig) GetSecretsUpdateCheckMaxInterval() int { + return c.AgreementBot.SecretsUpdateCheckMaxInterval +} + +func (c *HorizonConfig) GetSecretsUpdateCheckIncrement() int { + return c.AgreementBot.SecretsUpdateCheckIncrement } func (c *HorizonConfig) GetAgbotCSSURL() string { @@ -397,18 +407,20 @@ func Read(file string) (*HorizonConfig, error) { K8sCRInstallTimeoutS: K8sCRInstallTimeoutS_DEFAULT, }, AgreementBot: AGConfig{ - MessageKeyCheck: AgbotMessageKeyCheck_DEFAULT, - AgreementBatchSize: AgbotAgreementBatchSize_DEFAULT, - AgreementQueueSize: AgbotAgreementQueueSize_DEFAULT, - MessageQueueScale: AgbotMessageQueueScale_DEFAULT, - QueueHistorySize: AgbotQueueHistorySize_DEFAULT, - ErrRescanS: AgbotErrRescan_DEFAULT, - FullRescanS: AgbotFullRescan_DEFAULT, - MaxExchangeChanges: AgbotMaxChanges_DEFAULT, - RetryLookBackWindow: AgbotRetryLookBackWindow_DEFAULT, - PolicySearchOrder: AgbotPolicySearchOrder_DEFAULT, - SecretsUpdateCheck: SecretsUpdateCheck_DEFAULT, - CSSDestinationBatchSize: AgbotCSSDestinationBatchSize_DEFAULT, + MessageKeyCheck: AgbotMessageKeyCheck_DEFAULT, + AgreementBatchSize: AgbotAgreementBatchSize_DEFAULT, + AgreementQueueSize: AgbotAgreementQueueSize_DEFAULT, + MessageQueueScale: AgbotMessageQueueScale_DEFAULT, + QueueHistorySize: AgbotQueueHistorySize_DEFAULT, + ErrRescanS: AgbotErrRescan_DEFAULT, + FullRescanS: AgbotFullRescan_DEFAULT, + MaxExchangeChanges: AgbotMaxChanges_DEFAULT, + RetryLookBackWindow: AgbotRetryLookBackWindow_DEFAULT, + PolicySearchOrder: AgbotPolicySearchOrder_DEFAULT, + SecretsUpdateCheckInterval: SecretsUpdateCheck_DEFAULT, + SecretsUpdateCheckMaxInterval: SecretsUpdateCheckMaxInterval_DEFAULT, + SecretsUpdateCheckIncrement: SecretsUpdateCheckIncrement_DEFAULT, + CSSDestinationBatchSize: AgbotCSSDestinationBatchSize_DEFAULT, }, } @@ -595,7 +607,10 @@ func (agc *AGConfig) String() string { ", MaxExchangeChanges: %v"+ ", RetryLookBackWindow: %v"+ ", PolicySearchOrder: %v"+ - ", Vault: {%v}", + ", Vault: {%v}"+ + ", SecretsUpdateCheckInterval: %v"+ + ", SecretsUpdateCheckMaxInterval: %v"+ + ", SecretsUpdateCheckIncrement: %v", agc.TxLostDelayTolerationSeconds, agc.AgreementWorkers, agc.DBPath, agc.Postgresql.String(), agc.PartitionStale, agc.ProtocolTimeoutS, agc.AgreementTimeoutS, agc.NoDataIntervalS, agc.ActiveAgreementsURL, agc.ActiveAgreementsUser, mask, agc.PolicyPath, agc.NewContractIntervalS, agc.ProcessGovernanceIntervalS, @@ -604,7 +619,7 @@ func (agc *AGConfig) String() string { agc.SecureAPIListenHost, agc.SecureAPIListenPort, agc.SecureAPIServerCert, agc.SecureAPIServerKey, agc.PurgeArchivedAgreementHours, agc.CheckUpdatedPolicyS, agc.CSSURL, agc.CSSSSLCert, agc.CSSDestinationBatchSize, agc.AgreementBatchSize, agc.AgreementQueueSize, agc.MessageQueueScale, agc.QueueHistorySize, agc.FullRescanS, agc.ErrRescanS, agc.MaxExchangeChanges, - agc.RetryLookBackWindow, agc.PolicySearchOrder, agc.Vault) + agc.RetryLookBackWindow, agc.PolicySearchOrder, agc.Vault, agc.SecretsUpdateCheckInterval, agc.SecretsUpdateCheckMaxInterval, agc.SecretsUpdateCheckIncrement) } func (c *VaultConfig) String() string { diff --git a/config/constants.go b/config/constants.go index eee3c5f2e..ea587377f 100644 --- a/config/constants.go +++ b/config/constants.go @@ -138,5 +138,11 @@ const K8sCRInstallTimeoutS_DEFAULT = 180 // Time between secret update checks const SecretsUpdateCheck_DEFAULT = 60 +// Max interval between secret update checks +const SecretsUpdateCheckMaxInterval_DEFAULT = 600 + +// The Default secrets check increment size +const SecretsUpdateCheckIncrement_DEFAULT = 30 + // Batch destination size to send to CSS const AgbotCSSDestinationBatchSize_DEFAULT = 200