Skip to content

Commit

Permalink
Add Zipkin support of OpenSearch storage
Browse files Browse the repository at this point in the history
Signed-off-by: Andriy Redko <[email protected]>
  • Loading branch information
reta committed May 1, 2024
1 parent 3377f35 commit 6c909b5
Show file tree
Hide file tree
Showing 15 changed files with 670 additions and 255 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright The OpenZipkin Authors
* SPDX-License-Identifier: Apache-2.0
*/

package zipkin2.elasticsearch;

import static zipkin2.elasticsearch.internal.JsonReaders.enterPath;

import java.io.IOException;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.linecorp.armeria.common.AggregatedHttpRequest;
import com.linecorp.armeria.common.HttpMethod;

import zipkin2.elasticsearch.internal.client.HttpCall;

/**
* Base version for both Elasticsearch and OpenSearch distributions.
*/
public abstract class BaseVersion {
final int major, minor;

BaseVersion(int major, int minor) {
this.major = major;
this.minor = minor;
}

/**
* Gets the version for particular distribution, returns either {@link ElasticsearchVersion}
* or {@link OpensearchVersion} instance.
* @param http the HTTP client
* @return either {@link ElasticsearchVersion} or {@link OpensearchVersion} instance
* @throws IOException in case of I/O errors
*/
static BaseVersion get(HttpCall.Factory http) throws IOException {
return Parser.INSTANCE.get(http);
}

/**
* Does this version of Elasticsearch / OpenSearch still support mapping types?
* @return "true" if mapping types are supported, "false" otherwise
*/
public abstract boolean supportsTypes();

enum Parser implements HttpCall.BodyConverter<BaseVersion> {
INSTANCE;

final Pattern REGEX = Pattern.compile("(\\d+)\\.(\\d+).*");

BaseVersion get(HttpCall.Factory callFactory) throws IOException {
AggregatedHttpRequest getNode = AggregatedHttpRequest.of(HttpMethod.GET, "/");
BaseVersion version = callFactory.newCall(getNode, this, "get-node").execute();
if (version == null) {
throw new IllegalArgumentException("No content reading Elasticsearch version");
}
return version;
}

@Override
public BaseVersion convert(JsonParser parser, Supplier<String> contentString) {
String version = null;
String distribution = null;
try {
if (enterPath(parser, "version") != null) {
while (parser.nextToken() != null) {
if (parser.currentToken() == JsonToken.VALUE_STRING) {
if (parser.currentName() == "distribution") {
distribution = parser.getText();
} else if (parser.currentName() == "number") {
version = parser.getText();
}
}
}
}
} catch (RuntimeException | IOException possiblyParseException) {
// EmptyCatch ignored
}

if (version == null) {
throw new IllegalArgumentException(
".version.number not found in response: " + contentString.get());
}

Matcher matcher = REGEX.matcher(version);
if (!matcher.matches()) {
throw new IllegalArgumentException("Invalid .version.number: " + version);
}

try {
int major = Integer.parseInt(matcher.group(1));
int minor = Integer.parseInt(matcher.group(2));
if ("opensearch".equalsIgnoreCase(distribution)) {
return new OpensearchVersion(major, minor);
} else {
return new ElasticsearchVersion(major, minor);
}
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Invalid .version.number: " + version
+ ", for .version.distribution:" + distribution);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright The OpenZipkin Authors
* SPDX-License-Identifier: Apache-2.0
*/

package zipkin2.elasticsearch;

import static zipkin2.elasticsearch.ElasticsearchVersion.V5_0;
import static zipkin2.elasticsearch.ElasticsearchVersion.V6_0;
import static zipkin2.elasticsearch.ElasticsearchVersion.V6_7;
import static zipkin2.elasticsearch.ElasticsearchVersion.V7_0;
import static zipkin2.elasticsearch.ElasticsearchVersion.V7_8;
import static zipkin2.elasticsearch.ElasticsearchVersion.V9_0;

import zipkin2.internal.Nullable;

final class ElasticsearchSpecificTemplates extends VersionSpecificTemplates<ElasticsearchVersion> {
static class DistributionTemplate extends DistributionSpecificTemplates {
private final ElasticsearchVersion version;

DistributionTemplate(ElasticsearchVersion version) {
this.version = version;
}

@Override String indexTemplatesUrl(String indexPrefix, String type, @Nullable Integer templatePriority) {
if (version.compareTo(V7_8) >= 0 && templatePriority != null) {
return "/_index_template/" + indexPrefix + type + "_template";
}
if (version.compareTo(V6_7) >= 0 && version.compareTo(V7_0) < 0) {
// because deprecation warning on 6 to prepare for 7:
//
// [types removal] The parameter include_type_name should be explicitly specified in get
// template requests to prepare for 7.0. In 7.0 include_type_name will default to 'false',
// which means responses will omit the type name in mapping definitions.
//
// The parameter include_type_name was added in 6.7. Using this with ES older than
// 6.7 will result in unrecognized parameter: [include_type_name].
return "/_template/" + indexPrefix + type + "_template?include_type_name=true";
}

return "/_template/" + indexPrefix + type + "_template";
}

@Override char indexTypeDelimiter() {
return ElasticsearchSpecificTemplates.indexTypeDelimiter(version);
}

@Override
IndexTemplates get(String indexPrefix, int indexReplicas, int indexShards,
boolean searchEnabled, boolean strictTraceId, Integer templatePriority) {
return new ElasticsearchSpecificTemplates(indexPrefix, indexReplicas, indexShards,
searchEnabled, strictTraceId, templatePriority).get(version);
}
}

ElasticsearchSpecificTemplates(String indexPrefix, int indexReplicas, int indexShards,
boolean searchEnabled, boolean strictTraceId, Integer templatePriority) {
super(indexPrefix, indexReplicas,indexShards, searchEnabled, strictTraceId, templatePriority);
}

@Override String indexPattern(String type, ElasticsearchVersion version) {
return '"'
+ (version.compareTo(V6_0) < 0 ? "template" : "index_patterns")
+ "\": \""
+ indexPrefix
+ indexTypeDelimiter(version)
+ type
+ "-*"
+ "\"";
}

@Override boolean useComposableTemplate(ElasticsearchVersion version) {
return (version.compareTo(V7_8) >= 0 && templatePriority != null);
}

/**
* This returns a delimiter based on what's supported by the Elasticsearch version.
*
* <p>Starting in Elasticsearch 7.x, colons are no longer allowed in index names. This logic will
* make sure the pattern in our index template doesn't use them either.
*
* <p>See https://github.com/openzipkin/zipkin/issues/2219
*/
static char indexTypeDelimiter(ElasticsearchVersion version) {
return version.compareTo(V7_0) < 0 ? ':' : '-';
}

@Override String maybeWrap(String type, ElasticsearchVersion version, String json) {
// ES 7.x defaults include_type_name to false https://www.elastic.co/guide/en/elasticsearch/reference/current/breaking-changes-7.0.html#_literal_include_type_name_literal_now_defaults_to_literal_false_literal
if (version.compareTo(V7_0) >= 0) return json;
return " \"" + type + "\": {\n " + json.replace("\n", "\n ") + " }\n";
}

@Override IndexTemplates get(ElasticsearchVersion version) {
if (version.compareTo(V5_0) < 0 || version.compareTo(V9_0) >= 0) {
throw new IllegalArgumentException(
"Elasticsearch versions 5-8.x are supported, was: " + version);
}
return IndexTemplates.newBuilder()
.version(version)
.indexTypeDelimiter(indexTypeDelimiter(version))
.span(spanIndexTemplate(version))
.dependency(dependencyTemplate(version))
.autocomplete(autocompleteTemplate(version))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@
import zipkin2.storage.Traces;

import static com.linecorp.armeria.common.HttpMethod.GET;
import static zipkin2.elasticsearch.ElasticsearchVersion.V6_7;
import static zipkin2.elasticsearch.ElasticsearchVersion.V7_0;
import static zipkin2.elasticsearch.ElasticsearchVersion.V7_8;
import static zipkin2.elasticsearch.EnsureIndexTemplate.ensureIndexTemplate;
import static zipkin2.elasticsearch.VersionSpecificTemplates.TYPE_AUTOCOMPLETE;
import static zipkin2.elasticsearch.VersionSpecificTemplates.TYPE_DEPENDENCY;
Expand Down Expand Up @@ -240,17 +237,17 @@ public final Builder dateSeparator(char dateSeparator) {
return new ElasticsearchSpanConsumer(this);
}

/** Returns the Elasticsearch version of the connected cluster. Internal use only */
@Memoized public ElasticsearchVersion version() {
/** Returns the Elasticsearch / OpenSearch version of the connected cluster. Internal use only */
@Memoized public BaseVersion version() {
try {
return ElasticsearchVersion.get(http());
return BaseVersion.get(http());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

char indexTypeDelimiter() {
return VersionSpecificTemplates.indexTypeDelimiter(version());
return VersionSpecificTemplates.forVersion(version()).indexTypeDelimiter();
}

/** This is an internal blocking call, only used in tests. */
Expand Down Expand Up @@ -337,35 +334,20 @@ IndexTemplates doEnsureIndexTemplates() {
}
}

IndexTemplates versionSpecificTemplates(ElasticsearchVersion version) {
return new VersionSpecificTemplates(
IndexTemplates versionSpecificTemplates(BaseVersion version) {
return VersionSpecificTemplates.forVersion(version).get(
indexNameFormatter().index(),
indexReplicas(),
indexShards(),
searchEnabled(),
strictTraceId(),
templatePriority()
).get(version);
);
}

String buildUrl(IndexTemplates templates, String type) {
String indexPrefix = indexNameFormatter().index() + templates.indexTypeDelimiter();

if (version().compareTo(V7_8) >= 0 && templatePriority() != null) {
return "/_index_template/" + indexPrefix + type + "_template";
}
if (version().compareTo(V6_7) >= 0 && version().compareTo(V7_0) < 0) {
// because deprecation warning on 6 to prepare for 7:
//
// [types removal] The parameter include_type_name should be explicitly specified in get
// template requests to prepare for 7.0. In 7.0 include_type_name will default to 'false',
// which means responses will omit the type name in mapping definitions.
//
// The parameter include_type_name was added in 6.7. Using this with ES older than
// 6.7 will result in unrecognized parameter: [include_type_name].
return "/_template/" + indexPrefix + type + "_template?include_type_name=true";
}
return "/_template/" + indexPrefix + type + "_template";
return VersionSpecificTemplates.forVersion(version()).indexTemplatesUrl(indexPrefix, type, templatePriority());
}

@Override public final String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,23 @@
*/
package zipkin2.elasticsearch;

import com.fasterxml.jackson.core.JsonParser;
import com.linecorp.armeria.common.AggregatedHttpRequest;
import com.linecorp.armeria.common.HttpMethod;
import java.io.IOException;
import java.util.Objects;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import zipkin2.elasticsearch.internal.client.HttpCall;

import static zipkin2.elasticsearch.internal.JsonReaders.enterPath;

/** Helps avoid problems comparing versions by number. Ex 7.10 should be > 7.9 */
public final class ElasticsearchVersion implements Comparable<ElasticsearchVersion> {
public final class ElasticsearchVersion extends BaseVersion implements Comparable<ElasticsearchVersion> {
public static final ElasticsearchVersion V5_0 = new ElasticsearchVersion(5, 0);
public static final ElasticsearchVersion V6_0 = new ElasticsearchVersion(6, 0);
public static final ElasticsearchVersion V6_7 = new ElasticsearchVersion(6, 7);
public static final ElasticsearchVersion V7_0 = new ElasticsearchVersion(7, 0);
public static final ElasticsearchVersion V7_8 = new ElasticsearchVersion(7, 8);
public static final ElasticsearchVersion V9_0 = new ElasticsearchVersion(9, 0);

static ElasticsearchVersion get(HttpCall.Factory http) throws IOException {
return Parser.INSTANCE.get(http);
ElasticsearchVersion(int major, int minor) {
super(major, minor);
}

final int major, minor;

ElasticsearchVersion(int major, int minor) {
this.major = major;
this.minor = minor;
@Override public boolean supportsTypes() {
return compareTo(V7_0) < 0;
}

@Override public int compareTo(ElasticsearchVersion other) {
Expand All @@ -57,45 +44,5 @@ static ElasticsearchVersion get(HttpCall.Factory http) throws IOException {
return major + "." + minor;
}

enum Parser implements HttpCall.BodyConverter<ElasticsearchVersion> {
INSTANCE;

final Pattern REGEX = Pattern.compile("(\\d+)\\.(\\d+).*");

ElasticsearchVersion get(HttpCall.Factory callFactory) throws IOException {
AggregatedHttpRequest getNode = AggregatedHttpRequest.of(HttpMethod.GET, "/");
ElasticsearchVersion version = callFactory.newCall(getNode, this, "get-node").execute();
if (version == null) {
throw new IllegalArgumentException("No content reading Elasticsearch version");
}
return version;
}

@Override
public ElasticsearchVersion convert(JsonParser parser, Supplier<String> contentString) {
String version = null;
try {
if (enterPath(parser, "version", "number") != null) version = parser.getText();
} catch (RuntimeException | IOException possiblyParseException) {
// EmptyCatch ignored
}
if (version == null) {
throw new IllegalArgumentException(
".version.number not found in response: " + contentString.get());
}

Matcher matcher = REGEX.matcher(version);
if (!matcher.matches()) {
throw new IllegalArgumentException("Invalid .version.number: " + version);
}

try {
int major = Integer.parseInt(matcher.group(1));
int minor = Integer.parseInt(matcher.group(2));
return new ElasticsearchVersion(major, minor);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Invalid .version.number: " + version);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ static Builder newBuilder() {
return new AutoValue_IndexTemplates.Builder();
}

abstract ElasticsearchVersion version();
abstract BaseVersion version();

abstract char indexTypeDelimiter();

Expand All @@ -24,7 +24,7 @@ static Builder newBuilder() {

@AutoValue.Builder
interface Builder {
Builder version(ElasticsearchVersion version);
Builder version(BaseVersion version);

Builder indexTypeDelimiter(char indexTypeDelimiter);

Expand Down
Loading

0 comments on commit 6c909b5

Please sign in to comment.