Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(InstrumentationOutputWriter): handle multiple fields #361

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ object InstrumentationProvider {
trait InstrumentationProvider extends Serializable{
def count(name: String, value: Long, tags: Map[String, String] = Map(), time: Long = System.currentTimeMillis()): Unit
def gauge(name: String, value: Long, tags: Map[String, String] = Map(), time: Long = System.currentTimeMillis()): Unit
def gauge2(fields: Map[String, Object] = Map(), tags: Map[String, String] = Map(), time: Long = System.currentTimeMillis()): Unit
def close(): Unit = { }
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ class InfluxDBInstrumentation(val influxDB: InfluxDB, val measurement: String) e
writeToInflux(time, name, value, tags)
}

override def gauge2(fields: Map[String, Object] = Map(), tags: Map[String, String] = Map(), time: Long): Unit = {
writeToInflux2(time, fields, tags)
}

private def writeToInflux(time: Long, name: String, value: Long, tags: Map[String, String] = Map()): Unit = {
influxDB.write(Point.measurement(measurement)
.time(time, TimeUnit.MILLISECONDS)
Expand All @@ -26,6 +30,14 @@ class InfluxDBInstrumentation(val influxDB: InfluxDB, val measurement: String) e
.build())
}

private def writeToInflux2(time: Long, fields: Map[String, Object] = Map(), tags: Map[String, String] = Map()): Unit = {
influxDB.write(Point.measurement(measurement)
.time(time, TimeUnit.MILLISECONDS)
.fields(fields.asJava)
.tag(tags.asJava)
.build())
}

override def close(): Unit = {
influxDB.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ class SparkInstrumentation() extends InstrumentationProvider {
metricParams ++= Array(name)
metricParams.mkString(".")
}

override def gauge2(fields: Map[String, Object], tags: Map[String, String], time: Long): Unit = ???
}

class SparkInstrumentationFactory() extends InstrumentationFactory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,33 +7,44 @@ import org.apache.log4j.{LogManager, Logger}
import org.apache.spark.sql.{DataFrame, Row}


class InstrumentationOutputWriter(props: Map[String, String],
class InstrumentationOutputWriter(props: Map[String, Any],
dataFrameName: String,
metricName: String,
instrumentationFactory: InstrumentationFactory) extends Writer {
@transient lazy val log: Logger = LogManager.getLogger(this.getClass)

val valueColumnProperty: Option[String] = Option(props).getOrElse(Map()).get("valueColumn")
val timeColumnProperty: Option[String] = Option(props).getOrElse(Map()).get("timeColumn")
val valueColumnProperty: Option[String] = Some(Option(props).getOrElse(Map()).get("valueColumn").toString)
val timeColumnProperty: Option[String] = Some(Option(props).getOrElse(Map()).get("timeColumn").toString)
val valueColumnsProperty: Option[List[String]] = Some(Option(props).getOrElse(Map()).get("valueColumns").toList.map(_.toString))

// scalastyle:off cyclomatic.complexity
override def write(dataFrame: DataFrame): Unit = {
val columns = dataFrame.schema.fields.zipWithIndex
val indexOfValCol = valueColumnProperty.flatMap(col => Option(dataFrame.schema.fieldNames.indexOf(col)))
val indexOfValCol: Option[Int]= valueColumnProperty.flatMap(col => Option(dataFrame.schema.fieldNames.indexOf(col)))
val indexOfTimeCol = timeColumnProperty.flatMap(col => Option(dataFrame.schema.fieldNames.indexOf(col)))
val valueColumnsIndices: Option[List[Int]]= valueColumnsProperty.map(col => Option(dataFrame.schema.fieldNames.indexOf(col)).toList)

log.info(s"Starting to write Instrumentation of data frame: $dataFrameName on metric: $metricName")
dataFrame.foreachPartition(p => {
val client = instrumentationFactory.create()

// use last column if valueColumn is missing
// use last column if valueColumn is missing and valueColoumns is missing
// ToDO: NEEDS TO BE REMOVED
val actualIndexOfValCol = indexOfValCol.getOrElse(columns.length - 1)

val actualIndicesOfValCol: List[Int] = (indexOfValCol, valueColumnsIndices) match {
case (Some(_), None) => List(indexOfValCol.get)
case (None, Some(_)) => valueColumnsIndices.get
case _ => List(columns.length - 1)
}

p.foreach(row => {
try {

val tags = Map("metric" -> metricName, "dataframe" -> dataFrameName) ++
columns.filter {
case (column, index) => index != actualIndexOfValCol && (!indexOfTimeCol.isDefined || index != indexOfTimeCol.get)
case (column, index) => !actualIndicesOfValCol.contains(index) &&
(!indexOfTimeCol.isDefined || index != indexOfTimeCol.get) //replace index with indices
}.map {
case (column, index) =>
column.name -> row.get(index).asInstanceOf[AnyVal].toString
Expand All @@ -45,7 +56,8 @@ class InstrumentationOutputWriter(props: Map[String, String],
if (metricValue != null && classOf[Number].isAssignableFrom(metricValue.getClass)) {
val longValue = metricValue.asInstanceOf[Number].longValue()
val valueColumnName = row.schema.fieldNames(actualIndexOfValCol)
client.gauge(name = valueColumnName, value = longValue, tags = tags, time = time)
//client.gauge(name = valueColumnName, value = longValue, tags = tags, time = time)
client.gauge2(fields = tags, tags = tags, time = time)
}

}catch {
Expand All @@ -57,7 +69,7 @@ class InstrumentationOutputWriter(props: Map[String, String],
client.close()
})
}

// scalastyle:on cyclomatic.complexity
def getTime(indexOfTimeCol: Option[Int], row: Row): Long = {
if (indexOfTimeCol.isDefined) {
if (!row.isNullAt(indexOfTimeCol.get)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.yotpo.metorikku.output.writers.instrumentation

import com.holdenkarau.spark.testing.DataFrameSuiteBase
import com.yotpo.metorikku.instrumentation.{InstrumentationFactory, InstrumentationProvider}
import org.scalatest.FunSuite
import org.scalatest.mockito.MockitoSugar.mock


class InstrumentationOutputWriterTest extends FunSuite with DataFrameSuiteBase{
val states = Map("AL" -> "Alabama", "AK" -> "Alaska")
// val factory = mock[InstrumentationFactory]
val factory = InstrumentationProvider.getInstrumentationFactory(Some("foo"), None)
val writer = new InstrumentationOutputWriter(props = states, dataFrameName = "df", metricName = "metric", factory)

import sqlContext.implicits._
val propsData = Seq(
("month_ts", 1),
("number_of_ratings", 2)
)

val df = propsData.toDF("timeColumn", "valueColumns")
writer.write(df)
}