diff --git a/pom.xml b/pom.xml index bf7c01df..90353653 100644 --- a/pom.xml +++ b/pom.xml @@ -28,6 +28,16 @@ https://d2jrmugq4soldf.cloudfront.net/snapshots + + + true + + + false + + central + https://repo1.maven.org/maven2 + diff --git a/src/main/java/com/aws/greengrass/logmanager/LogManagerService.java b/src/main/java/com/aws/greengrass/logmanager/LogManagerService.java index 8881d172..04243f47 100644 --- a/src/main/java/com/aws/greengrass/logmanager/LogManagerService.java +++ b/src/main/java/com/aws/greengrass/logmanager/LogManagerService.java @@ -62,6 +62,7 @@ import static com.aws.greengrass.componentmanager.KernelConfigResolver.CONFIGURATION_CONFIG_KEY; import static com.aws.greengrass.logmanager.LogManagerService.LOGS_UPLOADER_SERVICE_TOPICS; +import static com.aws.greengrass.logmanager.util.ConfigUtil.updateFromMapWhenChanged; @ImplementsService(name = LOGS_UPLOADER_SERVICE_TOPICS, version = "2.0.0") @@ -404,7 +405,8 @@ private void loadProcessingFilesConfigDeprecated(String componentName) { .lookupTopics(PERSISTED_COMPONENT_CURRENT_PROCESSING_FILE_INFORMATION, componentName); - if (currentProcessingComponentTopicsDeprecated != null + if (isDeprecatedVersionSupported() + && currentProcessingComponentTopicsDeprecated != null && !currentProcessingComponentTopicsDeprecated.isEmpty()) { CurrentProcessingFileInformation processingFileInformation = CurrentProcessingFileInformation.builder().build(); @@ -520,15 +522,18 @@ private void handleCloudWatchAttemptStatus(CloudWatchAttempt cloudWatchAttempt) // Update the runtime configuration and store the last processed file information context.runOnPublishQueueAndWait(() -> { processingFilesInformation.forEach((componentName, processingFiles) -> { - // Update old config value to handle downgrade from v 2.3.1 to older ones - Topics componentTopicsDeprecated = - getRuntimeConfig().lookupTopics(PERSISTED_COMPONENT_CURRENT_PROCESSING_FILE_INFORMATION, - componentName); - - if (processingFiles.getMostRecentlyUsed() != null) { - componentTopicsDeprecated.updateFromMap( - processingFiles.getMostRecentlyUsed().convertToMapOfObjects(), - new UpdateBehaviorTree(UpdateBehaviorTree.UpdateBehavior.REPLACE, System.currentTimeMillis())); + if (isDeprecatedVersionSupported()) { + // Update old config value to handle downgrade from v 2.3.1 to older ones + Topics componentTopicsDeprecated = + getRuntimeConfig().lookupTopics(PERSISTED_COMPONENT_CURRENT_PROCESSING_FILE_INFORMATION, + componentName); + + if (processingFiles.getMostRecentlyUsed() != null) { + updateFromMapWhenChanged(componentTopicsDeprecated, + processingFiles.getMostRecentlyUsed().convertToMapOfObjects(), + new UpdateBehaviorTree(UpdateBehaviorTree.UpdateBehavior.REPLACE, + System.currentTimeMillis())); + } } // Handle version 2.3.1 and above @@ -537,7 +542,7 @@ private void handleCloudWatchAttemptStatus(CloudWatchAttempt cloudWatchAttempt) getRuntimeConfig().lookupTopics(PERSISTED_COMPONENT_CURRENT_PROCESSING_FILE_INFORMATION_V2, componentName); - componentTopics.updateFromMap(processingFiles.toMap(), + updateFromMapWhenChanged(componentTopics, processingFiles.toMap(), new UpdateBehaviorTree(UpdateBehaviorTree.UpdateBehavior.REPLACE, System.currentTimeMillis())); }); }); @@ -552,6 +557,10 @@ private void handleCloudWatchAttemptStatus(CloudWatchAttempt cloudWatchAttempt) isCurrentlyUploading.set(false); } + private boolean isDeprecatedVersionSupported() { + return Coerce.toBoolean(getConfig().findOrDefault(true, CONFIGURATION_CONFIG_KEY, "deprecatedVersionSupport")); + } + /** * Remove the processing files information for a given component from memory and disk (runtime config). * @param componentName - the name of a component diff --git a/src/main/java/com/aws/greengrass/logmanager/util/ConfigUtil.java b/src/main/java/com/aws/greengrass/logmanager/util/ConfigUtil.java new file mode 100644 index 00000000..f2a119ec --- /dev/null +++ b/src/main/java/com/aws/greengrass/logmanager/util/ConfigUtil.java @@ -0,0 +1,90 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.aws.greengrass.logmanager.util; + +import com.aws.greengrass.config.CaseInsensitiveString; +import com.aws.greengrass.config.Node; +import com.aws.greengrass.config.Topic; +import com.aws.greengrass.config.Topics; +import com.aws.greengrass.config.UnsupportedInputTypeException; +import com.aws.greengrass.config.UpdateBehaviorTree; +import com.aws.greengrass.logging.api.Logger; +import com.aws.greengrass.logging.impl.LogManager; +import edu.umd.cs.findbugs.annotations.NonNull; + +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +public final class ConfigUtil { + private static final Logger logger = LogManager.getLogger(ConfigUtil.class); + + private ConfigUtil() { + } + + /** + * Same as topics.updateFromMap, but only makes the update when the value actually changes, skipping any unnecessary + * timestampUpdated events. Ideally this code would exist in Topics, but it isn't, so we need to do this in order to + * maintain compatibility. + * + * @param topics Topics to update with values from the map + * @param newValues the new value to apply + * @param ubt update behavior tree + */ + public static void updateFromMapWhenChanged(Topics topics, Map newValues, UpdateBehaviorTree ubt) { + Set childrenToRemove = new HashSet<>(topics.children.keySet()); + + newValues.forEach((okey, value) -> { + CaseInsensitiveString key = new CaseInsensitiveString(okey); + childrenToRemove.remove(key); + updateChild(topics, key, value, ubt); + }); + + childrenToRemove.forEach(childName -> { + UpdateBehaviorTree childMergeBehavior = ubt.getChildBehavior(childName.toString()); + + // remove the existing child if its merge behavior is REPLACE + if (childMergeBehavior.getBehavior() == UpdateBehaviorTree.UpdateBehavior.REPLACE) { + topics.remove(topics.children.get(childName)); + } + }); + } + + private static void updateChild(Topics t, CaseInsensitiveString key, Object value, + @NonNull UpdateBehaviorTree mergeBehavior) { + UpdateBehaviorTree childMergeBehavior = mergeBehavior.getChildBehavior(key.toString()); + + Node existingChild = t.children.get(key); + // if new node is a container node + if (value instanceof Map) { + // if existing child is a container node + if (existingChild == null || existingChild instanceof Topics) { + updateFromMapWhenChanged(t.createInteriorChild(key.toString()), (Map) value, childMergeBehavior); + } else { + t.remove(existingChild); + Topics newNode = t.createInteriorChild(key.toString(), mergeBehavior.getTimestampToUse()); + updateFromMapWhenChanged(newNode, (Map) value, childMergeBehavior); + } + // if new node is a leaf node + } else { + try { + if (existingChild == null || existingChild instanceof Topic) { + Topic node = t.createLeafChild(key.toString()); + if (!Objects.equals(node.getOnce(), value)) { + node.withValueChecked(childMergeBehavior.getTimestampToUse(), value); + } + } else { + t.remove(existingChild); + Topic newNode = t.createLeafChild(key.toString()); + newNode.withValueChecked(childMergeBehavior.getTimestampToUse(), value); + } + } catch (UnsupportedInputTypeException e) { + logger.error("Should never fail in updateChild", e); + } + } + } +} diff --git a/src/test/java/com/aws/greengrass/logmanager/LogManagerServiceTest.java b/src/test/java/com/aws/greengrass/logmanager/LogManagerServiceTest.java index 3a2c49df..ddd8e00a 100644 --- a/src/test/java/com/aws/greengrass/logmanager/LogManagerServiceTest.java +++ b/src/test/java/com/aws/greengrass/logmanager/LogManagerServiceTest.java @@ -5,20 +5,27 @@ package com.aws.greengrass.logmanager; +import com.aws.greengrass.config.Configuration; import com.aws.greengrass.config.Topic; import com.aws.greengrass.config.Topics; import com.aws.greengrass.config.UnsupportedInputTypeException; import com.aws.greengrass.config.UpdateBehaviorTree; import com.aws.greengrass.dependency.Context; -import com.aws.greengrass.dependency.Crashable; import com.aws.greengrass.logging.impl.LogManager; import com.aws.greengrass.logging.impl.config.LogConfig; import com.aws.greengrass.logging.impl.config.LogStore; import com.aws.greengrass.logging.impl.config.model.LogConfigUpdate; import com.aws.greengrass.logmanager.exceptions.InvalidLogGroupException; -import com.aws.greengrass.logmanager.model.*; +import com.aws.greengrass.logmanager.model.CloudWatchAttempt; +import com.aws.greengrass.logmanager.model.CloudWatchAttemptLogFileInformation; +import com.aws.greengrass.logmanager.model.CloudWatchAttemptLogInformation; +import com.aws.greengrass.logmanager.model.ComponentLogConfiguration; +import com.aws.greengrass.logmanager.model.ComponentLogFileInformation; +import com.aws.greengrass.logmanager.model.ComponentType; +import com.aws.greengrass.logmanager.model.LogFile; +import com.aws.greengrass.logmanager.model.LogFileGroup; +import com.aws.greengrass.logmanager.model.ProcessingFiles; import com.aws.greengrass.testcommons.testutilities.GGExtension; -import com.aws.greengrass.testcommons.testutilities.GGServiceTestUtil; import com.aws.greengrass.util.Coerce; import com.aws.greengrass.util.NucleusPaths; import com.fasterxml.jackson.databind.exc.MismatchedInputException; @@ -27,7 +34,6 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtensionContext; @@ -91,7 +97,6 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.lenient; @@ -101,7 +106,7 @@ @ExtendWith({MockitoExtension.class, GGExtension.class}) @SuppressWarnings("PMD.ExcessiveClassLength") -class LogManagerServiceTest extends GGServiceTestUtil { +class LogManagerServiceTest { @Mock private CloudWatchLogsUploader mockUploader; @Mock @@ -110,10 +115,6 @@ class LogManagerServiceTest extends GGServiceTestUtil { private ArgumentCaptor componentLogsInformationCaptor; @Captor private ArgumentCaptor> callbackCaptor; - @Captor - private ArgumentCaptor> updateFromMapCaptor; - @Captor - private ArgumentCaptor numberObjectCaptor; private static NucleusPaths nucleusPaths; @TempDir @@ -123,6 +124,9 @@ class LogManagerServiceTest extends GGServiceTestUtil { private LogManagerService logsUploaderService; private final ExecutorService executor = Executors.newCachedThreadPool(); private final Instant mockInstant = Instant.EPOCH; + private final Context context = new Context(); + private final Configuration configuration = new Configuration(context); + private final Topics config = configuration.lookupTopics("a"); @BeforeAll static void setupBefore() throws IOException, InterruptedException { @@ -200,70 +204,32 @@ static void cleanUpAfter() { } } - @BeforeEach - public void setup() { - serviceFullName = "aws.greengrass.LogManager"; - initializeMockedConfig(); - lenient().when(context.runOnPublishQueueAndWait(any())).thenAnswer((s) -> { - ((Crashable) s.getArgument(0)).run(); - return null; - }); - } - private void mockDefaultPersistedState() { - Topics allCurrentProcessingComponentTopics1 = - Topics.of(context, PERSISTED_COMPONENT_CURRENT_PROCESSING_FILE_INFORMATION, null); - Topics currentProcessingComponentTopics1 = - Topics.of(context, SYSTEM_LOGS_COMPONENT_NAME, allCurrentProcessingComponentTopics1); - Topics currentProcessingComponentTopics2 = - Topics.of(context, "UserComponentA", allCurrentProcessingComponentTopics1); - Topics currentProcessingComponentTopics3 = - Topics.of(context, "UserComponentB", allCurrentProcessingComponentTopics1); - // 2.3.0 and prior - lenient().when(config.lookupTopics(RUNTIME_STORE_NAMESPACE_TOPIC) - .lookupTopics(PERSISTED_COMPONENT_CURRENT_PROCESSING_FILE_INFORMATION, - SYSTEM_LOGS_COMPONENT_NAME)) - .thenReturn(currentProcessingComponentTopics1); + config.lookupTopics(RUNTIME_STORE_NAMESPACE_TOPIC, PERSISTED_COMPONENT_CURRENT_PROCESSING_FILE_INFORMATION, + SYSTEM_LOGS_COMPONENT_NAME); // 2.3.1 and after - lenient().when(config.lookupTopics(RUNTIME_STORE_NAMESPACE_TOPIC) - .lookupTopics(PERSISTED_COMPONENT_CURRENT_PROCESSING_FILE_INFORMATION_V2, - SYSTEM_LOGS_COMPONENT_NAME)) - .thenReturn(currentProcessingComponentTopics1); + config.lookupTopics(RUNTIME_STORE_NAMESPACE_TOPIC, PERSISTED_COMPONENT_CURRENT_PROCESSING_FILE_INFORMATION_V2, + SYSTEM_LOGS_COMPONENT_NAME); + // 2.3.0 and prior - lenient().when(config.lookupTopics(RUNTIME_STORE_NAMESPACE_TOPIC) - .lookupTopics(PERSISTED_COMPONENT_CURRENT_PROCESSING_FILE_INFORMATION, "UserComponentA")) - .thenReturn(currentProcessingComponentTopics2); + config.lookupTopics(RUNTIME_STORE_NAMESPACE_TOPIC,PERSISTED_COMPONENT_CURRENT_PROCESSING_FILE_INFORMATION, + "UserComponentA"); // 2.3.1 and after - lenient().when(config.lookupTopics(RUNTIME_STORE_NAMESPACE_TOPIC) - .lookupTopics(PERSISTED_COMPONENT_CURRENT_PROCESSING_FILE_INFORMATION_V2, "UserComponentA")) - .thenReturn(currentProcessingComponentTopics2); + config.lookupTopics(RUNTIME_STORE_NAMESPACE_TOPIC, + PERSISTED_COMPONENT_CURRENT_PROCESSING_FILE_INFORMATION_V2, "UserComponentA"); // 2.3.0 and prior - lenient().when(config.lookupTopics(RUNTIME_STORE_NAMESPACE_TOPIC) - .lookupTopics(PERSISTED_COMPONENT_CURRENT_PROCESSING_FILE_INFORMATION, "UserComponentB")) - .thenReturn(currentProcessingComponentTopics3); - lenient().when(config.lookupTopics(RUNTIME_STORE_NAMESPACE_TOPIC) - .lookupTopics(PERSISTED_COMPONENT_CURRENT_PROCESSING_FILE_INFORMATION_V2, "UserComponentB")) - .thenReturn(currentProcessingComponentTopics3); - - Topics allLastFileProcessedComponentTopics = - Topics.of(context, PERSISTED_COMPONENT_LAST_FILE_PROCESSED_TIMESTAMP, null); - Topics lastFileProcessedComponentTopics1 = - Topics.of(context, SYSTEM_LOGS_COMPONENT_NAME, allLastFileProcessedComponentTopics); - Topics lastFileProcessedComponentTopics2 = - Topics.of(context, "UserComponentA", allLastFileProcessedComponentTopics); - Topics lastFileProcessedComponentTopics3 = - Topics.of(context, "UserComponentB", allLastFileProcessedComponentTopics); - - lenient().when(config.lookupTopics(RUNTIME_STORE_NAMESPACE_TOPIC) - .lookupTopics(PERSISTED_COMPONENT_LAST_FILE_PROCESSED_TIMESTAMP, SYSTEM_LOGS_COMPONENT_NAME)) - .thenReturn(lastFileProcessedComponentTopics1); - lenient().when(config.lookupTopics(RUNTIME_STORE_NAMESPACE_TOPIC) - .lookupTopics(PERSISTED_COMPONENT_LAST_FILE_PROCESSED_TIMESTAMP, "UserComponentA")) - .thenReturn(lastFileProcessedComponentTopics2); - lenient().when(config.lookupTopics(RUNTIME_STORE_NAMESPACE_TOPIC) - .lookupTopics(PERSISTED_COMPONENT_LAST_FILE_PROCESSED_TIMESTAMP, "UserComponentB")) - .thenReturn(lastFileProcessedComponentTopics3); + config.lookupTopics(RUNTIME_STORE_NAMESPACE_TOPIC, + PERSISTED_COMPONENT_CURRENT_PROCESSING_FILE_INFORMATION, "UserComponentB"); + config.lookupTopics(RUNTIME_STORE_NAMESPACE_TOPIC, PERSISTED_COMPONENT_CURRENT_PROCESSING_FILE_INFORMATION_V2, + "UserComponentB"); + + config.lookupTopics(RUNTIME_STORE_NAMESPACE_TOPIC, + PERSISTED_COMPONENT_LAST_FILE_PROCESSED_TIMESTAMP, SYSTEM_LOGS_COMPONENT_NAME); + config.lookupTopics(RUNTIME_STORE_NAMESPACE_TOPIC, + PERSISTED_COMPONENT_LAST_FILE_PROCESSED_TIMESTAMP, "UserComponentA"); + config.lookupTopics(RUNTIME_STORE_NAMESPACE_TOPIC, + PERSISTED_COMPONENT_LAST_FILE_PROCESSED_TIMESTAMP, "UserComponentB"); } @AfterEach @@ -275,23 +241,19 @@ public void cleanup() throws InterruptedException { } executor.shutdownNow(); + context.shutdown(); } @Test void GIVEN_system_log_files_to_be_uploaded_WHEN_merger_merges_THEN_we_get_all_log_files() throws InterruptedException, IOException { mockDefaultPersistedState(); - Topic periodicUpdateIntervalSecTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "1"); - when(config.findOrDefault(any(), eq(CONFIGURATION_CONFIG_KEY), eq(LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC))) - .thenReturn(periodicUpdateIntervalSecTopic); + config.lookup(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC) + .withValue("1"); when(mockMerger.processLogFiles(componentLogsInformationCaptor.capture())).thenReturn(new CloudWatchAttempt()); - Topics configTopics = Topics.of(context, CONFIGURATION_CONFIG_KEY, null); - when(config.lookupTopics(CONFIGURATION_CONFIG_KEY)).thenReturn(configTopics); - Topics logsUploaderConfigTopics = Topics.of(context, LOGS_UPLOADER_CONFIGURATION_TOPIC, null); - - Topics componentConfigTopics = - logsUploaderConfigTopics.createInteriorChild(COMPONENT_LOGS_CONFIG_MAP_TOPIC_NAME); + Topics componentConfigTopics = config.lookupTopics(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_CONFIGURATION_TOPIC, + COMPONENT_LOGS_CONFIG_MAP_TOPIC_NAME); Topics componentATopic = componentConfigTopics.createInteriorChild("UserComponentA"); componentATopic.createLeafChild(FILE_REGEX_CONFIG_TOPIC_NAME).withValue("^log.txt\\\\w*"); componentATopic.createLeafChild(FILE_DIRECTORY_PATH_CONFIG_TOPIC_NAME).withValue(directoryPath.toAbsolutePath().toString()); @@ -300,13 +262,13 @@ void GIVEN_system_log_files_to_be_uploaded_WHEN_merger_merges_THEN_we_get_all_lo componentATopic.createLeafChild(DISK_SPACE_LIMIT_UNIT_CONFIG_TOPIC_NAME).withValue("GB"); componentATopic.createLeafChild(DELETE_LOG_FILES_AFTER_UPLOAD_CONFIG_TOPIC_NAME).withValue("false"); - Topics systemConfigTopics = logsUploaderConfigTopics.createInteriorChild(SYSTEM_LOGS_CONFIG_TOPIC_NAME); + Topics systemConfigTopics = config.lookupTopics(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_CONFIGURATION_TOPIC, + SYSTEM_LOGS_CONFIG_TOPIC_NAME); systemConfigTopics.createLeafChild(UPLOAD_TO_CW_CONFIG_TOPIC_NAME).withValue("true"); systemConfigTopics.createLeafChild(MIN_LOG_LEVEL_CONFIG_TOPIC_NAME).withValue("INFO"); systemConfigTopics.createLeafChild(DISK_SPACE_LIMIT_CONFIG_TOPIC_NAME).withValue("25"); systemConfigTopics.createLeafChild(DISK_SPACE_LIMIT_UNIT_CONFIG_TOPIC_NAME).withValue("MB"); - when(config.lookupTopics(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_CONFIGURATION_TOPIC)) - .thenReturn(logsUploaderConfigTopics); + doNothing().when(mockUploader).registerAttemptStatus(anyString(), callbackCaptor.capture()); logsUploaderService = new LogManagerService(config, mockUploader, mockMerger, nucleusPaths); @@ -335,15 +297,10 @@ void GIVEN_system_log_files_to_be_uploaded_WHEN_merger_merges_THEN_we_get_all_lo void GIVEN_user_component_log_files_to_be_uploaded_with_required_config_as_array_WHEN_merger_merges_THEN_we_get_all_log_files() throws InterruptedException, UnsupportedInputTypeException, IOException { mockDefaultPersistedState(); - Topic periodicUpdateIntervalSecTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "1"); - when(config.findOrDefault(any(), eq(CONFIGURATION_CONFIG_KEY), eq(LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC))) - .thenReturn(periodicUpdateIntervalSecTopic); + config.lookup(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC) + .withValue("1"); when(mockMerger.processLogFiles(componentLogsInformationCaptor.capture())).thenReturn(new CloudWatchAttempt()); - Topics configTopics = Topics.of(context, CONFIGURATION_CONFIG_KEY, null); - when(config.lookupTopics(CONFIGURATION_CONFIG_KEY)).thenReturn(configTopics); - Topics logsUploaderConfigTopics = Topics.of(context, LOGS_UPLOADER_CONFIGURATION_TOPIC, null); - List> componentLogsInformationList = new ArrayList<>(); Map componentAConfig = new HashMap<>(); componentAConfig.put(COMPONENT_NAME_CONFIG_TOPIC_NAME, "UserComponentA"); @@ -352,15 +309,15 @@ void GIVEN_user_component_log_files_to_be_uploaded_with_required_config_as_array componentAConfig.put(DISK_SPACE_LIMIT_UNIT_CONFIG_TOPIC_NAME, "GB"); componentAConfig.put(DELETE_LOG_FILES_AFTER_UPLOAD_CONFIG_TOPIC_NAME, "false"); componentLogsInformationList.add(componentAConfig); - logsUploaderConfigTopics.createLeafChild(COMPONENT_LOGS_CONFIG_TOPIC_NAME).withValueChecked(componentLogsInformationList); + config.lookup(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_CONFIGURATION_TOPIC, + COMPONENT_LOGS_CONFIG_TOPIC_NAME).withValueChecked(componentLogsInformationList); - Topics systemConfigTopics = logsUploaderConfigTopics.createInteriorChild(SYSTEM_LOGS_CONFIG_TOPIC_NAME); + Topics systemConfigTopics = config.lookupTopics(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_CONFIGURATION_TOPIC, + SYSTEM_LOGS_CONFIG_TOPIC_NAME); systemConfigTopics.createLeafChild(UPLOAD_TO_CW_CONFIG_TOPIC_NAME).withValue("false"); systemConfigTopics.createLeafChild(MIN_LOG_LEVEL_CONFIG_TOPIC_NAME).withValue("INFO"); systemConfigTopics.createLeafChild(DISK_SPACE_LIMIT_CONFIG_TOPIC_NAME).withValue("25"); systemConfigTopics.createLeafChild(DISK_SPACE_LIMIT_UNIT_CONFIG_TOPIC_NAME).withValue("MB"); - when(config.lookupTopics(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_CONFIGURATION_TOPIC)) - .thenReturn(logsUploaderConfigTopics); doNothing().when(mockUploader).registerAttemptStatus(anyString(), callbackCaptor.capture()); @@ -388,31 +345,24 @@ void GIVEN_user_component_log_files_to_be_uploaded_with_required_config_as_array void GIVEN_user_component_log_files_to_be_uploaded_with_required_config_WHEN_merger_merges_THEN_we_get_all_log_files() throws InterruptedException, IOException { mockDefaultPersistedState(); - Topic periodicUpdateIntervalSecTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "1"); - when(config.findOrDefault(any(), eq(CONFIGURATION_CONFIG_KEY), eq(LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC))) - .thenReturn(periodicUpdateIntervalSecTopic); + config.lookup(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC) + .withValue("1"); when(mockMerger.processLogFiles(componentLogsInformationCaptor.capture())).thenReturn(new CloudWatchAttempt()); - Topics configTopics = Topics.of(context, CONFIGURATION_CONFIG_KEY, null); - when(config.lookupTopics(CONFIGURATION_CONFIG_KEY)).thenReturn(configTopics); - Topics logsUploaderConfigTopics = Topics.of(context, LOGS_UPLOADER_CONFIGURATION_TOPIC, null); - - Topics componentConfigTopics = - logsUploaderConfigTopics.createInteriorChild(COMPONENT_LOGS_CONFIG_MAP_TOPIC_NAME); - Topics componentATopic = componentConfigTopics.createInteriorChild("UserComponentA"); + Topics componentATopic = config.lookupTopics(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_CONFIGURATION_TOPIC, + COMPONENT_LOGS_CONFIG_MAP_TOPIC_NAME, "UserComponentA"); componentATopic.createLeafChild(FILE_DIRECTORY_PATH_CONFIG_TOPIC_NAME).withValue(directoryPath.toAbsolutePath().toString()); componentATopic.createLeafChild(MIN_LOG_LEVEL_CONFIG_TOPIC_NAME).withValue("DEBUG"); componentATopic.createLeafChild(DISK_SPACE_LIMIT_CONFIG_TOPIC_NAME).withValue("10"); componentATopic.createLeafChild(DISK_SPACE_LIMIT_UNIT_CONFIG_TOPIC_NAME).withValue("GB"); componentATopic.createLeafChild(DELETE_LOG_FILES_AFTER_UPLOAD_CONFIG_TOPIC_NAME).withValue("false"); - Topics systemConfigTopics = logsUploaderConfigTopics.createInteriorChild(SYSTEM_LOGS_CONFIG_TOPIC_NAME); + Topics systemConfigTopics = config.lookupTopics(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_CONFIGURATION_TOPIC, + SYSTEM_LOGS_CONFIG_TOPIC_NAME); systemConfigTopics.createLeafChild(UPLOAD_TO_CW_CONFIG_TOPIC_NAME).withValue("false"); systemConfigTopics.createLeafChild(MIN_LOG_LEVEL_CONFIG_TOPIC_NAME).withValue("INFO"); systemConfigTopics.createLeafChild(DISK_SPACE_LIMIT_CONFIG_TOPIC_NAME).withValue("25"); systemConfigTopics.createLeafChild(DISK_SPACE_LIMIT_UNIT_CONFIG_TOPIC_NAME).withValue("MB"); - when(config.lookupTopics(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_CONFIGURATION_TOPIC)) - .thenReturn(logsUploaderConfigTopics); doNothing().when(mockUploader).registerAttemptStatus(anyString(), callbackCaptor.capture()); @@ -440,18 +390,12 @@ void GIVEN_user_component_log_files_to_be_uploaded_with_required_config_WHEN_mer void GIVEN_user_component_log_files_to_be_uploaded_with_all_config_WHEN_merger_merges_THEN_we_get_all_log_files() throws InterruptedException, IOException { mockDefaultPersistedState(); - Topic periodicUpdateIntervalSecTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "1"); - when(config.findOrDefault(any(), eq(CONFIGURATION_CONFIG_KEY), eq(LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC))) - .thenReturn(periodicUpdateIntervalSecTopic); + config.lookup(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC) + .withValue("1"); when(mockMerger.processLogFiles(componentLogsInformationCaptor.capture())).thenReturn(new CloudWatchAttempt()); - Topics configTopics = Topics.of(context, CONFIGURATION_CONFIG_KEY, null); - when(config.lookupTopics(CONFIGURATION_CONFIG_KEY)).thenReturn(configTopics); - Topics logsUploaderConfigTopics = Topics.of(context, LOGS_UPLOADER_CONFIGURATION_TOPIC, null); - - Topics componentConfigTopics = - logsUploaderConfigTopics.createInteriorChild(COMPONENT_LOGS_CONFIG_MAP_TOPIC_NAME); - Topics componentATopic = componentConfigTopics.createInteriorChild("UserComponentA"); + Topics componentATopic = config.lookupTopics(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_CONFIGURATION_TOPIC, + COMPONENT_LOGS_CONFIG_MAP_TOPIC_NAME, "UserComponentA"); componentATopic.createLeafChild(FILE_REGEX_CONFIG_TOPIC_NAME).withValue("^UserComponentA\\w*\\.log"); componentATopic.createLeafChild(FILE_DIRECTORY_PATH_CONFIG_TOPIC_NAME).withValue(directoryPath.toAbsolutePath().toString()); componentATopic.createLeafChild(MIN_LOG_LEVEL_CONFIG_TOPIC_NAME).withValue("DEBUG"); @@ -459,13 +403,12 @@ void GIVEN_user_component_log_files_to_be_uploaded_with_all_config_WHEN_merger_m componentATopic.createLeafChild(DISK_SPACE_LIMIT_UNIT_CONFIG_TOPIC_NAME).withValue("GB"); componentATopic.createLeafChild(DELETE_LOG_FILES_AFTER_UPLOAD_CONFIG_TOPIC_NAME).withValue("false"); - Topics systemConfigTopics = logsUploaderConfigTopics.createInteriorChild(SYSTEM_LOGS_CONFIG_TOPIC_NAME); + Topics systemConfigTopics = config.lookupTopics(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_CONFIGURATION_TOPIC, + SYSTEM_LOGS_CONFIG_TOPIC_NAME); systemConfigTopics.createLeafChild(UPLOAD_TO_CW_CONFIG_TOPIC_NAME).withValue("false"); systemConfigTopics.createLeafChild(MIN_LOG_LEVEL_CONFIG_TOPIC_NAME).withValue("INFO"); systemConfigTopics.createLeafChild(DISK_SPACE_LIMIT_CONFIG_TOPIC_NAME).withValue("25"); systemConfigTopics.createLeafChild(DISK_SPACE_LIMIT_UNIT_CONFIG_TOPIC_NAME).withValue("MB"); - when(config.lookupTopics(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_CONFIGURATION_TOPIC)) - .thenReturn(logsUploaderConfigTopics); doNothing().when(mockUploader).registerAttemptStatus(anyString(), callbackCaptor.capture()); @@ -493,18 +436,12 @@ void GIVEN_user_component_log_files_to_be_uploaded_with_all_config_WHEN_merger_m void GIVEN_multiple_user_components_log_files_to_be_uploaded_with_all_config_WHEN_merger_merges_THEN_we_get_all_log_files() throws InterruptedException, IOException { mockDefaultPersistedState(); - Topic periodicUpdateIntervalSecTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "1"); - when(config.findOrDefault(any(), eq(CONFIGURATION_CONFIG_KEY), eq(LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC))) - .thenReturn(periodicUpdateIntervalSecTopic); + config.lookup(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC) + .withValue("1"); when(mockMerger.processLogFiles(componentLogsInformationCaptor.capture())).thenReturn(new CloudWatchAttempt()); - Topics configTopics = Topics.of(context, CONFIGURATION_CONFIG_KEY, null); - when(config.lookupTopics(CONFIGURATION_CONFIG_KEY)).thenReturn(configTopics); - Topics logsUploaderConfigTopics = Topics.of(context, LOGS_UPLOADER_CONFIGURATION_TOPIC, null); - - Topics componentConfigTopics = - logsUploaderConfigTopics.createInteriorChild(COMPONENT_LOGS_CONFIG_MAP_TOPIC_NAME); - Topics componentATopic = componentConfigTopics.createInteriorChild("UserComponentA"); + Topics componentATopic = config.lookupTopics(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_CONFIGURATION_TOPIC, + COMPONENT_LOGS_CONFIG_MAP_TOPIC_NAME, "UserComponentA"); componentATopic.createLeafChild(FILE_REGEX_CONFIG_TOPIC_NAME).withValue("^log.txt\\\\w*"); componentATopic.createLeafChild(FILE_DIRECTORY_PATH_CONFIG_TOPIC_NAME).withValue(directoryPath.toAbsolutePath().toString()); componentATopic.createLeafChild(MIN_LOG_LEVEL_CONFIG_TOPIC_NAME).withValue("DEBUG"); @@ -512,7 +449,8 @@ void GIVEN_multiple_user_components_log_files_to_be_uploaded_with_all_config_WHE componentATopic.createLeafChild(DISK_SPACE_LIMIT_UNIT_CONFIG_TOPIC_NAME).withValue("GB"); componentATopic.createLeafChild(DELETE_LOG_FILES_AFTER_UPLOAD_CONFIG_TOPIC_NAME).withValue("false"); - Topics componentBTopic = componentConfigTopics.createInteriorChild("UserComponentB"); + Topics componentBTopic = config.lookupTopics(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_CONFIGURATION_TOPIC, + COMPONENT_LOGS_CONFIG_MAP_TOPIC_NAME, "UserComponentB"); componentBTopic.createLeafChild(FILE_REGEX_CONFIG_TOPIC_NAME).withValue("^UserComponentB\\w*\\.log"); componentBTopic.createLeafChild(FILE_DIRECTORY_PATH_CONFIG_TOPIC_NAME).withValue(directoryPath.toAbsolutePath().toString()); componentBTopic.createLeafChild(MIN_LOG_LEVEL_CONFIG_TOPIC_NAME).withValue("DEBUG"); @@ -520,13 +458,12 @@ void GIVEN_multiple_user_components_log_files_to_be_uploaded_with_all_config_WHE componentBTopic.createLeafChild(DISK_SPACE_LIMIT_UNIT_CONFIG_TOPIC_NAME).withValue("GB"); componentBTopic.createLeafChild(DELETE_LOG_FILES_AFTER_UPLOAD_CONFIG_TOPIC_NAME).withValue("false"); - Topics systemConfigTopics = logsUploaderConfigTopics.createInteriorChild(SYSTEM_LOGS_CONFIG_TOPIC_NAME); + Topics systemConfigTopics = config.lookupTopics(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_CONFIGURATION_TOPIC, + SYSTEM_LOGS_CONFIG_TOPIC_NAME); systemConfigTopics.createLeafChild(UPLOAD_TO_CW_CONFIG_TOPIC_NAME).withValue("false"); systemConfigTopics.createLeafChild(MIN_LOG_LEVEL_CONFIG_TOPIC_NAME).withValue("INFO"); systemConfigTopics.createLeafChild(DISK_SPACE_LIMIT_CONFIG_TOPIC_NAME).withValue("25"); systemConfigTopics.createLeafChild(DISK_SPACE_LIMIT_UNIT_CONFIG_TOPIC_NAME).withValue("MB"); - when(config.lookupTopics(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_CONFIGURATION_TOPIC)) - .thenReturn(logsUploaderConfigTopics); doNothing().when(mockUploader).registerAttemptStatus(anyString(), callbackCaptor.capture()); @@ -563,15 +500,9 @@ private void startServiceOnAnotherThread() { void GIVEN_null_config_WHEN_config_is_processed_THEN_no_component_config_is_added( ExtensionContext context1) throws IOException { ignoreExceptionOfType(context1, MismatchedInputException.class); - Topics configTopics = Topics.of(context, CONFIGURATION_CONFIG_KEY, null); - when(config.lookupTopics(CONFIGURATION_CONFIG_KEY)).thenReturn(configTopics); - Topic periodicUpdateIntervalSecTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "3"); - when(config.findOrDefault(any(), eq(CONFIGURATION_CONFIG_KEY), eq(LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC))) - .thenReturn(periodicUpdateIntervalSecTopic); - Topics logsUploaderConfigTopic = Topics.of(context, LOGS_UPLOADER_CONFIGURATION_TOPIC, null); - when(config.lookupTopics(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_CONFIGURATION_TOPIC)) - .thenReturn(logsUploaderConfigTopic); - + config.lookup(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC) + .withValue("3"); + config.lookupTopics(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_CONFIGURATION_TOPIC); logsUploaderService = new LogManagerService(config, mockUploader, mockMerger, nucleusPaths); startServiceOnAnotherThread(); @@ -584,62 +515,15 @@ void GIVEN_cloud_watch_attempt_handler_WHEN_attempt_completes_THEN_successfully_ mockDefaultPersistedState(); LogFile processingFile = createLogFileWithSize(directoryPath.resolve("testlogs1.log").toUri(), 1061); LogFile lastProcessedFile = createLogFileWithSize(directoryPath.resolve("testlogs2.log").toUri(), 2943); - Topic periodicUpdateIntervalSecTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "1000"); - when(config.findOrDefault(any(), eq(CONFIGURATION_CONFIG_KEY), eq(LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC))) - .thenReturn(periodicUpdateIntervalSecTopic); - Topics configTopics = Topics.of(context, CONFIGURATION_CONFIG_KEY, null); - when(config.lookupTopics(CONFIGURATION_CONFIG_KEY)).thenReturn(configTopics); - Topics logsUploaderConfigTopics = Topics.of(context, LOGS_UPLOADER_CONFIGURATION_TOPIC, null); - Topics systemConfigTopics = logsUploaderConfigTopics.createInteriorChild(SYSTEM_LOGS_CONFIG_TOPIC_NAME); + config.lookup(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC) + .withValue("1000"); + + Topics systemConfigTopics = config.lookupTopics(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_CONFIGURATION_TOPIC, + SYSTEM_LOGS_CONFIG_TOPIC_NAME); systemConfigTopics.createLeafChild(UPLOAD_TO_CW_CONFIG_TOPIC_NAME).withValue("false"); systemConfigTopics.createLeafChild(MIN_LOG_LEVEL_CONFIG_TOPIC_NAME).withValue("INFO"); systemConfigTopics.createLeafChild(DISK_SPACE_LIMIT_CONFIG_TOPIC_NAME).withValue("25"); systemConfigTopics.createLeafChild(DISK_SPACE_LIMIT_UNIT_CONFIG_TOPIC_NAME).withValue("MB"); - when(config.lookupTopics(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_CONFIGURATION_TOPIC)) - .thenReturn(logsUploaderConfigTopics); - - Topics component1ProcessedTopics = mock(Topics.class); - Topic component1LastFileProcessedTimeStampTopics = mock(Topic.class); - when(component1ProcessedTopics.createLeafChild(any())).thenReturn(component1LastFileProcessedTimeStampTopics); - when(component1LastFileProcessedTimeStampTopics.withValue(numberObjectCaptor.capture())) - .thenReturn(component1LastFileProcessedTimeStampTopics); - - Topics component2ProcessedTopics = mock(Topics.class); - Topic component2lastFileProcessedTimeStampTopics = mock(Topic.class); - lenient().when(component2ProcessedTopics.createLeafChild(any())).thenReturn(component2lastFileProcessedTimeStampTopics); - lenient().when(component2lastFileProcessedTimeStampTopics.withValue(anyInt())).thenReturn(component2lastFileProcessedTimeStampTopics); - - Topics component2ProcessingTopics = mock(Topics.class); - doNothing().when(component2ProcessingTopics).updateFromMap(updateFromMapCaptor.capture(), any()); - - Topics component1ProcessingTopics = mock(Topics.class); - lenient().doNothing().when(component1ProcessingTopics).updateFromMap(any(), any()); - - - Topics runtimeConfig = mock(Topics.class); - when(config.lookupTopics(RUNTIME_STORE_NAMESPACE_TOPIC)) - .thenReturn(runtimeConfig); - - // 2.3.0 and below - lenient().when(runtimeConfig.lookupTopics(PERSISTED_COMPONENT_CURRENT_PROCESSING_FILE_INFORMATION, - "TestComponent2")).thenReturn(component2ProcessingTopics); - // After 2.3.1 - lenient().when(runtimeConfig.lookupTopics(PERSISTED_COMPONENT_CURRENT_PROCESSING_FILE_INFORMATION_V2, - "TestComponent2")).thenReturn(component2ProcessingTopics); - - lenient().when(runtimeConfig.lookupTopics(PERSISTED_COMPONENT_LAST_FILE_PROCESSED_TIMESTAMP, "TestComponent")) - .thenReturn(component1ProcessedTopics); - - // 2.3.0 and below - lenient().when(runtimeConfig.lookupTopics(PERSISTED_COMPONENT_CURRENT_PROCESSING_FILE_INFORMATION, - "TestComponent")).thenReturn(component1ProcessingTopics); - // After 2.3.1 - lenient().when(runtimeConfig.lookupTopics(PERSISTED_COMPONENT_CURRENT_PROCESSING_FILE_INFORMATION_V2, - "TestComponent")).thenReturn(component1ProcessingTopics); - - lenient().when(runtimeConfig.lookupTopics(PERSISTED_COMPONENT_LAST_FILE_PROCESSED_TIMESTAMP, "TestComponent2")) - .thenReturn(component2ProcessedTopics); - CloudWatchAttempt attempt = new CloudWatchAttempt(); Map logStreamsToLogInformationMap = new HashMap<>(); @@ -709,18 +593,15 @@ void GIVEN_cloud_watch_attempt_handler_WHEN_attempt_completes_THEN_successfully_ startServiceOnAnotherThread(); callbackCaptor.getValue().accept(attempt); - assertThat(updateFromMapCaptor.getAllValues(), IsNot.not(IsEmptyCollection.empty())); - assertThat(numberObjectCaptor.getAllValues(), IsNot.not(IsEmptyCollection.empty())); - List completedComponentLastProcessedFileInformation = numberObjectCaptor.getAllValues(); - List> partiallyReadComponentLogFileInformation = updateFromMapCaptor.getAllValues(); - assertEquals(1, completedComponentLastProcessedFileInformation.size()); - assertEquals(2, partiallyReadComponentLogFileInformation.size()); - assertEquals(lastProcessedFile.lastModified(), - Coerce.toLong(completedComponentLastProcessedFileInformation.get(0))); + assertEquals(lastProcessedFile.lastModified(), Coerce.toLong(config.find(RUNTIME_STORE_NAMESPACE_TOPIC, + PERSISTED_COMPONENT_LAST_FILE_PROCESSED_TIMESTAMP, "TestComponent", PERSISTED_LAST_FILE_PROCESSED_TIMESTAMP))); LogManagerService.CurrentProcessingFileInformation currentProcessingFileInformation = LogManagerService.CurrentProcessingFileInformation.convertFromMapOfObjects( - (Map) partiallyReadComponentLogFileInformation.get(1).get(processingFile.hashString())); + (Map) config.lookupTopics(RUNTIME_STORE_NAMESPACE_TOPIC, + PERSISTED_COMPONENT_CURRENT_PROCESSING_FILE_INFORMATION_V2, "TestComponent2") + .toPOJO() + .get(processingFile.hashString())); assertEquals(processingFile.hashString(), currentProcessingFileInformation.getFileHash()); assertEquals(1061, currentProcessingFileInformation.getStartPosition()); assertEquals(processingFile.lastModified(), currentProcessingFileInformation.getLastModifiedTime()); @@ -744,22 +625,16 @@ void GIVEN_cloud_watch_attempt_handler_WHEN_attempt_completes_THEN_successfully_ void GIVEN_some_system_files_uploaded_and_another_partially_uploaded_WHEN_merger_merges_THEN_sets_the_start_position_correctly() throws InterruptedException, IOException { mockDefaultPersistedState(); - Topic periodicUpdateIntervalSecTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "3"); - when(config.findOrDefault(any(), eq(CONFIGURATION_CONFIG_KEY), eq(LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC))) - .thenReturn(periodicUpdateIntervalSecTopic); + config.lookup(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC) + .withValue("3"); when(mockMerger.processLogFiles(componentLogsInformationCaptor.capture())).thenReturn(new CloudWatchAttempt()); - Topics configTopics = Topics.of(context, CONFIGURATION_CONFIG_KEY, null); - when(config.lookupTopics(CONFIGURATION_CONFIG_KEY)).thenReturn(configTopics); - Topics logsUploaderConfigTopics = Topics.of(context, LOGS_UPLOADER_CONFIGURATION_TOPIC, null); - - Topics systemConfigTopics = logsUploaderConfigTopics.createInteriorChild(SYSTEM_LOGS_CONFIG_TOPIC_NAME); + Topics systemConfigTopics = config.lookupTopics(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_CONFIGURATION_TOPIC, + SYSTEM_LOGS_CONFIG_TOPIC_NAME); systemConfigTopics.createLeafChild(UPLOAD_TO_CW_CONFIG_TOPIC_NAME).withValue("true"); systemConfigTopics.createLeafChild(MIN_LOG_LEVEL_CONFIG_TOPIC_NAME).withValue("INFO"); systemConfigTopics.createLeafChild(DISK_SPACE_LIMIT_CONFIG_TOPIC_NAME).withValue("25"); systemConfigTopics.createLeafChild(DISK_SPACE_LIMIT_UNIT_CONFIG_TOPIC_NAME).withValue("MB"); - when(config.lookupTopics(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_CONFIGURATION_TOPIC)) - .thenReturn(logsUploaderConfigTopics); logsUploaderService = new LogManagerService(config, mockUploader, mockMerger, nucleusPaths); // These two files are existed, and the active file is greengrass.log @@ -801,22 +676,16 @@ void GIVEN_some_system_files_uploaded_and_another_partially_uploaded_WHEN_merger void GIVEN_a_partially_uploaded_file_but_rotated_WHEN_merger_merges_THEN_sets_the_start_position_correctly() throws InterruptedException, IOException { mockDefaultPersistedState(); - Topic periodicUpdateIntervalSecTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "3"); - when(config.findOrDefault(any(), eq(CONFIGURATION_CONFIG_KEY), eq(LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC))) - .thenReturn(periodicUpdateIntervalSecTopic); + config.lookup(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC) + .withValue("3"); when(mockMerger.processLogFiles(componentLogsInformationCaptor.capture())).thenReturn(new CloudWatchAttempt()); - Topics configTopics = Topics.of(context, CONFIGURATION_CONFIG_KEY, null); - when(config.lookupTopics(CONFIGURATION_CONFIG_KEY)).thenReturn(configTopics); - Topics logsUploaderConfigTopics = Topics.of(context, LOGS_UPLOADER_CONFIGURATION_TOPIC, null); - - Topics systemConfigTopics = logsUploaderConfigTopics.createInteriorChild(SYSTEM_LOGS_CONFIG_TOPIC_NAME); + Topics systemConfigTopics = config.lookupTopics(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_CONFIGURATION_TOPIC, + SYSTEM_LOGS_CONFIG_TOPIC_NAME); systemConfigTopics.createLeafChild(UPLOAD_TO_CW_CONFIG_TOPIC_NAME).withValue("true"); systemConfigTopics.createLeafChild(MIN_LOG_LEVEL_CONFIG_TOPIC_NAME).withValue("INFO"); systemConfigTopics.createLeafChild(DISK_SPACE_LIMIT_CONFIG_TOPIC_NAME).withValue("25"); systemConfigTopics.createLeafChild(DISK_SPACE_LIMIT_UNIT_CONFIG_TOPIC_NAME).withValue("MB"); - when(config.lookupTopics(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_CONFIGURATION_TOPIC)) - .thenReturn(logsUploaderConfigTopics); logsUploaderService = new LogManagerService(config, mockUploader, mockMerger, nucleusPaths); startServiceOnAnotherThread(); @@ -852,15 +721,12 @@ void GIVEN_a_partially_uploaded_file_but_rotated_WHEN_merger_merges_THEN_sets_th @Test void GIVEN_persisted_data_WHEN_log_uploader_initialises_THEN_correctly_sets_the_persisted_data() throws IOException { - Topic periodicUpdateIntervalSecTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "3"); - when(config.findOrDefault(any(), eq(CONFIGURATION_CONFIG_KEY), eq(LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC))) - .thenReturn(periodicUpdateIntervalSecTopic); - Topics configTopics = Topics.of(context, CONFIGURATION_CONFIG_KEY, null); - when(config.lookupTopics(CONFIGURATION_CONFIG_KEY)).thenReturn(configTopics); - Topics logsUploaderConfigTopics = Topics.of(context, LOGS_UPLOADER_CONFIGURATION_TOPIC, null); + config.lookup(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC) + .withValue("1"); Topics componentConfigTopics = - logsUploaderConfigTopics.createInteriorChild(COMPONENT_LOGS_CONFIG_MAP_TOPIC_NAME); + config.lookupTopics(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_CONFIGURATION_TOPIC, + COMPONENT_LOGS_CONFIG_MAP_TOPIC_NAME); Topics componentATopic = componentConfigTopics.createInteriorChild("UserComponentA"); componentATopic.createLeafChild(FILE_REGEX_CONFIG_TOPIC_NAME).withValue("^log.txt\\w*"); componentATopic.createLeafChild(FILE_DIRECTORY_PATH_CONFIG_TOPIC_NAME).withValue(directoryPath.toAbsolutePath().toString()); @@ -869,22 +735,15 @@ void GIVEN_persisted_data_WHEN_log_uploader_initialises_THEN_correctly_sets_the_ componentATopic.createLeafChild(DISK_SPACE_LIMIT_UNIT_CONFIG_TOPIC_NAME).withValue("GB"); componentATopic.createLeafChild(DELETE_LOG_FILES_AFTER_UPLOAD_CONFIG_TOPIC_NAME).withValue("false"); - Topics systemConfigTopics = logsUploaderConfigTopics.createInteriorChild(SYSTEM_LOGS_CONFIG_TOPIC_NAME); + Topics systemConfigTopics = config.lookupTopics(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_CONFIGURATION_TOPIC, + SYSTEM_LOGS_CONFIG_TOPIC_NAME); systemConfigTopics.createLeafChild(UPLOAD_TO_CW_CONFIG_TOPIC_NAME).withValue("true"); systemConfigTopics.createLeafChild(MIN_LOG_LEVEL_CONFIG_TOPIC_NAME).withValue("INFO"); systemConfigTopics.createLeafChild(DISK_SPACE_LIMIT_CONFIG_TOPIC_NAME).withValue("25"); systemConfigTopics.createLeafChild(DISK_SPACE_LIMIT_UNIT_CONFIG_TOPIC_NAME).withValue("MB"); - when(config.lookupTopics(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_CONFIGURATION_TOPIC)) - .thenReturn(logsUploaderConfigTopics); Instant now = Instant.now(); Instant tenSecondsAgo = Instant.now().minusSeconds(10); - Topics allCurrentProcessingComponentTopics1 = - Topics.of(context, PERSISTED_COMPONENT_CURRENT_PROCESSING_FILE_INFORMATION, null); - Topics currentProcessingComponentTopics1 = - Topics.of(context, SYSTEM_LOGS_COMPONENT_NAME, allCurrentProcessingComponentTopics1); - Topics currentProcessingComponentTopics2 = - Topics.of(context, "UserComponentA", allCurrentProcessingComponentTopics1); LogManagerService.CurrentProcessingFileInformation currentProcessingFileInformation1 = LogManagerService.CurrentProcessingFileInformation.builder() .fileHash("TestFileHash") @@ -897,40 +756,19 @@ void GIVEN_persisted_data_WHEN_log_uploader_initialises_THEN_correctly_sets_the_ .lastModifiedTime(now.toEpochMilli()) .startPosition(10000) .build(); - currentProcessingComponentTopics1.updateFromMap(currentProcessingFileInformation1.convertToMapOfObjects(), - new UpdateBehaviorTree(UpdateBehaviorTree.UpdateBehavior.REPLACE, now.toEpochMilli())); - currentProcessingComponentTopics2.updateFromMap(currentProcessingFileInformation2.convertToMapOfObjects(), - new UpdateBehaviorTree(UpdateBehaviorTree.UpdateBehavior.REPLACE, now.toEpochMilli())); - - Topics allLastFileProcessedComponentTopics = - Topics.of(context, PERSISTED_COMPONENT_LAST_FILE_PROCESSED_TIMESTAMP, null); - Topics lastFileProcessedComponentTopics1 = - Topics.of(context, SYSTEM_LOGS_COMPONENT_NAME, allLastFileProcessedComponentTopics); - Topics lastFileProcessedComponentTopics2 = - Topics.of(context, "UserComponentA", allLastFileProcessedComponentTopics); - Topic leafChild1 = lastFileProcessedComponentTopics1.createLeafChild(PERSISTED_LAST_FILE_PROCESSED_TIMESTAMP); - leafChild1.withValue(tenSecondsAgo.toEpochMilli()); - - when(config.lookupTopics(RUNTIME_STORE_NAMESPACE_TOPIC) - .lookupTopics(PERSISTED_COMPONENT_LAST_FILE_PROCESSED_TIMESTAMP, SYSTEM_LOGS_COMPONENT_NAME)) - .thenReturn(lastFileProcessedComponentTopics1); - when(config.lookupTopics(RUNTIME_STORE_NAMESPACE_TOPIC) - .lookupTopics(PERSISTED_COMPONENT_LAST_FILE_PROCESSED_TIMESTAMP, "UserComponentA")) - .thenReturn(lastFileProcessedComponentTopics2); - // mock deprecated topic version 2.3.0 and below - when(config.lookupTopics(RUNTIME_STORE_NAMESPACE_TOPIC) - .lookupTopics(PERSISTED_COMPONENT_CURRENT_PROCESSING_FILE_INFORMATION, SYSTEM_LOGS_COMPONENT_NAME)) - .thenReturn(currentProcessingComponentTopics1); - when(config.lookupTopics(RUNTIME_STORE_NAMESPACE_TOPIC) - .lookupTopics(PERSISTED_COMPONENT_CURRENT_PROCESSING_FILE_INFORMATION, "UserComponentA")) - .thenReturn(currentProcessingComponentTopics2); - // mock topic for version 2.3.1 and above - when(config.lookupTopics(RUNTIME_STORE_NAMESPACE_TOPIC) - .lookupTopics(PERSISTED_COMPONENT_CURRENT_PROCESSING_FILE_INFORMATION_V2, SYSTEM_LOGS_COMPONENT_NAME)) - .thenReturn(currentProcessingComponentTopics1); - when(config.lookupTopics(RUNTIME_STORE_NAMESPACE_TOPIC) - .lookupTopics(PERSISTED_COMPONENT_CURRENT_PROCESSING_FILE_INFORMATION_V2, "UserComponentA")) - .thenReturn(currentProcessingComponentTopics2); + + config.lookupTopics(RUNTIME_STORE_NAMESPACE_TOPIC) + .lookupTopics(PERSISTED_COMPONENT_CURRENT_PROCESSING_FILE_INFORMATION, SYSTEM_LOGS_COMPONENT_NAME) + .updateFromMap(currentProcessingFileInformation1.convertToMapOfObjects(), + new UpdateBehaviorTree(UpdateBehaviorTree.UpdateBehavior.REPLACE, now.toEpochMilli())); + config.lookupTopics(RUNTIME_STORE_NAMESPACE_TOPIC) + .lookupTopics(PERSISTED_COMPONENT_CURRENT_PROCESSING_FILE_INFORMATION, "UserComponentA") + .updateFromMap(currentProcessingFileInformation2.convertToMapOfObjects(), + new UpdateBehaviorTree(UpdateBehaviorTree.UpdateBehavior.REPLACE, now.toEpochMilli())); + + config.lookupTopics(RUNTIME_STORE_NAMESPACE_TOPIC) + .lookup(PERSISTED_COMPONENT_LAST_FILE_PROCESSED_TIMESTAMP, SYSTEM_LOGS_COMPONENT_NAME, + PERSISTED_LAST_FILE_PROCESSED_TIMESTAMP).withValue(tenSecondsAgo.toEpochMilli()); logsUploaderService = new LogManagerService(config, mockUploader, mockMerger, nucleusPaths); diff --git a/src/test/java/com/aws/greengrass/logmanager/util/ConfigUtilTest.java b/src/test/java/com/aws/greengrass/logmanager/util/ConfigUtilTest.java new file mode 100644 index 00000000..75b49199 --- /dev/null +++ b/src/test/java/com/aws/greengrass/logmanager/util/ConfigUtilTest.java @@ -0,0 +1,90 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.aws.greengrass.logmanager.util; + +import com.aws.greengrass.config.Topics; +import com.aws.greengrass.config.UpdateBehaviorTree; +import com.aws.greengrass.dependency.Context; +import com.aws.greengrass.util.Utils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class ConfigUtilTest { + private final Context context = new Context(); + + @AfterEach() + void after() { + context.shutdown(); + } + + @Test + public void update_from_map_updates_when_changes_exist() { + Topics root = Topics.of(context, "a", null); + AtomicInteger callbackCount = new AtomicInteger(); + root.subscribe((w, n) -> { + callbackCount.incrementAndGet(); + }); + + Map map1 = Utils.immutableMap("B", 1, "C", 2); + long now = System.currentTimeMillis(); + root.updateFromMap(map1, new UpdateBehaviorTree(UpdateBehaviorTree.UpdateBehavior.REPLACE, now)); + context.waitForPublishQueueToClear(); + + assertEquals(5, callbackCount.get()); + + ConfigUtil.updateFromMapWhenChanged(root, map1, + new UpdateBehaviorTree(UpdateBehaviorTree.UpdateBehavior.REPLACE, now)); + + // Nothing should have changed + context.waitForPublishQueueToClear(); + assertEquals(5, callbackCount.get()); + + Map map2 = Utils.immutableMap("C", 1); + ConfigUtil.updateFromMapWhenChanged(root, map2, + new UpdateBehaviorTree(UpdateBehaviorTree.UpdateBehavior.REPLACE, now)); + + // 2 events to remove B and update the value of C + context.waitForPublishQueueToClear(); + assertEquals(7, callbackCount.get()); + + // Try pushing timestamp forward + ConfigUtil.updateFromMapWhenChanged(root, map2, + new UpdateBehaviorTree(UpdateBehaviorTree.UpdateBehavior.REPLACE, now+10)); + + // Nothing should have changed + context.waitForPublishQueueToClear(); + assertEquals(7, callbackCount.get()); + + // Add in some nesting + Map map3 = Utils.immutableMap("C", Utils.immutableMap("A", 2)); + ConfigUtil.updateFromMapWhenChanged(root, map3, + new UpdateBehaviorTree(UpdateBehaviorTree.UpdateBehavior.REPLACE, now+10)); + + // 4 events more. remove C, add C as Topics, add A, update A + context.waitForPublishQueueToClear(); + assertEquals(11, callbackCount.get()); + + ConfigUtil.updateFromMapWhenChanged(root, map3, + new UpdateBehaviorTree(UpdateBehaviorTree.UpdateBehavior.REPLACE, now+20)); + + // Nothing should have changed + context.waitForPublishQueueToClear(); + assertEquals(11, callbackCount.get()); + + Map map4 = Utils.immutableMap("C", Utils.immutableMap("A", 1)); + ConfigUtil.updateFromMapWhenChanged(root, map4, + new UpdateBehaviorTree(UpdateBehaviorTree.UpdateBehavior.REPLACE, now+20)); + + // A changed + context.waitForPublishQueueToClear(); + assertEquals(12, callbackCount.get()); + } +}