Skip to content

Commit

Permalink
Add test for the stdout sink
Browse files Browse the repository at this point in the history
  • Loading branch information
pondzix committed Aug 8, 2023
1 parent b9d05c9 commit 42a6716
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (c) 2013-2023 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0, and
* you may not use this file except in compliance with the Apache License
* Version 2.0. You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Apache License Version 2.0 is distributed on an "AS
* IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the Apache License Version 2.0 for the specific language
* governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.collectors.scalastream

import cats.effect.Sync
import cats.implicits._

import java.io.PrintStream
import java.util.Base64

class PrintingSink[F[_]: Sync](stream: PrintStream) extends Sink[F] {
private val encoder: Base64.Encoder = Base64.getEncoder.withoutPadding()

override val maxBytes: Int = Int.MaxValue // TODO: configurable?
override def isHealthy: F[Boolean] = Sync[F].pure(true)

override def storeRawEvents(events: List[Array[Byte]], key: String): F[Unit] =
events.traverse_ { event =>
Sync[F].delay {
stream.println(encoder.encodeToString(event))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,16 @@
*/
package com.snowplowanalytics.snowplow.collectors.scalastream

import cats.effect.{ExitCode, IO, IOApp, Sync}
import cats.effect.kernel.Resource
import cats.implicits._

import java.util.Base64
import java.io.PrintStream

import com.snowplowanalytics.snowplow.collectors.scalastream.model._
import cats.effect.{ExitCode, IO, IOApp}
import com.snowplowanalytics.snowplow.collectors.scalastream.generated.BuildInfo
import com.snowplowanalytics.snowplow.collectors.scalastream.model._

object StdoutCollector extends IOApp {

def run(args: List[String]): IO[ExitCode] = {
val good = Resource.pure[IO, Sink[IO]](printingSink(System.out))
val bad = Resource.pure[IO, Sink[IO]](printingSink(System.err))
val good = Resource.pure[IO, Sink[IO]](new PrintingSink[IO](System.out))
val bad = Resource.pure[IO, Sink[IO]](new PrintingSink[IO](System.err))
CollectorApp.run[IO](
good,
bad,
Expand All @@ -37,18 +32,4 @@ object StdoutCollector extends IOApp {
BuildInfo.version
)
}

private def printingSink[F[_]: Sync](stream: PrintStream): Sink[F] = new Sink[F] {
val maxBytes = Int.MaxValue // TODO: configurable?
def isHealthy: F[Boolean] = Sync[F].pure(true)

val encoder = Base64.getEncoder().withoutPadding()

def storeRawEvents(events: List[Array[Byte]], key: String): F[Unit] =
events.traverse_ { e =>
Sync[F].delay {
stream.println(encoder.encodeToString(e))
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (c) 2013-2022 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0, and
* you may not use this file except in compliance with the Apache License
* Version 2.0. You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Apache License Version 2.0 is distributed on an "AS
* IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the Apache License Version 2.0 for the specific language
* governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.collectors.scalastream.sinks

import cats.effect.IO
import cats.effect.unsafe.implicits.global
import com.snowplowanalytics.snowplow.collectors.scalastream.PrintingSink
import org.specs2.mutable.Specification

import java.io.{ByteArrayOutputStream, PrintStream}
import java.nio.charset.StandardCharsets

class PrintingSinkSpec extends Specification {

"Printing sink" should {
"print provided bytes encoded as BASE64 string" in {
val baos = new ByteArrayOutputStream()
val sink = new PrintingSink[IO](new PrintStream(baos))
val input = "Something"

sink.storeRawEvents(List(input.getBytes(StandardCharsets.UTF_8)), "key").unsafeRunSync()

baos.toString(StandardCharsets.UTF_8) must beEqualTo("U29tZXRoaW5n\n") // base64 of 'Something' + newline
}
}
}

0 comments on commit 42a6716

Please sign in to comment.