Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for snapshot metrics #295

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,13 @@ To disable exporting cluster settings use:
prometheus.cluster.settings: false
```

#### Snapshot metrics

By default, snapshot metrics are disabled. To enable exporting snapshot metrics use:
```
prometheus.snapshots: true
```

#### Nodes filter

Metrics include statistics about individual OpenSearch nodes.
Expand Down
8 changes: 7 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import org.opensearch.gradle.PropertyNormalization
import org.opensearch.gradle.test.RestIntegTestTask

import java.util.regex.Matcher
Expand Down Expand Up @@ -101,7 +102,7 @@ dependencies {

restResources {
restApi {
includeCore '_common', 'cat', 'cluster', 'nodes', 'indices', 'index'
includeCore '_common', 'cat', 'cluster', 'nodes', 'indices', 'index', 'snapshot'
}
}

Expand All @@ -124,8 +125,13 @@ tasks.named("check").configure { dependsOn(integTest) }
// Temporary disable task :testingConventions
testingConventions.enabled = false

// Directory for snapshot repository
File repositoryDir = new File(project.layout.buildDirectory.get().asFile, "shared-repository")

testClusters.all {
numberOfNodes = 2
// Configuring repo path for 'fs' type snapshot repository
setting 'path.repo', "${repositoryDir.absolutePath}", PropertyNormalization.IGNORE_VALUE

// It seems cluster name can not be customized here. It gives an error:
// Testclusters does not allow the following settings to be changed:[cluster.name] for node{::yamlRestTest-0}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.compuscene.metrics.prometheus;

import org.opensearch.action.ClusterStatsData;
import org.opensearch.action.SnapshotsResponse;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.indices.stats.CommonStats;
Expand All @@ -37,6 +38,8 @@
import org.opensearch.monitor.os.OsStats;
import org.opensearch.monitor.process.ProcessStats;
import org.opensearch.script.ScriptStats;
import org.opensearch.snapshots.SnapshotInfo;
import org.opensearch.snapshots.SnapshotState;
import org.opensearch.threadpool.ThreadPoolStats;
import org.opensearch.transport.TransportStats;

Expand All @@ -54,19 +57,23 @@ public class PrometheusMetricsCollector {

private boolean isPrometheusClusterSettings;
private boolean isPrometheusIndices;
private boolean isPrometheusSnapshots;
private PrometheusMetricsCatalog catalog;

/**
* A constructor.
* @param catalog {@link PrometheusMetricsCatalog}
* @param isPrometheusIndices boolean flag for index level metric
* @param isPrometheusSnapshots boolean flag for snapshots related metrics
* @param isPrometheusClusterSettings boolean flag cluster settings metrics
*/
public PrometheusMetricsCollector(PrometheusMetricsCatalog catalog,
boolean isPrometheusIndices,
boolean isPrometheusSnapshots,
boolean isPrometheusClusterSettings) {
this.isPrometheusClusterSettings = isPrometheusClusterSettings;
this.isPrometheusIndices = isPrometheusIndices;
this.isPrometheusSnapshots = isPrometheusSnapshots;
this.catalog = catalog;
}

Expand All @@ -80,6 +87,7 @@ public void registerMetrics() {
registerNodeMetrics();
registerIndicesMetrics();
registerPerIndexMetrics();
registerSnapshotMetrics();
registerTransportMetrics();
registerHTTPMetrics();
registerThreadPoolMetrics();
Expand Down Expand Up @@ -465,6 +473,30 @@ private void updatePerIndexMetrics(@Nullable ClusterHealthResponse chr, @Nullabl
}
}

@SuppressWarnings("checkstyle:LineLength")
private void registerSnapshotMetrics() {
catalog.registerClusterGauge("min_snapshot_age", "Time elapsed in milliseconds since the most recent successful snapshot's start time", "sm_policy");
}

private void updateSnapshotsMetrics(@Nullable SnapshotsResponse snapshotsResponse) {
if (snapshotsResponse == null) {
return;
}
Map<String, Long> smPolicyMinSnapshotAge = new HashMap<>();
for (SnapshotInfo snapshotInfo : snapshotsResponse.getSnapshotInfos()) {
// emit min_snapshot_age metric only for successful snapshots
if (snapshotInfo.state() != SnapshotState.SUCCESS) {
continue;
}
String smPolicy = snapshotInfo.userMetadata() == null ? "adhoc" : snapshotInfo.userMetadata().getOrDefault("sm_policy", "adhoc").toString();
long snapshotAge = System.currentTimeMillis() - snapshotInfo.startTime();
smPolicyMinSnapshotAge.compute(smPolicy, (key, oldValue) -> oldValue == null ? snapshotAge : Math.min(oldValue, snapshotAge));
}
for(Map.Entry<String, Long> entry : smPolicyMinSnapshotAge.entrySet()) {
catalog.setClusterGauge("min_snapshot_age", entry.getValue(), entry.getKey());
}
}

@SuppressWarnings("checkstyle:LineLength")
private void updatePerIndexContextMetrics(String indexName, String context, CommonStats idx) {
catalog.setClusterGauge("index_doc_number", idx.getDocs().getCount(), indexName, context);
Expand Down Expand Up @@ -920,12 +952,14 @@ private void updateESSettings(@Nullable ClusterStatsData stats) {
* @param nodeStats NodeStats filtered using nodes filter
* @param indicesStats IndicesStatsResponse
* @param clusterStatsData ClusterStatsData
* @param snapshotsResponse SnapshotsResponse
*/
public void updateMetrics(String originNodeName, String originNodeId,
@Nullable ClusterHealthResponse clusterHealthResponse,
NodeStats[] nodeStats,
@Nullable IndicesStatsResponse indicesStats,
@Nullable ClusterStatsData clusterStatsData) {
@Nullable ClusterStatsData clusterStatsData,
@Nullable SnapshotsResponse snapshotsResponse) {
Summary.Timer timer = catalog.startSummaryTimer(
new Tuple<>(originNodeName, originNodeId),
"metrics_generate_time_seconds");
Expand Down Expand Up @@ -956,7 +990,9 @@ public void updateMetrics(String originNodeName, String originNodeId,
if (isPrometheusClusterSettings) {
updateESSettings(clusterStatsData);
}

if (isPrometheusSnapshots) {
updateSnapshotsMetrics(snapshotsResponse);
}
timer.observeDuration();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,26 +49,35 @@ public enum INDEX_FILTER_OPTIONS {

static String PROMETHEUS_CLUSTER_SETTINGS_KEY = "prometheus.cluster.settings";
static String PROMETHEUS_INDICES_KEY = "prometheus.indices";
static String PROMETHEUS_SNAPSHOTS_KEY = "prometheus.snapshots";
static String PROMETHEUS_NODES_FILTER_KEY = "prometheus.nodes.filter";
static String PROMETHEUS_SELECTED_INDICES_KEY = "prometheus.indices_filter.selected_indices";
static String PROMETHEUS_SELECTED_OPTION_KEY = "prometheus.indices_filter.selected_option";

/**
* This setting is used configure weather to expose cluster settings metrics or not. The default value is true.
* This setting is used configure whether to expose cluster settings metrics or not. The default value is true.
patelsmit32123 marked this conversation as resolved.
Show resolved Hide resolved
* Can be configured in opensearch.yml file or update dynamically under key {@link #PROMETHEUS_CLUSTER_SETTINGS_KEY}.
*/
public static final Setting<Boolean> PROMETHEUS_CLUSTER_SETTINGS =
Setting.boolSetting(PROMETHEUS_CLUSTER_SETTINGS_KEY, true,
Setting.Property.Dynamic, Setting.Property.NodeScope);

/**
* This setting is used configure weather to expose low level index metrics or not. The default value is true.
* This setting is used configure whether to expose low level index metrics or not. The default value is true.
patelsmit32123 marked this conversation as resolved.
Show resolved Hide resolved
* Can be configured in opensearch.yml file or update dynamically under key {@link #PROMETHEUS_INDICES_KEY}.
*/
public static final Setting<Boolean> PROMETHEUS_INDICES =
Setting.boolSetting(PROMETHEUS_INDICES_KEY, true,
Setting.Property.Dynamic, Setting.Property.NodeScope);

/**
* This setting is used configure whether to expose snapshot metrics or not. The default value is false.
* Can be configured in opensearch.yml file or update dynamically under key {@link #PROMETHEUS_SNAPSHOTS_KEY}.
*/
public static final Setting<Boolean> PROMETHEUS_SNAPSHOTS =
Setting.boolSetting(PROMETHEUS_SNAPSHOTS_KEY, false,
Setting.Property.Dynamic, Setting.Property.NodeScope);

/**
* This setting is used configure which cluster nodes to gather metrics from. The default value is _local.
* Can be configured in opensearch.yml file or update dynamically under key {@link #PROMETHEUS_NODES_FILTER_KEY}.
Expand Down Expand Up @@ -97,6 +106,7 @@ public enum INDEX_FILTER_OPTIONS {

private volatile boolean clusterSettings;
private volatile boolean indices;
private volatile boolean snapshots;
private volatile String nodesFilter;
private volatile String selectedIndices;
private volatile INDEX_FILTER_OPTIONS selectedOption;
Expand All @@ -109,11 +119,13 @@ public enum INDEX_FILTER_OPTIONS {
public PrometheusSettings(Settings settings, ClusterSettings clusterSettings) {
setPrometheusClusterSettings(PROMETHEUS_CLUSTER_SETTINGS.get(settings));
setPrometheusIndices(PROMETHEUS_INDICES.get(settings));
setPrometheusSnapshots(PROMETHEUS_SNAPSHOTS.get(settings));
setPrometheusNodesFilter(PROMETHEUS_NODES_FILTER.get(settings));
setPrometheusSelectedIndices(PROMETHEUS_SELECTED_INDICES.get(settings));
setPrometheusSelectedOption(PROMETHEUS_SELECTED_OPTION.get(settings));
clusterSettings.addSettingsUpdateConsumer(PROMETHEUS_CLUSTER_SETTINGS, this::setPrometheusClusterSettings);
clusterSettings.addSettingsUpdateConsumer(PROMETHEUS_INDICES, this::setPrometheusIndices);
clusterSettings.addSettingsUpdateConsumer(PROMETHEUS_SNAPSHOTS, this::setPrometheusSnapshots);
clusterSettings.addSettingsUpdateConsumer(PROMETHEUS_NODES_FILTER, this::setPrometheusNodesFilter);
clusterSettings.addSettingsUpdateConsumer(PROMETHEUS_SELECTED_INDICES, this::setPrometheusSelectedIndices);
clusterSettings.addSettingsUpdateConsumer(PROMETHEUS_SELECTED_OPTION, this::setPrometheusSelectedOption);
Expand All @@ -127,6 +139,10 @@ private void setPrometheusIndices(boolean flag) {
this.indices = flag;
}

private void setPrometheusSnapshots(boolean flag) {
this.snapshots = flag;
}

private void setPrometheusNodesFilter(String filter) { this.nodesFilter = filter; }

private void setPrometheusSelectedIndices(String selectedIndices) {
Expand All @@ -153,6 +169,14 @@ public boolean getPrometheusIndices() {
return this.indices;
}

/**
* Get value of settings key {@link #PROMETHEUS_SNAPSHOTS_KEY}.
* @return boolean value of the key
*/
public boolean getPrometheusSnapshots() {
return this.snapshots;
}

/**
* Get value of settings key {@link #PROMETHEUS_NODES_FILTER_KEY}.
* @return boolean value of the key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.opensearch.action;

import org.opensearch.Version;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
Expand All @@ -29,6 +30,7 @@
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.search.pipeline.SearchPipelineStats;

import java.io.IOException;

Expand All @@ -43,6 +45,7 @@ public class NodePrometheusMetricsResponse extends ActionResponse {
private final NodeStats[] nodeStats;
@Nullable private final IndicesStatsResponse indicesStats;
private ClusterStatsData clusterStatsData = null;
@Nullable private final SnapshotsResponse snapshotsResponse;

/**
* A constructor that materialize the instance from inputStream.
Expand All @@ -56,6 +59,11 @@ public NodePrometheusMetricsResponse(StreamInput in) throws IOException {
nodeStats = in.readArray(NodeStats::new, NodeStats[]::new);
indicesStats = PackageAccessHelper.createIndicesStatsResponse(in);
clusterStatsData = new ClusterStatsData(in);
if (in.getVersion().onOrAfter(Version.V_2_17_1)) {
snapshotsResponse = new SnapshotsResponse(in);
} else {
snapshotsResponse = null;
}
}

/**
Expand All @@ -65,6 +73,7 @@ public NodePrometheusMetricsResponse(StreamInput in) throws IOException {
* @param nodesStats NodesStats
* @param indicesStats IndicesStats
* @param clusterStateResponse ClusterStateResponse
* @param snapshotsResponse SnapshotsResponse
* @param settings Settings
* @param clusterSettings ClusterSettings
*/
Expand All @@ -73,6 +82,7 @@ public NodePrometheusMetricsResponse(ClusterHealthResponse clusterHealth,
NodeStats[] nodesStats,
@Nullable IndicesStatsResponse indicesStats,
@Nullable ClusterStateResponse clusterStateResponse,
@Nullable SnapshotsResponse snapshotsResponse,
Settings settings,
ClusterSettings clusterSettings) {
this.clusterHealth = clusterHealth;
Expand All @@ -82,6 +92,7 @@ public NodePrometheusMetricsResponse(ClusterHealthResponse clusterHealth,
if (clusterStateResponse != null) {
this.clusterStatsData = new ClusterStatsData(clusterStateResponse, settings, clusterSettings);
}
this.snapshotsResponse = snapshotsResponse;
}

/**
Expand All @@ -106,6 +117,15 @@ public NodeStats[] getNodeStats() {
return this.nodeStats;
}

/**
* Get internal {@link SnapshotsResponse} object.
* @return SnapshotsResponse object
*/
@Nullable
public SnapshotsResponse getSnapshotsResponse() {
return this.snapshotsResponse;
}

/**
* Get internal {@link IndicesStatsResponse} object.
* @return IndicesStatsResponse object
Expand All @@ -131,5 +151,8 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeArray(nodeStats);
out.writeOptionalWriteable(indicesStats);
clusterStatsData.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_2_17_1)) {
snapshotsResponse.writeTo(out);
}
}
}
79 changes: 79 additions & 0 deletions src/main/java/org/opensearch/action/SnapshotsResponse.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright [2018] [Vincent VAN HOLLEBEKE]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.opensearch.action;

import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.snapshots.SnapshotInfo;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
* Represents a container class for holding response data related to snapshots.
*/
public class SnapshotsResponse extends ActionResponse {
private final List<SnapshotInfo> snapshotInfos;

/**
* A constructor.
* @param in A streamInput to materialize the instance from
* @throws IOException if there is an exception reading from inputStream
*/
public SnapshotsResponse(StreamInput in) throws IOException {
super(in);
snapshotInfos = Collections.synchronizedList(in.readList(SnapshotInfo::new));
}

/**
* A constructor.
*/
public SnapshotsResponse() {
this.snapshotInfos = Collections.synchronizedList(new ArrayList<>());
}

/**
* Writes the instance into {@link StreamOutput}.
*
* @param out the output stream to which the instance is to be written
* @throws IOException if there is an exception writing to the output stream
*/
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeCollection(snapshotInfos);
}

/**
* Getter for {@code snapshotInfos} list.
*
* @return the list of {@link SnapshotInfo} objects
*/
public List<SnapshotInfo> getSnapshotInfos() {
return snapshotInfos;
}

/**
* Adds {@code snapshotInfosToAdd} to the {@code snapshotInfos} list.
*/
public void addSnapshotInfos(List<SnapshotInfo> snapshotInfosToAdd) {
snapshotInfos.addAll(snapshotInfosToAdd);
}
}
Loading
Loading