Skip to content

Commit

Permalink
[Improve] Remove CatalogTable field in CatalogTableUtil (#5521)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hisoka-X authored Sep 20, 2023
1 parent a7e1939 commit 2887424
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.api.table.catalog;

import org.apache.seatunnel.api.table.type.SeaTunnelRowType;

import java.io.Serializable;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -91,6 +93,10 @@ public TableSchema getTableSchema() {
return tableSchema;
}

public SeaTunnelRowType getSeaTunnelRowType() {
return tableSchema.toPhysicalRowDataType();
}

public Map<String, String> getOptions() {
return options;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.common.utils.SeaTunnelException;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.io.Serializable;
Expand Down Expand Up @@ -70,12 +69,6 @@ public class CatalogTableUtil implements Serializable {
new SeaTunnelRowType(
new String[] {"content"}, new SeaTunnelDataType<?>[] {BasicType.STRING_TYPE});

@Getter private final CatalogTable catalogTable;

private CatalogTableUtil(CatalogTable catalogTable) {
this.catalogTable = catalogTable;
}

@Deprecated
public static CatalogTable getCatalogTable(String tableName, SeaTunnelRowType rowType) {
TableSchema.Builder schemaBuilder = TableSchema.builder();
Expand Down Expand Up @@ -108,7 +101,7 @@ public static List<CatalogTable> getCatalogTables(Config config, ClassLoader cla
// Highest priority: specified schema
Map<String, String> schemaMap = readonlyConfig.get(CatalogTableUtil.SCHEMA);
if (schemaMap != null && schemaMap.size() > 0) {
CatalogTable catalogTable = CatalogTableUtil.buildWithConfig(config).getCatalogTable();
CatalogTable catalogTable = CatalogTableUtil.buildWithConfig(config);
return Collections.singletonList(catalogTable);
}

Expand Down Expand Up @@ -158,7 +151,7 @@ public static List<CatalogTable> getCatalogTablesFromConfig(
if (schemaMap.isEmpty()) {
throw new SeaTunnelException("Schema config can not be empty");
}
CatalogTable catalogTable = CatalogTableUtil.buildWithConfig(config).getCatalogTable();
CatalogTable catalogTable = CatalogTableUtil.buildWithConfig(config);
return Collections.singletonList(catalogTable);
}

Expand Down Expand Up @@ -194,31 +187,26 @@ public static List<CatalogTable> getCatalogTablesFromConfig(
factoryId)));
}

public static CatalogTableUtil buildWithConfig(Config config) {
public static CatalogTable buildWithConfig(Config config) {
CheckResult checkResult = CheckConfigUtil.checkAllExists(config, "schema");
if (!checkResult.isSuccess()) {
throw new RuntimeException(
"Schema config need option [schema], please correct your config first");
}
TableSchema tableSchema = parseTableSchema(config.getConfig("schema"));
return new CatalogTableUtil(
CatalogTable.of(
// TODO: other table info
TableIdentifier.of("", "", ""),
tableSchema,
new HashMap<>(),
new ArrayList<>(),
""));
return CatalogTable.of(
// TODO: other table info
TableIdentifier.of("", "", ""),
tableSchema,
new HashMap<>(),
new ArrayList<>(),
"");
}

public static SeaTunnelRowType buildSimpleTextSchema() {
return SIMPLE_SCHEMA;
}

public SeaTunnelRowType getSeaTunnelRowType() {
return catalogTable.getTableSchema().toPhysicalRowDataType();
}

public static SeaTunnelDataType<?> parseDataType(String columnStr) {
SqlType sqlType = null;
try {
Expand Down

0 comments on commit 2887424

Please sign in to comment.