Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISSUE-363: Add the ability to Drop columns with DoricColumns #366

Merged
merged 4 commits into from
Jul 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions core/src/main/scala/doric/sem/TransformOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package doric
package sem

import cats.implicits._

import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.{Column, DataFrame, Dataset}
import org.apache.spark.sql.doric.DataFrameExtras

import scala.collection.immutable

private[sem] trait TransformOps {

implicit class DataframeTransformationSyntax[A](df: Dataset[A]) {
Expand Down Expand Up @@ -142,5 +143,31 @@ private[sem] trait TransformOps {
@inline def selectCName(col: CName, cols: CName*): DataFrame = {
df.select(col.value, cols.map(_.value): _*)
}

/**
* Drops specified column from the Dataframe.
* @group Dataframe Transformation operation
* @note Unlike in Spark, dropping a column that does not exist will result in a ColumnNotFound exception
*/
def drop(col: DoricColumn[_]): DataFrame = {
col.elem
.run(df)
.map(df.drop)
.returnOrThrow("drop")
}

/**
* Drops specified columns from the Dataframe.
* @group Dataframe Transformation operation
* @note Unlike in Spark, dropping columns that do not exist will result in a ColumnNotFound exception
*/
def drop(col: DoricColumn[_]*): DataFrame = {
val dataFrame = df.toDF()
col.toList
.traverse(_.elem)
.run(dataFrame)
.map(_.foldLeft(dataFrame)((df, col) => df.drop(col)))
.returnOrThrow("drop")
}
}
}
83 changes: 83 additions & 0 deletions core/src/test/scala/doric/sem/TransformOpsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package doric
package sem

import doric.implicitConversions._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, TimestampType}
import org.scalatest.matchers.should.Matchers
import org.scalatest.EitherValues

Expand Down Expand Up @@ -141,5 +143,86 @@ class TransformOpsSpec
(11, 1)
)
}

it("drops a single column") {
import spark.implicits._

val df = List(("a", "b")).toDF("col1", "col2")

val res = df.drop(colString("col1")).collect().toList
val actual = List(Row("b"))

res shouldBe actual
}

it("drop throws an error when column is not found") {
import spark.implicits._

val df = List(("a", "b")).toDF("col1", "col2")

intercept[DoricMultiError] {
df.drop(colString("invalid")).collect().toList
} should containAllErrors(
ColumnNotFound("invalid", List("col1", "col2"))
)
}

it("drop throws an error when column is of the wrong type") {
import spark.implicits._

val df = List(("a", "b")).toDF("col1", "col2")

intercept[DoricMultiError] {
df.drop(colInt("col1")).collect().toList
} should containAllErrors(
ColumnTypeError("col1", IntegerType, StringType)
)
}

it("drops multiple columns") {
import spark.implicits._

val df = List(("a", "b", 1, 2.0)).toDF("col1", "col2", "col3", "col4")

val res = df
.drop(colString("col2"), colInt("col3"), colDouble("col4"))
.collect()
.toList
val actual = List(Row("a"))

res shouldBe actual
}

it("drop throws an error when columns are not found") {
import spark.implicits._

val df = List(("a", "b", 1, 2.0)).toDF("col1", "col2", "col3", "col4")

intercept[DoricMultiError] {
eruizalo marked this conversation as resolved.
Show resolved Hide resolved
df.drop(colString("not"), colInt("a"), colDouble("column"))
.collect()
.toList
} should containAllErrors(
ColumnNotFound("not", List("col1", "col2", "col3", "col4")),
ColumnNotFound("a", List("col1", "col2", "col3", "col4")),
ColumnNotFound("column", List("col1", "col2", "col3", "col4"))
)
}

it("drop throws an error when columns are of the wrong type") {
import spark.implicits._

val df = List(("a", "b", 1, 2.0)).toDF("col1", "col2", "col3", "col4")

intercept[DoricMultiError] {
df.drop(colDouble("col1"), colTimestamp("col2"), colString("col3"))
.collect()
.toList
} should containAllErrors(
ColumnTypeError("col1", DoubleType, StringType),
ColumnTypeError("col2", TimestampType, StringType),
ColumnTypeError("col3", StringType, IntegerType)
)
}
}
}