Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Zeta] Support delete logs regularly #7787

Open
wants to merge 26 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
43d806a
[Improve][Zeta] test
corgy-w Oct 17, 2024
f10e67a
Revert "[Improve][Zeta] test"
corgy-w Oct 17, 2024
8c9ca5a
[Improve][Zeta] Delete logs regularly
corgy-w Oct 7, 2024
cb1f6fa
[Improve][Zeta] update license
corgy-w Oct 7, 2024
d4d9de0
[Improve][Zeta] use daemon
corgy-w Oct 8, 2024
6e50cbb
[Improve][Zeta] Resolve conflicts
corgy-w Oct 9, 2024
674a5d0
[Improve][Zeta] Optimize code
corgy-w Oct 9, 2024
6ca16ec
[Improve][Zeta] Verify cron expressions && update doc
corgy-w Oct 11, 2024
86d824d
[Improve][Zeta] Optimize code
corgy-w Oct 13, 2024
8243b9d
[Improve][Zeta] test
corgy-w Oct 17, 2024
a76d1f3
[Improve][Zeta] test
corgy-w Oct 17, 2024
089dfad
Revert "[Improve][Zeta] test"
corgy-w Oct 18, 2024
99ed9dc
[Improve][Zeta] test
corgy-w Oct 18, 2024
a9d5e30
[Improve][Zeta] fix ci
corgy-w Oct 19, 2024
c3b11a4
[Improve][Zeta] Optimized delete logic of history job's log
corgy-w Oct 21, 2024
98dab7a
[Improve][Zeta] Optimized doc
corgy-w Oct 22, 2024
be18790
[Improve][Zeta] simple clean
corgy-w Oct 23, 2024
3045792
[Improve][Zeta] Optimize code
corgy-w Oct 24, 2024
983c28c
[Improve][Zeta] Optimized code
corgy-w Oct 26, 2024
d473aa6
[Improve][Zeta] Optimized code
corgy-w Oct 26, 2024
b2ff484
[Improve][Zeta] test ci
corgy-w Oct 27, 2024
95b5e25
[Improve][Zeta] Optimize operation
corgy-w Oct 28, 2024
25a373b
Merge branch 'refs/heads/dev' into delete-logs-regularly
corgy-w Oct 28, 2024
f2860cb
[Improve][Zeta] Optimize code
corgy-w Oct 28, 2024
b52de8c
[Improve][Zeta] Optimize code
corgy-w Oct 29, 2024
cccd9e0
[Improve][Zeta] Optimize doc
corgy-w Oct 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config/seatunnel.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ seatunnel:
telemetry:
metric:
enabled: false
log:
corgy-w marked this conversation as resolved.
Show resolved Hide resolved
scheduled-deletion-enable: false
http:
enable-http: true
port: 8080
Expand Down
18 changes: 18 additions & 0 deletions docs/en/seatunnel-engine/logging.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,24 @@ SeaTunnel provides an API for querying logs.

For more details, please refer to the [REST-API](rest-api-v2.md).

## SeaTunnel Log Configuration

### Scheduled deletion of old logs

SeaTunnel supports scheduled deletion of old log files to prevent disk space exhaustion. You can add the following configuration in the `seatunnel.yml` file:

```yaml
seatunnel:
engine:
history-job-expire-minutes: 1440
telemetry:
logs:
corgy-w marked this conversation as resolved.
Show resolved Hide resolved
scheduled-deletion-enable: false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this configuration be deleted? Keeping it as part of the job history information may cause disk accumulation.

cc @Hisoka-X

```
- `history-job-expire-minutes`: Historical job retention time, in minutes, which is also the log retention time.
- `scheduled-deletion-enable`: Enables or disables scheduled log deletion.

## Best practices for developers

You can create an SLF4J logger by calling `org.slf4j.LoggerFactory#LoggerFactory.getLogger` with the Class of your class as an argument.
Expand Down
18 changes: 18 additions & 0 deletions docs/zh/seatunnel-engine/logging.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,24 @@ SeaTunnel 提供了一个 API,用于查询日志。

