From 865163da01a96eef6a7d183ea716abc3baf03a45 Mon Sep 17 00:00:00 2001 From: hailin0 Date: Mon, 25 Sep 2023 17:24:41 +0800 Subject: [PATCH] [Improve][Zeta] Checkpoint exception status messages exclude state data (#5547) --- release-note.md | 1 + .../seatunnel/engine/checkpoint/storage/PipelineState.java | 2 ++ .../engine/checkpoint/storage/hdfs/HdfsStorage.java | 7 +++++-- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/release-note.md b/release-note.md index f71a24636a8..c66c1222fd1 100644 --- a/release-note.md +++ b/release-note.md @@ -139,6 +139,7 @@ - [Zeta] Cancel pipeline add retry to avoid cancel failed. (#4792) - [Zeta] Improve Zeta operation max count and ignore NPE (#4787) - [Zeta] Remove serialize(deserialize) cost when use shuffle action (#4722) +- [zeta] Checkpoint exception status messages exclude state data (#5547) ## Feature diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/PipelineState.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/PipelineState.java index 095a1db6ebc..602303557c2 100644 --- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/PipelineState.java +++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/PipelineState.java @@ -22,9 +22,11 @@ import lombok.Builder; import lombok.Data; +import lombok.ToString; @Data @Builder +@ToString(exclude = "states") public class PipelineState { private String jobId; diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java index dc819c6ad7e..4d3f56f7c7e 100644 --- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java +++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java @@ -88,7 +88,7 @@ public String storeCheckPoint(PipelineState state) throws CheckpointStorageExcep datas = serializeCheckPointData(state); } catch (IOException e) { throw new CheckpointStorageException( - "Failed to serialize checkpoint data,state is :" + state, e); + String.format("Failed to serialize checkpoint data, state: %s", state), e); } Path filePath = new Path( @@ -108,7 +108,10 @@ public String storeCheckPoint(PipelineState state) throws CheckpointStorageExcep out.write(datas); } catch (IOException e) { throw new CheckpointStorageException( - "Failed to write checkpoint data, state: " + state, e); + String.format( + "Failed to write checkpoint data, file: %s, state: %s", + tmpFilePath, state), + e); } try { boolean success = fs.rename(tmpFilePath, filePath);