Envelope provides the ability to loop over one or more steps of a pipeline so that each instance does not need to be specified. The values to be looped over can be determined dynamically either from parameters given at pipeline submission or from the data of previous steps in the pipeline.
Loops are specified in an Envelope pipeline by adding a loop step. This is in contrast to a data step where data itself is manipulated. A loop step defines how the loop will run, such as parallel or serial, and over which values. The steps that the loop step then loops over is known as the loop graph, and is defined by all of the steps in the pipeline that are directly dependent on the loop step. Steps that are dependent on loop graph steps, but not the loop step itself, will run after all iterations of the loop.
Loops are executed by unrolling the loop into the steps that would have been created if the iterations of the loop were specified individually. The names of the loop graph steps of each iteration are suffixed with the iteration value to distinguish one instance from another. This allows the steps of all iterations to be accessible within the loop and in steps after the loop completes.
Loop steps have three options for specifying the of values to be looped over:
-
range
provides an inclusive range of integers -
list
provides an ordered list of values -
step
provides the values from a previous step
While iterating over the range
or list
of values, loop steps can specify a parameter
, which is a string that will be used to update the configuration of each loop graph instance with the value of the loop. This could be used to run iterations of the loop over, for example, a range of dates, or a list of table names. For a given parameter parameter_name
all instances of ${parameter_name}
in the loop graph step configurations will be replaced with the value of the loop iteration.
Consider an example where there is a monthly job that needs to iterate through the dates of the month one at a time, and in sequence. Rather than individually listing out steps for each date of the month we can use a loop step to iterate over the dates.
application { name = Loop step example } steps { read_month { input { type = hive table = staging_table } } loop_dates { dependencies = [read_month] type = loop mode = serial source = range range.start = ${first_date} range.end = ${last_date} parameter = processing_date } process_date { dependencies = [loop_dates] deriver { type = sql query.literal = "SELECT * FROM read_month WHERE data_date = ${processing_date}" } planner { type = append } output { type = hive table = processed_table } } }
The pipeline could be run with the dates provided like below:
spark-submit envelope-*.jar process_dates.conf first_date=20170501,last_date=20170531
Note
|
CDH5 uses spark2-submit instead of spark-submit for Spark 2 applications such as Envelope.
|
When Envelope reaches the loop step (after read_month
) it would unroll the loop into:
application { name = Loop step example } steps { read_month { input { type = hive table = staging_table } } loop_dates { dependencies = [read_month] type = loop mode = serial source = range range.start = 20170501 range.end = 20170531 parameter = processing_date } process_date_20170501 { dependencies = [loop_dates] deriver { type = sql query.literal = "SELECT * FROM read_month WHERE data_date = 20170501" } planner { type = append } output { type = hive table = processed_table } } process_date_20170502 { dependencies = [loop_dates, process_date_20170501] deriver { type = sql query.literal = "SELECT * FROM read_month WHERE data_date = 20170502" } planner { type = append } output { type = hive table = processed_table } } ... process_date_20170531 { dependencies = [loop_dates, process_date_20170530] deriver { type = sql query.literal = "SELECT * FROM read_month WHERE data_date = 20170531" } planner { type = append } output { type = hive table = processed_table } } }
With step
, looping is performed over a dataset derived in one of the previous steps. The content of each row within referenced dataset is accessible inside the loop using a ${field_name}
notation, where field_name
can be any field in that row’s schema.
Also, while using the step
option, the suffix of the loop graph steps can be controlled by setting configuration parameter suffix
. Its value should match one of the fields in the schema of a dataset being unrolled. For example, the below Envelope configuration can unroll the monthly_report
step into a sequence of steps monthy_report_JAN
, monthly_report_FEB
, etc., and will substitute parameters ${month_name}
, ${month_start_date}
and ${month_end_date}
in them with corresponding values for a month from the cal_dim
dataset.
application { name = Loop with parameters } steps { ... cal_dim { dependencies = [...] deriver { type = sql query.literal = """ SELECT month_id, month_name, month_start_date, month_end_date FROM calendar_dim WHERE year = 2019 """ } } ... loop_dates { dependencies = [cal_dim] type = loop mode = parallel source = step step = cal_dim suffix = month_id } monthly_report { dependencies = [loop_dates] deriver { type = sql query.literal = """ SELECT ${month_name} order_month, sum(order_amount) orders_total FROM orders WHERE order_date between ${month_start_date} and ${month_end_date} """ } planner { type = append } output { type = hive table = report_table } } ... }