Skip to content

Commit

Permalink
[Feature][Flink] Support flow control in Flink (#5509)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhilinli123 authored Oct 10, 2023
1 parent c4b18db commit d881dd6
Show file tree
Hide file tree
Showing 11 changed files with 109 additions and 53 deletions.
1 change: 1 addition & 0 deletions docs/en/concept/speed-limit.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ This document will guide you through the usage of these parameters and how to le
## Support Those Engines

> SeaTunnel Zeta<br/>
> Flink<br/>
## Configuration

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@

package org.apache.seatunnel.core.starter.flowcontrol;

import org.apache.seatunnel.api.env.EnvCommonOptions;

import lombok.Getter;
import lombok.Setter;

import java.util.Map;

@Getter
@Setter
public class FlowControlStrategy {
Expand Down Expand Up @@ -47,4 +51,42 @@ public static FlowControlStrategy ofBytes(int bytesPerSecond) {
public static FlowControlStrategy ofCount(int countPreSecond) {
return new FlowControlStrategy(Integer.MAX_VALUE, countPreSecond);
}

// Build the FlowControlStrategy object based on your configured speed limiting parameters
public static FlowControlStrategy getFlowControlStrategy(Map<String, Object> envOption) {
FlowControlStrategy strategy;
if (envOption == null || envOption.isEmpty()) {
return null;
}
if (envOption.containsKey(EnvCommonOptions.READ_LIMIT_BYTES_PER_SECOND.key())
&& envOption.containsKey(EnvCommonOptions.READ_LIMIT_ROW_PER_SECOND.key())) {
strategy =
FlowControlStrategy.of(
Integer.parseInt(
envOption
.get(EnvCommonOptions.READ_LIMIT_BYTES_PER_SECOND.key())
.toString()),
Integer.parseInt(
envOption
.get(EnvCommonOptions.READ_LIMIT_ROW_PER_SECOND.key())
.toString()));
} else if (envOption.containsKey(EnvCommonOptions.READ_LIMIT_BYTES_PER_SECOND.key())) {
strategy =
FlowControlStrategy.ofBytes(
Integer.parseInt(
envOption
.get(EnvCommonOptions.READ_LIMIT_BYTES_PER_SECOND.key())
.toString()));
} else if (envOption.containsKey(EnvCommonOptions.READ_LIMIT_ROW_PER_SECOND.key())) {
strategy =
FlowControlStrategy.ofCount(
Integer.parseInt(
envOption
.get(EnvCommonOptions.READ_LIMIT_ROW_PER_SECOND.key())
.toString()));
} else {
strategy = null;
}
return strategy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@ public FlinkExecution(Config config) {
jobContext.setJobMode(RuntimeEnvironment.getJobMode(config));

this.sourcePluginExecuteProcessor =
new SourceExecuteProcessor(
jarPaths, config.getConfigList(Constants.SOURCE), jobContext);
new SourceExecuteProcessor(jarPaths, config, jobContext);
this.transformPluginExecuteProcessor =
new TransformExecuteProcessor(
jarPaths,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SupportCoordinate;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.core.starter.enums.PluginType;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
Expand Down Expand Up @@ -53,10 +54,11 @@

public class SourceExecuteProcessor extends FlinkAbstractPluginExecuteProcessor<SeaTunnelSource> {
private static final String PLUGIN_TYPE = PluginType.SOURCE.getType();
private Config envConfigs;

public SourceExecuteProcessor(
List<URL> jarPaths, List<? extends Config> sourceConfigs, JobContext jobContext) {
super(jarPaths, sourceConfigs, jobContext);
public SourceExecuteProcessor(List<URL> jarPaths, Config ConfigsInfo, JobContext jobContext) {
super(jarPaths, ConfigsInfo.getConfigList(Constants.SOURCE), jobContext);
this.envConfigs = ConfigsInfo.getConfig("env");
}

@Override
Expand All @@ -69,10 +71,11 @@ public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams)
Config pluginConfig = pluginConfigs.get(i);
BaseSeaTunnelSourceFunction sourceFunction;
if (internalSource instanceof SupportCoordinate) {
sourceFunction = new SeaTunnelCoordinatedSource(internalSource);
sourceFunction = new SeaTunnelCoordinatedSource(internalSource, envConfigs);

registerAppendStream(pluginConfig);
} else {
sourceFunction = new SeaTunnelParallelSource(internalSource);
sourceFunction = new SeaTunnelParallelSource(internalSource, envConfigs);
}
boolean bounded =
internalSource.getBoundedness()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ env {
execution.parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
read_limit.bytes_per_second=7000000
read_limit.rows_per_second=400
}

source {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@
package org.apache.seatunnel.engine.server.task;

import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
import org.apache.seatunnel.engine.server.dag.physical.config.SourceConfig;
import org.apache.seatunnel.engine.server.dag.physical.flow.PhysicalExecutionFlow;
Expand All @@ -42,6 +40,8 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import static org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy.getFlowControlStrategy;

public class SourceSeaTunnelTask<T, SplitT extends SourceSplit> extends SeaTunnelTask {

private static final ILogger LOGGER = Logger.getLogger(SourceSeaTunnelTask.class);
Expand Down Expand Up @@ -90,7 +90,7 @@ public void init() throws Exception {
checkpointLock,
outputs,
this.getMetricsContext(),
getFlowControlStrategy(),
getFlowControlStrategy(envOption),
sourceProducedType);
((SourceFlowLifeCycle<T, SplitT>) startFlowLifeCycle).setCollector(collector);
}
Expand Down Expand Up @@ -133,38 +133,4 @@ public void triggerBarrier(Barrier barrier) throws Exception {
(SourceFlowLifeCycle<T, SplitT>) startFlowLifeCycle;
sourceFlow.triggerBarrier(barrier);
}

private FlowControlStrategy getFlowControlStrategy() {
FlowControlStrategy strategy;
if (envOption.containsKey(EnvCommonOptions.READ_LIMIT_BYTES_PER_SECOND.key())
&& envOption.containsKey(EnvCommonOptions.READ_LIMIT_ROW_PER_SECOND.key())) {
strategy =
FlowControlStrategy.of(
Integer.parseInt(
envOption
.get(EnvCommonOptions.READ_LIMIT_BYTES_PER_SECOND.key())
.toString()),
Integer.parseInt(
envOption
.get(EnvCommonOptions.READ_LIMIT_ROW_PER_SECOND.key())
.toString()));
} else if (envOption.containsKey(EnvCommonOptions.READ_LIMIT_BYTES_PER_SECOND.key())) {
strategy =
FlowControlStrategy.ofBytes(
Integer.parseInt(
envOption
.get(EnvCommonOptions.READ_LIMIT_BYTES_PER_SECOND.key())
.toString()));
} else if (envOption.containsKey(EnvCommonOptions.READ_LIMIT_ROW_PER_SECOND.key())) {
strategy =
FlowControlStrategy.ofCount(
Integer.parseInt(
envOption
.get(EnvCommonOptions.READ_LIMIT_ROW_PER_SECOND.key())
.toString()));
} else {
strategy = null;
}
return strategy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-core-starter</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.seatunnel.translation.flink.source;

import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;

import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;
Expand Down Expand Up @@ -47,6 +50,8 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy.getFlowControlStrategy;

/**
* The abstract implementation of {@link RichSourceFunction}, the entrypoint of flink source
* translation
Expand All @@ -63,12 +68,25 @@ public abstract class BaseSeaTunnelSourceFunction extends RichSourceFunction<Row

protected final AtomicLong latestCompletedCheckpointId = new AtomicLong(0);
protected final AtomicLong latestTriggerCheckpointId = new AtomicLong(0);
// Env Conf Info
private Config envConfigs;
// Store env for external Settings
private Map<String, Object> envOption = new HashMap<>();

/** Flag indicating whether the consumer is still running. */
private volatile boolean running = true;

public BaseSeaTunnelSourceFunction(SeaTunnelSource<SeaTunnelRow, ?, ?> source) {
public BaseSeaTunnelSourceFunction(
SeaTunnelSource<SeaTunnelRow, ?, ?> source, Config envConfigs) {
this.source = source;
this.envConfigs = envConfigs;
for (Map.Entry<String, ConfigValue> entry : this.envConfigs.entrySet()) {
String envKey = entry.getKey();
String envValue = entry.getValue().render();
if (envKey != null && envValue != null) {
envOption.put(envKey, envValue);
}
}
}

@Override
Expand All @@ -86,7 +104,8 @@ public void run(SourceFunction.SourceContext<Row> sourceContext) throws Exceptio
new RowCollector(
sourceContext,
sourceContext.getCheckpointLock(),
source.getProducedType()));
source.getProducedType(),
getFlowControlStrategy(envOption)));
// Wait for a checkpoint to complete:
// In the current version(version < 1.14.0), when the operator state of the source changes
// to FINISHED, jobs cannot be checkpoint executed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.core.starter.flowcontrol.FlowControlGate;
import org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy;
import org.apache.seatunnel.translation.flink.serialization.FlinkRowConverter;

import org.apache.flink.streaming.api.functions.source.SourceFunction;
Expand All @@ -33,19 +35,31 @@ public class RowCollector implements Collector<SeaTunnelRow> {
protected final FlinkRowConverter rowSerialization;
protected final Object checkpointLock;

private FlowControlGate flowControlGate;

public RowCollector(
SourceFunction.SourceContext<Row> internalCollector,
Object checkpointLock,
SeaTunnelDataType<?> dataType) {
SeaTunnelDataType<?> dataType,
FlowControlStrategy flowControlStrategy) {
this.internalCollector = internalCollector;
this.checkpointLock = checkpointLock;
this.rowSerialization = new FlinkRowConverter(dataType);
if (flowControlStrategy != null) {
this.flowControlGate = FlowControlGate.create(flowControlStrategy);
}
}

@Override
public void collect(SeaTunnelRow record) {
public void collect(SeaTunnelRow sourceRecord) {
try {
internalCollector.collect(rowSerialization.convert(record));
if (sourceRecord != null) {
if (flowControlGate != null) {
// Data source Speed limit
flowControlGate.audit(sourceRecord);
}
}
internalCollector.collect(rowSerialization.convert(sourceRecord));
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.translation.flink.source;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.translation.source.BaseSourceFunction;
Expand All @@ -27,9 +29,10 @@ public class SeaTunnelCoordinatedSource extends BaseSeaTunnelSourceFunction {

protected static final String COORDINATED_SOURCE_STATE_NAME = "coordinated-source-states";

public SeaTunnelCoordinatedSource(SeaTunnelSource<SeaTunnelRow, ?, ?> source) {
public SeaTunnelCoordinatedSource(
SeaTunnelSource<SeaTunnelRow, ?, ?> source, Config envConfigs) {
// TODO: Make sure the source is coordinated.
super(source);
super(source, envConfigs);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.translation.flink.source;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.translation.source.BaseSourceFunction;
Expand All @@ -31,9 +33,9 @@ public class SeaTunnelParallelSource extends BaseSeaTunnelSourceFunction

protected static final String PARALLEL_SOURCE_STATE_NAME = "parallel-source-states";

public SeaTunnelParallelSource(SeaTunnelSource<SeaTunnelRow, ?, ?> source) {
public SeaTunnelParallelSource(SeaTunnelSource<SeaTunnelRow, ?, ?> source, Config envConfigs) {
// TODO: Make sure the source is uncoordinated.
super(source);
super(source, envConfigs);
}

@Override
Expand Down

0 comments on commit d881dd6

Please sign in to comment.