Skip to content

Commit

Permalink
Added integration test for snapshots metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
psmitj committed Aug 15, 2024
1 parent 2ac18e9 commit a000692
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 31 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,13 @@ To disable exporting cluster settings use:
prometheus.cluster.settings: false
```

#### Snapshot metrics

To enable exporting snapshot metrics use:
```
prometheus.snapshots: true
```

#### Nodes filter

Metrics include statistics about individual OpenSearch nodes.
Expand Down
6 changes: 6 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import org.opensearch.gradle.PropertyNormalization
import org.opensearch.gradle.test.RestIntegTestTask

import java.util.regex.Matcher
Expand Down Expand Up @@ -124,8 +125,13 @@ tasks.named("check").configure { dependsOn(integTest) }
// Temporary disable task :testingConventions
testingConventions.enabled = false

// Directory for snapshot repository
File repositoryDir = new File(project.buildDir, "shared-repository")

testClusters.all {
numberOfNodes = 2
// Configuring repo path for 'fs' type snapshot repository
setting 'path.repo', "${repositoryDir.absolutePath}", PropertyNormalization.IGNORE_VALUE

// It seems cluster name can not be customized here. It gives an error:
// Testclusters does not allow the following settings to be changed:[cluster.name] for node{::yamlRestTest-0}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,23 +55,23 @@ public enum INDEX_FILTER_OPTIONS {
static String PROMETHEUS_SELECTED_OPTION_KEY = "prometheus.indices_filter.selected_option";

/**
* This setting is used configure weather to expose cluster settings metrics or not. The default value is true.
* This setting is used configure whether to expose cluster settings metrics or not. The default value is true.
* Can be configured in opensearch.yml file or update dynamically under key {@link #PROMETHEUS_CLUSTER_SETTINGS_KEY}.
*/
public static final Setting<Boolean> PROMETHEUS_CLUSTER_SETTINGS =
Setting.boolSetting(PROMETHEUS_CLUSTER_SETTINGS_KEY, true,
Setting.Property.Dynamic, Setting.Property.NodeScope);

/**
* This setting is used configure weather to expose low level index metrics or not. The default value is true.
* This setting is used configure whether to expose low level index metrics or not. The default value is true.
* Can be configured in opensearch.yml file or update dynamically under key {@link #PROMETHEUS_INDICES_KEY}.
*/
public static final Setting<Boolean> PROMETHEUS_INDICES =
Setting.boolSetting(PROMETHEUS_INDICES_KEY, true,
Setting.Property.Dynamic, Setting.Property.NodeScope);

/**
* This setting is used configure weather to expose snapshot metrics or not. The default value is false.
* This setting is used configure whether to expose snapshot metrics or not. The default value is false.
* Can be configured in opensearch.yml file or update dynamically under key {@link #PROMETHEUS_SNAPSHOTS_KEY}.
*/
public static final Setting<Boolean> PROMETHEUS_SNAPSHOTS =
Expand Down
19 changes: 12 additions & 7 deletions src/main/java/org/opensearch/action/SnapshotsResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.opensearch.snapshots.SnapshotInfo;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

Expand All @@ -39,16 +40,14 @@ public class SnapshotsResponse extends ActionResponse {
*/
public SnapshotsResponse(StreamInput in) throws IOException {
super(in);
snapshotInfos = in.readList(SnapshotInfo::new);
snapshotInfos = Collections.synchronizedList(in.readList(SnapshotInfo::new));
}

/**
* A constructor.
*
* @param snapshotInfos A list of {@link SnapshotInfo} objects to initialize the instance with.
*/
public SnapshotsResponse(List<SnapshotInfo> snapshotInfos) {
this.snapshotInfos = Collections.unmodifiableList(snapshotInfos);
public SnapshotsResponse() {
this.snapshotInfos = Collections.synchronizedList(new ArrayList<>());
}

/**
Expand All @@ -64,11 +63,17 @@ public void writeTo(StreamOutput out) throws IOException {

/**
* Getter for {@code snapshotInfos} list.
* The returned list is unmodifiable to ensure immutability.
*
* @return the list of {@link SnapshotInfo} objects
*/
public List<SnapshotInfo> getSnapshotInfos() {
return snapshotInfos;
}
}

/**
* Adds {@code snapshotInfosToAdd} to the {@code snapshotInfos} list.
*/
public void addSnapshotInfos(List<SnapshotInfo> snapshotInfosToAdd) {
snapshotInfos.addAll(snapshotInfosToAdd);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,17 @@
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.client.Client;
import org.opensearch.client.Requests;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.common.Nullable;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.action.ActionListener;
import org.opensearch.snapshots.SnapshotInfo;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -111,7 +109,10 @@ private class AsyncAction {
private NodesStatsResponse nodesStatsResponse = null;
private IndicesStatsResponse indicesStatsResponse = null;
private ClusterStateResponse clusterStateResponse = null;
private SnapshotsResponse snapshotsResponse = null;
private final Queue<String> snapshotRepositories = new ConcurrentLinkedQueue<>();
private final int maxSnapshotRepositoriesToBeFetched = 5;
private volatile int snapshotRepositoriesFetchedCount = 0;
private final SnapshotsResponse snapshotsResponse = new SnapshotsResponse();

// read the state of prometheus dynamic settings only once at the beginning of the async request
private final boolean isPrometheusIndices = prometheusSettings.getPrometheusIndices();
Expand Down Expand Up @@ -176,22 +177,14 @@ private void gatherRequests() {
new ActionListener<GetRepositoriesResponse>() {
@Override
public void onResponse(GetRepositoriesResponse response) {
List<ActionFuture<GetSnapshotsResponse>> snapshotsResponseFutures = response.repositories().stream()
.map(metadata -> new GetSnapshotsRequest(metadata.name()))
.map(snapshotsRequest -> client.admin().cluster().getSnapshots(snapshotsRequest))
.collect(Collectors.toList());
List<SnapshotInfo> snapshotInfos = new ArrayList<>();
for (ActionFuture<GetSnapshotsResponse> snapshotsResponseFuture : snapshotsResponseFutures) {
try {
GetSnapshotsResponse getSnapshotsResponse = snapshotsResponseFuture.get();
snapshotInfos.addAll(getSnapshotsResponse.getSnapshots());
} catch (InterruptedException | ExecutionException e) {
listener.onFailure(new OpenSearchException("Get snapshots request failed", e));
return;
}
if (response.repositories().isEmpty()) {
gatherRequests();
return;
}
snapshotsResponse = new SnapshotsResponse(snapshotInfos);
gatherRequests();
snapshotRepositories.addAll(response.repositories().stream()
.map(RepositoryMetadata::name).collect(Collectors.toList()));
String snapshotRepository = snapshotRepositories.poll();
client.admin().cluster().getSnapshots(new GetSnapshotsRequest(snapshotRepository), snapshotsResponseActionListener);
}

@Override
Expand All @@ -200,6 +193,27 @@ public void onFailure(Exception e) {
}
};

private final ActionListener<GetSnapshotsResponse> snapshotsResponseActionListener =
new ActionListener<GetSnapshotsResponse>() {
@Override
public void onResponse(GetSnapshotsResponse response) {
snapshotsResponse.addSnapshotInfos(response.getSnapshots());
snapshotRepositoriesFetchedCount++;
if (snapshotRepositories.isEmpty() || snapshotRepositoriesFetchedCount >= maxSnapshotRepositoriesToBeFetched) {
gatherRequests();
return;
}
// Fetch the snapshots for the next repository in the queue
String snapshotRepository = snapshotRepositories.poll();
client.admin().cluster().getSnapshots(new GetSnapshotsRequest(snapshotRepository), snapshotsResponseActionListener);
}

@Override
public void onFailure(Exception e) {
listener.onFailure(new OpenSearchException("Get snapshots request failed", e));
}
};

private final ActionListener<ClusterStateResponse> clusterStateResponseActionListener =
new ActionListener<ClusterStateResponse>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
---
"Verify snapshots metrics enabled":

# Create 'fs' snapshot repository
- do:
snapshot.create_repository:
repository: test_repo_get_1
body:
type: fs
settings:
location: "test_repo_get_1_loc"

- do:
snapshot.get_repository: { }

- is_true: test_repo_get_1

# Enable snapshots metrics
- do:
cluster.put_settings:
body:
persistent:
prometheus.snapshots: "true"
flat_settings: true

- match: { persistent: { prometheus.snapshots: "true" } }

# Create snapshot
- do:
snapshot.create:
repository: test_repo_get_1
snapshot: test_snapshot_1
wait_for_completion: true

- match: { snapshot.snapshot: test_snapshot_1 }
- match: { snapshot.state : SUCCESS }

# Fetch and verify metrics
- do:
prometheus.metrics: {}

- match:
$body: |
/.*
\# \s* HELP \s+ opensearch_min_snapshot_age \s+.*\n
\# \s* TYPE \s+ opensearch_min_snapshot_age \s+ gauge\n
opensearch_min_snapshot_age\{
cluster="yamlRestTest",sm_policy="adhoc",
\}\s+\d+\.\d+\n
.*/

0 comments on commit a000692

Please sign in to comment.