Skip to content

Commit

Permalink
[Improve][Connector-V2]Support multi-table sink feature for email (#7368
Browse files Browse the repository at this point in the history
)
  • Loading branch information
corgy-w authored Aug 20, 2024
1 parent 287badd commit c880b7a
Show file tree
Hide file tree
Showing 13 changed files with 520 additions and 101 deletions.
33 changes: 19 additions & 14 deletions docs/en/connector-v2/sink/Email.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,26 @@ The tested email version is 1.5.6.

## Options

| name | type | required | default value |
|--------------------------|--------|----------|---------------|
| email_from_address | string | yes | - |
| email_to_address | string | yes | - |
| email_host | string | yes | - |
| email_transport_protocol | string | yes | - |
| email_smtp_auth | string | yes | - |
| email_authorization_code | string | yes | - |
| email_message_headline | string | yes | - |
| email_message_content | string | yes | - |
| common-options | | no | - |
| name | type | required | default value |
|--------------------------|---------|----------|---------------|
| email_from_address | string | yes | - |
| email_to_address | string | yes | - |
| email_host | string | yes | - |
| email_transport_protocol | string | yes | - |
| email_smtp_auth | boolean | yes | - |
| email_smtp_port | int | no | 465 |
| email_authorization_code | string | no | - |
| email_message_headline | string | yes | - |
| email_message_content | string | yes | - |
| common-options | | no | - |

### email_from_address [string]

Sender Email Address .
Sender Email Address.

### email_to_address [string]

Address to receive mail.
Address to receive mail, Support multiple email addresses, separated by commas (,).

### email_host [string]

Expand All @@ -42,10 +43,14 @@ SMTP server to connect to.

The protocol to load the session .

### email_smtp_auth [string]
### email_smtp_auth [boolean]

Whether to authenticate the customer.

### email_smtp_port [int]

Select port for authentication.

### email_authorization_code [string]

authorization code,You can obtain the authorization code from the mailbox Settings.
Expand Down
31 changes: 18 additions & 13 deletions docs/zh/connector-v2/sink/Email.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,26 @@

## 选项

| 名称 | 类型 | 是否必须 | 默认值 |
|--------------------------|--------|------|-----|
| email_from_address | string || - |
| email_to_address | string || - |
| email_host | string || - |
| email_transport_protocol | string || - |
| email_smtp_auth | string || - |
| email_authorization_code | string || - |
| email_message_headline | string || - |
| email_message_content | string || - |
| common-options | || - |
| 名称 | 类型 | 是否必须 | 默认值 |
|--------------------------|---------|------|-----|
| email_from_address | string || - |
| email_to_address | string || - |
| email_host | string || - |
| email_transport_protocol | string || - |
| email_smtp_auth | boolean || - |
| email_smtp_port | int || 465 |
| email_authorization_code | string || - |
| email_message_headline | string || - |
| email_message_content | string || - |
| common-options | || - |

### email_from_address [string]

发件人邮箱地址

### email_to_address [string]

接收邮件的地址
接收邮件的地址,支持多个邮箱地址,以逗号(,)分隔。

### email_host [string]

Expand All @@ -44,10 +45,14 @@

加载会话的协议

### email_smtp_auth [string]
### email_smtp_auth [boolean]

是否对客户进行认证

### email_smtp_port [int]

选择用于身份验证的端口。

### email_authorization_code [string]

授权码,您可以从邮箱设置中获取授权码
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

