From 2f5f3b19e938aee20b9d7ad2638fbbd2eb85a4a1 Mon Sep 17 00:00:00 2001 From: Daniel Thomson Date: Thu, 6 Jun 2019 09:34:00 -0400 Subject: [PATCH 1/8] Fix connection URL appending props (#14) --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 5a52ba9..e44e6d3 100644 --- a/README.md +++ b/README.md @@ -46,7 +46,7 @@ or ### Connection URL -The URL has the form ``jdbc:ksql://:[?=,=...]`` +The URL has the form ``jdbc:ksql://:[?=&=...]`` where: From 092799eb0f4f34b1cdd7d1f089c57830e4c76f37 Mon Sep 17 00:00:00 2001 From: Mario Molina Date: Fri, 2 Aug 2019 18:05:32 -0500 Subject: [PATCH 2/8] Prepare for next version --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index c6832fd..49fb213 100644 --- a/build.sbt +++ b/build.sbt @@ -1,6 +1,6 @@ name := "ksql-jdbc-driver" -version := "1.0" +version := "1.1-SNAPSHOT" initialize := { assert(Integer.parseInt(sys.props("java.specification.version").split("\\.")(1)) >= 8, "Java 8 or above required") From 119ec0a96f695d27b9504261c39d9644116067db Mon Sep 17 00:00:00 2001 From: Mario Molina Date: Fri, 2 Aug 2019 21:55:06 -0500 Subject: [PATCH 3/8] Upgrade KSQL and Kafka versions --- build.sbt | 6 +++--- project/build.properties | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/build.sbt b/build.sbt index 49fb213..db8402d 100644 --- a/build.sbt +++ b/build.sbt @@ -12,9 +12,9 @@ resolvers += "Confluent Maven Repo" at "http://packages.confluent.io/maven/" resolvers += "Confluent Snapshots Maven Repo" at "https://s3-us-west-2.amazonaws.com/confluent-snapshots/" resolvers += Resolver.mavenLocal -libraryDependencies += "io.confluent.ksql" % "ksql-rest-app" % "5.1.2" -libraryDependencies += "org.apache.kafka" %% "kafka" % "2.1.1" % "test" -libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.5" % "test" +libraryDependencies += "io.confluent.ksql" % "ksql-rest-app" % "5.3.0" +libraryDependencies += "org.apache.kafka" %% "kafka" % "2.3.0" % "test" +libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.8" % "test" libraryDependencies += "org.scalamock" %% "scalamock-scalatest-support" % "3.6.0" % "test" libraryDependencies += "javax.ws.rs" % "javax.ws.rs-api" % "2.1.1" artifacts (Artifact("javax.ws.rs-api", "jar", "jar")) diff --git a/project/build.properties b/project/build.properties index d638b4f..da4bbed 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version = 0.13.8 \ No newline at end of file +sbt.version = 0.13.9 \ No newline at end of file From 623dfd38f220d4250532d3f4c7b613f29661db76 Mon Sep 17 00:00:00 2001 From: Mario Molina Date: Sat, 3 Aug 2019 07:57:44 -0500 Subject: [PATCH 4/8] Functionality for new KSQL version --- .../mmolimar/ksql/jdbc/KsqlConnection.scala | 28 +++--- .../ksql/jdbc/KsqlDatabaseMetaData.scala | 6 +- .../mmolimar/ksql/jdbc/KsqlDriver.scala | 2 +- .../mmolimar/ksql/jdbc/KsqlStatement.scala | 96 +++++++++---------- .../github/mmolimar/ksql/jdbc/package.scala | 4 +- .../ksql/jdbc/resultset/KsqlResultSet.scala | 32 +++---- .../resultset/KsqlResultSetMetaData.scala | 4 +- .../ksql/jdbc/resultset/ResultSet.scala | 42 ++++---- 8 files changed, 105 insertions(+), 109 deletions(-) diff --git a/src/main/scala/com/github/mmolimar/ksql/jdbc/KsqlConnection.scala b/src/main/scala/com/github/mmolimar/ksql/jdbc/KsqlConnection.scala index 4911045..6e39898 100644 --- a/src/main/scala/com/github/mmolimar/ksql/jdbc/KsqlConnection.scala +++ b/src/main/scala/com/github/mmolimar/ksql/jdbc/KsqlConnection.scala @@ -36,7 +36,7 @@ case class KsqlConnectionValues(ksqlServer: String, port: Int, config: Map[Strin class ConnectionNotSupported extends Connection with WrapperNotSupported { - override def commit: Unit = throw NotSupported("commit") + override def commit(): Unit = throw NotSupported("commit") override def getHoldability: Int = throw NotSupported("getHoldability") @@ -112,15 +112,15 @@ class ConnectionNotSupported extends Connection with WrapperNotSupported { override def createArrayOf(typeName: String, elements: scala.Array[AnyRef]): Array = throw NotSupported("createArrayOf") - override def setSavepoint: Savepoint = throw NotSupported("setSavepoint") + override def setSavepoint(): Savepoint = throw NotSupported("setSavepoint") override def setSavepoint(name: String): Savepoint = throw NotSupported("setSavepoint") - override def close: Unit = throw NotSupported("close") + override def close(): Unit = throw NotSupported("close") override def createNClob: NClob = throw NotSupported("createNClob") - override def rollback: Unit = throw NotSupported("rollback") + override def rollback(): Unit = throw NotSupported("rollback") override def rollback(savepoint: Savepoint): Unit = throw NotSupported("rollback") @@ -132,7 +132,7 @@ class ConnectionNotSupported extends Connection with WrapperNotSupported { override def getAutoCommit: Boolean = throw NotSupported("getAutoCommit") - override def clearWarnings: Unit = throw NotSupported("clearWarnings") + override def clearWarnings(): Unit = throw NotSupported("clearWarnings") override def getSchema: String = throw NotSupported("getSchema") @@ -159,15 +159,17 @@ class KsqlConnection(private[jdbc] val values: KsqlConnectionValues, properties: private var connected: Option[Boolean] = None private[jdbc] def init: KsqlRestClient = { - val props = if (values.properties) { - properties.asScala.toMap[String, AnyRef].asJava + val (localProps, clientProps) = if (values.properties) { + val local = properties.asScala.toMap[String, AnyRef].filterNot(_._1.toLowerCase.startsWith("ssl.")).asJava + val client = properties.asScala.toMap[String, String].filter(_._1.toLowerCase.startsWith("ssl.")).asJava + (local, client) } else { - Collections.emptyMap[String, AnyRef] + (Collections.emptyMap[String, AnyRef], Collections.emptyMap[String, String]) } - new KsqlRestClient(values.ksqlUrl, props) + new KsqlRestClient(values.ksqlUrl, localProps, clientProps) } - private[jdbc] def validate: Unit = { + private[jdbc] def validate(): Unit = { Try(ksqlClient.makeRootRequest) match { case Success(response) if response.isErroneous => throw CannotConnect(values.ksqlServer, response.getErrorMessage.getMessage) @@ -217,8 +219,8 @@ class KsqlConnection(private[jdbc] val values: KsqlConnectionValues, properties: override def setCatalog(catalog: String): Unit = {} - override def close: Unit = { - ksqlClient.close + override def close(): Unit = { + ksqlClient.close() connected = Some(false) } @@ -230,6 +232,6 @@ class KsqlConnection(private[jdbc] val values: KsqlConnectionValues, properties: override def getWarnings: SQLWarning = None.orNull - override def commit: Unit = {} + override def commit(): Unit = {} } diff --git a/src/main/scala/com/github/mmolimar/ksql/jdbc/KsqlDatabaseMetaData.scala b/src/main/scala/com/github/mmolimar/ksql/jdbc/KsqlDatabaseMetaData.scala index 429dc30..e1534ca 100644 --- a/src/main/scala/com/github/mmolimar/ksql/jdbc/KsqlDatabaseMetaData.scala +++ b/src/main/scala/com/github/mmolimar/ksql/jdbc/KsqlDatabaseMetaData.scala @@ -859,13 +859,13 @@ class KsqlDatabaseMetaData(private val ksqlConnection: KsqlConnection) extends D types: Set[String] = Set(".*")): Set[String] = { var functions = mutable.Set.empty[String] - (ksqlConnection.createStatement.executeQuery("LIST FUNCTIONS")).toStream.foreach { fn => + ksqlConnection.createStatement.executeQuery("LIST FUNCTIONS").toStream.foreach { fn => val fnName = fn.getString("FUNCTION_NAME_FN_NAME").toUpperCase ksqlConnection.createStatement.executeQuery(s"DESCRIBE FUNCTION $fnName").toStream.foreach { fnDesc => val fnAuthor = fnDesc.getString("FUNCTION_DESCRIPTION_AUTHOR").trim.toUpperCase val fnReturnType = fnDesc.getString("FUNCTION_DESCRIPTION_FN_RETURN_TYPE") - if ((types.filter(fnReturnType.matches(_)).nonEmpty || names.filter(fnName.matches(_)).nonEmpty) && + if ((types.exists(fnReturnType.matches(_)) || names.exists(fnName.matches(_))) && (author.isEmpty || author.get.toUpperCase == fnAuthor.toUpperCase)) { functions += fnName } @@ -874,7 +874,7 @@ class KsqlDatabaseMetaData(private val ksqlConnection: KsqlConnection) extends D functions.toSet } - private def validateCatalogAndSchema(catalog: String, schema: String) = { + private def validateCatalogAndSchema(catalog: String, schema: String): Unit = { if (catalog != null && catalog != "") throw UnknownCatalog(s"Unknown catalog $catalog") if (schema != null && schema != "") throw UnknownSchema(s"Unknown schema $schema") } diff --git a/src/main/scala/com/github/mmolimar/ksql/jdbc/KsqlDriver.scala b/src/main/scala/com/github/mmolimar/ksql/jdbc/KsqlDriver.scala index 0f05026..f9a1a5e 100644 --- a/src/main/scala/com/github/mmolimar/ksql/jdbc/KsqlDriver.scala +++ b/src/main/scala/com/github/mmolimar/ksql/jdbc/KsqlDriver.scala @@ -61,7 +61,7 @@ class KsqlDriver extends Driver { if (!acceptsURL(url)) throw InvalidUrl(url) val connection = buildConnection(KsqlDriver.parseUrl(url), properties) - connection.validate + connection.validate() connection } diff --git a/src/main/scala/com/github/mmolimar/ksql/jdbc/KsqlStatement.scala b/src/main/scala/com/github/mmolimar/ksql/jdbc/KsqlStatement.scala index 1306bcc..3d6e2d1 100644 --- a/src/main/scala/com/github/mmolimar/ksql/jdbc/KsqlStatement.scala +++ b/src/main/scala/com/github/mmolimar/ksql/jdbc/KsqlStatement.scala @@ -4,8 +4,8 @@ import java.io.InputStream import java.sql.{Connection, ResultSet, SQLWarning, Statement, Types} import com.github.mmolimar.ksql.jdbc.Exceptions._ -import com.github.mmolimar.ksql.jdbc.resultset._ -import io.confluent.ksql.parser.KsqlParser +import com.github.mmolimar.ksql.jdbc.resultset.{StreamedResultSet, _} +import io.confluent.ksql.parser.{DefaultKsqlParser, KsqlParser} import io.confluent.ksql.rest.client.{KsqlRestClient, RestResponse} import io.confluent.ksql.rest.entity.SchemaInfo.{Type => KsqlType} import io.confluent.ksql.rest.entity._ @@ -15,13 +15,13 @@ import scala.util.{Failure, Success, Try} private object KsqlStatement { - private val ksqlParser: KsqlParser = new KsqlParser + private val ksqlParser: KsqlParser = new DefaultKsqlParser } class StatementNotSupported extends Statement with WrapperNotSupported { - override def cancel: Unit = throw NotSupported("cancel") + override def cancel(): Unit = throw NotSupported("cancel") override def getResultSetHoldability: Int = throw NotSupported("getResultSetHoldability") @@ -87,11 +87,11 @@ class StatementNotSupported extends Statement with WrapperNotSupported { override def isPoolable: Boolean = throw NotSupported("isPoolable") - override def clearBatch: Unit = throw NotSupported("clearBatch") + override def clearBatch(): Unit = throw NotSupported("clearBatch") - override def close: Unit = throw NotSupported("close") + override def close(): Unit = throw NotSupported("close") - override def closeOnCompletion: Unit = throw NotSupported("closeOnCompletion") + override def closeOnCompletion(): Unit = throw NotSupported("closeOnCompletion") override def executeBatch: Array[Int] = throw NotSupported("executeBatch") @@ -99,7 +99,7 @@ class StatementNotSupported extends Statement with WrapperNotSupported { override def setFetchSize(rows: Int): Unit = throw NotSupported("setFetchSize") - override def clearWarnings: Unit = throw NotSupported("clearWarnings") + override def clearWarnings(): Unit = throw NotSupported("clearWarnings") override def getResultSetConcurrency: Int = throw NotSupported("getResultSetConcurrency") @@ -114,13 +114,13 @@ class KsqlStatement(private val ksqlClient: KsqlRestClient, val timeout: Long = private var maxRows = 0 private var closed = false - override def cancel: Unit = { - currentResultSet.map(_.close) + override def cancel(): Unit = { + currentResultSet.foreach(_.close) currentResultSet = None } - override def close: Unit = { - cancel + override def close(): Unit = { + cancel() closed = true } @@ -132,12 +132,19 @@ class KsqlStatement(private val ksqlClient: KsqlRestClient, val timeout: Long = override def getMaxRows: Int = maxRows + override def getMoreResults: Boolean = !isClosed && currentResultSet.isDefined && (currentResultSet.get match { + case srs: StreamedResultSet => srs.hasNext + case irs: IteratorResultSet[_] => irs.rows.hasNext + }) + + override def getMoreResults(current: Int): Boolean = getMoreResults + private def executeKsqlRequest(sql: String): Unit = { if (closed) throw AlreadyClosed("Statement already closed.") currentResultSet = None val fixedSql = fixSql(sql) - val stmt = Try(ksqlParser.getStatements(fixedSql)) match { + val stmt = Try(ksqlParser.parse(fixedSql)) match { case Failure(e) => throw KsqlQueryError(s"Error parsing query '$fixedSql': ${e.getMessage}.", e) case Success(s) if s.size() != 1 => throw KsqlQueryError("You have to execute just one query at a time. " + s"Number of queries sent: '${s.size}'.") @@ -146,9 +153,9 @@ class KsqlStatement(private val ksqlClient: KsqlRestClient, val timeout: Long = val response = stmt.getStatement.statement.getStart.getText.trim.toUpperCase match { case "SELECT" => - ksqlClient.makeQueryRequest(fixedSql).asInstanceOf[RestResponse[AnyRef]] + ksqlClient.makeQueryRequest(fixedSql, None.orNull).asInstanceOf[RestResponse[AnyRef]] case "PRINT" => - ksqlClient.makePrintTopicRequest(fixedSql).asInstanceOf[RestResponse[AnyRef]] + ksqlClient.makePrintTopicRequest(fixedSql, None.orNull).asInstanceOf[RestResponse[AnyRef]] case _ => ksqlClient.makeKsqlRequest(fixedSql).asInstanceOf[RestResponse[AnyRef]] } @@ -159,8 +166,8 @@ class KsqlStatement(private val ksqlClient: KsqlRestClient, val timeout: Long = } val resultSet: ResultSet = response.get match { - case e: KsqlRestClient.QueryStream => { - implicit lazy val queryDesc = { + case e: KsqlRestClient.QueryStream => + implicit lazy val queryDesc: QueryDescription = { val response = ksqlClient.makeKsqlRequest(s"EXPLAIN $fixedSql") if (response.isErroneous) { import scala.collection.JavaConverters._ @@ -173,7 +180,6 @@ class KsqlStatement(private val ksqlClient: KsqlRestClient, val timeout: Long = }.getQueryDescription e.asInstanceOf[KsqlRestClient.QueryStream] - } case e: KsqlEntityList => e.asInstanceOf[KsqlEntityList] case e: InputStream => e.asInstanceOf[InputStream] } @@ -240,8 +246,8 @@ class KsqlStatement(private val ksqlClient: KsqlRestClient, val timeout: Long = import KsqlEntityHeaders._ if (list.size != 1) throw KsqlEntityListError(s"KSQL entity list with an invalid number of entities: '${list.size}'.") - list.asScala.headOption.map(_ match { - case e: CommandStatusEntity => { + list.asScala.headOption.map { + case e: CommandStatusEntity => val rows = Iterator(Seq( e.getCommandId.getType.name, e.getCommandId.getEntity, @@ -250,9 +256,8 @@ class KsqlStatement(private val ksqlClient: KsqlRestClient, val timeout: Long = e.getCommandStatus.getMessage )) new IteratorResultSet[String](commandStatusEntity, maxRows, rows) - } case e: ExecutionPlan => new IteratorResultSet[String](executionPlanEntity, maxRows, Iterator(Seq(e.getExecutionPlan))) - case e: FunctionDescriptionList => { + case e: FunctionDescriptionList => val rows = e.getFunctions.asScala.map(f => Seq( e.getAuthor, e.getDescription, @@ -265,15 +270,13 @@ class KsqlStatement(private val ksqlClient: KsqlRestClient, val timeout: Long = f.getArguments.asScala.map(arg => s"${arg.getName}:${arg.getType}").mkString(", ") )).toIterator new IteratorResultSet[String](functionDescriptionListEntity, maxRows, rows) - } - case e: FunctionNameList => { + case e: FunctionNameList => val rows = e.getFunctions.asScala.map(f => Seq( f.getName, f.getType.name )).toIterator new IteratorResultSet[String](functionNameListEntity, maxRows, rows) - } - case e: KafkaTopicsList => { + case e: KafkaTopicsList => val rows = e.getTopics.asScala.map(t => Seq( t.getName, t.getConsumerCount, @@ -282,35 +285,30 @@ class KsqlStatement(private val ksqlClient: KsqlRestClient, val timeout: Long = t.getReplicaInfo.asScala.mkString(", ") )).toIterator new IteratorResultSet[Any](kafkaTopicsListEntity, maxRows, rows) - } - case e: KsqlTopicsList => { + case e: KsqlTopicsList => val rows = e.getTopics.asScala.map(t => Seq( t.getName, t.getKafkaTopic, t.getFormat.name )).toIterator new IteratorResultSet[String](ksqlTopicsListEntity, maxRows, rows) - } - case e: PropertiesList => { + case e: PropertiesList => val rows = e.getProperties.asScala.map(p => Seq( p._1, p._2.toString )).toIterator new IteratorResultSet[String](propertiesListEntity, maxRows, rows) - } - case e: Queries => { + case e: Queries => val rows = e.getQueries.asScala.map(q => Seq( q.getId.getId, q.getQueryString, q.getSinks.asScala.mkString(", ") )).toIterator new IteratorResultSet[String](queriesEntity, maxRows, rows) - } - case e@(_: QueryDescriptionEntity | _: QueryDescriptionList) => { - val descriptions: Seq[QueryDescription] = if (e.isInstanceOf[QueryDescriptionEntity]) { - Seq(e.asInstanceOf[QueryDescriptionEntity].getQueryDescription) - } else { - e.asInstanceOf[QueryDescriptionList].getQueryDescriptions.asScala + case e@(_: QueryDescriptionEntity | _: QueryDescriptionList) => + val descriptions: Seq[QueryDescription] = e match { + case qde: QueryDescriptionEntity => Seq(qde.getQueryDescription) + case qdl: QueryDescriptionList => qdl.getQueryDescriptions.asScala } val rows = descriptions.map(d => Seq( @@ -322,12 +320,10 @@ class KsqlStatement(private val ksqlClient: KsqlRestClient, val timeout: Long = d.getExecutionPlan )).toIterator new IteratorResultSet[String](queryDescriptionEntityList, maxRows, rows) - } - case e@(_: SourceDescriptionEntity | _: SourceDescriptionList) => { - val descriptions: Seq[SourceDescription] = if (e.isInstanceOf[SourceDescriptionEntity]) { - Seq(e.asInstanceOf[SourceDescriptionEntity].getSourceDescription) - } else { - e.asInstanceOf[SourceDescriptionList].getSourceDescriptions.asScala + case e@(_: SourceDescriptionEntity | _: SourceDescriptionList) => + val descriptions: Seq[SourceDescription] = e match { + case sde: SourceDescriptionEntity => Seq(sde.getSourceDescription) + case sdl: SourceDescriptionList => sdl.getSourceDescriptions.asScala } val rows = descriptions.map(d => Seq( @@ -343,16 +339,14 @@ class KsqlStatement(private val ksqlClient: KsqlRestClient, val timeout: Long = d.getTimestamp )).toIterator new IteratorResultSet[Any](sourceDescriptionEntityList, maxRows, rows) - } - case e: StreamsList => { + case e: StreamsList => val rows = e.getStreams.asScala.map(s => Seq( s.getName, s.getTopic, s.getFormat )).toIterator new IteratorResultSet[String](streamsListEntity, maxRows, rows) - } - case e: TablesList => { + case e: TablesList => val rows = e.getTables.asScala.map(t => Seq( t.getName, t.getTopic, @@ -360,8 +354,7 @@ class KsqlStatement(private val ksqlClient: KsqlRestClient, val timeout: Long = t.getIsWindowed )).toIterator new IteratorResultSet[Any](tablesListEntity, maxRows, rows) - } - case e: TopicDescription => { + case e: TopicDescription => val rows = Iterator(Seq( e.getName, e.getKafkaTopic, @@ -369,8 +362,7 @@ class KsqlStatement(private val ksqlClient: KsqlRestClient, val timeout: Long = e.getSchemaString )) new IteratorResultSet[String](topicDescriptionEntity, maxRows, rows) - } - }).getOrElse(throw KsqlCommandError(s"Cannot build result set for '${list.get(0).getStatementText}'.")) + }.getOrElse(throw KsqlCommandError(s"Cannot build result set for '${list.get(0).getStatementText}'.")) } } diff --git a/src/main/scala/com/github/mmolimar/ksql/jdbc/package.scala b/src/main/scala/com/github/mmolimar/ksql/jdbc/package.scala index e1b8d06..14f5f9f 100644 --- a/src/main/scala/com/github/mmolimar/ksql/jdbc/package.scala +++ b/src/main/scala/com/github/mmolimar/ksql/jdbc/package.scala @@ -10,9 +10,9 @@ package object jdbc { def toStream: Stream[ResultSet] = new Iterator[ResultSet] { - def hasNext = resultSet.next + def hasNext(): Boolean = resultSet.next - def next = resultSet + def next(): ResultSet = resultSet }.toStream } diff --git a/src/main/scala/com/github/mmolimar/ksql/jdbc/resultset/KsqlResultSet.scala b/src/main/scala/com/github/mmolimar/ksql/jdbc/resultset/KsqlResultSet.scala index 508eb8f..a96562d 100644 --- a/src/main/scala/com/github/mmolimar/ksql/jdbc/resultset/KsqlResultSet.scala +++ b/src/main/scala/com/github/mmolimar/ksql/jdbc/resultset/KsqlResultSet.scala @@ -28,7 +28,7 @@ class IteratorResultSet[T <: Any](private val metadata: ResultSetMetaData, priva override protected def getColumnBounds: (Int, Int) = (1, currentRow.getOrElse(Seq.empty).size) - override protected def closeInherit: Unit = {} + override protected def closeInherit(): Unit = {} } @@ -36,7 +36,7 @@ trait KsqlStream extends Closeable with JIterator[StreamedRow] private[jdbc] class KsqlQueryStream(stream: KsqlRestClient.QueryStream) extends KsqlStream { - override def close: Unit = stream.close + override def close(): Unit = stream.close() override def hasNext: Boolean = stream.hasNext @@ -48,9 +48,9 @@ private[jdbc] class KsqlInputStream(stream: InputStream) extends KsqlStream { private var isClosed = false private lazy val scanner = new Scanner(stream) - override def close: Unit = { + override def close(): Unit = { isClosed = true - scanner.close + scanner.close() } override def hasNext: Boolean = { @@ -65,8 +65,8 @@ private[jdbc] class KsqlInputStream(stream: InputStream) extends KsqlStream { } -class StreamedResultSet(private val metadata: ResultSetMetaData, - private val stream: KsqlStream, private val maxRows: Long, val timeout: Long = 0) +class StreamedResultSet(private[jdbc] val metadata: ResultSetMetaData, + private[jdbc] val stream: KsqlStream, private[resultset] val maxRows: Long, val timeout: Long = 0) extends AbstractResultSet[StreamedRow](metadata, maxRows, stream) { private val emptyRow: StreamedRow = StreamedRow.row(new GenericRow) @@ -74,15 +74,15 @@ class StreamedResultSet(private val metadata: ResultSetMetaData, private val waitDuration = if (timeout > 0) timeout millis else Duration.Inf protected override def nextResult: Boolean = { - def hasNext = stream.hasNext match { - case true => - stream.next match { - case record if Option(record.getRow) == None => false - case record => - currentRow = Some(record) - true - } - case false => false + def hasNext = if (stream.hasNext) { + stream.next match { + case record if Option(record.getRow).isEmpty => false + case record => + currentRow = Some(record) + true + } + } else { + false } Try(Await.result(Future(hasNext), waitDuration)) match { @@ -92,7 +92,7 @@ class StreamedResultSet(private val metadata: ResultSetMetaData, } } - override protected def closeInherit: Unit = stream.close + override protected def closeInherit(): Unit = stream.close() override protected def getColumnBounds: (Int, Int) = (1, currentRow.getOrElse(emptyRow).getRow.getColumns.size) diff --git a/src/main/scala/com/github/mmolimar/ksql/jdbc/resultset/KsqlResultSetMetaData.scala b/src/main/scala/com/github/mmolimar/ksql/jdbc/resultset/KsqlResultSetMetaData.scala index eaec618..2792a82 100644 --- a/src/main/scala/com/github/mmolimar/ksql/jdbc/resultset/KsqlResultSetMetaData.scala +++ b/src/main/scala/com/github/mmolimar/ksql/jdbc/resultset/KsqlResultSetMetaData.scala @@ -12,8 +12,8 @@ class KsqlResultSetMetaData(private[jdbc] val columns: List[HeaderField]) extend private val fieldByIndex: Map[Int, HeaderField] = columns - private def getField(index: Int): HeaderField = fieldByIndex.get(index) - .getOrElse(throw InvalidColumn(s"Column with index '$index' does not exist.")) + private def getField(index: Int): HeaderField = fieldByIndex.getOrElse(index, + throw InvalidColumn(s"Column with index '$index' does not exist.")) override def getColumnClassName(column: Int): String = { getField(column).jdbcType match { diff --git a/src/main/scala/com/github/mmolimar/ksql/jdbc/resultset/ResultSet.scala b/src/main/scala/com/github/mmolimar/ksql/jdbc/resultset/ResultSet.scala index 3fdd931..3e75507 100644 --- a/src/main/scala/com/github/mmolimar/ksql/jdbc/resultset/ResultSet.scala +++ b/src/main/scala/com/github/mmolimar/ksql/jdbc/resultset/ResultSet.scala @@ -38,7 +38,7 @@ private[resultset] class ResultSetNotSupported extends ResultSet with WrapperNot override def updateNString(columnLabel: String, nString: String): Unit = throw NotSupported("updateNString") - override def clearWarnings: Unit = throw NotSupported("clearWarnings") + override def clearWarnings(): Unit = throw NotSupported("clearWarnings") override def updateTimestamp(columnIndex: Int, x: Timestamp): Unit = throw NotSupported("updateTimestamp") @@ -70,7 +70,7 @@ private[resultset] class ResultSetNotSupported extends ResultSet with WrapperNot override def getBinaryStream(columnLabel: String): InputStream = throw NotSupported("getBinaryStream") - override def beforeFirst: Unit = throw NotSupported("beforeFirst") + override def beforeFirst(): Unit = throw NotSupported("beforeFirst") override def updateNCharacterStream(columnIndex: Int, x: Reader, length: Long): Unit = throw NotSupported("updateNCharacterStream") @@ -138,9 +138,9 @@ private[resultset] class ResultSetNotSupported extends ResultSet with WrapperNot override def getURL(columnLabel: String): URL = throw NotSupported("getURL") - override def updateRow: Unit = throw NotSupported("updateRow") + override def updateRow(): Unit = throw NotSupported("updateRow") - override def insertRow: Unit = throw NotSupported("insertRow") + override def insertRow(): Unit = throw NotSupported("insertRow") override def getMetaData: ResultSetMetaData = throw NotSupported("getMetaData") @@ -166,7 +166,7 @@ private[resultset] class ResultSetNotSupported extends ResultSet with WrapperNot override def getRowId(columnLabel: String): RowId = throw NotSupported("getRowId") - override def moveToInsertRow: Unit = throw NotSupported("moveToInsertRow") + override def moveToInsertRow(): Unit = throw NotSupported("moveToInsertRow") override def rowInserted: Boolean = throw NotSupported("rowInserted") @@ -198,15 +198,15 @@ private[resultset] class ResultSetNotSupported extends ResultSet with WrapperNot override def updateFloat(columnLabel: String, x: Float): Unit = throw NotSupported("updateFloat") - override def afterLast: Unit = throw NotSupported("afterLast") + override def afterLast(): Unit = throw NotSupported("afterLast") - override def refreshRow: Unit = throw NotSupported("refreshRow") + override def refreshRow(): Unit = throw NotSupported("refreshRow") override def getNString(columnIndex: Int): String = throw NotSupported("getNString") override def getNString(columnLabel: String): String = throw NotSupported("getNString") - override def deleteRow: Unit = throw NotSupported("deleteRow") + override def deleteRow(): Unit = throw NotSupported("deleteRow") override def getConcurrency: Int = throw NotSupported("getConcurrency") @@ -284,7 +284,7 @@ private[resultset] class ResultSetNotSupported extends ResultSet with WrapperNot override def getNCharacterStream(columnLabel: String): Reader = throw NotSupported("getNCharacterStream") - override def close: Unit = throw NotSupported("close") + override def close(): Unit = throw NotSupported("close") override def relative(rows: Int): Boolean = throw NotSupported("relative") @@ -304,7 +304,7 @@ private[resultset] class ResultSetNotSupported extends ResultSet with WrapperNot override def updateLong(columnLabel: String, x: Long): Unit = throw NotSupported("updateLong") - override def moveToCurrentRow: Unit = throw NotSupported("moveToCurrentRow") + override def moveToCurrentRow(): Unit = throw NotSupported("moveToCurrentRow") override def isClosed: Boolean = throw NotSupported("isClosed") @@ -340,7 +340,7 @@ private[resultset] class ResultSetNotSupported extends ResultSet with WrapperNot override def getStatement: Statement = throw NotSupported("getStatement") - override def cancelRowUpdates: Unit = throw NotSupported("cancelRowUpdates") + override def cancelRowUpdates(): Unit = throw NotSupported("cancelRowUpdates") override def getSQLXML(columnIndex: Int): SQLXML = throw NotSupported("getSQLXML") @@ -405,7 +405,7 @@ private[resultset] abstract class AbstractResultSet[T](private val metadata: Res private val records: Iterator[T]) extends ResultSetNotSupported { private val indexByLabel: Map[String, Int] = (1 to metadata.getColumnCount) - .map(index => (metadata.getColumnLabel(index).toUpperCase -> index)).toMap + .map(index => metadata.getColumnLabel(index).toUpperCase -> index).toMap private var lastColumnNull = true private var rowCounter = 0 @@ -413,6 +413,8 @@ private[resultset] abstract class AbstractResultSet[T](private val metadata: Res private var closed: Boolean = false + def hasNext: Boolean = !closed && maxRows != 0 && rowCounter < maxRows && records.hasNext + protected def nextResult: Boolean = records.hasNext match { case true => currentRow = Some(records.next) @@ -429,15 +431,15 @@ private[resultset] abstract class AbstractResultSet[T](private val metadata: Res result } - override final def close: Unit = closed match { + override final def close(): Unit = closed match { case true => // do nothing case false => currentRow = None - closeInherit + closeInherit() closed = true } - protected def closeInherit: Unit + protected def closeInherit(): Unit override def getBoolean(columnIndex: Int): Boolean = getColumn[JBoolean](columnIndex) @@ -492,16 +494,16 @@ private[resultset] abstract class AbstractResultSet[T](private val metadata: Res result } - private def checkRow(columnIndex: Int) = { - def checkIfEmpty = if (isEmpty) throw EmptyRow() + private def checkRow(columnIndex: Int): Unit = { + def checkIfEmpty(): Unit = if (isEmpty) throw EmptyRow() - def checkColumnBounds(index: Int) = { + def checkColumnBounds(index: Int): Unit = { val (min, max) = getColumnBounds if (index < min || index > max) throw InvalidColumn(s"Column with index $index does not exist") } - checkIfEmpty + checkIfEmpty() checkColumnBounds(columnIndex) } @@ -556,7 +558,7 @@ private[resultset] abstract class AbstractResultSet[T](private val metadata: Res protected def isEmpty: Boolean = currentRow.isEmpty protected def getColumnIndex(columnLabel: String): Int = { - indexByLabel.get(columnLabel.toUpperCase).getOrElse(throw InvalidColumn()) + indexByLabel.getOrElse(columnLabel.toUpperCase, throw InvalidColumn()) } protected def getColumnBounds: (Int, Int) From 8063a84f9a2d1e37355b99647952c3955d59c9c7 Mon Sep 17 00:00:00 2001 From: Mario Molina Date: Sat, 3 Aug 2019 08:38:52 -0500 Subject: [PATCH 5/8] More verbosity when explaining SQL queries --- .../scala/com/github/mmolimar/ksql/jdbc/KsqlStatement.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/scala/com/github/mmolimar/ksql/jdbc/KsqlStatement.scala b/src/main/scala/com/github/mmolimar/ksql/jdbc/KsqlStatement.scala index 3d6e2d1..28c7095 100644 --- a/src/main/scala/com/github/mmolimar/ksql/jdbc/KsqlStatement.scala +++ b/src/main/scala/com/github/mmolimar/ksql/jdbc/KsqlStatement.scala @@ -171,8 +171,9 @@ class KsqlStatement(private val ksqlClient: KsqlRestClient, val timeout: Long = val response = ksqlClient.makeKsqlRequest(s"EXPLAIN $fixedSql") if (response.isErroneous) { import scala.collection.JavaConverters._ + val stacktrace = response.getErrorMessage.getStackTrace.asScala.mkString("\n").trim throw KsqlQueryError(s"Error getting metadata for query: '$fixedSql'. " + - s"Error: ${response.getErrorMessage.getStackTrace.asScala.mkString("\n")}.") + s"Error message: ${response.getErrorMessage.getMessage}.${if (stacktrace.nonEmpty) s"Stacktrace: $stacktrace."}") } else if (response.getResponse.size != 1) { throw KsqlEntityListError("Invalid metadata for result set.") } From a7385074b47cba6a0f66fa4a9ba1aa45b95757a3 Mon Sep 17 00:00:00 2001 From: Mario Molina Date: Sat, 3 Aug 2019 08:39:18 -0500 Subject: [PATCH 6/8] Updating unit tests --- .../ksql/jdbc/KsqlConnectionSpec.scala | 16 ++-- .../ksql/jdbc/KsqlDatabaseMetaDataSpec.scala | 28 +++---- .../ksql/jdbc/KsqlStatementSpec.scala | 77 +++++++++++-------- .../mmolimar/ksql/jdbc/utils/TestUtils.scala | 18 ++--- 4 files changed, 75 insertions(+), 64 deletions(-) diff --git a/src/test/scala/com/github/mmolimar/ksql/jdbc/KsqlConnectionSpec.scala b/src/test/scala/com/github/mmolimar/ksql/jdbc/KsqlConnectionSpec.scala index 97943d6..606c2d0 100644 --- a/src/test/scala/com/github/mmolimar/ksql/jdbc/KsqlConnectionSpec.scala +++ b/src/test/scala/com/github/mmolimar/ksql/jdbc/KsqlConnectionSpec.scala @@ -23,7 +23,7 @@ class KsqlConnectionSpec extends WordSpec with Matchers with MockFactory { "throw not supported exception if not supported" in { val methods = implementedMethods[KsqlConnection] - reflectMethods[KsqlConnection](methods, false, ksqlConnection) + reflectMethods[KsqlConnection](methods = methods, implemented = false, obj = ksqlConnection) .foreach(method => { assertThrows[SQLFeatureNotSupportedException] { method() @@ -39,11 +39,11 @@ class KsqlConnectionSpec extends WordSpec with Matchers with MockFactory { ksqlConnection.getTransactionIsolation should be(Connection.TRANSACTION_NONE) ksqlConnection.setClientInfo(new Properties) - (ksqlRestClient.makeKsqlRequest _).expects(*) + (ksqlRestClient.makeKsqlRequest(_: String)).expects(*) .returns(RestResponse.successful[KsqlEntityList](new KsqlEntityList)) ksqlConnection.setClientInfo("", "") assertThrows[SQLException] { - (ksqlRestClient.makeKsqlRequest _).expects(*) + (ksqlRestClient.makeKsqlRequest(_: String)).expects(*) .returns(RestResponse.erroneous(new KsqlErrorMessage(-1, "", Collections.emptyList[String]))) ksqlConnection.setClientInfo("", "") } @@ -55,9 +55,9 @@ class KsqlConnectionSpec extends WordSpec with Matchers with MockFactory { (new CommandStatuses(Collections.emptyMap[CommandId, CommandStatus.Status]))) ksqlConnection.isValid(0) should be(true) - Option(ksqlConnection.getMetaData) should not be (None) + Option(ksqlConnection.getMetaData) should not be None - Option(ksqlConnection.createStatement) should not be (None) + Option(ksqlConnection.createStatement) should not be None assertThrows[SQLFeatureNotSupportedException] { ksqlConnection.createStatement(-1, -1) } @@ -70,9 +70,9 @@ class KsqlConnectionSpec extends WordSpec with Matchers with MockFactory { ksqlConnection.getCatalog should be(None.orNull) (ksqlRestClient.close _).expects - ksqlConnection.close + ksqlConnection.close() ksqlConnection.isClosed should be(true) - ksqlConnection.commit + ksqlConnection.commit() } } } @@ -84,7 +84,7 @@ class KsqlConnectionSpec extends WordSpec with Matchers with MockFactory { "throw not supported exception if not supported" in { val resultSet = new ConnectionNotSupported - reflectMethods[ConnectionNotSupported](Seq.empty, false, resultSet) + reflectMethods[ConnectionNotSupported](methods = Seq.empty, implemented = false, obj = resultSet) .foreach(method => { assertThrows[SQLFeatureNotSupportedException] { method() diff --git a/src/test/scala/com/github/mmolimar/ksql/jdbc/KsqlDatabaseMetaDataSpec.scala b/src/test/scala/com/github/mmolimar/ksql/jdbc/KsqlDatabaseMetaDataSpec.scala index 085adbf..60a07ee 100644 --- a/src/test/scala/com/github/mmolimar/ksql/jdbc/KsqlDatabaseMetaDataSpec.scala +++ b/src/test/scala/com/github/mmolimar/ksql/jdbc/KsqlDatabaseMetaDataSpec.scala @@ -31,12 +31,12 @@ class KsqlDatabaseMetaDataSpec extends WordSpec with Matchers with MockFactory w "throw not supported exception if not supported" in { (mockResponse.getEntity _).expects.returns(mock[InputStream]).once - (mockedKsqlRestClient.makeQueryRequest _).expects(*) + (mockedKsqlRestClient.makeQueryRequest _).expects(*, *) .returns(RestResponse.successful[KsqlRestClient.QueryStream](mockQueryStream(mockResponse))) .anyNumberOfTimes val methods = implementedMethods[KsqlDatabaseMetaData] - reflectMethods[KsqlDatabaseMetaData](methods, false, metadata) + reflectMethods[KsqlDatabaseMetaData](methods = methods, implemented = false, obj = metadata) .foreach(method => { assertThrows[SQLFeatureNotSupportedException] { method() @@ -50,7 +50,7 @@ class KsqlDatabaseMetaDataSpec extends WordSpec with Matchers with MockFactory w val methods = implementedMethods[KsqlDatabaseMetaData] .filterNot(specialMethods.contains(_)) - reflectMethods[KsqlDatabaseMetaData](methods, true, metadata) + reflectMethods[KsqlDatabaseMetaData](methods = methods, implemented = true, obj = metadata) .foreach(method => { method() }) @@ -59,20 +59,20 @@ class KsqlDatabaseMetaDataSpec extends WordSpec with Matchers with MockFactory w metadata.getTables("", "", "", Array[String]("test")) } - (mockedKsqlRestClient.makeKsqlRequest _).expects(*) + (mockedKsqlRestClient.makeKsqlRequest(_: String)).expects(*) .returns(RestResponse.erroneous(new KsqlErrorMessage(-1, "error message", Collections.emptyList[String]))) .once assertThrows[SQLException] { metadata.getTables("", "", "", Array[String](TableTypes.TABLE.name)) } - (mockedKsqlRestClient.makeKsqlRequest _).expects(*) + (mockedKsqlRestClient.makeKsqlRequest(_: String)).expects(*) .returns(RestResponse.successful[KsqlEntityList](new KsqlEntityList)) .twice metadata.getTables("", "", "[a-z]*", Array[String](TableTypes.TABLE.name, TableTypes.STREAM.name)).next should be(false) - (mockedKsqlRestClient.makeKsqlRequest _).expects(*) + (mockedKsqlRestClient.makeKsqlRequest(_: String)).expects(*) .returns(RestResponse.successful[KsqlEntityList](new KsqlEntityList)) .twice metadata.getColumns("", "", "", "").next should be(false) @@ -97,15 +97,15 @@ class KsqlDatabaseMetaDataSpec extends WordSpec with Matchers with MockFactory w val descFn1 = new FunctionDescriptionList("DESCRIBE FUNCTION testfn;", "TESTFN", "Description", "Confluent", "version", "path", List( - new FunctionInfo(List(new ArgumentInfo("arg1", "INT", "Description")).asJava, "BIGINT", "Description"), - new FunctionInfo(List(new ArgumentInfo("arg1", "INT", "Description")).asJava, "STRING", "Description") + new FunctionInfo(List(new ArgumentInfo("arg1", "INT", "Description", false)).asJava, "BIGINT", "Description"), + new FunctionInfo(List(new ArgumentInfo("arg1", "INT", "Description", false)).asJava, "STRING", "Description") ).asJava, FunctionType.scalar ) val descFn2 = new FunctionDescriptionList("DESCRIBE FUNCTION testdatefn;", "TESTDATEFN", "Description", "Unknown", "version", "path", List( - new FunctionInfo(List(new ArgumentInfo("arg1", "INT", "Description")).asJava, "BIGINT", "Description") + new FunctionInfo(List(new ArgumentInfo("arg1", "INT", "Description", false)).asJava, "BIGINT", "Description") ).asJava, FunctionType.scalar ) @@ -114,13 +114,13 @@ class KsqlDatabaseMetaDataSpec extends WordSpec with Matchers with MockFactory w val entityDescribeFn2 = new KsqlEntityList entityDescribeFn2.add(descFn2) - (mockedKsqlRestClient.makeKsqlRequest _).expects("LIST FUNCTIONS;") + (mockedKsqlRestClient.makeKsqlRequest(_: String)).expects("LIST FUNCTIONS;") .returns(RestResponse.successful[KsqlEntityList](entityListFn)) .repeat(4) - (mockedKsqlRestClient.makeKsqlRequest _).expects("DESCRIBE FUNCTION TESTFN;") + (mockedKsqlRestClient.makeKsqlRequest(_: String)).expects("DESCRIBE FUNCTION TESTFN;") .returns(RestResponse.successful[KsqlEntityList](entityDescribeFn1)) .repeat(4) - (mockedKsqlRestClient.makeKsqlRequest _).expects("DESCRIBE FUNCTION TESTDATEFN;") + (mockedKsqlRestClient.makeKsqlRequest(_: String)).expects("DESCRIBE FUNCTION TESTDATEFN;") .returns(RestResponse.successful[KsqlEntityList](entityDescribeFn2)) .repeat(4) @@ -129,7 +129,7 @@ class KsqlDatabaseMetaDataSpec extends WordSpec with Matchers with MockFactory w metadata.getSystemFunctions should be("TESTFN") metadata.getTimeDateFunctions should be("TESTDATEFN") - Option(metadata.getConnection) should not be (None) + Option(metadata.getConnection) should not be None metadata.getCatalogs.next should be(false) metadata.getCatalogTerm should be("TOPIC") metadata.getSchemaTerm should be("") @@ -246,7 +246,7 @@ class KsqlDatabaseMetaDataSpec extends WordSpec with Matchers with MockFactory w "throw not supported exception if not supported" in { val metadata = new DatabaseMetaDataNotSupported - reflectMethods[DatabaseMetaDataNotSupported](Seq.empty, false, metadata) + reflectMethods[DatabaseMetaDataNotSupported](methods = Seq.empty, implemented = false, obj = metadata) .foreach(method => { assertThrows[SQLFeatureNotSupportedException] { method() diff --git a/src/test/scala/com/github/mmolimar/ksql/jdbc/KsqlStatementSpec.scala b/src/test/scala/com/github/mmolimar/ksql/jdbc/KsqlStatementSpec.scala index 6af21a9..08be5e0 100644 --- a/src/test/scala/com/github/mmolimar/ksql/jdbc/KsqlStatementSpec.scala +++ b/src/test/scala/com/github/mmolimar/ksql/jdbc/KsqlStatementSpec.scala @@ -4,14 +4,18 @@ import java.io.{ByteArrayInputStream, InputStream} import java.sql.{ResultSet, SQLException, SQLFeatureNotSupportedException} import com.github.mmolimar.ksql.jdbc.utils.TestUtils._ -import io.confluent.ksql.metastore.{KsqlStream, KsqlTopic} +import io.confluent.ksql.metastore.SerdeFactory +import io.confluent.ksql.metastore.model.{KeyField, KsqlStream, KsqlTopic} import io.confluent.ksql.rest.client.{KsqlRestClient, RestResponse} import io.confluent.ksql.rest.entity.{ExecutionPlan, KafkaTopicsList, QueryDescriptionEntity, QueryDescriptionList, _} -import io.confluent.ksql.serde.DataSource.DataSourceSerDe -import io.confluent.ksql.serde.json.KsqlJsonTopicSerDe +import io.confluent.ksql.rest.server.computation.CommandId +import io.confluent.ksql.schema.ksql.KsqlSchema +import io.confluent.ksql.serde.Format +import io.confluent.ksql.serde.json.KsqlJsonSerdeFactory import io.confluent.ksql.util.timestamp.LongColumnTimestampExtractionPolicy import javax.ws.rs.core.Response -import org.apache.kafka.connect.data.SchemaBuilder +import org.apache.kafka.common.serialization.{Serde, Serdes} +import org.apache.kafka.connect.data.{Schema, SchemaBuilder} import org.scalamock.scalatest.MockFactory import org.scalatest.{Matchers, OneInstancePerTest, WordSpec} @@ -31,12 +35,12 @@ class KsqlStatementSpec extends WordSpec with Matchers with MockFactory with One "throw not supported exception if not supported" in { - (mockedKsqlRestClient.makeQueryRequest _).expects(*) + (mockedKsqlRestClient.makeQueryRequest _).expects(*, *) .returns(RestResponse.successful[KsqlRestClient.QueryStream](mockQueryStream(mockResponse))) .noMoreThanOnce val methods = implementedMethods[KsqlStatement] - reflectMethods[KsqlStatement](methods, false, statement) + reflectMethods[KsqlStatement](methods = methods, implemented = false, obj = statement) .foreach(method => { assertThrows[SQLFeatureNotSupportedException] { method() @@ -60,27 +64,27 @@ class KsqlStatementSpec extends WordSpec with Matchers with MockFactory with One } assertThrows[SQLException] { - (mockedKsqlRestClient.makeQueryRequest _).expects(*) + (mockedKsqlRestClient.makeQueryRequest _).expects(*, *) .returns(RestResponse.erroneous(new KsqlErrorMessage(-1, "error"))) .once statement.execute("select * from test") } assertThrows[SQLException] { - (mockedKsqlRestClient.makeQueryRequest _).expects(*) + (mockedKsqlRestClient.makeQueryRequest _).expects(*, *) .returns(RestResponse.successful[KsqlRestClient.QueryStream](mockQueryStream(mockResponse))) .once - (mockedKsqlRestClient.makeKsqlRequest _).expects(*) + (mockedKsqlRestClient.makeKsqlRequest(_: String)).expects(*) .returns(RestResponse.erroneous(new KsqlErrorMessage(-1, "error"))) .once statement.execute("select * from test") } assertThrows[SQLException] { - (mockedKsqlRestClient.makeQueryRequest _).expects(*) + (mockedKsqlRestClient.makeQueryRequest _).expects(*, *) .returns(RestResponse.successful[KsqlRestClient.QueryStream](mockQueryStream(mockResponse))) .once - (mockedKsqlRestClient.makeKsqlRequest _).expects(*) + (mockedKsqlRestClient.makeKsqlRequest(_: String)).expects(*) .returns(RestResponse.successful[KsqlEntityList](new KsqlEntityList)) .once statement.execute("select * from test") @@ -110,21 +114,21 @@ class KsqlStatementSpec extends WordSpec with Matchers with MockFactory with One val entityList = new KsqlEntityList entityList.add(new QueryDescriptionEntity("select * from test;", queryDesc)) - (mockedKsqlRestClient.makeQueryRequest _).expects(*) + (mockedKsqlRestClient.makeQueryRequest _).expects(*, *) .returns(RestResponse.successful[KsqlRestClient.QueryStream](mockQueryStream(mockResponse))) .once - (mockedKsqlRestClient.makeKsqlRequest _).expects(*) + (mockedKsqlRestClient.makeKsqlRequest(_: String)).expects(*) .returns(RestResponse.successful[KsqlEntityList](entityList)) .once statement.execute("select * from test") should be(true) - (mockedKsqlRestClient.makeQueryRequest _).expects(*) + (mockedKsqlRestClient.makeQueryRequest _).expects(*, *) .returns(RestResponse.successful[KsqlRestClient.QueryStream](mockQueryStream(mockResponse))) .once - (mockedKsqlRestClient.makeKsqlRequest _).expects(*) + (mockedKsqlRestClient.makeKsqlRequest(_: String)).expects(*) .returns(RestResponse.successful[KsqlEntityList](entityList)) .once - Option(statement.executeQuery("select * from test;")) should not be (None) + Option(statement.executeQuery("select * from test;")) should not be None statement.getMaxRows should be(0) statement.getResultSet shouldNot be(None.orNull) @@ -140,13 +144,13 @@ class KsqlStatementSpec extends WordSpec with Matchers with MockFactory with One statement.getWarnings should be(None.orNull) assertThrows[SQLException] { - (mockedKsqlRestClient.makeQueryRequest _).expects(*) + (mockedKsqlRestClient.makeQueryRequest _).expects(*, *) .returns(RestResponse.successful[KsqlRestClient.QueryStream](mockQueryStream(mockResponse))) .once val multipleResults = new KsqlEntityList multipleResults.add(new QueryDescriptionEntity("select * from test;", queryDesc)) multipleResults.add(new QueryDescriptionEntity("select * from test;", queryDesc)) - (mockedKsqlRestClient.makeKsqlRequest _).expects(*) + (mockedKsqlRestClient.makeKsqlRequest(_: String)).expects(*) .returns(RestResponse.successful[KsqlEntityList](multipleResults)) .once statement.execute("select * from test") @@ -154,11 +158,11 @@ class KsqlStatementSpec extends WordSpec with Matchers with MockFactory with One assertThrows[SQLException] { statement.getResultSet } - statement.cancel + statement.cancel() statement.isClosed should be(false) - statement.close - statement.close + statement.close() + statement.close() statement.isClosed should be(true) assertThrows[SQLException] { statement.executeQuery("select * from test;") @@ -166,10 +170,10 @@ class KsqlStatementSpec extends WordSpec with Matchers with MockFactory with One } "work when printing topics" in { - (mockedKsqlRestClient.makePrintTopicRequest _).expects(*) + (mockedKsqlRestClient.makePrintTopicRequest _).expects(*, *) .returns(RestResponse.successful[InputStream](new ByteArrayInputStream("test".getBytes))) .once - Option(statement.executeQuery("print 'test'")) should not be (None) + Option(statement.executeQuery("print 'test'")) should not be None statement.getResultSet.next should be(true) statement.getResultSet.getString(1) should be("test") } @@ -180,24 +184,24 @@ class KsqlStatementSpec extends WordSpec with Matchers with MockFactory with One def validateCommand(entity: KsqlEntity, headers: List[HeaderField]): Unit = { val entityList = new KsqlEntityList entityList.add(entity) - (mockedKsqlRestClient.makeKsqlRequest _).expects(*) + (mockedKsqlRestClient.makeKsqlRequest(_: String)).expects(*) .returns(RestResponse.successful[KsqlEntityList](entityList)) .once statement.execute(entity.getStatementText) should be(true) statement.getResultSet.getMetaData.getColumnCount should be(headers.size) - headers.zipWithIndex.map { case (c, index) => { + headers.zipWithIndex.map { case (c, index) => statement.getResultSet.getMetaData.getColumnName(index + 1) should be(c.name) statement.getResultSet.getMetaData.getColumnLabel(index + 1).toUpperCase should be(c.name) } - } } - val commandStatus = new CommandStatusEntity("REGISTER TOPIC TEST", "topic/1/create", "SUCCESS", "Success Message") + val commandStatus = new CommandStatusEntity("REGISTER TOPIC TEST", CommandId.fromString("topic/1/create"), + new CommandStatus(CommandStatus.Status.SUCCESS, "Success Message"), null) val executionPlan = new ExecutionPlan("DESCRIBE test") val functionDescriptionList = new FunctionDescriptionList("DESCRIBE FUNCTION test;", "TEST", "Description", "author", "version", "path", List( - new FunctionInfo(List(new ArgumentInfo("arg1", "INT", "Description")).asJava, "BIGINT", "Description") + new FunctionInfo(List(new ArgumentInfo("arg1", "INT", "Description", false)).asJava, "BIGINT", "Description") ).asJava, FunctionType.scalar ) @@ -211,7 +215,7 @@ class KsqlStatementSpec extends WordSpec with Matchers with MockFactory with One ) val ksqlTopicsList = new KsqlTopicsList( "SHOW TOPICS;", - List(new KsqlTopicInfo("ksqltopic", "kafkatopic", DataSourceSerDe.JSON)).asJava + List(new KsqlTopicInfo("ksqltopic", "kafkatopic", Format.JSON)).asJava ) val propertiesList = new PropertiesList( "list properties;", @@ -250,15 +254,22 @@ class KsqlStatementSpec extends WordSpec with Matchers with MockFactory with One "EXPLAIN select * from test;", List(queryDescription.getQueryDescription).asJava ) + val schema = SchemaBuilder + .struct + .field("key", Schema.OPTIONAL_INT64_SCHEMA) + .build() val sourceDescEntity = new SourceDescriptionEntity( "DESCRIBE TEST;", new SourceDescription( new KsqlStream("sqlExpression", "datasource", - SchemaBuilder.struct, - SchemaBuilder.struct.field("key"), + KsqlSchema.of(schema), + KeyField.of("key", schema.field("key")), new LongColumnTimestampExtractionPolicy("timestamp"), - new KsqlTopic("input", "input", new KsqlJsonTopicSerDe)), + new KsqlTopic("input", "input", new KsqlJsonSerdeFactory, true), + new SerdeFactory[String] { + override def create(): Serde[String] = Serdes.String() + }), true, "JSON", List.empty.asJava, @@ -302,7 +313,7 @@ class KsqlStatementSpec extends WordSpec with Matchers with MockFactory with One "throw not supported exception if not supported" in { val resultSet = new StatementNotSupported - reflectMethods[StatementNotSupported](Seq.empty, false, resultSet) + reflectMethods[StatementNotSupported](methods = Seq.empty, implemented = false, obj = resultSet) .foreach(method => { assertThrows[SQLFeatureNotSupportedException] { method() diff --git a/src/test/scala/com/github/mmolimar/ksql/jdbc/utils/TestUtils.scala b/src/test/scala/com/github/mmolimar/ksql/jdbc/utils/TestUtils.scala index 6242b56..300e499 100644 --- a/src/test/scala/com/github/mmolimar/ksql/jdbc/utils/TestUtils.scala +++ b/src/test/scala/com/github/mmolimar/ksql/jdbc/utils/TestUtils.scala @@ -25,26 +25,25 @@ object TestUtils extends Logging { private val RANDOM: Random = new Random - def constructTempDir(dirPrefix: String) = { + def constructTempDir(dirPrefix: String): File = { val file: File = new File(System.getProperty("java.io.tmpdir"), dirPrefix + RANDOM.nextInt(10000000)) if (!file.mkdirs) throw new RuntimeException("could not create temp directory: " + file.getAbsolutePath) file.deleteOnExit() file } - def getAvailablePort = { + def getAvailablePort: Int = { var socket: ServerSocket = null try { socket = new ServerSocket(0) socket.getLocalPort - } catch { case e: IOException => throw new IllegalStateException("Cannot find available port: " + e.getMessage, e) } finally socket.close() } - def waitTillAvailable(host: String, port: Int, maxWaitMs: Int) = { + def waitTillAvailable(host: String, port: Int, maxWaitMs: Int): Unit = { val defaultWait: Int = 100 var currentWait: Int = 0 try @@ -69,7 +68,7 @@ object TestUtils extends Logging { } catch { case ioe: IOException => false } - finally if (Option(ss) != None) ss.close() + finally if (Option(ss).isDefined) ss.close() } def buildProducer(brokerList: String, compression: String = "none"): KafkaProducer[Array[Byte], Array[Byte]] = { @@ -107,7 +106,8 @@ object TestUtils extends Logging { } def buildZkClient(zkConnection: String): KafkaZkClient = - KafkaZkClient(zkConnection, false, 6000, 10000, Int.MaxValue, Time.SYSTEM) + KafkaZkClient(connectString = zkConnection, isSecure = false, sessionTimeoutMs = 6000, + connectionTimeoutMs = 10000, maxInFlightRequests = Int.MaxValue, time = Time.SYSTEM) @throws[FileNotFoundException] def deleteFile(path: File): Boolean = { @@ -147,13 +147,13 @@ object TestUtils extends Logging { ct.runtimeClass.getMethods.filter(_.getDeclaringClass == ct.runtimeClass).map(_.getName) } - def reflectMethods[T <: AnyRef](methods: Seq[String], implemented: Boolean, - obj: T)(implicit tt: TypeTag[T], ct: ClassTag[T]): Seq[() => Any] = { + def reflectMethods[T <: AnyRef](methods: Seq[String], implemented: Boolean, obj: T) + (implicit tt: TypeTag[T], ct: ClassTag[T]): Seq[() => Any] = { val ksqlPackage = "com.github.mmolimar.ksql" val declarations = for { baseClass <- typeTag.tpe.baseClasses - if (baseClass.fullName.startsWith(ksqlPackage)) + if baseClass.fullName.startsWith(ksqlPackage) } yield baseClass.typeSignature.decls declarations.flatten From 7c95c62717fd995b35dc58d14a784a6f70eae0c7 Mon Sep 17 00:00:00 2001 From: Mario Molina Date: Mon, 5 Aug 2019 08:15:09 -0500 Subject: [PATCH 7/8] Updating integration tests --- .../ksql/jdbc/KsqlDriverIntegrationTest.scala | 105 ++++++++++-------- .../jdbc/embedded/EmbeddedKafkaCluster.scala | 14 ++- .../jdbc/embedded/EmbeddedKsqlEngine.scala | 16 +-- .../embedded/EmbeddedZookeeperServer.scala | 8 +- 4 files changed, 77 insertions(+), 66 deletions(-) diff --git a/src/it/scala/com/github/mmolimar/ksql/jdbc/KsqlDriverIntegrationTest.scala b/src/it/scala/com/github/mmolimar/ksql/jdbc/KsqlDriverIntegrationTest.scala index 2e4b230..4e5fa5b 100644 --- a/src/it/scala/com/github/mmolimar/ksql/jdbc/KsqlDriverIntegrationTest.scala +++ b/src/it/scala/com/github/mmolimar/ksql/jdbc/KsqlDriverIntegrationTest.scala @@ -7,7 +7,7 @@ import java.util.concurrent.atomic.AtomicBoolean import com.github.mmolimar.ksql.jdbc.KsqlEntityHeaders._ import com.github.mmolimar.ksql.jdbc.embedded.{EmbeddedKafkaCluster, EmbeddedKsqlEngine, EmbeddedZookeeperServer} import com.github.mmolimar.ksql.jdbc.utils.TestUtils -import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.scalatest._ class KsqlDriverIntegrationTest extends WordSpec with Matchers with BeforeAndAfterAll { @@ -16,14 +16,14 @@ class KsqlDriverIntegrationTest extends WordSpec with Matchers with BeforeAndAft val kafkaCluster = new EmbeddedKafkaCluster(zkServer.getConnection) val ksqlEngine = new EmbeddedKsqlEngine(kafkaCluster.getBrokerList) - lazy val kafkaProducer = TestUtils.buildProducer(kafkaCluster.getBrokerList) + lazy val kafkaProducer: KafkaProducer[Array[Byte], Array[Byte]] = TestUtils.buildProducer(kafkaCluster.getBrokerList) val ksqlUrl = s"jdbc:ksql://localhost:${ksqlEngine.getPort}?timeout=20000" var ksqlConnection: Connection = _ - val topic = TestUtils.randomString() + val topic: String = TestUtils.randomString() val stop = new AtomicBoolean(false) - val producerThread = new BackgroundOps(stop, () => produceMessages) + val producerThread = new BackgroundOps(stop, () => produceMessages()) "A KsqlConnection" when { @@ -35,24 +35,24 @@ class KsqlDriverIntegrationTest extends WordSpec with Matchers with BeforeAndAft "create the table properly" in { val resultSet = createTestTableOrStream(table) resultSet.next should be(true) - resultSet.getString(commandStatusEntity(0).name) should be("TABLE") + resultSet.getString(commandStatusEntity.head.name) should be("TABLE") resultSet.getString(commandStatusEntity(1).name) should be(table.toUpperCase) resultSet.getString(commandStatusEntity(2).name) should be("CREATE") resultSet.getString(commandStatusEntity(3).name) should be("SUCCESS") resultSet.getString(commandStatusEntity(4).name) should be("Table created") resultSet.next should be(false) - resultSet.close + resultSet.close() } "list the table already created" in { val resultSet = ksqlConnection.createStatement.executeQuery(s"SHOW TABLES") resultSet.next should be(true) - resultSet.getString(tablesListEntity(0).name) should be(table.toUpperCase) + resultSet.getString(tablesListEntity.head.name) should be(table.toUpperCase) resultSet.getString(tablesListEntity(1).name) should be(topic) resultSet.getString(tablesListEntity(2).name) should be("JSON") resultSet.getBoolean(tablesListEntity(3).name) should be(false) resultSet.next should be(false) - resultSet.close + resultSet.close() } "be able to get the execution plan for a query in a table" in { @@ -61,17 +61,19 @@ class KsqlDriverIntegrationTest extends WordSpec with Matchers with BeforeAndAft resultSet.getString(queryDescriptionEntity(1).name) should be("ROWTIME, ROWKEY, FIELD1, FIELD2, FIELD3") resultSet.getString(queryDescriptionEntity(2).name) should be(table.toUpperCase) resultSet.next should be(false) - resultSet.close + resultSet.close() } "be able to query all fields in the table" in { var counter = 0 val statement = ksqlConnection.createStatement statement.setMaxRows(maxRecords) + statement.getMoreResults(1) should be(false) val resultSet = statement.executeQuery(s"SELECT * FROM $table") + statement.getMoreResults(1) should be(true) while (resultSet.next) { resultSet.getLong(1) should not be (-1) - Option(resultSet.getString(2)) should not be (None) + Option(resultSet.getString(2)) should not be None resultSet.getInt(3) should be(123) resultSet.getDouble(4) should be(45.4) resultSet.getString(5) should be("lorem ipsum") @@ -81,9 +83,10 @@ class KsqlDriverIntegrationTest extends WordSpec with Matchers with BeforeAndAft counter += 1 } counter should be(maxRecords) + statement.getMoreResults() should be(false) - resultSet.close - statement.close + resultSet.close() + statement.close() val metadata = resultSet.getMetaData metadata.getColumnCount should be(5) @@ -168,19 +171,19 @@ class KsqlDriverIntegrationTest extends WordSpec with Matchers with BeforeAndAft resultSet.getString(sourceDescriptionEntity(3).name) should be("TABLE") resultSet.getString(sourceDescriptionEntity(4).name) should be("JSON") resultSet.next should be(false) - resultSet.close + resultSet.close() } "drop the table" in { val resultSet = ksqlConnection.createStatement.executeQuery(s"DROP TABLE $table") resultSet.next should be(true) - resultSet.getString(commandStatusEntity(0).name) should be("TABLE") + resultSet.getString(commandStatusEntity.head.name) should be("TABLE") resultSet.getString(commandStatusEntity(1).name) should be(table.toUpperCase) resultSet.getString(commandStatusEntity(2).name) should be("DROP") resultSet.getString(commandStatusEntity(3).name) should be("SUCCESS") - resultSet.getString(commandStatusEntity(4).name) should be(s"Source ${table.toUpperCase} was dropped. ") + resultSet.getString(commandStatusEntity(4).name) should be(s"Source ${table.toUpperCase} (topic: $topic) was dropped.") resultSet.next should be(false) - resultSet.close + resultSet.close() } } @@ -190,25 +193,25 @@ class KsqlDriverIntegrationTest extends WordSpec with Matchers with BeforeAndAft val stream = TestUtils.randomString() "create the stream properly" in { - val resultSet = createTestTableOrStream(stream, true) + val resultSet = createTestTableOrStream(str = stream, isStream = true) resultSet.next should be(true) - resultSet.getString(commandStatusEntity(0).name) should be("STREAM") + resultSet.getString(commandStatusEntity.head.name) should be("STREAM") resultSet.getString(commandStatusEntity(1).name) should be(stream.toUpperCase) resultSet.getString(commandStatusEntity(2).name) should be("CREATE") resultSet.getString(commandStatusEntity(3).name) should be("SUCCESS") resultSet.getString(commandStatusEntity(4).name) should be("Stream created") resultSet.next should be(false) - resultSet.close + resultSet.close() } "list the stream already created" in { val resultSet = ksqlConnection.createStatement.executeQuery(s"SHOW STREAMS") resultSet.next should be(true) - resultSet.getString(streamsListEntity(0).name) should be(stream.toUpperCase) + resultSet.getString(streamsListEntity.head.name) should be(stream.toUpperCase) resultSet.getString(streamsListEntity(1).name) should be(topic) resultSet.getString(streamsListEntity(2).name) should be("JSON") resultSet.next should be(false) - resultSet.close + resultSet.close() } "be able to get the execution plan for a query in a stream" in { @@ -217,17 +220,19 @@ class KsqlDriverIntegrationTest extends WordSpec with Matchers with BeforeAndAft resultSet.getString(queryDescriptionEntity(1).name) should be("ROWTIME, ROWKEY, FIELD1, FIELD2, FIELD3") resultSet.getString(queryDescriptionEntity(2).name) should be(stream.toUpperCase) resultSet.next should be(false) - resultSet.close + resultSet.close() } "be able to query all fields in the stream" in { var counter = 0 val statement = ksqlConnection.createStatement statement.setMaxRows(maxRecords) + statement.getMoreResults(1) should be(false) val resultSet = statement.executeQuery(s"SELECT * FROM $stream") + statement.getMoreResults(1) should be(true) while (resultSet.next) { resultSet.getLong(1) should not be (-1) - Option(resultSet.getString(2)) should not be (None) + Option(resultSet.getString(2)) should not be None resultSet.getInt(3) should be(123) resultSet.getDouble(4) should be(45.4) resultSet.getString(5) should be("lorem ipsum") @@ -237,9 +242,10 @@ class KsqlDriverIntegrationTest extends WordSpec with Matchers with BeforeAndAft counter += 1 } counter should be(maxRecords) + statement.getMoreResults(1) should be(false) - resultSet.close - statement.close + resultSet.close() + statement.close() val metadata = resultSet.getMetaData metadata.getColumnCount should be(5) @@ -323,19 +329,19 @@ class KsqlDriverIntegrationTest extends WordSpec with Matchers with BeforeAndAft resultSet.getString(sourceDescriptionEntity(3).name) should be("STREAM") resultSet.getString(sourceDescriptionEntity(4).name) should be("JSON") resultSet.next should be(false) - resultSet.close + resultSet.close() } "drop the stream" in { val resultSet = ksqlConnection.createStatement.executeQuery(s"DROP STREAM $stream") resultSet.next should be(true) - resultSet.getString(commandStatusEntity(0).name) should be("STREAM") + resultSet.getString(commandStatusEntity.head.name) should be("STREAM") resultSet.getString(commandStatusEntity(1).name) should be(stream.toUpperCase) resultSet.getString(commandStatusEntity(2).name) should be("DROP") resultSet.getString(commandStatusEntity(3).name) should be("SUCCESS") - resultSet.getString(commandStatusEntity(4).name) should be(s"Source ${stream.toUpperCase} was dropped. ") + resultSet.getString(commandStatusEntity(4).name) should be(s"Source ${stream.toUpperCase} (topic: $topic) was dropped.") resultSet.next should be(false) - resultSet.close + resultSet.close() } } @@ -344,19 +350,22 @@ class KsqlDriverIntegrationTest extends WordSpec with Matchers with BeforeAndAft "show the content of that topic" in { val statement = ksqlConnection.createStatement statement.setMaxRows(3) + statement.getMoreResults(1) should be(false) val resultSet = statement.executeQuery(s"PRINT '$topic'") + statement.getMoreResults(1) should be(true) resultSet.next should be(true) - resultSet.getString(printTopic(0).name) should be("Format:STRING") + resultSet.getString(printTopic.head.name) should be("Format:STRING") resultSet.next should be(true) resultSet.next should be(true) resultSet.next should be(false) - resultSet.close - statement.close + statement.getMoreResults() should be(false) + resultSet.close() + statement.close() } } } - private def produceMessages: Unit = { + private def produceMessages(): Unit = { val key = TestUtils.randomString().getBytes val value = """ @@ -378,39 +387,39 @@ class KsqlDriverIntegrationTest extends WordSpec with Matchers with BeforeAndAft s"WITH (KAFKA_TOPIC='$topic', VALUE_FORMAT='JSON', KEY='FIELD1');") } - override def beforeAll = { - DriverManager.registerDriver(new KsqlDriver); + override def beforeAll(): Unit = { + DriverManager.registerDriver(new KsqlDriver) - zkServer.startup + zkServer.startup() TestUtils.waitTillAvailable("localhost", zkServer.getPort, 5000) - kafkaCluster.startup + kafkaCluster.startup() kafkaCluster.getPorts.foreach { port => TestUtils.waitTillAvailable("localhost", port, 5000) } kafkaCluster.createTopic(topic) kafkaCluster.existTopic(topic) should be(true) - producerThread.start + producerThread.start() - ksqlEngine.startup + ksqlEngine.startup() TestUtils.waitTillAvailable("localhost", ksqlEngine.getPort, 5000) ksqlConnection = DriverManager.getConnection(ksqlUrl) } - override def afterAll = { + override def afterAll(): Unit = { info(s"Produced ${producerThread.getNumExecs} messages") stop.set(true) - TestUtils.swallow(producerThread.interrupt) + TestUtils.swallow(producerThread.interrupt()) - TestUtils.swallow(ksqlConnection.close) - ksqlEngine.shutdown - TestUtils.swallow(kafkaProducer.close) + TestUtils.swallow(ksqlConnection.close()) + ksqlEngine.shutdown() + TestUtils.swallow(kafkaProducer.close()) - kafkaCluster.shutdown - zkServer.shutdown + kafkaCluster.shutdown() + zkServer.shutdown() } } @@ -418,13 +427,13 @@ class KsqlDriverIntegrationTest extends WordSpec with Matchers with BeforeAndAft class BackgroundOps(stop: AtomicBoolean, exec: () => Unit) extends Thread { private var count = 0L - override def run = { + override def run(): Unit = { while (!stop.get) { exec() this.count += 1 } } - def getNumExecs = this.count + def getNumExecs: Long = this.count } diff --git a/src/it/scala/com/github/mmolimar/ksql/jdbc/embedded/EmbeddedKafkaCluster.scala b/src/it/scala/com/github/mmolimar/ksql/jdbc/embedded/EmbeddedKafkaCluster.scala index c96a1f1..a4ae996 100644 --- a/src/it/scala/com/github/mmolimar/ksql/jdbc/embedded/EmbeddedKafkaCluster.scala +++ b/src/it/scala/com/github/mmolimar/ksql/jdbc/embedded/EmbeddedKafkaCluster.scala @@ -8,11 +8,13 @@ import kafka.server.{KafkaConfig, KafkaServer} import kafka.utils.Logging import kafka.zk.AdminZkClient +import scala.collection.Seq + class EmbeddedKafkaCluster(zkConnection: String, ports: Seq[Int] = Seq(TestUtils.getAvailablePort), baseProps: Properties = new Properties) extends Logging { - private val actualPorts: Seq[Int] = ports.map(resolvePort(_)) + private val actualPorts: Seq[Int] = ports.map(resolvePort) private var brokers: Seq[KafkaServer] = Seq.empty private var logDirs: Seq[File] = Seq.empty @@ -20,7 +22,7 @@ class EmbeddedKafkaCluster(zkConnection: String, private lazy val zkClient = TestUtils.buildZkClient(zkConnection) private lazy val adminZkClient = new AdminZkClient(zkClient) - def startup = { + def startup(): Unit = { info("Starting up embedded Kafka brokers") for ((port, i) <- actualPorts.zipWithIndex) { @@ -50,7 +52,7 @@ class EmbeddedKafkaCluster(zkConnection: String, info(s"Started embedded Kafka brokers: $getBrokerList") } - def shutdown = { + def shutdown(): Unit = { brokers.foreach(broker => TestUtils.swallow(broker.shutdown)) logDirs.foreach(logDir => TestUtils.swallow(TestUtils.deleteFile(logDir))) } @@ -59,7 +61,7 @@ class EmbeddedKafkaCluster(zkConnection: String, def getBrokerList: String = actualPorts.map("localhost:" + _).mkString(",") - def createTopic(topic: String, numPartitions: Int = 1, replicationFactor: Int = 1) = { + def createTopic(topic: String, numPartitions: Int = 1, replicationFactor: Int = 1): Unit = { info(s"Creating topic $topic") adminZkClient.createTopic(topic, numPartitions, replicationFactor) } @@ -69,11 +71,11 @@ class EmbeddedKafkaCluster(zkConnection: String, adminZkClient.deleteTopic(topic) } - def deleteTopics(topics: Seq[String]) = topics.foreach(deleteTopic(_)) + def deleteTopics(topics: Seq[String]): Unit = topics.foreach(deleteTopic) def existTopic(topic: String): Boolean = zkClient.topicExists(topic) - def listTopics = zkClient.getAllTopicsInCluster + def listTopics: Seq[String] = zkClient.getAllTopicsInCluster private def resolvePort(port: Int) = if (port <= 0) TestUtils.getAvailablePort else port diff --git a/src/it/scala/com/github/mmolimar/ksql/jdbc/embedded/EmbeddedKsqlEngine.scala b/src/it/scala/com/github/mmolimar/ksql/jdbc/embedded/EmbeddedKsqlEngine.scala index 8ff400d..0d6b55c 100644 --- a/src/it/scala/com/github/mmolimar/ksql/jdbc/embedded/EmbeddedKsqlEngine.scala +++ b/src/it/scala/com/github/mmolimar/ksql/jdbc/embedded/EmbeddedKsqlEngine.scala @@ -26,34 +26,34 @@ class EmbeddedKsqlEngine(brokerList: String, port: Int = TestUtils.getAvailableP import java.util.function.{Function => JFunction, Supplier => JSupplier} - implicit def toJavaSupplier[A](f: Function0[A]) = new JSupplier[A] { + implicit def toJavaSupplier[A](f: Function0[A]): JSupplier[A] = new JSupplier[A] { override def get: A = f() } - implicit def toJavaFunction[A, B](f: Function1[A, B]) = new JFunction[A, B] { + implicit def toJavaFunction[A, B](f: Function1[A, B]): JFunction[A, B] = new JFunction[A, B] { override def apply(a: A): B = f(a) } - lazy val ksqlEngine = { + lazy val ksqlEngine: KsqlRestApplication = { val versionCheckerAgent = mock[VersionCheckerAgent] (versionCheckerAgent.start _).expects(*, *).returns().anyNumberOfTimes (versionCheckerAgent.updateLastRequestTime _).expects().returns().anyNumberOfTimes - KsqlRestApplication.buildApplication(config, (_: JSupplier[java.lang.Boolean]) => versionCheckerAgent) + KsqlRestApplication.buildApplication(config, (_: JSupplier[java.lang.Boolean]) => versionCheckerAgent, Int.MaxValue) } @throws[IOException] - def startup = { + def startup(): Unit = { info("Starting up embedded KSQL engine") - ksqlEngine.start + ksqlEngine.start() info("Started embedded Zookeeper: " + getConnection) } - def shutdown = { + def shutdown(): Unit = { info("Shutting down embedded KSQL engine") - TestUtils.swallow(ksqlEngine.stop) + TestUtils.swallow(ksqlEngine.stop()) info("Shutted down embedded KSQL engine") } diff --git a/src/it/scala/com/github/mmolimar/ksql/jdbc/embedded/EmbeddedZookeeperServer.scala b/src/it/scala/com/github/mmolimar/ksql/jdbc/embedded/EmbeddedZookeeperServer.scala index c6f5137..eaac2e8 100644 --- a/src/it/scala/com/github/mmolimar/ksql/jdbc/embedded/EmbeddedZookeeperServer.scala +++ b/src/it/scala/com/github/mmolimar/ksql/jdbc/embedded/EmbeddedZookeeperServer.scala @@ -16,7 +16,7 @@ class EmbeddedZookeeperServer(private val port: Int = TestUtils.getAvailablePort private val factory: ServerCnxnFactory = ServerCnxnFactory.createFactory(new InetSocketAddress("localhost", port), 0) @throws[IOException] - def startup = { + def startup(): Unit = { info("Starting up embedded Zookeeper") factory.startup(zookeeper) @@ -24,11 +24,11 @@ class EmbeddedZookeeperServer(private val port: Int = TestUtils.getAvailablePort info("Started embedded Zookeeper: " + getConnection) } - def shutdown = { + def shutdown(): Unit = { info("Shutting down embedded Zookeeper") - TestUtils.swallow(zookeeper.shutdown) - TestUtils.swallow(factory.shutdown) + TestUtils.swallow(zookeeper.shutdown()) + TestUtils.swallow(factory.shutdown()) TestUtils.deleteFile(snapshotDir) TestUtils.deleteFile(logDir) From 5cd325f1bdd56ea9951a2b6d3e0350d513476049 Mon Sep 17 00:00:00 2001 From: Mario Molina Date: Mon, 5 Aug 2019 08:18:01 -0500 Subject: [PATCH 8/8] Release version 1.1 --- build.sbt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/build.sbt b/build.sbt index db8402d..8e5bda7 100644 --- a/build.sbt +++ b/build.sbt @@ -1,6 +1,6 @@ name := "ksql-jdbc-driver" -version := "1.1-SNAPSHOT" +version := "1.1" initialize := { assert(Integer.parseInt(sys.props("java.specification.version").split("\\.")(1)) >= 8, "Java 8 or above required") @@ -16,7 +16,7 @@ libraryDependencies += "io.confluent.ksql" % "ksql-rest-app" % "5.3.0" libraryDependencies += "org.apache.kafka" %% "kafka" % "2.3.0" % "test" libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.8" % "test" libraryDependencies += "org.scalamock" %% "scalamock-scalatest-support" % "3.6.0" % "test" -libraryDependencies += "javax.ws.rs" % "javax.ws.rs-api" % "2.1.1" artifacts (Artifact("javax.ws.rs-api", "jar", "jar")) +libraryDependencies += "javax.ws.rs" % "javax.ws.rs-api" % "2.1.1" artifacts Artifact("javax.ws.rs-api", "jar", "jar") assemblyMergeStrategy in assembly := { case PathList("javax", "inject", xs@_*) => MergeStrategy.first