From e5a63e02c1739152a4a8f0b4c21fe59bb56480e2 Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Thu, 11 Jan 2024 13:35:02 +0000 Subject: [PATCH] Update to use SinkExt components Signed-off-by: Stephen Wakely --- Cargo.lock | 141 ++++++------------- Cargo.toml | 2 +- scripts/integration/mqtt/test.yaml | 11 +- src/internal_events/mod.rs | 8 -- src/internal_events/mqtt.rs | 5 +- src/sinks/mqtt/config.rs | 5 +- src/sinks/mqtt/integration_tests.rs | 2 +- src/sinks/mqtt/mod.rs | 2 + src/sinks/mqtt/request_builder.rs | 92 +++++++++++++ src/sinks/mqtt/service.rs | 92 +++++++++++++ src/sinks/mqtt/sink.rs | 205 ++++++++++------------------ 11 files changed, 314 insertions(+), 251 deletions(-) create mode 100644 src/sinks/mqtt/request_builder.rs create mode 100644 src/sinks/mqtt/service.rs diff --git a/Cargo.lock b/Cargo.lock index f13702269182b..fece00cb2a3cd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -597,8 +597,8 @@ dependencies = [ "once_cell", "rand 0.8.5", "regex", - "ring 0.17.5", - "rustls 0.21.8", + "ring", + "rustls", "rustls-native-certs", "rustls-pemfile", "rustls-webpki", @@ -610,7 +610,7 @@ dependencies = [ "time", "tokio", "tokio-retry", - "tokio-rustls 0.24.1", + "tokio-rustls", "tracing 0.1.40", "url", ] @@ -1159,7 +1159,7 @@ dependencies = [ "once_cell", "pin-project-lite", "pin-utils", - "rustls 0.21.8", + "rustls", "tokio", "tracing 0.1.40", ] @@ -1556,7 +1556,7 @@ dependencies = [ "hyperlocal", "log", "pin-project-lite", - "rustls 0.21.8", + "rustls", "rustls-native-certs", "rustls-pemfile", "rustls-webpki", @@ -3348,11 +3348,21 @@ checksum = "1657b4441c3403d9f7b3409e47575237dac27b1b5726df654a6ecbf92f0f7577" dependencies = [ "futures-core", "futures-sink", - "nanorand", "pin-project", "spin 0.9.8", ] +[[package]] +name = "flume" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55ac459de2512911e4b674ce33cf20befaba382d05b62b008afc1c8b57cbf181" +dependencies = [ + "futures-core", + "futures-sink", + "spin 0.9.8", +] + [[package]] name = "fnv" version = "1.0.7" @@ -4229,10 +4239,10 @@ dependencies = [ "http 0.2.9", "hyper", "log", - "rustls 0.21.8", + "rustls", "rustls-native-certs", "tokio", - "tokio-rustls 0.24.1", + "tokio-rustls", ] [[package]] @@ -4817,7 +4827,7 @@ dependencies = [ "async-reactor-trait", "async-trait", "executor-trait", - "flume", + "flume 0.10.14", "futures-core", "futures-io", "parking_lot", @@ -5327,7 +5337,7 @@ dependencies = [ "percent-encoding", "rand 0.8.5", "rustc_version_runtime", - "rustls 0.21.8", + "rustls", "rustls-pemfile", "serde", "serde_bytes", @@ -5340,7 +5350,7 @@ dependencies = [ "take_mut", "thiserror", "tokio", - "tokio-rustls 0.24.1", + "tokio-rustls", "tokio-util", "trust-dns-proto", "trust-dns-resolver", @@ -5373,15 +5383,6 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" -[[package]] -name = "nanorand" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" -dependencies = [ - "getrandom 0.2.11", -] - [[package]] name = "native-tls" version = "0.2.11" @@ -6349,7 +6350,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d894b67aa7a4bf295db5e85349078c604edaa6fa5c8721e8eca3c7729a27f2ac" dependencies = [ "doc-comment", - "flume", + "flume 0.10.14", "parking_lot", "tracing 0.1.40", ] @@ -6462,12 +6463,6 @@ dependencies = [ "windows-sys 0.48.0", ] -[[package]] -name = "pollster" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5da3b0203fd7ee5720aa0b5e790b591aa5d3f41c3ed2c34a3a393382198af2f7" - [[package]] name = "poly1305" version = "0.8.0" @@ -7411,7 +7406,7 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", - "rustls 0.21.8", + "rustls", "rustls-pemfile", "serde", "serde_json", @@ -7419,7 +7414,7 @@ dependencies = [ "system-configuration", "tokio", "tokio-native-tls", - "tokio-rustls 0.24.1", + "tokio-rustls", "tokio-util", "tower-service", "url", @@ -7457,21 +7452,6 @@ dependencies = [ "subtle", ] -[[package]] -name = "ring" -version = "0.16.20" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc" -dependencies = [ - "cc", - "libc", - "once_cell", - "spin 0.5.2", - "untrusted 0.7.1", - "web-sys", - "winapi", -] - [[package]] name = "ring" version = "0.17.5" @@ -7482,7 +7462,7 @@ dependencies = [ "getrandom 0.2.11", "libc", "spin 0.9.8", - "untrusted 0.9.0", + "untrusted", "windows-sys 0.48.0", ] @@ -7623,20 +7603,20 @@ dependencies = [ [[package]] name = "rumqttc" -version = "0.20.0" +version = "0.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b616bf8b706c2a6235604f5d93f9578c37d0c6161e13898b68a1da4af2d812c" +checksum = "8d8941c6791801b667d52bfe9ff4fc7c968d4f3f9ae8ae7abdaaa1c966feafc8" dependencies = [ "bytes 1.5.0", - "flume", - "futures 0.3.30", + "flume 0.11.0", + "futures-util", "log", - "pollster", "rustls-native-certs", "rustls-pemfile", + "rustls-webpki", "thiserror", "tokio", - "tokio-rustls 0.23.4", + "tokio-rustls", ] [[package]] @@ -7722,18 +7702,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "rustls" -version = "0.20.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b80e3dec595989ea8510028f30c408a4630db12c9cbb8de34203b89d6577e99" -dependencies = [ - "log", - "ring 0.16.20", - "sct", - "webpki", -] - [[package]] name = "rustls" version = "0.21.8" @@ -7741,7 +7709,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "446e14c5cda4f3f30fe71863c34ec70f5ac79d6087097ad0bb433e1be5edf04c" dependencies = [ "log", - "ring 0.17.5", + "ring", "rustls-webpki", "sct", ] @@ -7773,8 +7741,8 @@ version = "0.101.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" dependencies = [ - "ring 0.17.5", - "untrusted 0.9.0", + "ring", + "untrusted", ] [[package]] @@ -7887,8 +7855,8 @@ version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" dependencies = [ - "ring 0.17.5", - "untrusted 0.9.0", + "ring", + "untrusted", ] [[package]] @@ -9036,24 +9004,13 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-rustls" -version = "0.23.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59" -dependencies = [ - "rustls 0.20.9", - "tokio", - "webpki", -] - [[package]] name = "tokio-rustls" version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" dependencies = [ - "rustls 0.21.8", + "rustls", "tokio", ] @@ -9090,7 +9047,7 @@ checksum = "212d5dcb2a1ce06d81107c3d0ffa3121fe974b73f068c8282cb1c32328113b6c" dependencies = [ "futures-util", "log", - "rustls 0.21.8", + "rustls", "tokio", "tungstenite", ] @@ -9198,7 +9155,7 @@ dependencies = [ "prost 0.11.9", "rustls-pemfile", "tokio", - "tokio-rustls 0.24.1", + "tokio-rustls", "tokio-stream", "tower", "tower-layer", @@ -9226,11 +9183,11 @@ dependencies = [ "percent-encoding", "pin-project", "prost 0.12.3", - "rustls 0.21.8", + "rustls", "rustls-native-certs", "rustls-pemfile", "tokio", - "tokio-rustls 0.24.1", + "tokio-rustls", "tokio-stream", "tower", "tower-layer", @@ -9745,12 +9702,6 @@ version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab4c90930b95a82d00dc9e9ac071b4991924390d46cbd0dfe566148667605e4b" -[[package]] -name = "untrusted" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" - [[package]] name = "untrusted" version = "0.9.0" @@ -10691,16 +10642,6 @@ dependencies = [ "web-sys", ] -[[package]] -name = "webpki" -version = "0.22.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed63aea5ce73d0ff405984102c42de94fc55a6b75765d621c65262469b3c9b53" -dependencies = [ - "ring 0.17.5", - "untrusted 0.9.0", -] - [[package]] name = "webpki-roots" version = "0.25.2" diff --git a/Cargo.toml b/Cargo.toml index 5591885ee7f82..1a2c95335016e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -300,7 +300,7 @@ rdkafka = { version = "0.35.0", default-features = false, features = ["tokio", " redis = { version = "0.24.0", default-features = false, features = ["connection-manager", "tokio-comp", "tokio-native-tls-comp"], optional = true } regex = { version = "1.10.2", default-features = false, features = ["std", "perf"] } roaring = { version = "0.10.2", default-features = false, optional = true } -rumqttc = { version = "0.20.0", default-features = false, features = ["use-rustls"], optional = true } +rumqttc = { version = "0.23.0", default-features = false, features = ["use-rustls"], optional = true } seahash = { version = "4.1.0", default-features = false } semver = { version = "1.0.21", default-features = false, features = ["serde", "std"], optional = true } smallvec = { version = "1", default-features = false, features = ["union", "serde"] } diff --git a/scripts/integration/mqtt/test.yaml b/scripts/integration/mqtt/test.yaml index 5f7fa958ac617..0538ee8091127 100644 --- a/scripts/integration/mqtt/test.yaml +++ b/scripts/integration/mqtt/test.yaml @@ -1,8 +1,11 @@ -args: -- --features +features: - mqtt-integration-tests -- --lib -- '::mqtt::' + +test_filter: '::mqtt::' matrix: version: ['5.0.15'] + +paths: +- "src/internal_events/mqtt.rs" +- "src/sinks/mqtt/**" diff --git a/src/internal_events/mod.rs b/src/internal_events/mod.rs index ce902fe0bae28..924215599c63e 100644 --- a/src/internal_events/mod.rs +++ b/src/internal_events/mod.rs @@ -87,10 +87,6 @@ mod lua; mod metric_to_log; #[cfg(feature = "sources-mongodb_metrics")] mod mongodb_metrics; -#[cfg(feature = "sinks-mqtt")] -mod mqtt; -#[cfg(feature = "sinks-nats")] -mod nats; #[cfg(feature = "sources-nginx_metrics")] mod nginx_metrics; mod open; @@ -225,10 +221,6 @@ pub(crate) use self::loki::*; pub(crate) use self::lua::*; #[cfg(feature = "transforms-metric_to_log")] pub(crate) use self::metric_to_log::*; -#[cfg(feature = "sinks-mqtt")] -pub(crate) use self::mqtt::*; -#[cfg(feature = "sinks-nats")] -pub(crate) use self::nats::*; #[cfg(feature = "sources-nginx_metrics")] pub(crate) use self::nginx_metrics::*; #[allow(unused_imports)] diff --git a/src/internal_events/mqtt.rs b/src/internal_events/mqtt.rs index 791572ebfe961..129133054169b 100644 --- a/src/internal_events/mqtt.rs +++ b/src/internal_events/mqtt.rs @@ -2,9 +2,8 @@ use std::fmt::Debug; use metrics::counter; use rumqttc::{ClientError, ConnectionError}; -use vector_core::internal_event::InternalEvent; - -use vector_common::internal_event::{error_stage, error_type}; +use vector_lib::internal_event::InternalEvent; +use vector_lib::internal_event::{error_stage, error_type}; #[derive(Debug)] pub struct MqttConnectionError { diff --git a/src/sinks/mqtt/config.rs b/src/sinks/mqtt/config.rs index 21d8452dda7d1..7e8175eae9034 100644 --- a/src/sinks/mqtt/config.rs +++ b/src/sinks/mqtt/config.rs @@ -1,9 +1,8 @@ use std::time::Duration; -use codecs::JsonSerializerConfig; use rumqttc::{MqttOptions, QoS, TlsConfiguration, Transport}; use snafu::{ResultExt, Snafu}; -use vector_config::configurable_component; +use vector_lib::codecs::JsonSerializerConfig; use crate::template::Template; use crate::{ @@ -11,6 +10,7 @@ use crate::{ config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext}, sinks::{ mqtt::sink::{ConfigurationSnafu, MqttConnector, MqttError, MqttSink, TlsSnafu}, + prelude::*, Healthcheck, VectorSink, }, tls::{MaybeTlsSettings, TlsEnableableConfig}, @@ -133,6 +133,7 @@ impl Default for MqttSinkConfig { impl_generate_config_from_default!(MqttSinkConfig); #[async_trait::async_trait] +#[typetag::serde(name = "mqtt")] impl SinkConfig for MqttSinkConfig { async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { let connector = self.build_connector()?; diff --git a/src/sinks/mqtt/integration_tests.rs b/src/sinks/mqtt/integration_tests.rs index 33b4b84edd103..c47bc88d01e76 100644 --- a/src/sinks/mqtt/integration_tests.rs +++ b/src/sinks/mqtt/integration_tests.rs @@ -33,7 +33,7 @@ async fn mqtt_happy() { ..Default::default() }; - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let (sink, healthcheck) = cnf.build(cx).await.expect("Cannot build the sink"); healthcheck.await.expect("Health check failed"); diff --git a/src/sinks/mqtt/mod.rs b/src/sinks/mqtt/mod.rs index 9203b7af30a85..26df23d0d3fa6 100644 --- a/src/sinks/mqtt/mod.rs +++ b/src/sinks/mqtt/mod.rs @@ -1,4 +1,6 @@ mod config; +mod request_builder; +mod service; mod sink; #[cfg(all(test, feature = "mqtt-integration-tests"))] diff --git a/src/sinks/mqtt/request_builder.rs b/src/sinks/mqtt/request_builder.rs new file mode 100644 index 0000000000000..62b75910a5454 --- /dev/null +++ b/src/sinks/mqtt/request_builder.rs @@ -0,0 +1,92 @@ +use std::io; + +use bytes::{Bytes, BytesMut}; +use tokio_util::codec::Encoder as _; + +use crate::sinks::prelude::*; + +use super::{service::MqttRequest, sink::MqttEvent}; + +pub(super) struct MqttMetadata { + topic: String, + finalizers: EventFinalizers, +} + +pub(super) struct MqttEncoder { + pub(super) encoder: crate::codecs::Encoder<()>, + pub(super) transformer: crate::codecs::Transformer, +} + +impl encoding::Encoder for MqttEncoder { + fn encode_input( + &self, + mut input: Event, + writer: &mut dyn io::Write, + ) -> io::Result<(usize, GroupedCountByteSize)> { + let mut body = BytesMut::new(); + self.transformer.transform(&mut input); + + let mut byte_size = telemetry().create_request_count_byte_size(); + byte_size.add_event(&input, input.estimated_json_encoded_size_of()); + + let mut encoder = self.encoder.clone(); + encoder + .encode(input, &mut body) + .map_err(|_| io::Error::new(io::ErrorKind::Other, "unable to encode"))?; + + let body = body.freeze(); + write_all(writer, 1, body.as_ref())?; + + Ok((body.len(), byte_size)) + } +} + +pub(super) struct MqttRequestBuilder { + pub(super) encoder: MqttEncoder, +} + +impl RequestBuilder for MqttRequestBuilder { + type Metadata = MqttMetadata; + type Events = Event; + type Encoder = MqttEncoder; + type Payload = Bytes; + type Request = MqttRequest; + type Error = io::Error; + + fn compression(&self) -> Compression { + Compression::None + } + + fn encoder(&self) -> &Self::Encoder { + &self.encoder + } + + fn split_input( + &self, + mut input: MqttEvent, + ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) { + let builder = RequestMetadataBuilder::from_event(&input.event); + + let metadata = MqttMetadata { + topic: input.topic, + finalizers: input.event.take_finalizers(), + }; + + (metadata, builder, input.event) + } + + fn build_request( + &self, + mqtt_metadata: Self::Metadata, + metadata: RequestMetadata, + payload: EncodeResult, + ) -> Self::Request { + let body = payload.into_payload(); + MqttRequest { + body, + topic: mqtt_metadata.topic, + finalizers: mqtt_metadata.finalizers, + metadata, + } + } +} diff --git a/src/sinks/mqtt/service.rs b/src/sinks/mqtt/service.rs new file mode 100644 index 0000000000000..fcba9c808fdf6 --- /dev/null +++ b/src/sinks/mqtt/service.rs @@ -0,0 +1,92 @@ +use std::task::{Context, Poll}; + +use crate::sinks::prelude::*; +use bytes::Bytes; +use futures::future::BoxFuture; +use rumqttc::{AsyncClient, ClientError}; +use snafu::Snafu; + +use super::config::MqttQoS; + +pub(super) struct MqttResponse { + byte_size: usize, + json_size: GroupedCountByteSize, +} + +impl DriverResponse for MqttResponse { + fn event_status(&self) -> EventStatus { + EventStatus::Delivered + } + + fn events_sent(&self) -> &GroupedCountByteSize { + &self.json_size + } + + fn bytes_sent(&self) -> Option { + Some(self.byte_size) + } +} + +pub(super) struct MqttRequest { + pub(super) body: Bytes, + pub(super) topic: String, + pub(super) finalizers: EventFinalizers, + pub(super) metadata: RequestMetadata, +} + +impl Finalizable for MqttRequest { + fn take_finalizers(&mut self) -> EventFinalizers { + std::mem::take(&mut self.finalizers) + } +} + +impl MetaDescriptive for MqttRequest { + fn get_metadata(&self) -> &RequestMetadata { + &self.metadata + } + + fn metadata_mut(&mut self) -> &mut RequestMetadata { + &mut self.metadata + } +} + +pub(super) struct MqttService { + pub(super) client: AsyncClient, + pub(super) quality_of_service: MqttQoS, +} + +#[derive(Debug, Snafu)] +pub(super) enum MqttError { + #[snafu(display("error"))] + Error { error: ClientError }, +} + +impl Service for MqttService { + type Response = MqttResponse; + type Error = MqttError; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: MqttRequest) -> Self::Future { + let quality_of_service = self.quality_of_service; + let client = self.client.clone(); + + Box::pin(async move { + let byte_size = req.body.len(); + + let res = client + .publish(&req.topic, quality_of_service.into(), false, req.body) + .await; + match res { + Ok(()) => Ok(MqttResponse { + byte_size, + json_size: req.metadata.into_events_estimated_json_encoded_byte_size(), + }), + Err(error) => Err(MqttError::Error { error }), + } + }) + } +} diff --git a/src/sinks/mqtt/sink.rs b/src/sinks/mqtt/sink.rs index 6fef8dc990d56..8824ea302ab27 100644 --- a/src/sinks/mqtt/sink.rs +++ b/src/sinks/mqtt/sink.rs @@ -1,29 +1,16 @@ -use std::{collections::VecDeque, fmt::Debug}; - use async_trait::async_trait; -use bytes::BytesMut; -use futures::{pin_mut, stream::BoxStream, Stream, StreamExt}; +use futures::{stream::BoxStream, StreamExt}; use rumqttc::{AsyncClient, ClientError, ConnectionError, EventLoop, MqttOptions}; use snafu::{ResultExt, Snafu}; -use tokio_util::codec::Encoder as _; -use vector_common::internal_event::{ByteSize, CountByteSize, Output, Protocol}; -use vector_core::{ - event::EventFinalizers, - internal_event::{BytesSent, EventsSent, InternalEventHandle}, - ByteSizeOf, -}; +use vector_lib::tls::TlsError; -use crate::sinks::mqtt::config::{ConfigurationError, MqttQoS}; -use crate::{ - codecs::{Encoder, Transformer}, - emit, - event::{Event, EventStatus, Finalizable}, - internal_events::TemplateRenderingError, - internal_events::{ConnectionOpen, MqttClientError, MqttConnectionError, OpenGauge}, - sinks::mqtt::config::MqttSinkConfig, - sinks::util::StreamSink, - template::{Template, TemplateParseError}, - tls::TlsError, +use crate::sinks::prelude::*; + +use super::{ + config::{ConfigurationError, MqttQoS}, + request_builder::{MqttEncoder, MqttRequestBuilder}, + service::MqttService, + MqttSinkConfig, }; #[derive(Debug, Snafu)] @@ -67,140 +54,94 @@ pub struct MqttSink { transformer: Transformer, encoder: Encoder<()>, connector: MqttConnector, - finalizers_queue: VecDeque, quality_of_service: MqttQoS, } +pub(super) struct MqttEvent { + pub(super) topic: String, + pub(super) event: Event, +} + impl MqttSink { pub fn new(config: &MqttSinkConfig, connector: MqttConnector) -> crate::Result { let transformer = config.encoding.transformer(); let serializer = config.encoding.build()?; let encoder = Encoder::<()>::new(serializer); - let finalizers_queue = VecDeque::new(); Ok(Self { transformer, encoder, connector, - finalizers_queue, quality_of_service: config.quality_of_service, }) } - /// outgoing events main loop - async fn handle_events( - &mut self, - input: &mut I, - client: &mut AsyncClient, - connection: &mut EventLoop, - ) -> Result<(), ()> - where - I: Stream + Unpin, - { - let events_sent = register!(EventsSent::from(Output(None))); - let bytes_sent = register!(BytesSent::from(Protocol("mqtt".into()))); - - loop { - tokio::select! { - // handle connection errors - msg = connection.poll() => { - match msg { - Ok(rumqttc::Event::Outgoing(rumqttc::Outgoing::PubRel(_))) => { - // publish has been acknowledged by the MQTT server - if let Some(finalizers) = self.finalizers_queue.pop_front() { - finalizers.update_status(EventStatus::Delivered); - } - } - Ok(_) => {} - Err(error) => { - emit!(MqttConnectionError { error }); - return Err(()); - } - } - }, - - // handle outgoing events - event = input.next() => { - let mut event = if let Some(event) = event { - event - } else { - break; - }; - - let finalizers = event.take_finalizers(); - - let topic = match self.connector.topic.render_string(&event) { - Ok(topic) => topic, - Err(error) => { - emit!(TemplateRenderingError { - error, - field: Some("topic"), - drop_event: true, - }); - finalizers.update_status(EventStatus::Errored); - continue; - } - }; - - self.transformer.transform(&mut event); - - let event_byte_size = event.size_of(); - - let mut bytes = BytesMut::new(); - let message = match self.encoder.encode(event, &mut bytes) { - Ok(()) => { - bytes.to_vec() - } - Err(_) => { - finalizers.update_status(EventStatus::Errored); - continue; - } - }; - let message_len = message.len(); - - let retain = false; - match client.publish(&topic, self.quality_of_service.into(), retain, message).await { - Ok(()) => { - events_sent.emit(CountByteSize(1, event_byte_size)); - bytes_sent.emit(ByteSize(message_len)); - - self.finalizers_queue.push_back(finalizers); - } - Err(error) => { - emit!(MqttClientError { error }); - finalizers.update_status(EventStatus::Errored); - return Err(()); - } - } - }, + fn make_mqtt_event(&self, event: Event) -> Option { + let topic = self + .connector + .topic + .render_string(&event) + .map_err(|missing_keys| { + emit!(TemplateRenderingError { + error: missing_keys, + field: Some("topic"), + drop_event: true, + }) + }) + .ok()?; + + Some(MqttEvent { topic, event }) + } - else => break, + async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + let (client, mut connection) = self.connector.connect(); + + // This is necessary to keep the mqtt event loop moving forward. + tokio::spawn(async move { + loop { + // If an error is returned here there is currently no way to tie this back + // to the event that was posted which means we can't accurately provide + // delivery guarantees. + // We need this issue resolved first: + // https://github.com/bytebeamio/rumqtt/issues/349 + let _ = connection.poll().await; } - } - - Ok(()) + }); + + let service = ServiceBuilder::new().service(MqttService { + client, + quality_of_service: self.quality_of_service, + }); + + let request_builder = MqttRequestBuilder { + encoder: MqttEncoder { + encoder: self.encoder.clone(), + transformer: self.transformer.clone(), + }, + }; + + input + .filter_map(|event| std::future::ready(self.make_mqtt_event(event))) + .request_builder(default_request_builder_concurrency_limit(), request_builder) + .filter_map(|request| async move { + match request { + Err(e) => { + error!("Failed to build MQTT request: {:?}.", e); + None + } + Ok(req) => Some(req), + } + }) + .into_driver(service) + .protocol("mqtt") + .run() + .await } } #[async_trait] impl StreamSink for MqttSink { async fn run(mut self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { - let input = input.fuse().peekable(); - pin_mut!(input); - - let (client, connection) = self.connector.connect(); - pin_mut!(client); - pin_mut!(connection); - while input.as_mut().peek().await.is_some() { - let _open_token = OpenGauge::new().open(|count| emit!(ConnectionOpen { count })); - - let _result = self - .handle_events(&mut input, &mut client, &mut connection) - .await; - } - - let _ = client.disconnect().await; - - Ok(()) + self.run_inner(input).await } }