public class EmailConfig {

public static final String CONNECTOR_IDENTITY = "EmailSink";

public static final Option<String> EMAIL_FROM_ADDRESS =
Options.key("email_from_address")
.stringType()
Expand Down Expand Up @@ -61,9 +63,15 @@ public class EmailConfig {
.stringType()
.noDefaultValue()
.withDescription("The protocol used to send the message");
public static final Option<String> EMAIL_SMTP_AUTH =
public static final Option<Boolean> EMAIL_SMTP_AUTH =
Options.key("email_smtp_auth")
.stringType()
.booleanType()
.noDefaultValue()
.withDescription("Whether to use SMTP authentication");

public static final Option<Integer> EMAIL_SMTP_PORT =
Options.key("email_smtp_port")
.intType()
.defaultValue(465)
.withDescription("Select port for authentication.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,55 +17,45 @@

package org.apache.seatunnel.connectors.seatunnel.email.config;

import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;

import lombok.Data;
import lombok.NonNull;

import java.io.Serializable;

import static org.apache.seatunnel.connectors.seatunnel.email.config.EmailConfig.EMAIL_AUTHORIZATION_CODE;
import static org.apache.seatunnel.connectors.seatunnel.email.config.EmailConfig.EMAIL_FROM_ADDRESS;
import static org.apache.seatunnel.connectors.seatunnel.email.config.EmailConfig.EMAIL_HOST;
import static org.apache.seatunnel.connectors.seatunnel.email.config.EmailConfig.EMAIL_MESSAGE_CONTENT;
import static org.apache.seatunnel.connectors.seatunnel.email.config.EmailConfig.EMAIL_MESSAGE_HEADLINE;
import static org.apache.seatunnel.connectors.seatunnel.email.config.EmailConfig.EMAIL_SMTP_AUTH;
import static org.apache.seatunnel.connectors.seatunnel.email.config.EmailConfig.EMAIL_SMTP_PORT;
import static org.apache.seatunnel.connectors.seatunnel.email.config.EmailConfig.EMAIL_TO_ADDRESS;
import static org.apache.seatunnel.connectors.seatunnel.email.config.EmailConfig.EMAIL_TRANSPORT_PROTOCOL;

@Data
public class EmailSinkConfig {
public class EmailSinkConfig implements Serializable {
private String emailFromAddress;
private String emailToAddress;
private String emailAuthorizationCode;
private String emailMessageHeadline;
private String emailMessageContent;
private String emailHost;
private String emailTransportProtocol;
private String emailSmtpAuth;
private Boolean emailSmtpAuth;
private Integer emailSmtpPort;

public EmailSinkConfig(@NonNull Config pluginConfig) {
if (pluginConfig.hasPath(EMAIL_FROM_ADDRESS.key())) {
this.emailFromAddress = pluginConfig.getString(EMAIL_FROM_ADDRESS.key());
}
if (pluginConfig.hasPath(EMAIL_TO_ADDRESS.key())) {
this.emailToAddress = pluginConfig.getString(EMAIL_TO_ADDRESS.key());
}
if (pluginConfig.hasPath(EMAIL_AUTHORIZATION_CODE.key())) {
this.emailAuthorizationCode = pluginConfig.getString(EMAIL_AUTHORIZATION_CODE.key());
}
if (pluginConfig.hasPath(EMAIL_MESSAGE_HEADLINE.key())) {
this.emailMessageHeadline = pluginConfig.getString(EMAIL_MESSAGE_HEADLINE.key());
}
if (pluginConfig.hasPath(EMAIL_MESSAGE_CONTENT.key())) {
this.emailMessageContent = pluginConfig.getString(EMAIL_MESSAGE_CONTENT.key());
}
if (pluginConfig.hasPath(EMAIL_HOST.key())) {
this.emailHost = pluginConfig.getString(EMAIL_HOST.key());
}
if (pluginConfig.hasPath(EMAIL_TRANSPORT_PROTOCOL.key())) {
this.emailTransportProtocol = pluginConfig.getString(EMAIL_TRANSPORT_PROTOCOL.key());
}
if (pluginConfig.hasPath(EMAIL_SMTP_AUTH.key())) {
this.emailSmtpAuth = pluginConfig.getString(EMAIL_SMTP_AUTH.key());
}
public EmailSinkConfig(@NonNull ReadonlyConfig pluginConfig) {
super();
this.emailFromAddress = pluginConfig.get(EMAIL_FROM_ADDRESS);
this.emailToAddress = pluginConfig.get(EMAIL_TO_ADDRESS);
this.emailAuthorizationCode = pluginConfig.get(EMAIL_AUTHORIZATION_CODE);
this.emailMessageHeadline = pluginConfig.get(EMAIL_MESSAGE_HEADLINE);
this.emailMessageContent = pluginConfig.get(EMAIL_MESSAGE_CONTENT);
this.emailHost = pluginConfig.get(EMAIL_HOST);
this.emailTransportProtocol = pluginConfig.get(EMAIL_TRANSPORT_PROTOCOL);
this.emailSmtpAuth = pluginConfig.get(EMAIL_SMTP_AUTH);
this.emailSmtpPort = pluginConfig.get(EMAIL_SMTP_PORT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,40 +17,38 @@

package org.apache.seatunnel.connectors.seatunnel.email.sink;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;

import com.google.auto.service.AutoService;
import org.apache.seatunnel.connectors.seatunnel.email.config.EmailConfig;
import org.apache.seatunnel.connectors.seatunnel.email.config.EmailSinkConfig;

@AutoService(SeaTunnelSink.class)
public class EmailSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
public class EmailSink extends AbstractSimpleSink<SeaTunnelRow, Void>
implements SupportMultiTableSink {

private Config pluginConfig;
private SeaTunnelRowType seaTunnelRowType;

@Override
public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
this.seaTunnelRowType = seaTunnelRowType;
private ReadonlyConfig readonlyConfig;
private CatalogTable catalogTable;
private EmailSinkConfig pluginConfig;

public EmailSink(ReadonlyConfig config, CatalogTable table) {
this.readonlyConfig = config;
this.catalogTable = table;
this.pluginConfig = new EmailSinkConfig(config);
this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
}

@Override
public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context) {
public EmailSinkWriter createWriter(SinkWriter.Context context) {
return new EmailSinkWriter(seaTunnelRowType, pluginConfig);
}

@Override
public String getPluginName() {
return "EmailSink";
}

@Override
public void prepare(Config pluginConfig) {
this.pluginConfig = pluginConfig;
return EmailConfig.CONNECTOR_IDENTITY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@
package org.apache.seatunnel.connectors.seatunnel.email.sink;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;

import com.google.auto.service.AutoService;

Expand All @@ -39,6 +43,12 @@ public String factoryIdentifier() {
return "EmailSink";
}

@Override
public TableSink createSink(TableSinkFactoryContext context) {
CatalogTable catalogTable = context.getCatalogTable();
return () -> new EmailSink(context.getOptions(), catalogTable);
}

@Override
public OptionRule optionRule() {
return OptionRule.builder()
Expand All @@ -51,6 +61,7 @@ public OptionRule optionRule() {
EMAIL_AUTHORIZATION_CODE,
EMAIL_MESSAGE_HEADLINE,
EMAIL_MESSAGE_CONTENT)
.optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
.build();
}
}
Loading

0 comments on commit c880b7a

Please sign in to comment.