有关详细信息,请参阅 [REST-API](rest-api-v2.md)。

## SeaTunnel 日志配置

### 定时删除旧日志

SeaTunnel 支持定时删除旧日志文件,以避免磁盘空间不足。您可以在 `seatunnel.yml` 文件中添加以下配置:

```yaml
seatunnel:
engine:
history-job-expire-minutes: 1440
telemetry:
logs:
corgy-w marked this conversation as resolved.
Show resolved Hide resolved
scheduled-deletion-enable: false
```
- `history-job-expire-minutes`: 历史作业保留时间,单位为分钟,同时也是日志保留时间
- `scheduled-deletion-enable`: 是否启用定时删除日志


## 开发人员最佳实践

您可以通过调用 `org.slf4j.LoggerFactory#LoggerFactory.getLogger` 并以您的类的类作为参数来创建 SLF4J 记录器。
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.platform.commons.util.StringUtils;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
Expand All @@ -35,12 +36,15 @@
import org.testcontainers.utility.DockerLoggerFactory;
import org.testcontainers.utility.MountableFile;

import com.beust.jcommander.internal.Lists;
import com.hazelcast.jet.datamodel.Tuple2;
import io.restassured.response.Response;

import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Stream;
Expand All @@ -52,7 +56,11 @@
public class JobLogIT extends SeaTunnelContainer {

private static final String CUSTOM_JOB_NAME = "test-job-log-file";
private static final String CUSTOM_JOB_NAME2 = "test-job-log-file2";
private static final String CUSTOM_JOB_NAME3 = "test-job-log-file3";
private static final long CUSTOM_JOB_ID = 862969647010611201L;
private static final long CUSTOM_JOB_ID2 = 862969647010611202L;
private static final long CUSTOM_JOB_ID3 = 862969647010611203L;

private static final String confFile = "/fakesource_to_console.conf";
private static final Path BIN_PATH = Paths.get(SEATUNNEL_HOME, "bin", SERVER_SHELL);
Expand Down Expand Up @@ -99,10 +107,29 @@ public void tearDown() throws Exception {
@Test
public void testJobLogFile() throws Exception {
submitJobAndAssertResponse(
server, JobMode.STREAMING.name(), false, CUSTOM_JOB_NAME, CUSTOM_JOB_ID);
server, JobMode.BATCH.name(), false, CUSTOM_JOB_NAME, CUSTOM_JOB_ID);

submitJobAndAssertResponse(
server, JobMode.STREAMING.name(), false, CUSTOM_JOB_NAME2, CUSTOM_JOB_ID2);

submitJobAndAssertResponse(
server, JobMode.STREAMING.name(), false, CUSTOM_JOB_NAME3, CUSTOM_JOB_ID3);

assertConsoleLog();
assertFileLog();
List<Tuple2<Boolean, String>> before =
Lists.newArrayList(
Tuple2.tuple2(false, "job-" + CUSTOM_JOB_ID + ".log"),
Tuple2.tuple2(false, "job-" + CUSTOM_JOB_ID2 + ".log"),
Tuple2.tuple2(false, "job-" + CUSTOM_JOB_ID3 + ".log"));
assertFileLogClean(before);
Thread.sleep(90000);
List<Tuple2<Boolean, String>> after =
Lists.newArrayList(
Tuple2.tuple2(true, "job-" + CUSTOM_JOB_ID + ".log"),
Tuple2.tuple2(false, "job-" + CUSTOM_JOB_ID2 + ".log"),
Tuple2.tuple2(false, "job-" + CUSTOM_JOB_ID3 + ".log"));
assertFileLogClean(after);
}

private void assertConsoleLog() {
Expand Down Expand Up @@ -168,6 +195,22 @@ private void assertFileLog() throws IOException, InterruptedException {
});
}

