Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Supports OpenSearch V2 through ES_* environment variables #229

Merged
merged 5 commits into from
May 25, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ jobs:
- name: zipkin-dependencies-elasticsearch-v8
module: zipkin-dependencies-elasticsearch
groups: docker,elasticsearch8
- name: zipkin-dependencies-opensearch-v2
module: zipkin-dependencies-opensearch
groups: docker,opensearch2
- name: zipkin-dependencies-mysql
module: zipkin-dependencies-mysql
groups: docker
Expand Down
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ are supported, including Cassandra, MySQL and Elasticsearch.

* `STORAGE_TYPE=cassandra3` : requires Cassandra 3.11.3+; tested against the latest patch of 4.0
* `STORAGE_TYPE=mysql` : requires MySQL 5.6+; tested against MySQL 10.11
* `STORAGE_TYPE=elasticsearch` : requires Elasticsearch 7+; tested against last minor release of 7.x and 8.x
* `STORAGE_TYPE=elasticsearch` : requires Elasticsearch 7+ or OpenSearch 2.x; tested against last minor release of Elasticsearch 7.x and 8.x, OpenSearch 2.x

## Quick-start

Expand Down Expand Up @@ -92,20 +92,20 @@ $ STORAGE_TYPE=mysql MYSQL_USER=root java -jar zipkin-dependencies.jar
```

### Elasticsearch Storage
Elasticsearch is used when `STORAGE_TYPE=elasticsearch`. The schema is compatible with Zipkin's [Elasticsearch storage component](https://github.com/openzipkin/zipkin/tree/master/zipkin-storage/elasticsearch).
Elasticsearch/OpenSearch is used when `STORAGE_TYPE=elasticsearch`. The schema is compatible with Zipkin's [Elasticsearch storage component](https://github.com/openzipkin/zipkin/tree/master/zipkin-storage/elasticsearch).

* `ES_INDEX`: The index prefix to use when generating daily index names. Defaults to zipkin.
* `ES_DATE_SEPARATOR`: The separator used when generating dates in index.
Defaults to '-' so the queried index look like zipkin-yyyy-DD-mm
Could for example be changed to '.' to give zipkin-yyyy.MM.dd
* `ES_HOSTS`: A comma separated list of elasticsearch hosts advertising http. Defaults to
* `ES_HOSTS`: A comma separated list of Elasticsearch / OpenSearch hosts advertising http. Defaults to
localhost. Add port section if not listening on port 9200. Only one of these hosts
needs to be available to fetch the remaining nodes in the cluster. It is
recommended to set this to all the master nodes of the cluster. Use url format for
SSL. For example, "https://yourhost:8888"
* `ES_NODES_WAN_ONLY`: Set to true to only use the values set in ES_HOSTS, for example if your
elasticsearch cluster is in Docker. Defaults to false
* `ES_USERNAME` and `ES_PASSWORD`: Elasticsearch basic authentication. Use when X-Pack security
Elasticsearch / OpenSearch cluster is in Docker. Defaults to false
* `ES_USERNAME` and `ES_PASSWORD`: Elasticsearch / OpenSearch basic authentication. Use when X-Pack security
(formerly Shield) is in place. By default no username or
password is provided to elasticsearch.

Expand Down
7 changes: 7 additions & 0 deletions docker/examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ $ STORAGE_TYPE=elasticsearch
$ docker-compose -f docker-compose.yml -f docker-compose-${STORAGE_TYPE}.yml up
```

The `elasticsearch` storage type is also compatible with OpenSearch,
you can start the example setup like that:

```
$ docker-compose -f docker-compose.yml -f docker-compose-opensearch.yml up
```

This starts zipkin, the corresponding storage and makes an example request.
After that, it runs the dependencies job on-demand.

Expand Down
40 changes: 40 additions & 0 deletions docker/examples/docker-compose-opensearch.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#
# Copyright The OpenZipkin Authors
# SPDX-License-Identifier: Apache-2.0
#

# This file uses the version 2 docker-compose file format, described here:
# https://docs.docker.com/compose/compose-file/#version-2
#
# It extends the default configuration from docker-compose.yml to run the
# zipkin-opensearch2 container instead of the zipkin-mysql container.

version: '2.4'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: probably want to update the README so people can see this is there, also


services:
storage:
image: ghcr.io/openzipkin/zipkin-opensearch2:${TAG:-latest}
container_name: opensearch
# Uncomment to expose the storage port for testing
# ports:
# - 9200:9200

# Use OpenSearch instead of in-memory storage
zipkin:
extends:
file: docker-compose.yml
service: zipkin
environment:
- STORAGE_TYPE=elasticsearch
# Point the zipkin at the storage backend
- ES_HOSTS=opensearch:9200
# Uncomment to see requests to and from elasticsearch
# - ES_HTTP_LOGGING=BODY

