Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bq2bq): add spillover date via SQL support #10

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 44 additions & 14 deletions task/bq2bq/executor/bumblebee/transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def __init__(self,
dend: datetime,
execution_time: datetime,
dry_run: bool):
self.spillover_query = spillover_query
self.bigquery_service = bigquery_service
self.task_config = task_config
self.sql_query = sql_query
Expand Down Expand Up @@ -78,15 +79,26 @@ def transform(self):
elif bq_destination_table.partitioning_type == "DAY":
partition_strategy = timedelta(days=1)

# queries where source data/partition directly map with destination partitions
transformation = MultiPartitionTransformation(self.bigquery_service,
self.task_config,
self.sql_query,
self.dstart, self.dend,
self.dry_run,
localised_execution_time,
partition_strategy,
self.task_config.concurrency)
if self.spillover_query:
transformation = LegacySpilloverTransformation(self.bigquery_service,
self.task_config,
self.sql_query,
self.spillover_query,
self.dstart,
self.dend,
self.dry_run,
localised_execution_time,
partition_strategy)
else:
# queries where source data/partition directly map with destination partitions
transformation = MultiPartitionTransformation(self.bigquery_service,
self.task_config,
self.sql_query,
self.dstart, self.dend,
self.dry_run,
localised_execution_time,
partition_strategy,
self.task_config.concurrency)
else:
raise Exception("unable to generate a transformation for request, unsupported partition strategy")
transformation.transform()
Expand Down Expand Up @@ -376,7 +388,11 @@ def transform(self):
# break query file
task_queries = self.task_query.split(OPTIMUS_QUERY_BREAK_MARKER)
if len(task_queries) < len(datetime_list):
raise Exception("query needs to be broken using {}, {} query found, needed {}\n{}".format(OPTIMUS_QUERY_BREAK_MARKER, len(task_queries), len(datetime_list), self.task_query))
raise Exception(
"query needs to be broken using {}, {} query found, needed {}\n{}".format(OPTIMUS_QUERY_BREAK_MARKER,
len(task_queries),
len(datetime_list),
self.task_query))

tasks = []
query_index = 0
Expand Down Expand Up @@ -413,45 +429,59 @@ def __init__(self,
sql_query: str,
spillover_query: str,
start_time: datetime,
end_time: datetime,
dry_run: bool,
execution_time: datetime):
execution_time: datetime,
partition_delta: timedelta):
self.bigquery_service = bigquery_service
self.task_config = task_config
self.sql_query = sql_query
self.spillover_query = spillover_query
self.dry_run = dry_run
self.start_time = start_time
self.end_time = end_time
self.execution_time = execution_time
self.partition_delta = partition_delta

self.concurrency = self.task_config.concurrency

def transform(self):
datetime_list = []
default_datetime = [self.start_time]
datetime_list.extend(default_datetime)
# default_datetime = [self.start_time]
# datetime_list.extend(default_datetime)

if self.task_config.use_spillover:
spillover = SpilloverDatetimes(self.bigquery_service,
self.spillover_query,
self.task_config,
self.start_time,
self.end_time,
self.dry_run,
self.execution_time)
spillover_datetimes = spillover.collect_datetimes()
datetime_list.extend(spillover_datetimes)

datetime_list = distinct_list(datetime_list)

execute_for = self.start_time

# tables are partitioned for day
# iterate from start to end for each partition
while execute_for < self.end_time:
execute_for += self.partition_delta

tasks = []
for partition_time in datetime_list:
logger.info("create transformation for partition: {}".format(partition_time))
loader = PartitionLoader(self.bigquery_service, self.task_config.destination_table,
self.task_config.load_method, partition_time)

task_window = WindowFactory.create_window_with_time(partition_time, partition_time + self.partition_delta)

task = PartitionTransformation(self.task_config,
loader,
self.sql_query,
self.window,
task_window,
self.dry_run,
self.execution_time)
tasks.append(task)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[DESTINATION]
PROJECT="g-project"
DATASET="playground"
TABLE="test_booking_count"

[TRANSFORMATION]
TASK_WINDOW="DAILY"
TIMEZONE="Asia/Jakarta"
USE_SPILLOVER="TRUE"
CONCURRENCY=5

[LOAD]
LOAD_METHOD="REPLACE"
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
select
TIMESTAMP('__dstart__') as dstart,
TIMESTAMP('__dend__') as dend,
Comment on lines +2 to +3
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these dstart dend macros won't work now and these will be exact timestamps getting passed to this SQL, we know that right?

"beerus" as hakai,
"naruto" as rasengan,
CAST("__execution_time__" AS TIMESTAMP) as `load_timestamp`
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
SELECT
date
FROM
`g-project.playground.calendar_date`
-- this query will generate 14 calendar dates which we will replace 14 partitions using the main query
WHERE date >= DATE_SUB('__dstart__', INTERVAL 14 DAY)
AND date < '__dend__'
Comment on lines +6 to +7
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similarly, these will be exact timestamps.

ORDER BY date