Skip to content

Commit

Permalink
Update to 0.10.8, add in Delta lake example, update mysql command
Browse files Browse the repository at this point in the history
  • Loading branch information
pflooky committed Jun 11, 2024
1 parent 50483bc commit c9ce551
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 3 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
FROM datacatering/data-caterer-basic:0.10.6
FROM datacatering/data-caterer-basic:0.10.8

COPY --chown=app:app build/libs/data-caterer-example-0.1.0.jar /opt/spark/jars/data-caterer.jar
2 changes: 1 addition & 1 deletion docker/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ services:
image: "mysql:8.4.0"
environment:
MYSQL_ROOT_PASSWORD: "root"
command: "--default-authentication-plugin=mysql_native_password"
command: "--mysql-native-password=ON"
volumes:
- "${HOME}/data/mysql:/var/lib/mysql"
- "./data/sql/mysql/customer.sql:/docker-entrypoint-initdb.d/customer.sql"
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ version=0.1.0

scalaVersion=2.12
scalaSpecificVersion=2.12.15
dataCatererVersion=0.10.7
dataCatererVersion=0.10.8
sparkVersion=3.5.1
sparkMajorVersion=3.5
39 changes: 39 additions & 0 deletions src/main/scala/io/github/datacatering/plan/DeltaLakePlan.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package io.github.datacatering.plan

import io.github.datacatering.datacaterer.api.PlanRun
import io.github.datacatering.datacaterer.api.model.{DateType, DecimalType, TimestampType}

class DeltaLakePlan extends PlanRun {

val accountTask = delta("customer_accounts", "/opt/app/data/customer/delta")
.schema(
field.name("account_id").regex("ACC[0-9]{8}").unique(true),
field.name("balance").`type`(new DecimalType(5, 2)).min(1).max(1000),
field.name("created_by").sql("CASE WHEN status IN ('open', 'closed') THEN 'eod' ELSE 'event' END"),
field.name("name").expression("#{Name.name}"),
field.name("open_time").`type`(TimestampType).min(java.sql.Date.valueOf("2022-01-01")),
field.name("status").oneOf("open", "closed", "suspended", "pending")
)
.count(count.records(100))

val transactionTask = iceberg("customer_transactions", "/opt/app/data/customer/delta")
.schema(
field.name("account_id"),
field.name("full_name"),
field.name("amount").`type`(new DecimalType(4, 2)).min(1).max(100),
field.name("time").`type`(TimestampType).min(java.sql.Date.valueOf("2022-01-01")),
field.name("date").`type`(DateType).sql("DATE(time)")
)
.count(count.recordsPerColumnGenerator(generator.min(1).max(5), "account_id", "full_name"))

val config = configuration
.generatedReportsFolderPath("/opt/app/data/report")
.enableUniqueCheck(true)

val myPlan = plan.addForeignKeyRelationship(
accountTask, List("account_id", "name"),
List(transactionTask -> List("account_id", "full_name"))
)

execute(myPlan, config, accountTask, transactionTask)
}

0 comments on commit c9ce551

Please sign in to comment.