dependencies:
extends:
file: docker-compose.yml
service: dependencies
environment:
- STORAGE_TYPE=elasticsearch
- ES_HOSTS=opensearch
13 changes: 13 additions & 0 deletions main/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,19 @@
<artifactId>zipkin-dependencies-elasticsearch</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>zipkin-dependencies-opensearch</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.linecorp.armeria</groupId>
<artifactId>armeria-junit5</artifactId>
<version>${armeria.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
25 changes: 18 additions & 7 deletions main/src/main/java/zipkin2/dependencies/ZipkinDependenciesJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.LinkedHashMap;
import java.util.TimeZone;
import zipkin2.dependencies.elasticsearch.ElasticsearchDependenciesJob;
import zipkin2.dependencies.opensearch.OpensearchDependenciesJob;
import zipkin2.dependencies.mysql.MySQLDependenciesJob;

public final class ZipkinDependenciesJob {
Expand Down Expand Up @@ -61,13 +62,23 @@ public static void main(String[] args) throws UnsupportedEncodingException {
.run();
break;
case "elasticsearch":
ElasticsearchDependenciesJob.builder()
.logInitializer(logInitializer)
.jars(jarPath)
.day(day)
.conf(sparkConf)
.build()
.run();
if (ZipkinElasticsearchStorage.flavor().equalsIgnoreCase("elasticsearch")) {
ElasticsearchDependenciesJob.builder()
.logInitializer(logInitializer)
.jars(jarPath)
.day(day)
.conf(sparkConf)
.build()
.run();
} else { // "opensearch"
OpensearchDependenciesJob.builder()
.logInitializer(logInitializer)
.jars(jarPath)
.day(day)
.conf(sparkConf)
.build()
.run();
}
break;
default:
throw new UnsupportedOperationException("Unsupported STORAGE_TYPE: " + storageType + "\n"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Copyright The OpenZipkin Authors
* SPDX-License-Identifier: Apache-2.0
*/

package zipkin2.dependencies;

import java.io.IOException;
import java.net.Authenticator;
import java.net.PasswordAuthentication;
import java.net.Socket;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509ExtendedTrustManager;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class ZipkinElasticsearchStorage {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codefromthecrypt what do you think about this way of OS/ES storage detection? (tests are on me, just haven't had time to finish them)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep exactly as I was hoping!

private static final Logger LOG = LoggerFactory.getLogger(ZipkinElasticsearchStorage.class);
private static final Pattern DISTRIBUTION = Pattern.compile("\"distribution\"\s*[:]\s*\"([^\"]+)\"");

static final String HOSTS = getEnv("ES_HOSTS", "127.0.0.1");
static final String USERNAME = getEnv("ES_USERNAME", null);
static final String PASSWORD = getEnv("ES_PASSWORD", null);

static TrustManager[] TRUST_ALL = new TrustManager [] {
new X509ExtendedTrustManager() {
@Override
public X509Certificate[] getAcceptedIssuers() {
return null;
}

@Override
public void checkClientTrusted(X509Certificate[] certs, String authType) {
}

@Override
public void checkServerTrusted(X509Certificate[] certs, String authType) {
}

@Override
public void checkServerTrusted(X509Certificate[] chain, String authType, SSLEngine engine) throws CertificateException {
}

@Override
public void checkServerTrusted(X509Certificate[] chain, String authType, Socket socket) throws CertificateException {
}

@Override
public void checkClientTrusted(X509Certificate[] chain, String authType, SSLEngine engine) throws CertificateException {
}

@Override
public void checkClientTrusted(X509Certificate[] chain, String authType, Socket socket) throws CertificateException {
}
}
};

static String flavor() {
return flavor(HOSTS, USERNAME, PASSWORD);
}

static String flavor(String hosts, String username, String password) {
final HttpClient.Builder builder = HttpClient
.newBuilder()
.connectTimeout(Duration.ofSeconds(5));

if (username != null && password != null) {
builder.authenticator(new Authenticator() {
@Override
protected PasswordAuthentication getPasswordAuthentication() {
return new PasswordAuthentication(username, password.toCharArray());
}
});
}

try {
final SSLContext sslContext = SSLContext.getInstance("TLS");
sslContext.init(null, TRUST_ALL, new SecureRandom());

final HttpClient client = builder.sslContext(sslContext).build();
try {
for (String host: parseHosts(hosts)) {
final HttpRequest request = HttpRequest.newBuilder().GET().uri(URI.create(host)).build();
try {
final HttpResponse<String> response = client.send(request, BodyHandlers.ofString());
final Matcher matcher = DISTRIBUTION.matcher(response.body());
if (matcher.find()) {
return matcher.group(1).toLowerCase();
}
} catch (InterruptedException | IOException ex) {
LOG.warn("Unable issue HTTP GET request to '" + host + "'", ex);
}
}
} finally {
if (client instanceof AutoCloseable) {
try {
// Since JDK-21, the HttpClient is AutoCloseable
((AutoCloseable) client).close();
} catch (Exception ex) {
/* Ignore */
}
}
}
} catch (final NoSuchAlgorithmException | KeyManagementException ex) {
LOG.warn("Unable to configure HttpClient", ex);
}

return "elasticsearch";
}

private static String getEnv(String key, String defaultValue) {
String result = System.getenv(key);
return result != null && !result.isEmpty() ? result : defaultValue;
}

static String[] parseHosts(String hosts) {
final String[] hostParts = hosts.split(",", -1);

// Detect default scheme to use if not specified
String defaultScheme = "http";
for (int i = 0; i < hostParts.length; i++) {
String host = hostParts[i];
if (host.startsWith("https")) {
defaultScheme = "https";
break;
}
}

Collection<String> list = new ArrayList<>();
for (int i = 0; i < hostParts.length; i++) {
String host = hostParts[i];
URI httpUri = host.startsWith("http") ? URI.create(host) : URI.create(defaultScheme + "://" + host);

int port = httpUri.getPort();
if (port == -1) {
port = 9200; /* default Elasticsearch / OpenSearch port */
}

list.add(httpUri.getScheme() + "://" + httpUri.getHost() + ":" + port);
}

return list.toArray(new String[0]);
}
}
Loading
Loading