Inputs are used to bring external data into an Envelope pipeline. The data brought in from an input can then be further used to derive new data and to write out to external outputs.
Inputs can operate as a batch or a stream. When at least one input in an Envelope pipeline is a stream then the pipeline will run in Spark Streaming mode.
An input maps back to Spark by creating a DataFrame for a batch input and a DStream for a stream input. Envelope assigns the DataFrame of a batch input or the DataFrame of the micro-batch of a stream input to the data of the step that contains the input.
The process of turning a DStream micro-batch into a DataFrame is known as translation. See the Translators section for details and options for this operation.
There are five inputs provided out of the box by Envelope. Custom inputs can be developed and provided to an Envelope pipeline to access other data sources.
There are four provided batch inputs: filesystem
, hive
, jdbc
, kudu
.
There is one provided streaming input: kafka
.
The filesystem
input can read in data from Hadoop-compatible file systems, such as HDFS and S3A, at the path given by the path
configuration, and interpret them as one of five formats.
For some of the supported file formats a schema must or can be given. There are multiple ways to specify a schema in the input configuration. Please refer to the schema documentation for details.
This input can read the contents of the path in five different formats (specified by the format
configuration):
-
parquet
will read the path as Parquet files. For this format the schema of the data is retrieved from the files so it is not specified in the configuration. This format uses Spark’sDataFrameReader#parquet
functionality. -
json
will read the path as JSON files. A schema can be provided, or if no schema is provided then it will be inferred from the data. This format uses Spark’sDataFrameReader#json
functionality. -
csv
will read the path as delimited text files. A schema can be provided, or if no schema is provided then it will be inferred from the data. See the configurations documentation for the many options for this format. This format uses Spark’sDataFrameReader#csv
functionality. -
input-format
will read the path using the given Hadoop InputFormat class and the given Envelope translator. This format allows input formats that already exist, or have been custom developed for the pipeline, to be plugged in without making an entirely new input. The InputFormat class is used to define how records will be retrieved from the path. The translator is then used to translate the unstructured records into typed fields so that the data can be represented as a DataFrame. This format uses Spark’sSparkContext#newAPIHadoopFile
functionality. -
text
will read the path as text files with a record per line and then, optionally, translate the lines to typed fields using the given Envelope translator. This format uses Spark’sDataFrameReader#text
functionality.
The hive
input reads a table
from the Hive metastore, which includes tables created by Envelope’s Hive output and by Impala. This input uses Spark’s DataFrameReader#table
functionality.
The jdbc
input reads the contents of a table
at a given JDBC url
. A username
and password
can also be provided. This input uses Spark’s DataFrameReader#jdbc
functionality.
The kudu
input reads a table (specified by table.name
) from Kudu. The Kudu masters are specified with connection
.
The Kudu connection
configuration can be supplied by a system environment variable, which allows the Kudu master addresses to be provided at runtime. This technique could also be used for other Envelope configurations.
The system environment variable can be referenced in configuration as follows:
... input { type = kudu connection = ${KUDU_MASTER_CONNECTION_STRING} table.name = "impala::default.mykudutable1" } ...
The system environment variable can be provided as follows when executed in client deployment mode:
export KUDU_MASTER_CONNECTION_STRING="master1:7051,master2:7051,master3:7051" spark-submit --master yarn --deploy-mode client envelope-<version>.jar application.conf
Note
|
CDH5 uses spark2-submit instead of spark-submit for Spark 2 applications such as Envelope.
|
The system environment variable can be provided as follows when executed in cluster deployment mode:
spark-submit --master yarn --deploy-mode cluster --files application.conf --conf spark.yarn.appMasterEnv.KUDU_MASTER_CONNECTION_STRING="master1:7051,master2:7051,master3:7051" envelope-<version>.jar application.conf
The kafka
input reads one or more Kafka topics as a stream. Envelope will use the given translator
to turn each micro-batch into a DataFrame. The Kafka brokers are specified with brokers
and the list of topics with topics
.
The group ID (group.id
) is the unique identifier of the input across all executions of the pipeline. It is used to identify multiple runs of the same pipeline even when other pipelines might be reading from the same topic. If no group ID is provided a random UUID will be generated for each pipeline execution, and so the progress across runs will not be maintained.
To enable Spark Streaming’s windowing support, which allows each micro-batch to contain a window of its previous micro-batches, set window.enable
to true
and set window.milliseconds
to the duration of the window. window.slide.milliseconds
allows you to specify a sliding window.
The Kafka input supports offset management that will store the latest processed offset of each partition of each topic of each group ID, and when the pipeline starts these offsets will be retrieved to start the stream from where it last successfully processed, even when data has arrived in the topics between runs. Note that offset management (offset.manage
) and stream windowing (window.enable
) currently cannot be enabled at the same time.
By default, offsets will be stored using Kafka’s offset commit API which uses a special Kafka topic for storage. Alternatively, external storage can be specified using offset.output
. To disable offset management set offset.manage
to false
.
The external output (if specified, using offset.output
) must be support random upsert mutations (i.e. implement RandomOutput
and support the UPSERT mutation type) and must contain the four fields group_id
(string), topic
(string), partition
(int) and offset
(long). If the output requires the key fields to be specified (e.g. HBase, ZooKeeper) then provide the fields group_id
, topic
, partition
.
input { type = kafka brokers = "broker1:9092,..." topics = [topicname1,topicname2] group.id = applicationname translator { type = delimited delimiter = "," schema { type = flat field.names = [name,score,time] field.types = [string,int,long] } } offsets { manage = true output { type = kudu connection = "master1:7051,..." table.name = "impala::default.offsets" } } }
In Envelope the process of deserializing raw messages into structured rows is known as translation.
Envelope provides seven translator implementations.
This Translator
deserializes Protocol Buffers objects using the proto3
library. The translator reads the incoming Protobuf objects using a supplied Descriptor
file as the schema. The resulting DataFrame also uses the supplied Descriptor
as its schema. The Protobuf objects themselves can be GZIP compressed.
Note
|
The key parameter of the translate(byte[] key, byte[] value) method is ignored; the Protobuf payload should be assigned to the value parameter.
|
Protobuf fields are mapped to the following Spark SQL types:
Field Type | DataType | Field Type | DataType |
---|---|---|---|
double |
DoubleType |
float |
FloatType |
int32 |
IntegerType |
int64 |
LongType |
uint32 |
IntegerType |
uint64 |
LongType |
sint32 |
IntegerType |
sint64 |
LongType |
fixed32 |
IntegerType |
fixed64 |
LongType |
sfixed32 |
IntegerType |
sfixed64 |
LongType |
bool |
BooleanType |
bytes |
BinaryType |
string |
StringType |
enum |
StringType |
map |
MapType |
message |
StructType |
If the field is set to repeating
, the field will be wrapped within an ArrayType
.
If the field is a oneof
, the child fields are flattened into discrete columns with the resulting Row
.
Note
|
The following fields are not supported: any , group . In addition, service declarations are ignored.
|
Note
|
The translator ignores all unknown fields. |
In cases that Envelope does not provide an input or translator for a required data source, a custom class can be developed and referenced in the Envelope pipeline.
To create a batch input implement the BatchInput
interface, or to create a stream input implement the StreamInput
interface. Translators must implement the Translator
interface. With the implemented class compiled into its own jar file the input or translator can be referenced in the pipeline by using the fully qualified class name (or alias — see below) as the input type
, and it can be provided to the Envelope application using the --jars
argument when calling spark-submit
.
To use an alias in configuration files, Envelope needs to be able to find your class. First, your class will need to implement the ProvidesAlias
interface. Next, place the implementation’s fully qualified class name in a META-INF/services/com.cloudera.labs.envelope.input.Input
or META-INF/services/com.cloudera.labs.envelope.input.translator.Translator
file on the class path - the usual method is to package the file with your JAR.