From b3523e3924359d24517eacbfc1f6da7fc215561d Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Sat, 11 May 2024 19:03:55 -0400 Subject: [PATCH 1/5] Add Zipkin Dependencies support of OpenSearch storage Signed-off-by: Andriy Redko --- README.md | 26 ++ docker/examples/docker-compose-opensearch.yml | 40 +++ main/pom.xml | 6 + .../dependencies/ZipkinDependenciesJob.java | 12 +- opensearch/pom.xml | 71 +++++ .../opensearch/OpensearchDependenciesJob.java | 273 ++++++++++++++++++ .../TraceIdAndJsonToDependencyLinks.java | 49 ++++ .../dependencies/opensearch/package-info.java | 6 + .../OpensearchDependenciesJobTest.java | 124 ++++++++ .../ITOpensearchDependenciesHeavyV2.java | 34 +++ .../ITOpensearchDependenciesV2.java | 34 +++ .../IgnoredDeprecationWarnings.java | 24 ++ .../elasticsearch/OpensearchContainer.java | 119 ++++++++ .../src/test/resources/log4j2.properties | 31 ++ pom.xml | 4 + 15 files changed, 852 insertions(+), 1 deletion(-) create mode 100644 docker/examples/docker-compose-opensearch.yml create mode 100644 opensearch/pom.xml create mode 100644 opensearch/src/main/java/zipkin2/dependencies/opensearch/OpensearchDependenciesJob.java create mode 100644 opensearch/src/main/java/zipkin2/dependencies/opensearch/TraceIdAndJsonToDependencyLinks.java create mode 100644 opensearch/src/main/java/zipkin2/dependencies/opensearch/package-info.java create mode 100644 opensearch/src/test/java/zipkin2/dependencies/opensearch/OpensearchDependenciesJobTest.java create mode 100644 opensearch/src/test/java/zipkin2/storage/elasticsearch/ITOpensearchDependenciesHeavyV2.java create mode 100644 opensearch/src/test/java/zipkin2/storage/elasticsearch/ITOpensearchDependenciesV2.java create mode 100644 opensearch/src/test/java/zipkin2/storage/elasticsearch/IgnoredDeprecationWarnings.java create mode 100644 opensearch/src/test/java/zipkin2/storage/elasticsearch/OpensearchContainer.java create mode 100644 opensearch/src/test/resources/log4j2.properties diff --git a/README.md b/README.md index e296e55..baabb22 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,7 @@ are supported, including Cassandra, MySQL and Elasticsearch. * `STORAGE_TYPE=cassandra3` : requires Cassandra 3.11.3+; tested against the latest patch of 4.0 * `STORAGE_TYPE=mysql` : requires MySQL 5.6+; tested against MySQL 10.11 * `STORAGE_TYPE=elasticsearch` : requires Elasticsearch 7+; tested against last minor release of 7.x and 8.x +* `STORAGE_TYPE=opensearch` : requires OpenSearch 2+; tested against last minor release of 2.x ## Quick-start @@ -117,6 +118,31 @@ $ STORAGE_TYPE=elasticsearch ES_HOSTS=host1,host2 java -jar zipkin-dependencies. $ STORAGE_TYPE=elasticsearch ES_HOSTS=host1:9201 java -jar zipkin-dependencies.jar ``` +### OpenSearch Storage +OpenSearch is used when `STORAGE_TYPE=opensearch`. The schema is compatible with Zipkin's [Elasticsearch storage component](https://github.com/openzipkin/zipkin/tree/master/zipkin-storage/elasticsearch). + + * `OS_INDEX`: The index prefix to use when generating daily index names. Defaults to zipkin. + * `OS_DATE_SEPARATOR`: The separator used when generating dates in index. + Defaults to '-' so the queried index look like zipkin-yyyy-DD-mm + Could for example be changed to '.' to give zipkin-yyyy.MM.dd + * `OS_HOSTS`: A comma separated list of OpenSearch hosts advertising http. Defaults to + localhost. Add port section if not listening on port 9200. Only one of these hosts + needs to be available to fetch the remaining nodes in the cluster. It is + recommended to set this to all the master nodes of the cluster. Use url format for + SSL. For example, "https://yourhost:8888" + * `OS_NODES_WAN_ONLY`: Set to true to only use the values set in OS_HOSTS, for example if your + OpenSearch cluster is in Docker. Defaults to false + * `OS_USERNAME` and `OS_PASSWORD`: OpenSearch basic authentication. Use when security plugin + is in place. By default no username or password is provided to OpenSearch. + +Example usage: + +```bash +$ STORAGE_TYPE=opensearch OS_HOSTS=host1,host2 java -jar zipkin-dependencies.jar +# To override the http port, add it to the host string +$ STORAGE_TYPE=opensearch OS_HOSTS=host1:9201 java -jar zipkin-dependencies.jar +``` + #### Custom certificates When using an https endpoint in `ES_HOSTS`, you can use the following standard properties to diff --git a/docker/examples/docker-compose-opensearch.yml b/docker/examples/docker-compose-opensearch.yml new file mode 100644 index 0000000..439d09a --- /dev/null +++ b/docker/examples/docker-compose-opensearch.yml @@ -0,0 +1,40 @@ +# +# Copyright The OpenZipkin Authors +# SPDX-License-Identifier: Apache-2.0 +# + +# This file uses the version 2 docker-compose file format, described here: +# https://docs.docker.com/compose/compose-file/#version-2 +# +# It extends the default configuration from docker-compose.yml to run the +# zipkin-opensearch2 container instead of the zipkin-mysql container. + +version: '2.4' + +services: + storage: + image: ghcr.io/openzipkin/zipkin-opensearch2:${TAG:-latest} + container_name: opensearch + # Uncomment to expose the storage port for testing + # ports: + # - 9200:9200 + + # Use OpenSearch instead of in-memory storage + zipkin: + extends: + file: docker-compose.yml + service: zipkin + environment: + - STORAGE_TYPE=elasticsearch + # Point the zipkin at the storage backend + - ES_HOSTS=elasticsearch:9200 + # Uncomment to see requests to and from elasticsearch + # - ES_HTTP_LOGGING=BODY + + dependencies: + extends: + file: docker-compose.yml + service: dependencies + environment: + - STORAGE_TYPE=opensearch + - OS_HOSTS=opensearch diff --git a/main/pom.xml b/main/pom.xml index c61b8a6..5727080 100644 --- a/main/pom.xml +++ b/main/pom.xml @@ -39,6 +39,12 @@ zipkin-dependencies-elasticsearch ${project.version} + + + ${project.groupId} + zipkin-dependencies-opensearch + ${project.version} + diff --git a/main/src/main/java/zipkin2/dependencies/ZipkinDependenciesJob.java b/main/src/main/java/zipkin2/dependencies/ZipkinDependenciesJob.java index 7265d1c..1b25706 100644 --- a/main/src/main/java/zipkin2/dependencies/ZipkinDependenciesJob.java +++ b/main/src/main/java/zipkin2/dependencies/ZipkinDependenciesJob.java @@ -13,6 +13,7 @@ import java.util.LinkedHashMap; import java.util.TimeZone; import zipkin2.dependencies.elasticsearch.ElasticsearchDependenciesJob; +import zipkin2.dependencies.opensearch.OpensearchDependenciesJob; import zipkin2.dependencies.mysql.MySQLDependenciesJob; public final class ZipkinDependenciesJob { @@ -69,9 +70,18 @@ public static void main(String[] args) throws UnsupportedEncodingException { .build() .run(); break; + case "opensearch": + OpensearchDependenciesJob.builder() + .logInitializer(logInitializer) + .jars(jarPath) + .day(day) + .conf(sparkConf) + .build() + .run(); + break; default: throw new UnsupportedOperationException("Unsupported STORAGE_TYPE: " + storageType + "\n" - + "Options are: cassandra3, mysql, elasticsearch"); + + "Options are: cassandra3, mysql, elasticsearch, opensearch"); } } diff --git a/opensearch/pom.xml b/opensearch/pom.xml new file mode 100644 index 0000000..01c335a --- /dev/null +++ b/opensearch/pom.xml @@ -0,0 +1,71 @@ + + + + 4.0.0 + + + io.zipkin.dependencies + zipkin-dependencies-parent + 3.2.0-SNAPSHOT + + + zipkin-dependencies-opensearch + Zipkin Dependencies: OpenSearch + + + ${project.basedir}/.. + 4.12.0 + + + + + org.opensearch.client + opensearch-spark-30_${scala.binary.version} + ${opensearch-spark.version} + + + + io.zipkin.zipkin2 + zipkin-storage-elasticsearch + ${zipkin.version} + test + + + com.squareup.okhttp3 + mockwebserver + ${okhttp.version} + test + + + com.squareup.okhttp3 + okhttp-tls + ${okhttp.version} + test + + + + org.slf4j + slf4j-api + ${slf4j.version} + + + + + com.linecorp.armeria + armeria-junit5 + ${armeria.version} + test + + + org.testcontainers + junit-jupiter + ${testcontainers.version} + test + + + diff --git a/opensearch/src/main/java/zipkin2/dependencies/opensearch/OpensearchDependenciesJob.java b/opensearch/src/main/java/zipkin2/dependencies/opensearch/OpensearchDependenciesJob.java new file mode 100644 index 0000000..76f8674 --- /dev/null +++ b/opensearch/src/main/java/zipkin2/dependencies/opensearch/OpensearchDependenciesJob.java @@ -0,0 +1,273 @@ +/* + * Copyright The OpenZipkin Authors + * SPDX-License-Identifier: Apache-2.0 + */ +package zipkin2.dependencies.opensearch; + +import com.google.gson.stream.JsonReader; +import com.google.gson.stream.MalformedJsonException; +import java.io.IOException; +import java.io.StringReader; +import java.net.URI; +import java.nio.charset.Charset; +import java.text.SimpleDateFormat; +import java.util.Collections; +import java.util.Date; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.TimeZone; +import javax.annotation.Nullable; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.PairFunction; +import org.opensearch.spark.rdd.api.java.JavaOpenSearchSpark; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; +import zipkin2.DependencyLink; +import zipkin2.codec.SpanBytesDecoder; + +import static com.google.common.base.Preconditions.checkNotNull; +import static org.opensearch.hadoop.cfg.ConfigurationOptions.OPENSEARCH_INDEX_READ_MISSING_AS_EMPTY; +import static org.opensearch.hadoop.cfg.ConfigurationOptions.OPENSEARCH_NET_HTTP_AUTH_PASS; +import static org.opensearch.hadoop.cfg.ConfigurationOptions.OPENSEARCH_NET_HTTP_AUTH_USER; +import static org.opensearch.hadoop.cfg.ConfigurationOptions.OPENSEARCH_NET_SSL_KEYSTORE_LOCATION; +import static org.opensearch.hadoop.cfg.ConfigurationOptions.OPENSEARCH_NET_SSL_KEYSTORE_PASS; +import static org.opensearch.hadoop.cfg.ConfigurationOptions.OPENSEARCH_NET_SSL_TRUST_STORE_LOCATION; +import static org.opensearch.hadoop.cfg.ConfigurationOptions.OPENSEARCH_NET_SSL_TRUST_STORE_PASS; +import static org.opensearch.hadoop.cfg.ConfigurationOptions.OPENSEARCH_NET_USE_SSL; +import static org.opensearch.hadoop.cfg.ConfigurationOptions.OPENSEARCH_NODES; +import static org.opensearch.hadoop.cfg.ConfigurationOptions.OPENSEARCH_NODES_WAN_ONLY; +import static zipkin2.internal.DateUtil.midnightUTC; + +public final class OpensearchDependenciesJob { + static final Charset UTF_8 = Charset.forName("UTF-8"); + + private static final Logger log = LoggerFactory.getLogger(OpensearchDependenciesJob.class); + + public static Builder builder() { + return new Builder(); + } + + public static final class Builder { + + String index = getEnv("OS_INDEX", "zipkin"); + String hosts = getEnv("OS_HOSTS", "127.0.0.1"); + String username = getEnv("OS_USERNAME", null); + String password = getEnv("OS_PASSWORD", null); + + final Map sparkProperties = new LinkedHashMap<>(); + + Builder() { + sparkProperties.put("spark.ui.enabled", "false"); + // don't die if there are no spans + sparkProperties.put(OPENSEARCH_INDEX_READ_MISSING_AS_EMPTY, "true"); + sparkProperties.put(OPENSEARCH_NODES_WAN_ONLY, getEnv("OS_NODES_WAN_ONLY", "false")); + sparkProperties.put(OPENSEARCH_NET_SSL_KEYSTORE_LOCATION, + getSystemPropertyAsFileResource("javax.net.ssl.keyStore")); + sparkProperties.put(OPENSEARCH_NET_SSL_KEYSTORE_PASS, + System.getProperty("javax.net.ssl.keyStorePassword", "")); + sparkProperties.put(OPENSEARCH_NET_SSL_TRUST_STORE_LOCATION, + getSystemPropertyAsFileResource("javax.net.ssl.trustStore")); + sparkProperties.put(OPENSEARCH_NET_SSL_TRUST_STORE_PASS, + System.getProperty("javax.net.ssl.trustStorePassword", "")); + } + + // local[*] master lets us run & test the job locally without setting a Spark cluster + String sparkMaster = getEnv("SPARK_MASTER", "local[*]"); + // needed when not in local mode + String[] jars; + Runnable logInitializer; + + // By default, the job only works on traces whose first timestamp is today + long day = midnightUTC(System.currentTimeMillis()); + + /** When set, this indicates which jars to distribute to the cluster. */ + public Builder jars(String... jars) { + this.jars = jars; + return this; + } + + /** The index prefix to use when generating daily index names. Defaults to "zipkin" */ + public Builder index(String index) { + this.index = checkNotNull(index, "index"); + return this; + } + + public Builder hosts(String hosts) { + this.hosts = checkNotNull(hosts, "hosts"); + sparkProperties.put("opensearch.nodes.wan.only", "true"); + return this; + } + + /** username used for basic auth. Needed when Shield or X-Pack security is enabled */ + public Builder username(String username) { + this.username = username; + return this; + } + + /** password used for basic auth. Needed when Shield or X-Pack security is enabled */ + public Builder password(String password) { + this.password = password; + return this; + } + + /** Day (in epoch milliseconds) to process dependencies for. Defaults to today. */ + public Builder day(long day) { + this.day = midnightUTC(day); + return this; + } + + /** Extending more configuration of spark. */ + public Builder conf(Map conf) { + sparkProperties.putAll(conf); + return this; + } + + /** Ensures that logging is set up. Particularly important when in cluster mode. */ + public Builder logInitializer(Runnable logInitializer) { + this.logInitializer = checkNotNull(logInitializer, "logInitializer"); + return this; + } + + public OpensearchDependenciesJob build() { + return new OpensearchDependenciesJob(this); + } + } + + private static String getSystemPropertyAsFileResource(String key) { + String prop = System.getProperty(key, ""); + return prop != null && !prop.isEmpty() ? "file:" + prop : prop; + } + + final String index; + final String dateStamp; + final SparkConf conf; + @Nullable final Runnable logInitializer; + + OpensearchDependenciesJob(Builder builder) { + this.index = builder.index; + String dateSeparator = getEnv("OS_DATE_SEPARATOR", "-"); + SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd".replace("-", dateSeparator)); + df.setTimeZone(TimeZone.getTimeZone("UTC")); + this.dateStamp = df.format(new Date(builder.day)); + this.conf = new SparkConf(true).setMaster(builder.sparkMaster).setAppName(getClass().getName()); + if (builder.sparkMaster.startsWith("local[")) { + conf.set("spark.driver.bindAddress", "127.0.0.1"); + } + if (builder.jars != null) conf.setJars(builder.jars); + if (builder.username != null) conf.set(OPENSEARCH_NET_HTTP_AUTH_USER, builder.username); + if (builder.password != null) conf.set(OPENSEARCH_NET_HTTP_AUTH_PASS, builder.password); + conf.set(OPENSEARCH_NODES, parseHosts(builder.hosts)); + if (builder.hosts.contains("https")) conf.set(OPENSEARCH_NET_USE_SSL, "true"); + for (Map.Entry entry : builder.sparkProperties.entrySet()) { + conf.set(entry.getKey(), entry.getValue()); + log.debug("Spark conf properties: {}={}", entry.getKey(), entry.getValue()); + } + this.logInitializer = builder.logInitializer; + } + + public void run() { + String spanResource = index + "-span-" + dateStamp; + String dependencyLinkResource = index + "-dependency-" + dateStamp; + SpanBytesDecoder decoder = SpanBytesDecoder.JSON_V2; + + log.info("Processing spans from {}", spanResource); + JavaRDD> links; + try (JavaSparkContext sc = new JavaSparkContext(conf)) { + links = JavaOpenSearchSpark.openSearchJsonRDD(sc, spanResource) + .groupBy(JSON_TRACE_ID) + .flatMapValues(new TraceIdAndJsonToDependencyLinks(logInitializer, decoder)) + .values() + .mapToPair((PairFunction, DependencyLink>) l -> + new Tuple2<>(new Tuple2<>(l.parent(), l.child()), l)) + .reduceByKey((l, r) -> DependencyLink.newBuilder() + .parent(l.parent()) + .child(l.child()) + .callCount(l.callCount() + r.callCount()) + .errorCount(l.errorCount() + r.errorCount()) + .build()) + .values() + .map(DEPENDENCY_LINK_JSON); + + if (links.isEmpty()) { + log.info("No dependency links could be processed from spans in index {}", spanResource); + } else { + log.info("Saving dependency links to {}", dependencyLinkResource); + JavaOpenSearchSpark.saveToOpenSearch( + links, + dependencyLinkResource, + Collections.singletonMap("opensearch.mapping.id", "id")); // allows overwriting the link + } + } + + log.info("Done"); + } + + /** + * Same as {@linkplain DependencyLink}, except it adds an ID field so the job can be re-run, + * overwriting a prior run's value for the link. + */ + static final Function> DEPENDENCY_LINK_JSON = l -> { + Map result = new LinkedHashMap<>(); + result.put("id", l.parent() + "|" + l.child()); + result.put("parent", l.parent()); + result.put("child", l.child()); + result.put("callCount", l.callCount()); + result.put("errorCount", l.errorCount()); + return result; + }; + + private static String getEnv(String key, String defaultValue) { + String result = System.getenv(key); + return result != null && !result.isEmpty() ? result : defaultValue; + } + + static String parseHosts(String hosts) { + StringBuilder to = new StringBuilder(); + String[] hostParts = hosts.split(",", -1); + for (int i = 0; i < hostParts.length; i++) { + String host = hostParts[i]; + if (host.startsWith("http")) { + URI httpUri = URI.create(host); + int port = httpUri.getPort(); + if (port == -1) { + port = host.startsWith("https") ? 443 : 80; + } + to.append(httpUri.getHost()).append(":").append(port); + } else { + to.append(host); + } + if (i + 1 < hostParts.length) { + to.append(','); + } + } + return to.toString(); + } + + // defining what could be lambdas here until we update to minimum JRE 8 or retrolambda works. + static final Function, String> JSON_TRACE_ID = + new Function, String>() { + /** returns the lower 64 bits of the trace ID */ + @Override public String call(Tuple2 pair) throws IOException { + JsonReader reader = new JsonReader(new StringReader(pair._2)); + reader.beginObject(); + while (reader.hasNext()) { + String nextName = reader.nextName(); + if (nextName.equals("traceId")) { + String traceId = reader.nextString(); + return traceId.length() > 16 ? traceId.substring(traceId.length() - 16) : traceId; + } else { + reader.skipValue(); + } + } + throw new MalformedJsonException("no traceId in " + pair); + } + + @Override public String toString() { + return "pair._2.traceId"; + } + }; +} diff --git a/opensearch/src/main/java/zipkin2/dependencies/opensearch/TraceIdAndJsonToDependencyLinks.java b/opensearch/src/main/java/zipkin2/dependencies/opensearch/TraceIdAndJsonToDependencyLinks.java new file mode 100644 index 0000000..4123fe0 --- /dev/null +++ b/opensearch/src/main/java/zipkin2/dependencies/opensearch/TraceIdAndJsonToDependencyLinks.java @@ -0,0 +1,49 @@ +/* + * Copyright The OpenZipkin Authors + * SPDX-License-Identifier: Apache-2.0 + */ +package zipkin2.dependencies.opensearch; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; +import zipkin2.DependencyLink; +import zipkin2.Span; +import zipkin2.codec.SpanBytesDecoder; +import zipkin2.internal.DependencyLinker; + +final class TraceIdAndJsonToDependencyLinks + implements Serializable, FlatMapFunction>, DependencyLink> { + private static final long serialVersionUID = 0L; + private static final Logger log = LoggerFactory.getLogger(TraceIdAndJsonToDependencyLinks.class); + + @Nullable final Runnable logInitializer; + final SpanBytesDecoder decoder; + + TraceIdAndJsonToDependencyLinks(Runnable logInitializer, SpanBytesDecoder decoder) { + this.logInitializer = logInitializer; + this.decoder = decoder; + } + + @Override + public Iterator call(Iterable> traceIdJson) { + if (logInitializer != null) logInitializer.run(); + List sameTraceId = new ArrayList<>(); + for (Tuple2 row : traceIdJson) { + try { + decoder.decode(row._2.getBytes(OpensearchDependenciesJob.UTF_8), sameTraceId); + } catch (Exception e) { + log.warn("Unable to decode span from traces where trace_id=" + row._1, e); + } + } + DependencyLinker linker = new DependencyLinker(); + linker.putTrace(sameTraceId); + return linker.link().iterator(); + } +} diff --git a/opensearch/src/main/java/zipkin2/dependencies/opensearch/package-info.java b/opensearch/src/main/java/zipkin2/dependencies/opensearch/package-info.java new file mode 100644 index 0000000..5a2acb8 --- /dev/null +++ b/opensearch/src/main/java/zipkin2/dependencies/opensearch/package-info.java @@ -0,0 +1,6 @@ +/* + * Copyright The OpenZipkin Authors + * SPDX-License-Identifier: Apache-2.0 + */ +@javax.annotation.ParametersAreNonnullByDefault +package zipkin2.dependencies.opensearch; diff --git a/opensearch/src/test/java/zipkin2/dependencies/opensearch/OpensearchDependenciesJobTest.java b/opensearch/src/test/java/zipkin2/dependencies/opensearch/OpensearchDependenciesJobTest.java new file mode 100644 index 0000000..1fd4544 --- /dev/null +++ b/opensearch/src/test/java/zipkin2/dependencies/opensearch/OpensearchDependenciesJobTest.java @@ -0,0 +1,124 @@ +/* + * Copyright The OpenZipkin Authors + * SPDX-License-Identifier: Apache-2.0 + */ +package zipkin2.dependencies.opensearch; + +import java.io.IOException; +import java.util.Base64; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.hadoop.OpenSearchHadoopException; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static okhttp3.mockwebserver.SocketPolicy.DISCONNECT_AT_START; +import static okhttp3.tls.internal.TlsUtil.localhost; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +class OpensearchDependenciesJobTest { + MockWebServer es = new MockWebServer(); + + @BeforeEach void start() throws IOException { + es.start(); + } + + @AfterEach void stop() throws IOException { + es.close(); + } + + @Test void buildHttps() { + OpensearchDependenciesJob job = + OpensearchDependenciesJob.builder().hosts("https://foobar").build(); + assertThat(job.conf.get("opensearch.nodes")).isEqualTo("foobar:443"); + assertThat(job.conf.get("opensearch.net.ssl")).isEqualTo("true"); + } + + @Test void buildAuth() { + OpensearchDependenciesJob job = + OpensearchDependenciesJob.builder().username("foo").password("bar").build(); + assertThat(job.conf.get("opensearch.net.http.auth.user")).isEqualTo("foo"); + assertThat(job.conf.get("opensearch.net.http.auth.pass")).isEqualTo("bar"); + } + + @Test void authWorks() throws Exception { + es.enqueue(new MockResponse()); // let the HEAD request pass, so we can trap the header value + es.enqueue(new MockResponse().setSocketPolicy(DISCONNECT_AT_START)); // kill the job + OpensearchDependenciesJob job = OpensearchDependenciesJob.builder() + .username("foo") + .password("bar") + .hosts(es.url("").toString()) + .build(); + + assertThatThrownBy(job::run) + .isInstanceOf(OpenSearchHadoopException.class); + + String encoded = Base64.getEncoder().encodeToString("foo:bar".getBytes(UTF_8)); + assertThat(es.takeRequest().getHeader("Authorization")) + .isEqualTo("Basic " + encoded.trim()); + } + + @Test void authWorksWithSsl() throws Exception { + es.useHttps(localhost().sslSocketFactory(), false); + + es.enqueue(new MockResponse()); // let the HEAD request pass, so we can trap the header value + es.enqueue(new MockResponse().setSocketPolicy(DISCONNECT_AT_START)); // kill the job + + OpensearchDependenciesJob.Builder builder = OpensearchDependenciesJob.builder() + .username("foo") + .password("bar") + .hosts(es.url("").toString()); + + // temporarily hack-in self-signed until https://github.com/openzipkin/zipkin/issues/1683 + builder.sparkProperties.put("opensearch.net.ssl.cert.allow.self.signed", "true"); + + OpensearchDependenciesJob job = builder.build(); + + assertThatThrownBy(job::run) + .isInstanceOf(OpenSearchHadoopException.class); + + String encoded = Base64.getEncoder().encodeToString("foo:bar".getBytes(UTF_8)); + assertThat(es.takeRequest().getHeader("Authorization")) + .isEqualTo("Basic " + encoded.trim()); + } + + @Test void parseHosts_default() { + assertThat(OpensearchDependenciesJob.parseHosts("1.1.1.1")).isEqualTo("1.1.1.1"); + } + + @Test void parseHosts_commaDelimits() { + assertThat(OpensearchDependenciesJob.parseHosts("1.1.1.1:9200,2.2.2.2:9200")).isEqualTo( + "1.1.1.1:9200,2.2.2.2:9200"); + } + + @Test void parseHosts_http_defaultPort() { + assertThat(OpensearchDependenciesJob.parseHosts("http://1.1.1.1")).isEqualTo("1.1.1.1:80"); + } + + @Test void parseHosts_https_defaultPort() { + assertThat(OpensearchDependenciesJob.parseHosts("https://1.1.1.1")).isEqualTo("1.1.1.1:443"); + } + + @Test void javaSslOptsRedirected() { + System.setProperty("javax.net.ssl.keyStore", "keystore.jks"); + System.setProperty("javax.net.ssl.keyStorePassword", "superSecret"); + System.setProperty("javax.net.ssl.trustStore", "truststore.jks"); + System.setProperty("javax.net.ssl.trustStorePassword", "secretSuper"); + + OpensearchDependenciesJob job = OpensearchDependenciesJob.builder().build(); + + assertThat(job.conf.get("opensearch.net.ssl.keystore.location")).isEqualTo("file:keystore.jks"); + assertThat(job.conf.get("opensearch.net.ssl.keystore.pass")).isEqualTo("superSecret"); + assertThat(job.conf.get("opensearch.net.ssl.truststore.location")).isEqualTo("file:truststore.jks"); + assertThat(job.conf.get("opensearch.net.ssl.truststore.pass")).isEqualTo("secretSuper"); + + System.clearProperty("javax.net.ssl.keyStore"); + System.clearProperty("javax.net.ssl.keyStorePassword"); + System.clearProperty("javax.net.ssl.trustStore"); + System.clearProperty("javax.net.ssl.trustStorePassword"); + } +} diff --git a/opensearch/src/test/java/zipkin2/storage/elasticsearch/ITOpensearchDependenciesHeavyV2.java b/opensearch/src/test/java/zipkin2/storage/elasticsearch/ITOpensearchDependenciesHeavyV2.java new file mode 100644 index 0000000..6e288cf --- /dev/null +++ b/opensearch/src/test/java/zipkin2/storage/elasticsearch/ITOpensearchDependenciesHeavyV2.java @@ -0,0 +1,34 @@ +/* + * Copyright The OpenZipkin Authors + * SPDX-License-Identifier: Apache-2.0 + */ +package zipkin2.storage.elasticsearch; + +import java.io.IOException; +import java.util.List; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.TestInfo; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import zipkin2.Span; +import zipkin2.elasticsearch.ElasticsearchStorage; +import zipkin2.storage.ITDependenciesHeavy; + +@Tag("docker") +@Tag("opensearch2") +@Testcontainers(disabledWithoutDocker = true) +class ITOpensearchDependenciesHeavyV2 extends ITDependenciesHeavy { + @Container static OpensearchContainer opensearch = new OpensearchContainer(2); + + @Override protected ElasticsearchStorage.Builder newStorageBuilder(TestInfo testInfo) { + return opensearch.newStorageBuilder(); + } + + @Override public void clear() throws IOException { + storage.clear(); + } + + @Override protected void processDependencies(List spans) throws Exception { + opensearch.processDependencies(storage, spans); + } +} diff --git a/opensearch/src/test/java/zipkin2/storage/elasticsearch/ITOpensearchDependenciesV2.java b/opensearch/src/test/java/zipkin2/storage/elasticsearch/ITOpensearchDependenciesV2.java new file mode 100644 index 0000000..2494df0 --- /dev/null +++ b/opensearch/src/test/java/zipkin2/storage/elasticsearch/ITOpensearchDependenciesV2.java @@ -0,0 +1,34 @@ +/* + * Copyright The OpenZipkin Authors + * SPDX-License-Identifier: Apache-2.0 + */ +package zipkin2.storage.elasticsearch; + +import java.io.IOException; +import java.util.List; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.TestInfo; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import zipkin2.Span; +import zipkin2.elasticsearch.ElasticsearchStorage; +import zipkin2.storage.ITDependencies; + +@Tag("docker") +@Tag("opensearch2") +@Testcontainers(disabledWithoutDocker = true) +class ITOpensearchDependenciesV2 extends ITDependencies { + @Container static OpensearchContainer opensearch = new OpensearchContainer(2); + + @Override protected ElasticsearchStorage.Builder newStorageBuilder(TestInfo testInfo) { + return opensearch.newStorageBuilder(); + } + + @Override public void clear() throws IOException { + storage.clear(); + } + + @Override protected void processDependencies(List spans) throws Exception { + opensearch.processDependencies(storage, spans); + } +} diff --git a/opensearch/src/test/java/zipkin2/storage/elasticsearch/IgnoredDeprecationWarnings.java b/opensearch/src/test/java/zipkin2/storage/elasticsearch/IgnoredDeprecationWarnings.java new file mode 100644 index 0000000..5d00114 --- /dev/null +++ b/opensearch/src/test/java/zipkin2/storage/elasticsearch/IgnoredDeprecationWarnings.java @@ -0,0 +1,24 @@ +/* + * Copyright The OpenZipkin Authors + * SPDX-License-Identifier: Apache-2.0 + */ +package zipkin2.storage.elasticsearch; + +import java.util.List; +import java.util.regex.Pattern; + +import static java.util.Arrays.asList; + +/** + * When OS emits a deprecation warning header in response to a method being called, the integration + * test will fail. We cannot always fix our code however to take into account all deprecation + * warnings, as we have to support multiple versions of ES. For these cases, add the warning message + * to {@link #IGNORE_THESE_WARNINGS} array so it will not raise an exception anymore. + */ +abstract class IgnoredDeprecationWarnings { + + // These will be matched using header.contains(ignored[i]), so find a unique substring of the + // warning header for it to be ignored + static List IGNORE_THESE_WARNINGS = asList( + ); +} diff --git a/opensearch/src/test/java/zipkin2/storage/elasticsearch/OpensearchContainer.java b/opensearch/src/test/java/zipkin2/storage/elasticsearch/OpensearchContainer.java new file mode 100644 index 0000000..569ac64 --- /dev/null +++ b/opensearch/src/test/java/zipkin2/storage/elasticsearch/OpensearchContainer.java @@ -0,0 +1,119 @@ +/* + * Copyright The OpenZipkin Authors + * SPDX-License-Identifier: Apache-2.0 + */ +package zipkin2.storage.elasticsearch; + +import com.linecorp.armeria.client.ClientFactory; +import com.linecorp.armeria.client.ClientOptions; +import com.linecorp.armeria.client.ClientOptionsBuilder; +import com.linecorp.armeria.client.WebClient; +import com.linecorp.armeria.client.WebClientBuilder; +import com.linecorp.armeria.client.logging.ContentPreviewingClient; +import com.linecorp.armeria.client.logging.LoggingClient; +import com.linecorp.armeria.client.logging.LoggingClientBuilder; +import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.common.logging.LogLevel; +import java.io.IOException; +import java.util.List; +import java.util.Set; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import zipkin2.Span; +import zipkin2.dependencies.opensearch.OpensearchDependenciesJob; +import zipkin2.elasticsearch.ElasticsearchStorage; + +import static org.testcontainers.utility.DockerImageName.parse; +import static zipkin2.storage.ITDependencies.aggregateLinks; + +class OpensearchContainer extends GenericContainer { + static final Logger LOGGER = LoggerFactory.getLogger(OpensearchContainer.class); + + OpensearchContainer(int majorVersion) { + super(parse("ghcr.io/openzipkin/zipkin-opensearch" + majorVersion + ":3.3.0")); + addExposedPort(9200); + waitStrategy = Wait.forHealthcheck(); + withLogConsumer(new Slf4jLogConsumer(LOGGER)); + } + + @Override public void start() { + super.start(); + LOGGER.info("Using baseUrl http://" + hostPort()); + } + + ElasticsearchStorage.Builder newStorageBuilder() { + + WebClientBuilder builder = WebClient.builder("http://" + hostPort()) + // Elasticsearch 7 never returns a response when receiving an HTTP/2 preface instead of the + // more valid behavior of returning a bad request response, so we can't use the preface. + // + // TODO: find or raise a bug with Elastic + .factory(ClientFactory.builder().useHttp2Preface(false).build()); + builder.decorator((delegate, ctx, req) -> { + final HttpResponse response = delegate.execute(ctx, req); + return HttpResponse.of(response.aggregate().thenApply(r -> { + // OS will return a 'warning' response header when using deprecated api, detect this and + // fail early, so we can do something about it. + // Example usage: https://github.com/elastic/elasticsearch/blob/3049e55f093487bb582a7e49ad624961415ba31c/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/IndexPrivilegeIntegTests.java#L559 + final String warningHeader = r.headers().get("warning"); + if (warningHeader != null) { + if (IgnoredDeprecationWarnings.IGNORE_THESE_WARNINGS.stream().noneMatch(p -> p.matcher(warningHeader).find())) { + throw new IllegalArgumentException( + "Detected usage of deprecated API for request " + req + ":\n" + warningHeader); + } + } + // Convert AggregatedHttpResponse back to HttpResponse. + return r.toHttpResponse(); + })); + }); + + // When ES_DEBUG=true log full headers, request and response body to the category + // com.linecorp.armeria.client.logging + if (Boolean.parseBoolean(System.getenv("ES_DEBUG"))) { + ClientOptionsBuilder options = ClientOptions.builder(); + LoggingClientBuilder loggingBuilder = LoggingClient.builder() + .requestLogLevel(LogLevel.INFO) + .successfulResponseLogLevel(LogLevel.INFO); + options.decorator(loggingBuilder.newDecorator()); + options.decorator(ContentPreviewingClient.newDecorator(Integer.MAX_VALUE)); + builder.options(options.build()); + } + + WebClient client = builder.build(); + return ElasticsearchStorage.newBuilder(new ElasticsearchStorage.LazyHttpClient() { + @Override public WebClient get() { + return client; + } + + @Override public void close() { + client.endpointGroup().close(); + } + + @Override public String toString() { + return client.uri().toString(); + } + }).flushOnWrites(true); + } + + String hostPort() { + return getHost() + ":" + getMappedPort(9200); + } + + /** + * This processes the job as if it were a batch. For each day we had traces, run the job again. + */ + void processDependencies(ElasticsearchStorage storage, List spans) throws IOException { + storage.spanConsumer().accept(spans).execute(); + + // aggregate links in memory to determine which days they are in + Set days = aggregateLinks(spans).keySet(); + + // process the job for each day of links. + for (long day : days) { + OpensearchDependenciesJob.builder().hosts(hostPort()).day(day).build().run(); + } + } +} diff --git a/opensearch/src/test/resources/log4j2.properties b/opensearch/src/test/resources/log4j2.properties new file mode 100644 index 0000000..751fe39 --- /dev/null +++ b/opensearch/src/test/resources/log4j2.properties @@ -0,0 +1,31 @@ +# Maven configuration conflicts on simplelogger vs Log4J2, but IntelliJ unit tests use Log4J2 +appenders=console +appender.console.type=Console +appender.console.name=STDOUT +appender.console.layout.type=PatternLayout +appender.console.layout.pattern=%d{ABSOLUTE} %-5p [%t] %C{2} (%F:%L) - %m%n +rootLogger.level=warn +rootLogger.appenderRefs=stdout +rootLogger.appenderRef.stdout.ref=STDOUT + +# hush warning about loading native code +logger.util.name=org.apache.hadoop.util +logger.util.level=error + +# set to debug to see storage details +logger.zipkin.name=zipkin2 +logger.zipkin.level=warn + +# set to debug to see configuration and when the job starts and completes +logger.dependencies-opensearch.name=zipkin2.dependencies.opensearch +logger.dependencies-opensearch.level=warn + +# set to info to see feedback about starting the container +logger.testcontainers.name=org.testcontainers +logger.testcontainers.level=warn +logger.container.name=zipkin2.storage.opensearch.OpensearchContainer +logger.container.level=warn + +# uncomment to see outbound client connections (useful in OpenSearch troubleshooting) +#logger.client.name=com.linecorp.armeria.client +#logger.client.level=info diff --git a/pom.xml b/pom.xml index 96c80ea..fe1012d 100755 --- a/pom.xml +++ b/pom.xml @@ -17,6 +17,7 @@ cassandra3 mysql elasticsearch + opensearch main @@ -74,6 +75,9 @@ 8.14.0-SNAPSHOT + + 1.2.0 + 3.4.1