From 5cf980744e3e0b81999d8b4c57e1f493db532663 Mon Sep 17 00:00:00 2001 From: Irena Date: Tue, 11 Aug 2020 20:30:40 +0300 Subject: [PATCH 1/2] feat(InstrumentationOutputWriter): handle multiple fields --- .../InstrumentationProvider.scala | 1 + .../influxdb/InfluxDBInstrumentation.scala | 12 ++++++++++ .../InstrumentationOutputWriter.scala | 23 ++++++++++++++----- 3 files changed, 30 insertions(+), 6 deletions(-) diff --git a/src/main/scala/com/yotpo/metorikku/instrumentation/InstrumentationProvider.scala b/src/main/scala/com/yotpo/metorikku/instrumentation/InstrumentationProvider.scala index e34bc9b60..90e8ca463 100644 --- a/src/main/scala/com/yotpo/metorikku/instrumentation/InstrumentationProvider.scala +++ b/src/main/scala/com/yotpo/metorikku/instrumentation/InstrumentationProvider.scala @@ -21,6 +21,7 @@ object InstrumentationProvider { trait InstrumentationProvider extends Serializable{ def count(name: String, value: Long, tags: Map[String, String] = Map(), time: Long = System.currentTimeMillis()): Unit def gauge(name: String, value: Long, tags: Map[String, String] = Map(), time: Long = System.currentTimeMillis()): Unit + def gauge(fields: Map[String, Object] = Map(), tags: Map[String, String] = Map(), time: Long = System.currentTimeMillis()): Unit def close(): Unit = { } } diff --git a/src/main/scala/com/yotpo/metorikku/instrumentation/influxdb/InfluxDBInstrumentation.scala b/src/main/scala/com/yotpo/metorikku/instrumentation/influxdb/InfluxDBInstrumentation.scala index 8873a0adc..cd66e5dcc 100644 --- a/src/main/scala/com/yotpo/metorikku/instrumentation/influxdb/InfluxDBInstrumentation.scala +++ b/src/main/scala/com/yotpo/metorikku/instrumentation/influxdb/InfluxDBInstrumentation.scala @@ -18,6 +18,10 @@ class InfluxDBInstrumentation(val influxDB: InfluxDB, val measurement: String) e writeToInflux(time, name, value, tags) } + override def gauge(fields: Map[String, Object] = Map(), tags: Map[String, String] = Map(), time: Long): Unit = { + writeToInflux(time, fields, tags) + } + private def writeToInflux(time: Long, name: String, value: Long, tags: Map[String, String] = Map()): Unit = { influxDB.write(Point.measurement(measurement) .time(time, TimeUnit.MILLISECONDS) @@ -26,6 +30,14 @@ class InfluxDBInstrumentation(val influxDB: InfluxDB, val measurement: String) e .build()) } + private def writeToInflux(time: Long, fields: Map[String, Object] = Map(), tags: Map[String, String] = Map()): Unit = { + influxDB.write(Point.measurement(measurement) + .time(time, TimeUnit.MILLISECONDS) + .fields(fields.asJava) + .tag(tags.asJava) + .build()) + } + override def close(): Unit = { influxDB.close() } diff --git a/src/main/scala/com/yotpo/metorikku/output/writers/instrumentation/InstrumentationOutputWriter.scala b/src/main/scala/com/yotpo/metorikku/output/writers/instrumentation/InstrumentationOutputWriter.scala index 8bb23b111..7a8beb1ba 100644 --- a/src/main/scala/com/yotpo/metorikku/output/writers/instrumentation/InstrumentationOutputWriter.scala +++ b/src/main/scala/com/yotpo/metorikku/output/writers/instrumentation/InstrumentationOutputWriter.scala @@ -7,27 +7,37 @@ import org.apache.log4j.{LogManager, Logger} import org.apache.spark.sql.{DataFrame, Row} -class InstrumentationOutputWriter(props: Map[String, String], +class InstrumentationOutputWriter(props: Map[String, Any], dataFrameName: String, metricName: String, instrumentationFactory: InstrumentationFactory) extends Writer { @transient lazy val log: Logger = LogManager.getLogger(this.getClass) - val valueColumnProperty: Option[String] = Option(props).getOrElse(Map()).get("valueColumn") - val timeColumnProperty: Option[String] = Option(props).getOrElse(Map()).get("timeColumn") + val valueColumnProperty: Option[String] = Some(Option(props).getOrElse(Map()).get("valueColumn").toString) + val timeColumnProperty: Option[String] = Some(Option(props).getOrElse(Map()).get("timeColumn").toString) + val valueColumnsProperty: Option[List[String]] = Some(Option(props).getOrElse(Map()).get("valueColumns").toList.map(_.toString)) + // scalastyle:off cyclomatic.complexity override def write(dataFrame: DataFrame): Unit = { val columns = dataFrame.schema.fields.zipWithIndex val indexOfValCol = valueColumnProperty.flatMap(col => Option(dataFrame.schema.fieldNames.indexOf(col))) val indexOfTimeCol = timeColumnProperty.flatMap(col => Option(dataFrame.schema.fieldNames.indexOf(col))) + var valueColoumnsIndices = valueColumnsProperty.flatMap(col => Option(dataFrame.schema.fieldNames.indexOf(col))) log.info(s"Starting to write Instrumentation of data frame: $dataFrameName on metric: $metricName") dataFrame.foreachPartition(p => { val client = instrumentationFactory.create() - // use last column if valueColumn is missing + // use last column if valueColumn is missing and valueColoumns is missing + // ToDO: NEEDS TO BE REMOVED val actualIndexOfValCol = indexOfValCol.getOrElse(columns.length - 1) + val actualColumnsIndices = (indexOfValCol, valueColoumnsIndices) match { + case (Some(colIndex), None) => List(indexOfValCol.getOrElse(columns.length - 1)) + case (None, Some(valueColoumnsIndices)) => valueColoumnsIndices + case _ => + } + p.foreach(row => { try { @@ -45,7 +55,8 @@ class InstrumentationOutputWriter(props: Map[String, String], if (metricValue != null && classOf[Number].isAssignableFrom(metricValue.getClass)) { val longValue = metricValue.asInstanceOf[Number].longValue() val valueColumnName = row.schema.fieldNames(actualIndexOfValCol) - client.gauge(name = valueColumnName, value = longValue, tags = tags, time = time) + //client.gauge(name = valueColumnName, value = longValue, tags = tags, time = time) + client.gauge(fields = tags, tags = tags, time = time) } }catch { @@ -57,7 +68,7 @@ class InstrumentationOutputWriter(props: Map[String, String], client.close() }) } - + // scalastyle:on cyclomatic.complexity def getTime(indexOfTimeCol: Option[Int], row: Row): Long = { if (indexOfTimeCol.isDefined) { if (!row.isNullAt(indexOfTimeCol.get)) { From 8e4da75ed039b435809a1a4c3db791629164c8bd Mon Sep 17 00:00:00 2001 From: Gilad Weinbach Date: Wed, 19 Aug 2020 08:56:03 +0300 Subject: [PATCH 2/2] contintue logic + temp fix for overload error --- .../InstrumentationProvider.scala | 2 +- .../influxdb/InfluxDBInstrumentation.scala | 6 ++--- .../spark/SparkInstrumentation.scala | 2 ++ .../InstrumentationOutputWriter.scala | 17 +++++++------- .../InstrumentationOutputWriterTest.scala | 23 +++++++++++++++++++ 5 files changed, 38 insertions(+), 12 deletions(-) create mode 100644 src/test/scala/com/yotpo/metorikku/output/writers/instrumentation/InstrumentationOutputWriterTest.scala diff --git a/src/main/scala/com/yotpo/metorikku/instrumentation/InstrumentationProvider.scala b/src/main/scala/com/yotpo/metorikku/instrumentation/InstrumentationProvider.scala index 90e8ca463..8211012c9 100644 --- a/src/main/scala/com/yotpo/metorikku/instrumentation/InstrumentationProvider.scala +++ b/src/main/scala/com/yotpo/metorikku/instrumentation/InstrumentationProvider.scala @@ -21,7 +21,7 @@ object InstrumentationProvider { trait InstrumentationProvider extends Serializable{ def count(name: String, value: Long, tags: Map[String, String] = Map(), time: Long = System.currentTimeMillis()): Unit def gauge(name: String, value: Long, tags: Map[String, String] = Map(), time: Long = System.currentTimeMillis()): Unit - def gauge(fields: Map[String, Object] = Map(), tags: Map[String, String] = Map(), time: Long = System.currentTimeMillis()): Unit + def gauge2(fields: Map[String, Object] = Map(), tags: Map[String, String] = Map(), time: Long = System.currentTimeMillis()): Unit def close(): Unit = { } } diff --git a/src/main/scala/com/yotpo/metorikku/instrumentation/influxdb/InfluxDBInstrumentation.scala b/src/main/scala/com/yotpo/metorikku/instrumentation/influxdb/InfluxDBInstrumentation.scala index cd66e5dcc..239b36e3a 100644 --- a/src/main/scala/com/yotpo/metorikku/instrumentation/influxdb/InfluxDBInstrumentation.scala +++ b/src/main/scala/com/yotpo/metorikku/instrumentation/influxdb/InfluxDBInstrumentation.scala @@ -18,8 +18,8 @@ class InfluxDBInstrumentation(val influxDB: InfluxDB, val measurement: String) e writeToInflux(time, name, value, tags) } - override def gauge(fields: Map[String, Object] = Map(), tags: Map[String, String] = Map(), time: Long): Unit = { - writeToInflux(time, fields, tags) + override def gauge2(fields: Map[String, Object] = Map(), tags: Map[String, String] = Map(), time: Long): Unit = { + writeToInflux2(time, fields, tags) } private def writeToInflux(time: Long, name: String, value: Long, tags: Map[String, String] = Map()): Unit = { @@ -30,7 +30,7 @@ class InfluxDBInstrumentation(val influxDB: InfluxDB, val measurement: String) e .build()) } - private def writeToInflux(time: Long, fields: Map[String, Object] = Map(), tags: Map[String, String] = Map()): Unit = { + private def writeToInflux2(time: Long, fields: Map[String, Object] = Map(), tags: Map[String, String] = Map()): Unit = { influxDB.write(Point.measurement(measurement) .time(time, TimeUnit.MILLISECONDS) .fields(fields.asJava) diff --git a/src/main/scala/com/yotpo/metorikku/instrumentation/spark/SparkInstrumentation.scala b/src/main/scala/com/yotpo/metorikku/instrumentation/spark/SparkInstrumentation.scala index 276d48e99..27f856ea2 100644 --- a/src/main/scala/com/yotpo/metorikku/instrumentation/spark/SparkInstrumentation.scala +++ b/src/main/scala/com/yotpo/metorikku/instrumentation/spark/SparkInstrumentation.scala @@ -20,6 +20,8 @@ class SparkInstrumentation() extends InstrumentationProvider { metricParams ++= Array(name) metricParams.mkString(".") } + + override def gauge2(fields: Map[String, Object], tags: Map[String, String], time: Long): Unit = ??? } class SparkInstrumentationFactory() extends InstrumentationFactory { diff --git a/src/main/scala/com/yotpo/metorikku/output/writers/instrumentation/InstrumentationOutputWriter.scala b/src/main/scala/com/yotpo/metorikku/output/writers/instrumentation/InstrumentationOutputWriter.scala index 7a8beb1ba..edfebbfef 100644 --- a/src/main/scala/com/yotpo/metorikku/output/writers/instrumentation/InstrumentationOutputWriter.scala +++ b/src/main/scala/com/yotpo/metorikku/output/writers/instrumentation/InstrumentationOutputWriter.scala @@ -20,9 +20,9 @@ class InstrumentationOutputWriter(props: Map[String, Any], // scalastyle:off cyclomatic.complexity override def write(dataFrame: DataFrame): Unit = { val columns = dataFrame.schema.fields.zipWithIndex - val indexOfValCol = valueColumnProperty.flatMap(col => Option(dataFrame.schema.fieldNames.indexOf(col))) + val indexOfValCol: Option[Int]= valueColumnProperty.flatMap(col => Option(dataFrame.schema.fieldNames.indexOf(col))) val indexOfTimeCol = timeColumnProperty.flatMap(col => Option(dataFrame.schema.fieldNames.indexOf(col))) - var valueColoumnsIndices = valueColumnsProperty.flatMap(col => Option(dataFrame.schema.fieldNames.indexOf(col))) + val valueColumnsIndices: Option[List[Int]]= valueColumnsProperty.map(col => Option(dataFrame.schema.fieldNames.indexOf(col)).toList) log.info(s"Starting to write Instrumentation of data frame: $dataFrameName on metric: $metricName") dataFrame.foreachPartition(p => { @@ -32,10 +32,10 @@ class InstrumentationOutputWriter(props: Map[String, Any], // ToDO: NEEDS TO BE REMOVED val actualIndexOfValCol = indexOfValCol.getOrElse(columns.length - 1) - val actualColumnsIndices = (indexOfValCol, valueColoumnsIndices) match { - case (Some(colIndex), None) => List(indexOfValCol.getOrElse(columns.length - 1)) - case (None, Some(valueColoumnsIndices)) => valueColoumnsIndices - case _ => + val actualIndicesOfValCol: List[Int] = (indexOfValCol, valueColumnsIndices) match { + case (Some(_), None) => List(indexOfValCol.get) + case (None, Some(_)) => valueColumnsIndices.get + case _ => List(columns.length - 1) } p.foreach(row => { @@ -43,7 +43,8 @@ class InstrumentationOutputWriter(props: Map[String, Any], val tags = Map("metric" -> metricName, "dataframe" -> dataFrameName) ++ columns.filter { - case (column, index) => index != actualIndexOfValCol && (!indexOfTimeCol.isDefined || index != indexOfTimeCol.get) + case (column, index) => !actualIndicesOfValCol.contains(index) && + (!indexOfTimeCol.isDefined || index != indexOfTimeCol.get) //replace index with indices }.map { case (column, index) => column.name -> row.get(index).asInstanceOf[AnyVal].toString @@ -56,7 +57,7 @@ class InstrumentationOutputWriter(props: Map[String, Any], val longValue = metricValue.asInstanceOf[Number].longValue() val valueColumnName = row.schema.fieldNames(actualIndexOfValCol) //client.gauge(name = valueColumnName, value = longValue, tags = tags, time = time) - client.gauge(fields = tags, tags = tags, time = time) + client.gauge2(fields = tags, tags = tags, time = time) } }catch { diff --git a/src/test/scala/com/yotpo/metorikku/output/writers/instrumentation/InstrumentationOutputWriterTest.scala b/src/test/scala/com/yotpo/metorikku/output/writers/instrumentation/InstrumentationOutputWriterTest.scala new file mode 100644 index 000000000..b4845e9d8 --- /dev/null +++ b/src/test/scala/com/yotpo/metorikku/output/writers/instrumentation/InstrumentationOutputWriterTest.scala @@ -0,0 +1,23 @@ +package com.yotpo.metorikku.output.writers.instrumentation + +import com.holdenkarau.spark.testing.DataFrameSuiteBase +import com.yotpo.metorikku.instrumentation.{InstrumentationFactory, InstrumentationProvider} +import org.scalatest.FunSuite +import org.scalatest.mockito.MockitoSugar.mock + + +class InstrumentationOutputWriterTest extends FunSuite with DataFrameSuiteBase{ + val states = Map("AL" -> "Alabama", "AK" -> "Alaska") +// val factory = mock[InstrumentationFactory] + val factory = InstrumentationProvider.getInstrumentationFactory(Some("foo"), None) + val writer = new InstrumentationOutputWriter(props = states, dataFrameName = "df", metricName = "metric", factory) + + import sqlContext.implicits._ + val propsData = Seq( + ("month_ts", 1), + ("number_of_ratings", 2) + ) + + val df = propsData.toDF("timeColumn", "valueColumns") + writer.write(df) +}