diff --git a/docs/en/connector-v2/sink/Email.md b/docs/en/connector-v2/sink/Email.md index f2bca2783d4..444be57292f 100644 --- a/docs/en/connector-v2/sink/Email.md +++ b/docs/en/connector-v2/sink/Email.md @@ -14,25 +14,26 @@ The tested email version is 1.5.6. ## Options -| name | type | required | default value | -|--------------------------|--------|----------|---------------| -| email_from_address | string | yes | - | -| email_to_address | string | yes | - | -| email_host | string | yes | - | -| email_transport_protocol | string | yes | - | -| email_smtp_auth | string | yes | - | -| email_authorization_code | string | yes | - | -| email_message_headline | string | yes | - | -| email_message_content | string | yes | - | -| common-options | | no | - | +| name | type | required | default value | +|--------------------------|---------|----------|---------------| +| email_from_address | string | yes | - | +| email_to_address | string | yes | - | +| email_host | string | yes | - | +| email_transport_protocol | string | yes | - | +| email_smtp_auth | boolean | yes | - | +| email_smtp_port | int | no | 465 | +| email_authorization_code | string | no | - | +| email_message_headline | string | yes | - | +| email_message_content | string | yes | - | +| common-options | | no | - | ### email_from_address [string] -Sender Email Address . +Sender Email Address. ### email_to_address [string] -Address to receive mail. +Address to receive mail, Support multiple email addresses, separated by commas (,). ### email_host [string] @@ -42,10 +43,14 @@ SMTP server to connect to. The protocol to load the session . -### email_smtp_auth [string] +### email_smtp_auth [boolean] Whether to authenticate the customer. +### email_smtp_port [int] + +Select port for authentication. + ### email_authorization_code [string] authorization code,You can obtain the authorization code from the mailbox Settings. diff --git a/docs/zh/connector-v2/sink/Email.md b/docs/zh/connector-v2/sink/Email.md index cc3999c580c..a254dc4608f 100644 --- a/docs/zh/connector-v2/sink/Email.md +++ b/docs/zh/connector-v2/sink/Email.md @@ -16,17 +16,18 @@ ## 选项 -| 名称 | 类型 | 是否必须 | 默认值 | -|--------------------------|--------|------|-----| -| email_from_address | string | 是 | - | -| email_to_address | string | 是 | - | -| email_host | string | 是 | - | -| email_transport_protocol | string | 是 | - | -| email_smtp_auth | string | 是 | - | -| email_authorization_code | string | 是 | - | -| email_message_headline | string | 是 | - | -| email_message_content | string | 是 | - | -| common-options | | 否 | - | +| 名称 | 类型 | 是否必须 | 默认值 | +|--------------------------|---------|------|-----| +| email_from_address | string | 是 | - | +| email_to_address | string | 是 | - | +| email_host | string | 是 | - | +| email_transport_protocol | string | 是 | - | +| email_smtp_auth | boolean | 是 | - | +| email_smtp_port | int | 否 | 465 | +| email_authorization_code | string | 否 | - | +| email_message_headline | string | 是 | - | +| email_message_content | string | 是 | - | +| common-options | | 否 | - | ### email_from_address [string] @@ -34,7 +35,7 @@ ### email_to_address [string] -接收邮件的地址 +接收邮件的地址,支持多个邮箱地址,以逗号(,)分隔。 ### email_host [string] @@ -44,10 +45,14 @@ 加载会话的协议 -### email_smtp_auth [string] +### email_smtp_auth [boolean] 是否对客户进行认证 +### email_smtp_port [int] + +选择用于身份验证的端口。 + ### email_authorization_code [string] 授权码,您可以从邮箱设置中获取授权码 diff --git a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/config/EmailConfig.java b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/config/EmailConfig.java index 03825804d82..406de844df3 100644 --- a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/config/EmailConfig.java +++ b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/config/EmailConfig.java @@ -22,6 +22,8 @@ public class EmailConfig { + public static final String CONNECTOR_IDENTITY = "EmailSink"; + public static final Option EMAIL_FROM_ADDRESS = Options.key("email_from_address") .stringType() @@ -61,9 +63,15 @@ public class EmailConfig { .stringType() .noDefaultValue() .withDescription("The protocol used to send the message"); - public static final Option EMAIL_SMTP_AUTH = + public static final Option EMAIL_SMTP_AUTH = Options.key("email_smtp_auth") - .stringType() + .booleanType() .noDefaultValue() .withDescription("Whether to use SMTP authentication"); + + public static final Option EMAIL_SMTP_PORT = + Options.key("email_smtp_port") + .intType() + .defaultValue(465) + .withDescription("Select port for authentication."); } diff --git a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/config/EmailSinkConfig.java b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/config/EmailSinkConfig.java index f876fab37c7..8455a2ed2c1 100644 --- a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/config/EmailSinkConfig.java +++ b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/config/EmailSinkConfig.java @@ -17,22 +17,25 @@ package org.apache.seatunnel.connectors.seatunnel.email.config; -import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import lombok.Data; import lombok.NonNull; +import java.io.Serializable; + import static org.apache.seatunnel.connectors.seatunnel.email.config.EmailConfig.EMAIL_AUTHORIZATION_CODE; import static org.apache.seatunnel.connectors.seatunnel.email.config.EmailConfig.EMAIL_FROM_ADDRESS; import static org.apache.seatunnel.connectors.seatunnel.email.config.EmailConfig.EMAIL_HOST; import static org.apache.seatunnel.connectors.seatunnel.email.config.EmailConfig.EMAIL_MESSAGE_CONTENT; import static org.apache.seatunnel.connectors.seatunnel.email.config.EmailConfig.EMAIL_MESSAGE_HEADLINE; import static org.apache.seatunnel.connectors.seatunnel.email.config.EmailConfig.EMAIL_SMTP_AUTH; +import static org.apache.seatunnel.connectors.seatunnel.email.config.EmailConfig.EMAIL_SMTP_PORT; import static org.apache.seatunnel.connectors.seatunnel.email.config.EmailConfig.EMAIL_TO_ADDRESS; import static org.apache.seatunnel.connectors.seatunnel.email.config.EmailConfig.EMAIL_TRANSPORT_PROTOCOL; @Data -public class EmailSinkConfig { +public class EmailSinkConfig implements Serializable { private String emailFromAddress; private String emailToAddress; private String emailAuthorizationCode; @@ -40,32 +43,19 @@ public class EmailSinkConfig { private String emailMessageContent; private String emailHost; private String emailTransportProtocol; - private String emailSmtpAuth; + private Boolean emailSmtpAuth; + private Integer emailSmtpPort; - public EmailSinkConfig(@NonNull Config pluginConfig) { - if (pluginConfig.hasPath(EMAIL_FROM_ADDRESS.key())) { - this.emailFromAddress = pluginConfig.getString(EMAIL_FROM_ADDRESS.key()); - } - if (pluginConfig.hasPath(EMAIL_TO_ADDRESS.key())) { - this.emailToAddress = pluginConfig.getString(EMAIL_TO_ADDRESS.key()); - } - if (pluginConfig.hasPath(EMAIL_AUTHORIZATION_CODE.key())) { - this.emailAuthorizationCode = pluginConfig.getString(EMAIL_AUTHORIZATION_CODE.key()); - } - if (pluginConfig.hasPath(EMAIL_MESSAGE_HEADLINE.key())) { - this.emailMessageHeadline = pluginConfig.getString(EMAIL_MESSAGE_HEADLINE.key()); - } - if (pluginConfig.hasPath(EMAIL_MESSAGE_CONTENT.key())) { - this.emailMessageContent = pluginConfig.getString(EMAIL_MESSAGE_CONTENT.key()); - } - if (pluginConfig.hasPath(EMAIL_HOST.key())) { - this.emailHost = pluginConfig.getString(EMAIL_HOST.key()); - } - if (pluginConfig.hasPath(EMAIL_TRANSPORT_PROTOCOL.key())) { - this.emailTransportProtocol = pluginConfig.getString(EMAIL_TRANSPORT_PROTOCOL.key()); - } - if (pluginConfig.hasPath(EMAIL_SMTP_AUTH.key())) { - this.emailSmtpAuth = pluginConfig.getString(EMAIL_SMTP_AUTH.key()); - } + public EmailSinkConfig(@NonNull ReadonlyConfig pluginConfig) { + super(); + this.emailFromAddress = pluginConfig.get(EMAIL_FROM_ADDRESS); + this.emailToAddress = pluginConfig.get(EMAIL_TO_ADDRESS); + this.emailAuthorizationCode = pluginConfig.get(EMAIL_AUTHORIZATION_CODE); + this.emailMessageHeadline = pluginConfig.get(EMAIL_MESSAGE_HEADLINE); + this.emailMessageContent = pluginConfig.get(EMAIL_MESSAGE_CONTENT); + this.emailHost = pluginConfig.get(EMAIL_HOST); + this.emailTransportProtocol = pluginConfig.get(EMAIL_TRANSPORT_PROTOCOL); + this.emailSmtpAuth = pluginConfig.get(EMAIL_SMTP_AUTH); + this.emailSmtpPort = pluginConfig.get(EMAIL_SMTP_PORT); } } diff --git a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSink.java b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSink.java index c1b8ffdd37b..0a3df90a120 100644 --- a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSink.java +++ b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSink.java @@ -17,40 +17,38 @@ package org.apache.seatunnel.connectors.seatunnel.email.sink; -import org.apache.seatunnel.shade.com.typesafe.config.Config; - -import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.sink.SupportMultiTableSink; +import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; -import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; - -import com.google.auto.service.AutoService; +import org.apache.seatunnel.connectors.seatunnel.email.config.EmailConfig; +import org.apache.seatunnel.connectors.seatunnel.email.config.EmailSinkConfig; -@AutoService(SeaTunnelSink.class) -public class EmailSink extends AbstractSimpleSink { +public class EmailSink extends AbstractSimpleSink + implements SupportMultiTableSink { - private Config pluginConfig; private SeaTunnelRowType seaTunnelRowType; - - @Override - public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) { - this.seaTunnelRowType = seaTunnelRowType; + private ReadonlyConfig readonlyConfig; + private CatalogTable catalogTable; + private EmailSinkConfig pluginConfig; + + public EmailSink(ReadonlyConfig config, CatalogTable table) { + this.readonlyConfig = config; + this.catalogTable = table; + this.pluginConfig = new EmailSinkConfig(config); + this.seaTunnelRowType = catalogTable.getSeaTunnelRowType(); } @Override - public AbstractSinkWriter createWriter(SinkWriter.Context context) { + public EmailSinkWriter createWriter(SinkWriter.Context context) { return new EmailSinkWriter(seaTunnelRowType, pluginConfig); } @Override public String getPluginName() { - return "EmailSink"; - } - - @Override - public void prepare(Config pluginConfig) { - this.pluginConfig = pluginConfig; + return EmailConfig.CONNECTOR_IDENTITY; } } diff --git a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSinkFactory.java b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSinkFactory.java index 243985ff6f1..8db3e42cacf 100644 --- a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSinkFactory.java +++ b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSinkFactory.java @@ -18,8 +18,12 @@ package org.apache.seatunnel.connectors.seatunnel.email.sink; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.sink.SinkCommonOptions; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.connector.TableSink; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSinkFactory; +import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; import com.google.auto.service.AutoService; @@ -39,6 +43,12 @@ public String factoryIdentifier() { return "EmailSink"; } + @Override + public TableSink createSink(TableSinkFactoryContext context) { + CatalogTable catalogTable = context.getCatalogTable(); + return () -> new EmailSink(context.getOptions(), catalogTable); + } + @Override public OptionRule optionRule() { return OptionRule.builder() @@ -51,6 +61,7 @@ public OptionRule optionRule() { EMAIL_AUTHORIZATION_CODE, EMAIL_MESSAGE_HEADLINE, EMAIL_MESSAGE_CONTENT) + .optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA) .build(); } } diff --git a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSinkWriter.java b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSinkWriter.java index ebcb5f90418..f7fe04c0685 100644 --- a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSinkWriter.java +++ b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSinkWriter.java @@ -17,8 +17,7 @@ package org.apache.seatunnel.connectors.seatunnel.email.sink; -import org.apache.seatunnel.shade.com.typesafe.config.Config; - +import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.exception.CommonError; @@ -33,6 +32,7 @@ import javax.activation.DataHandler; import javax.activation.DataSource; import javax.activation.FileDataSource; +import javax.mail.Address; import javax.mail.Authenticator; import javax.mail.BodyPart; import javax.mail.Message; @@ -51,15 +51,16 @@ import java.util.Properties; @Slf4j -public class EmailSinkWriter extends AbstractSinkWriter { +public class EmailSinkWriter extends AbstractSinkWriter + implements SupportMultiTableSinkWriter { private final SeaTunnelRowType seaTunnelRowType; - private EmailSinkConfig config; + private final EmailSinkConfig config; private StringBuffer stringBuffer; - public EmailSinkWriter(SeaTunnelRowType seaTunnelRowType, Config pluginConfig) { + public EmailSinkWriter(SeaTunnelRowType seaTunnelRowType, EmailSinkConfig pluginConfig) { this.seaTunnelRowType = seaTunnelRowType; - this.config = new EmailSinkConfig(pluginConfig); + this.config = pluginConfig; this.stringBuffer = new StringBuffer(); } @@ -78,29 +79,32 @@ public void write(SeaTunnelRow element) { public void close() { createFile(); Properties properties = new Properties(); - properties.setProperty("mail.host", config.getEmailHost()); - properties.setProperty("mail.transport.protocol", config.getEmailTransportProtocol()); - - properties.setProperty("mail.smtp.auth", config.getEmailSmtpAuth()); + properties.setProperty("mail.smtp.auth", config.getEmailSmtpAuth().toString()); + properties.setProperty("mail.smtp.port", config.getEmailSmtpPort().toString()); try { MailSSLSocketFactory sf = new MailSSLSocketFactory(); sf.setTrustAllHosts(true); - properties.put("mail.smtp.ssl.enable", "true"); properties.put("mail.smtp.ssl.socketFactory", sf); - Session session = - Session.getDefaultInstance( - properties, - new Authenticator() { - @Override - protected PasswordAuthentication getPasswordAuthentication() { - return new PasswordAuthentication( - config.getEmailFromAddress(), - config.getEmailAuthorizationCode()); - } - }); + Session session; + if (config.getEmailSmtpAuth()) { + properties.put("mail.smtp.ssl.enable", "true"); + session = + Session.getDefaultInstance( + properties, + new Authenticator() { + @Override + protected PasswordAuthentication getPasswordAuthentication() { + return new PasswordAuthentication( + config.getEmailFromAddress(), + config.getEmailAuthorizationCode()); + } + }); + } else { + session = Session.getDefaultInstance(properties); + } // Create the default MimeMessage object MimeMessage message = new MimeMessage(session); @@ -108,8 +112,14 @@ protected PasswordAuthentication getPasswordAuthentication() { message.setFrom(new InternetAddress(config.getEmailFromAddress())); // Set the recipient email address - message.addRecipient( - Message.RecipientType.TO, new InternetAddress(config.getEmailToAddress())); + String[] emailAddresses = config.getEmailToAddress().split(","); + Address[] addresses = new Address[emailAddresses.length]; + for (int i = 0; i < emailAddresses.length; i++) { + addresses[i] = new InternetAddress(emailAddresses[i]); + } + if (addresses.length > 0) { + message.setRecipients(Message.RecipientType.TO, addresses); + } // Setting the Email subject message.setSubject(config.getEmailMessageHeadline()); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/pom.xml new file mode 100644 index 00000000000..7a6552989a2 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/pom.xml @@ -0,0 +1,37 @@ + + + + 4.0.0 + + org.apache.seatunnel + seatunnel-connector-v2-e2e + ${revision} + + + connector-email-e2e + SeaTunnel : E2E : Connector V2 : Email + + + + + org.apache.seatunnel + connector-email + ${project.version} + test + + + diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/src/test/java/org/apache/seatunnel/e2e/connector/email/EmailWithMultiIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/src/test/java/org/apache/seatunnel/e2e/connector/email/EmailWithMultiIT.java new file mode 100644 index 00000000000..2822231874e --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/src/test/java/org/apache/seatunnel/e2e/connector/email/EmailWithMultiIT.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.email; + +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.EngineType; +import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.TestTemplate; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerImageName; + +import lombok.extern.slf4j.Slf4j; + +import javax.mail.Flags; +import javax.mail.Folder; +import javax.mail.Message; +import javax.mail.Session; +import javax.mail.Store; + +import java.io.IOException; +import java.util.Properties; +import java.util.stream.Stream; + +@Slf4j +public class EmailWithMultiIT extends TestSuiteBase implements TestResource { + private static final String IMAGE = "greenmail/standalone"; + private static final String HOST = "email-e2e"; + private static final int STMP_PORT = 3025; + private static final int IMAP_PORT = 3143; + + private GenericContainer smtpContainer; + + @BeforeAll + @Override + public void startUp() { + this.smtpContainer = + new GenericContainer<>(DockerImageName.parse(IMAGE)) + .withNetwork(NETWORK) + .withNetworkAliases(HOST) + .withExposedPorts(STMP_PORT, IMAP_PORT) + .withLogConsumer( + new Slf4jLogConsumer(LoggerFactory.getLogger("email-service"))); + Startables.deepStart(Stream.of(smtpContainer)).join(); + log.info("SMTP container started"); + } + + @Override + public void tearDown() throws Exception { + if (smtpContainer != null) { + smtpContainer.stop(); + } + } + + @TestTemplate + public void testEmailSink(TestContainer container) throws Exception { + Container.ExecResult textWriteResult = container.executeJob("/fake_to_email.conf"); + testEMailSuccess(1, "receiver-1@example.com", "receiver-2@example.com"); + Assertions.assertEquals(0, textWriteResult.getExitCode()); + } + + @TestTemplate + @DisabledOnContainer( + value = {}, + type = {EngineType.FLINK}, + disabledReason = "Currently FLINK do not support multi-table") + public void testMultipleTableEmailSink(TestContainer container) throws Exception { + Container.ExecResult textWriteResult = container.executeJob("/fake_to_multiemailsink.conf"); + testEMailSuccess(2, "receiver-3@example.com", "receiver-4@example.com"); + Assertions.assertEquals(0, textWriteResult.getExitCode()); + } + + private Session setupImap() { + log.info("in setupImap"); + Properties props = new Properties(); + props.setProperty("mail.store.protocol", "imap"); + props.put("mail.imap.host", smtpContainer.getHost()); + props.put("mail.imap.port", smtpContainer.getMappedPort(IMAP_PORT)); + props.put("mail.imap.localaddress", smtpContainer.getHost()); + return Session.getInstance(props, null); + } + + private void testEMailSuccess(int receivedNum, String... users) throws Exception { + Session sessionIMAP = setupImap(); + for (String user : users) { + Store store = sessionIMAP.getStore("imap"); + store.connect( + smtpContainer.getHost(), smtpContainer.getMappedPort(IMAP_PORT), user, ""); + if (store.isConnected()) { + log.info("IMAP is connected"); + Folder folder = store.getFolder("INBOX"); + if (folder != null) { + // Open the folder in read/write mode + folder.open(Folder.READ_WRITE); + + Message[] messages = folder.getMessages(); + int unreadCount = 0; + + for (Message message : messages) { + // Process only unread mail + if (!message.isSet(Flags.Flag.SEEN)) { + unreadCount++; + // Mark as read + message.setFlag(Flags.Flag.SEEN, true); + } + } + + log.info("mail messages.length: {}", unreadCount); + Assertions.assertEquals(receivedNum, unreadCount); + } + } else { + log.info("IMAP is not connected"); + } + } + } + + @Disabled("Email authentication address and authentication information need to be configured") + public void testOwnEmailSink(TestContainer container) throws IOException, InterruptedException { + Container.ExecResult textReadResult = container.executeJob("/fake_to_email_test.conf"); + Assertions.assertEquals(0, textReadResult.getExitCode()); + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/src/test/resources/fake_to_email.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/src/test/resources/fake_to_email.conf new file mode 100644 index 00000000000..d69b83fd288 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/src/test/resources/fake_to_email.conf @@ -0,0 +1,63 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + FakeSource { + tables_configs = [ + { + row.num = 100 + schema = { + table = "test.table1" + columns = [ + { + name = "id" + type = "bigint" + }, + { + name = "name" + type = "string" + }, + { + name = "age" + type = "int" + } + ] + } + } + ] + result_table_name = "fake" + } +} + +sink { + EmailSink { + email_from_address = "sender@example.com" + email_to_address = "receiver-1@example.com,receiver-2@example.com" + email_host = "email-e2e" + email_transport_protocol = "smtp" + email_smtp_auth = "false" + email_smtp_port = 3025 + email_authorization_code="" + email_message_headline = "test-title" + email_message_content = "test-content" + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/src/test/resources/fake_to_email_test.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/src/test/resources/fake_to_email_test.conf new file mode 100644 index 00000000000..b3657cd7a72 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/src/test/resources/fake_to_email_test.conf @@ -0,0 +1,62 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + FakeSource { + tables_configs = [ + { + row.num = 100 + schema = { + table = "test.table1" + columns = [ + { + name = "id" + type = "bigint" + }, + { + name = "name" + type = "string" + }, + { + name = "age" + type = "int" + } + ] + } + } + ] + result_table_name = "fake" + } +} + +sink { + EmailSink { + email_from_address = "xxxxxxxx@qq.com" + email_to_address = "xxxxxxxxx@qq.com" + email_host="smtp.qq.com" + email_transport_protocol="smtp" + email_smtp_auth="true" + email_authorization_code="you authorization code" + email_message_headline="test-title" + email_message_content="test-content" + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/src/test/resources/fake_to_multiemailsink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/src/test/resources/fake_to_multiemailsink.conf new file mode 100644 index 00000000000..974ad1cbb2d --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/src/test/resources/fake_to_multiemailsink.conf @@ -0,0 +1,83 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + FakeSource { + tables_configs = [ + { + row.num = 100 + schema = { + table = "test.table1" + columns = [ + { + name = "id" + type = "bigint" + }, + { + name = "name" + type = "string" + }, + { + name = "age" + type = "int" + } + ] + } + }, + { + row.num = 100 + schema = { + table = "test.table2" + columns = [ + { + name = "id" + type = "bigint" + }, + { + name = "name" + type = "string" + }, + { + name = "age" + type = "int" + } + ] + } + } + ] + result_table_name = "fake" + } +} + +sink { + EmailSink { + email_from_address = "sender@example.com" + email_to_address = "receiver-3@example.com,receiver-4@example.com" + email_host = "email-e2e" + email_transport_protocol = "smtp" + email_smtp_auth = false + email_smtp_port = 3025 + email_authorization_code="" + email_message_headline = "test-title" + email_message_content = "test-content" + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml index ed36310474a..4933ab02057 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml @@ -77,6 +77,7 @@ connector-milvus-e2e connector-activemq-e2e connector-sls-e2e + connector-email-e2e