diff --git a/tmail-backend/guice/distributed/src/main/java/com/linagora/tmail/event/DistributedEmailAddressContactEventModule.java b/tmail-backend/guice/distributed/src/main/java/com/linagora/tmail/event/DistributedEmailAddressContactEventModule.java index eb76a4d7b7..b79aeb8438 100644 --- a/tmail-backend/guice/distributed/src/main/java/com/linagora/tmail/event/DistributedEmailAddressContactEventModule.java +++ b/tmail-backend/guice/distributed/src/main/java/com/linagora/tmail/event/DistributedEmailAddressContactEventModule.java @@ -9,6 +9,7 @@ import org.apache.james.events.EventBusId; import org.apache.james.events.EventBusName; import org.apache.james.events.EventDeadLetters; +import org.apache.james.events.EventSerializer; import org.apache.james.events.NamingStrategy; import org.apache.james.events.RabbitMQEventBus; import org.apache.james.events.RetryBackoffConfiguration; @@ -22,6 +23,7 @@ import com.google.inject.AbstractModule; import com.google.inject.Provides; import com.google.inject.Singleton; +import com.google.inject.multibindings.Multibinder; import com.google.inject.multibindings.ProvidesIntoSet; import com.google.inject.name.Names; import com.linagora.tmail.james.jmap.EmailAddressContactInjectKeys; @@ -36,6 +38,10 @@ public class DistributedEmailAddressContactEventModule extends AbstractModule { @Override protected void configure() { bind(EventBusId.class).annotatedWith(Names.named(EmailAddressContactInjectKeys.AUTOCOMPLETE)).toInstance(EventBusId.random()); + + Multibinder.newSetBinder(binder(), EventSerializer.class) + .addBinding() + .to(TmailJmapEventSerializer.class); } @ProvidesIntoSet diff --git a/tmail-backend/guice/distributed/src/main/java/com/linagora/tmail/event/TmailEventModule.java b/tmail-backend/guice/distributed/src/main/java/com/linagora/tmail/event/TmailEventModule.java index 92197ca1c9..592b69fdc6 100644 --- a/tmail-backend/guice/distributed/src/main/java/com/linagora/tmail/event/TmailEventModule.java +++ b/tmail-backend/guice/distributed/src/main/java/com/linagora/tmail/event/TmailEventModule.java @@ -11,6 +11,7 @@ import org.apache.james.events.EventBusId; import org.apache.james.events.EventBusName; import org.apache.james.events.EventDeadLetters; +import org.apache.james.events.EventSerializer; import org.apache.james.events.NamingStrategy; import org.apache.james.events.RabbitMQEventBus; import org.apache.james.events.RetryBackoffConfiguration; @@ -41,6 +42,9 @@ protected void configure() { bind(TmailEventSerializer.class).in(Scopes.SINGLETON); bind(EventBusId.class).annotatedWith(Names.named(TmailInjectNameConstants.TMAIL_EVENT_BUS_INJECT_NAME)).toInstance(EventBusId.random()); + Multibinder.newSetBinder(binder(), EventSerializer.class) + .addBinding() + .to(TmailEventSerializer.class); Multibinder.newSetBinder(binder(), TmailReactiveGroupEventListener.class); } diff --git a/tmail-backend/integration-tests/jmap/distributed-jmap-integration-tests/src/test/java/com/linagora/tmail/james/DistributedLinagoraSecondaryBlobStoreTest.java b/tmail-backend/integration-tests/jmap/distributed-jmap-integration-tests/src/test/java/com/linagora/tmail/james/DistributedLinagoraSecondaryBlobStoreTest.java index 8975910627..08a53dfbb8 100644 --- a/tmail-backend/integration-tests/jmap/distributed-jmap-integration-tests/src/test/java/com/linagora/tmail/james/DistributedLinagoraSecondaryBlobStoreTest.java +++ b/tmail-backend/integration-tests/jmap/distributed-jmap-integration-tests/src/test/java/com/linagora/tmail/james/DistributedLinagoraSecondaryBlobStoreTest.java @@ -4,9 +4,12 @@ import static io.netty.handler.codec.http.HttpHeaderNames.ACCEPT; import static io.restassured.RestAssured.given; import static io.restassured.RestAssured.requestSpecification; +import static io.restassured.RestAssured.with; import static io.restassured.http.ContentType.JSON; import static org.apache.james.blob.objectstorage.aws.S3BlobStoreConfiguration.UPLOAD_RETRY_EXCEPTION_PREDICATE; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.awaitility.Durations.FIVE_SECONDS; import static org.awaitility.Durations.ONE_HUNDRED_MILLISECONDS; import static org.awaitility.Durations.ONE_MINUTE; import static org.awaitility.Durations.TEN_SECONDS; @@ -30,6 +33,8 @@ import org.apache.james.blob.objectstorage.aws.S3BlobStoreDAO; import org.apache.james.core.Domain; import org.apache.james.core.Username; +import org.apache.james.events.EventDeadLetters; +import org.apache.james.events.Group; import org.apache.james.jmap.JMAPUrls; import org.apache.james.jmap.JmapGuiceProbe; import org.apache.james.jmap.http.UserCredential; @@ -39,6 +44,10 @@ import org.apache.james.modules.MailboxProbeImpl; import org.apache.james.utils.DataProbeImpl; import org.apache.james.utils.GuiceProbe; +import org.apache.james.utils.WebAdminGuiceProbe; +import org.apache.james.webadmin.WebAdminUtils; +import org.apache.james.webadmin.routes.EventDeadLettersRoutes; +import org.apache.james.webadmin.routes.TasksRoutes; import org.awaitility.Awaitility; import org.awaitility.core.ConditionFactory; import org.junit.jupiter.api.AfterAll; @@ -51,7 +60,10 @@ import com.google.inject.multibindings.Multibinder; import com.google.inject.name.Named; import com.linagora.tmail.blob.guice.BlobStoreConfiguration; +import com.linagora.tmail.blob.secondaryblobstore.FailedBlobEvents; +import com.linagora.tmail.blob.secondaryblobstore.FailedBlobOperationListener; import com.linagora.tmail.blob.secondaryblobstore.SecondaryBlobStoreDAO; +import com.linagora.tmail.common.event.TmailInjectNameConstants; import com.linagora.tmail.encrypted.MailboxConfiguration; import com.linagora.tmail.james.app.CassandraExtension; import com.linagora.tmail.james.app.DistributedJamesConfiguration; @@ -73,8 +85,6 @@ @Tag(BasicFeature.TAG) class DistributedLinagoraSecondaryBlobStoreTest { - public static final int FIVE_SECONDS = 5000; - public static final ConditionFactory calmlyAwait = Awaitility.with() .pollInterval(ONE_HUNDRED_MILLISECONDS) .and() @@ -85,12 +95,15 @@ class DistributedLinagoraSecondaryBlobStoreTest { static class BlobStoreProbe implements GuiceProbe { private final S3BlobStoreDAO primaryBlobStoreDAO; private final S3BlobStoreDAO secondaryBlobStoreDAO; + private final EventDeadLetters eventDeadLetters; @Inject - public BlobStoreProbe(@Named(MAYBE_SECONDARY_BLOBSTORE) BlobStoreDAO blobStoreDAO) { + public BlobStoreProbe(@Named(MAYBE_SECONDARY_BLOBSTORE) BlobStoreDAO blobStoreDAO, + @Named(TmailInjectNameConstants.TMAIL_EVENT_BUS_INJECT_NAME) EventDeadLetters eventDeadLetters) { SecondaryBlobStoreDAO secondaryBlobStoreDAO = (SecondaryBlobStoreDAO) blobStoreDAO; this.primaryBlobStoreDAO = (S3BlobStoreDAO) secondaryBlobStoreDAO.getFirstBlobStoreDAO(); this.secondaryBlobStoreDAO = (S3BlobStoreDAO) secondaryBlobStoreDAO.getSecondBlobStoreDAO(); + this.eventDeadLetters = eventDeadLetters; } public S3BlobStoreDAO getPrimaryBlobStoreDAO() { @@ -100,6 +113,10 @@ public S3BlobStoreDAO getPrimaryBlobStoreDAO() { public S3BlobStoreDAO getSecondaryBlobStoreDAO() { return secondaryBlobStoreDAO; } + + public EventDeadLetters getEventDeadLetters() { + return eventDeadLetters; + } } static final Domain DOMAIN = Domain.of("domain.tld"); @@ -162,7 +179,7 @@ static void afterAll() { } @BeforeEach - void setUp(GuiceJamesServer server) throws Exception { + void beforeEach(GuiceJamesServer server) throws Exception { prepareBlobStore(server); server.getProbe(DataProbeImpl.class) @@ -171,6 +188,10 @@ void setUp(GuiceJamesServer server) throws Exception { .addUser(BOB.asString(), BOB_PASSWORD) .addUser(ANDRE.asString(), ANDRE_PASSWORD); + MailboxProbeImpl mailboxProbe = server.getProbe(MailboxProbeImpl.class); + mailboxProbe.createMailbox(MailboxPath.inbox(BOB)); + mailboxProbe.createMailbox(MailboxPath.inbox(ANDRE)); + UserCredential userCredential = new UserCredential(BOB, BOB_PASSWORD); PreemptiveBasicAuthScheme authScheme = new PreemptiveBasicAuthScheme(); authScheme.setUserName(userCredential.username().asString()); @@ -186,11 +207,6 @@ void setUp(GuiceJamesServer server) throws Exception { .setAuth(authScheme) .addHeader(ACCEPT.toString(), ACCEPT_RFC8621_VERSION_HEADER) .build(); - - MailboxProbeImpl mailboxProbe = server.getProbe(MailboxProbeImpl.class); - mailboxProbe.createMailbox(MailboxPath.inbox(BOB)); - mailboxProbe.createMailbox(MailboxPath.inbox(ANDRE)); - EncryptHelper.uploadPublicKey(ACCOUNT_ID, requestSpecification); } @@ -229,7 +245,7 @@ void sendEmailShouldResultingInSavingDataToBothObjectStorages(GuiceJamesServer s } @Test - void sendEmailShouldResultingInEventuallySavingDataToBothObjectStoragesWhenSecondStorageIsDown(GuiceJamesServer server) throws Exception { + void sendEmailShouldResultingInEventuallySavingDataToBothObjectStoragesWhenSecondStorageIsDownForShortTime(GuiceJamesServer server) throws Exception { secondaryS3.pause(); given() @@ -248,12 +264,79 @@ void sendEmailShouldResultingInEventuallySavingDataToBothObjectStoragesWhenSecon BlobStoreProbe blobStoreProbe = server.getProbe(BlobStoreProbe.class); BucketName bucketName = Flux.from(blobStoreProbe.getPrimaryBlobStoreDAO().listBuckets()).collectList().block().getFirst(); + List blobIds = Flux.from(blobStoreProbe.getPrimaryBlobStoreDAO().listBlobs(bucketName)).collectList().block(); calmlyAwait.atMost(ONE_MINUTE) .untilAsserted(() -> { - List blobIds = Flux.from(blobStoreProbe.getPrimaryBlobStoreDAO().listBlobs(bucketName)).collectList().block(); List blobIds2 = Flux.from(blobStoreProbe.getSecondaryBlobStoreDAO().listBlobs(bucketName)).collectList().block(); assertThat(blobIds2).hasSameSizeAs(blobIds); assertThat(blobIds2).hasSameElementsAs(blobIds); }); } + + @Test + void sendEmailShouldResultingInEventuallySavingDataToBothObjectStoragesWhenSecondStorageIsDownForLongTime(GuiceJamesServer server) throws Exception { + secondaryS3.pause(); + + given() + .body(LinagoraEmailSendMethodContract$.MODULE$.bobSendsAMailToAndre(server)) + .when() + .post() + .then() + .statusCode(HttpStatus.SC_OK) + .contentType(JSON) + .extract() + .body() + .asString(); + + checkIfAllEventDeadLettersArePersisted(server); + secondaryS3.unpause(); + + WebAdminGuiceProbe webAdminGuiceProbe = server.getProbe(WebAdminGuiceProbe.class); + requestSpecification = WebAdminUtils.buildRequestSpecification(webAdminGuiceProbe.getWebAdminPort()) + .build(); + + // trigger reprocessing event dead letters + String taskId = with() + .queryParam("action", "reDeliver") + .post(EventDeadLettersRoutes.BASE_PATH + "/groups/" + new FailedBlobOperationListener.FailedBlobOperationListenerGroup().asString()) + .then() + .extract() + .jsonPath() + .get("taskId"); + + with() + .basePath(TasksRoutes.BASE) + .queryParam("timeout", "1m") + .get(taskId + "/await"); + + BlobStoreProbe blobStoreProbe = server.getProbe(BlobStoreProbe.class); + BucketName bucketName = Flux.from(blobStoreProbe.getPrimaryBlobStoreDAO().listBuckets()).collectList().block().getFirst(); + List expectedBlobIds = Flux.from(blobStoreProbe.getPrimaryBlobStoreDAO().listBlobs(bucketName)).collectList().block(); + calmlyAwait.atMost(TEN_SECONDS) + .untilAsserted(() -> { + List blobIds2 = Flux.from(blobStoreProbe.getSecondaryBlobStoreDAO().listBlobs(bucketName)).collectList().block(); + assertThat(blobIds2).hasSameSizeAs(expectedBlobIds); + assertThat(blobIds2).hasSameElementsAs(expectedBlobIds); + }); + } + + private void checkIfAllEventDeadLettersArePersisted(GuiceJamesServer server) { + BlobStoreProbe blobStoreProbe = server.getProbe(BlobStoreProbe.class); + Group group = new FailedBlobOperationListener.FailedBlobOperationListenerGroup(); + calmlyAwait.atMost(ONE_MINUTE) + .untilAsserted(() -> { + assertThatCode(() -> { + BucketName bucketName = Flux.from(blobStoreProbe.getPrimaryBlobStoreDAO().listBuckets()).collectList().block().getFirst(); + List expectedBlobIds = Flux.from(blobStoreProbe.getPrimaryBlobStoreDAO().listBlobs(bucketName)).collectList().block(); + List insertionIds = blobStoreProbe.getEventDeadLetters().failedIds(group) + .collectList().block(); + List events = insertionIds.stream() + .map(insertionId -> blobStoreProbe.getEventDeadLetters().failedEvent(group, insertionId).block()) + .map(FailedBlobEvents.BlobAddition.class::cast) + .toList(); + assertThat(bucketName).isEqualTo(events.getFirst().bucketName()); + assertThat(events.stream().map(FailedBlobEvents.BlobAddition::blobId)).hasSameElementsAs(expectedBlobIds); + }).doesNotThrowAnyException(); + }); + } }