private void assertFileLogClean(List<Tuple2<Boolean, String>> tuple2s)
throws IOException, InterruptedException {
for (Tuple2<Boolean, String> tuple2 : tuple2s) {
Container.ExecResult execResult =
server.execInContainer(
"sh", "-c", "find /tmp/seatunnel/logs -name " + tuple2.f1() + "\n");
String file = execResult.getStdout();
execResult =
secondServer.execInContainer(
"sh", "-c", "find /tmp/seatunnel/logs -name " + tuple2.f1() + "\n");
String file1 = execResult.getStdout();
Assertions.assertEquals(
tuple2.f0(), StringUtils.isBlank(file) && StringUtils.isBlank(file1));
}
}

private Response submitJob(
GenericContainer<?> container,
String jobMode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

seatunnel:
engine:
history-job-expire-minutes: 1440
history-job-expire-minutes: 1
backup-count: 2
queue-type: blockingqueue
print-execution-info-interval: 10
Expand All @@ -35,3 +35,8 @@ seatunnel:
enable-http: true
port: 8080
enable-dynamic-port: false
telemetry:
metric:
enabled: false
logs:
scheduled-deletion-enable: true
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,9 @@ seatunnel:
namespace: /tmp/seatunnel/checkpoint_snapshot/
http:
enable-http: false
port: 8080
port: 8080
telemetry:
metric:
enabled: false
logs:
scheduled-deletion-enable: true
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.seatunnel.engine.common.config.server.ServerConfigOptions;
import org.apache.seatunnel.engine.common.config.server.SlotServiceConfig;
import org.apache.seatunnel.engine.common.config.server.TelemetryConfig;
import org.apache.seatunnel.engine.common.config.server.TelemetryLogsConfig;
import org.apache.seatunnel.engine.common.config.server.TelemetryMetricConfig;
import org.apache.seatunnel.engine.common.config.server.ThreadShareMode;

Expand Down Expand Up @@ -330,17 +331,19 @@ private Map<String, String> parseConnectorJarHAStoragePluginConfig(
}

private TelemetryConfig parseTelemetryConfig(Node telemetryNode) {
TelemetryConfig metricConfig = new TelemetryConfig();
TelemetryConfig telemetryConfig = new TelemetryConfig();
for (Node node : childElements(telemetryNode)) {
String name = cleanNodeName(node);
if (ServerConfigOptions.TELEMETRY_METRIC.key().equals(name)) {
metricConfig.setMetric(parseTelemetryMetricConfig(node));
telemetryConfig.setMetric(parseTelemetryMetricConfig(node));
} else if (ServerConfigOptions.TELEMETRY_LOGS.key().equals(name)) {
telemetryConfig.setLogs(parseTelemetryLogsConfig(node));
} else {
LOGGER.warning("Unrecognized element: " + name);
}
}

return metricConfig;
return telemetryConfig;
}

private TelemetryMetricConfig parseTelemetryMetricConfig(Node metricNode) {
Expand All @@ -357,6 +360,20 @@ private TelemetryMetricConfig parseTelemetryMetricConfig(Node metricNode) {
return metricConfig;
}

private TelemetryLogsConfig parseTelemetryLogsConfig(Node logsNode) {
TelemetryLogsConfig logsConfig = new TelemetryLogsConfig();
for (Node node : childElements(logsNode)) {
String name = cleanNodeName(node);
if (ServerConfigOptions.TELEMETRY_LOGS_SCHEDULED_DELETION_ENABLE.key().equals(name)) {
Hisoka-X marked this conversation as resolved.
Show resolved Hide resolved
logsConfig.setEnabled(getBooleanValue(getTextContent(node)));
} else {
LOGGER.warning("Unrecognized element: " + name);
}
}

return logsConfig;
}

private HttpConfig parseHttpConfig(Node httpNode) {
HttpConfig httpConfig = new HttpConfig();
for (Node node : childElements(httpNode)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,18 @@ public class ServerConfigOptions {
.withDescription(
"Whether to use classloader cache mode. With cache mode, all jobs share the same classloader if the jars are the same");

public static final Option<Boolean> TELEMETRY_LOGS_SCHEDULED_DELETION_ENABLE =
Options.key("scheduled-deletion-enable")
.booleanType()
.defaultValue(false)
.withDescription("Enable scheduled deletion of telemetry logs");

public static final Option<TelemetryLogsConfig> TELEMETRY_LOGS =
Options.key("logs")
.type(new TypeReference<TelemetryLogsConfig>() {})
.defaultValue(new TelemetryLogsConfig())
.withDescription("The telemetry logs configuration.");

public static final Option<Boolean> TELEMETRY_METRIC_ENABLED =
Options.key("enabled")
.booleanType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,6 @@
public class TelemetryConfig implements Serializable {

private TelemetryMetricConfig metric = ServerConfigOptions.TELEMETRY_METRIC.defaultValue();

private TelemetryLogsConfig logs = ServerConfigOptions.TELEMETRY_LOGS.defaultValue();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.engine.common.config.server;

import lombok.Data;

import java.io.Serializable;

@Data
public class TelemetryLogsConfig implements Serializable {

private boolean enabled =
ServerConfigOptions.TELEMETRY_LOGS_SCHEDULED_DELETION_ENABLE.defaultValue();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.engine.common.utils;

import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.config.builder.api.Component;
import org.apache.logging.log4j.core.config.builder.impl.BuiltConfiguration;
import org.apache.logging.log4j.core.config.properties.PropertiesConfiguration;
import org.apache.logging.log4j.core.lookup.StrSubstitutor;

import java.lang.reflect.Field;

public class LogUtil {

/** Get configuration log path by log4j */
public static String getLogPath() throws NoSuchFieldException, IllegalAccessException {
String routingAppender = "routingAppender";
String fileAppender = "fileAppender";
PropertiesConfiguration config = getLogConfiguration();
// Get routingAppender log file path
String routingLogFilePath = getRoutingLogFilePath(config);

// Get fileAppender log file path
String fileLogPath = getFileLogPath(config);
String logRef =
config.getLoggerConfig(StringUtils.EMPTY).getAppenderRefs().stream()
.map(Object::toString)
.filter(ref -> ref.contains(routingAppender) || ref.contains(fileAppender))
.findFirst()
.orElse(StringUtils.EMPTY);
if (logRef.equals(routingAppender)) {
return routingLogFilePath.substring(0, routingLogFilePath.lastIndexOf("/"));
} else if (logRef.equals(fileAppender)) {
return fileLogPath.substring(0, routingLogFilePath.lastIndexOf("/"));
} else {
throw new IllegalArgumentException(
String.format("Log file path is empty, get logRef : %s", logRef));
}
}

private static PropertiesConfiguration getLogConfiguration() {
LoggerContext context = (LoggerContext) LogManager.getContext(false);
return (PropertiesConfiguration) context.getConfiguration();
}

private static String getRoutingLogFilePath(PropertiesConfiguration config)
throws NoSuchFieldException, IllegalAccessException {
Field propertiesField = BuiltConfiguration.class.getDeclaredField("appendersComponent");
propertiesField.setAccessible(true);
Component propertiesComponent = (Component) propertiesField.get(config);
StrSubstitutor substitutor = config.getStrSubstitutor();
return propertiesComponent.getComponents().stream()
.filter(
component ->
"routingAppender".equals(component.getAttributes().get("name")))
.flatMap(component -> component.getComponents().stream())
.flatMap(component -> component.getComponents().stream())
.flatMap(component -> component.getComponents().stream())
.map(component -> substitutor.replace(component.getAttributes().get("fileName")))
.findFirst()
.orElse(null);
}

private static String getFileLogPath(PropertiesConfiguration config)
throws NoSuchFieldException, IllegalAccessException {
Field propertiesField = BuiltConfiguration.class.getDeclaredField("appendersComponent");
propertiesField.setAccessible(true);
Component propertiesComponent = (Component) propertiesField.get(config);
StrSubstitutor substitutor = config.getStrSubstitutor();
return propertiesComponent.getComponents().stream()
.filter(component -> "fileAppender".equals(component.getAttributes().get("name")))
.map(component -> substitutor.replace(component.getAttributes().get("fileName")))
.findFirst()
.orElse(null);
}
}
Loading
Loading