Since Envelope configures standard Spark applications, we can take advantage of all of the in-built security mechanisms that Spark provides (see Spark Security). However, some of the mechanisms require some additional configuration from the user. Below, we outline how to use Envelope with secure external systems in both batch and streaming modes and YARN client and YARN cluster modes. The steps in this guide are only required for clusters secured with Kerberos authentication.
If your application is expected to run for less than 7 days (or whatever the maximum token renewal lifetime of your
cluster is), the following configurations are required.
A common pre-requisite to all modes is to have acquired a Kerberos TGT via kinit
.
- HDFS
-
Handled automatically by Spark and no extra configuration required.
- Hive
-
Handled automatically by Spark and no extra confuguration required.
- Kafka
- HBase
-
To obtain a security token for HBase, the HBase configuration for the secure cluster and the HBase libraries must be on the classpath of both the driver and the executor. The easiest way to achieve this is via something like the following:
export EXTRA_CP=/etc/hbase/conf:/opt/cloudera/parcels/CDH/lib/hbase/hbase-common.jar:/opt/cloudera/parcels/CDH/lib/hbase/hbase-client.jar:/opt/cloudera/parcels/CDH/lib/hbase/hbase-protocol.jar:/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core.jar spark-submit \ --driver-class-path ${EXTRA_CP} \ --conf spark.executor.extraClassPath=${EXTRA_CP} \ envelope-*.jar envelope_app.conf
Note
|
CDH5 uses spark2-submit instead of spark-submit for Spark 2 applications such as Envelope.
CDH5 clusters may also require the environment variable SPARK_KAFKA_VERSION to be set to 0.10 .
|
- Kudu
-
If using a bulk planner (
append
,delete
,overwrite
,upsert
), there is no extra configuration required to work with secure Kudu clusters—the underlyingKuduContext
automatically handles the security.If using random planners (
bitemporal
,history
,eventtimeupsert
) with an unsecured Kudu but with an otherwise secured Hadoop cluster (hadoop.security.authentication=kerberos
) you must additionally specifysecure = false
in theoutput
section of the step configuration. For example:output { type = kudu secure = false connection = "REPLACEME:7051" table.name = "impala::default.fix_orderhistory" }
- HBase
-
We need to ensure the HBase configuration is on the classpath, but since Spark runs things in cluster mode slightly differently we need to specify the classpath via an environment variable as follows:
HADOOP_CONF_DIR=/etc/hbase/conf:/etc/hive/conf:/etc/spark2/conf:/opt/cloudera/parcels/CDH/lib/hbase/hbase-common.jar:/opt/cloudera/parcels/CDH/lib/hbase/hbase-client.jar:/opt/cloudera/parcels/CDH/lib/hbase/hbase-protocol.jar:/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core.jar \ spark-submit \ --deploy-mode cluster \ --files jaas.conf,envelope_app.conf,kafka.kt \ --driver-java-options=-Djava.security.auth.login.config=jaas.conf \ --conf spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf \ --conf spark.executor.extraClassPath=/etc/hbase/conf \ envelope-*.jar envelope_app.conf
For an example of a JAAS file, see Connecting to Secure Kafka below.
- Kudu
-
Kudu requires a keytab to operate correctly in cluster mode. Additionally, if using a random planner
secure = true
is required in the step config as in client mode. An example submission would be:spark-submit \ --deploy-mode cluster \ --files envelope_app.conf \ --keytab user.kt \ (1) --principal [email protected] \ (2) envelope-*.jar envelope_app.conf
-
Or whatever you have called the keytab file.
-
The principal to use in the keytab file.
-
To access a secure Kafka we must supply both a keytab and a JAAS configuration file and add configurations to the Kafka input/output to enable security
We need to add the following configurations to the input
or output
sections of our Envelope steps
which use Kafka:
parameter { security.protocol = SASL_PLAINTEXT sasl.kerberos.service.name = kafka (1) }
-
Or whatever principal your Kafka brokers are running with.
We need to supply a JAAS configuration file to both driver and executor. For example:
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true useTicketCache=false keyTab="kafka.kt" (1) principal="[email protected]"; (2) };
-
Or whatever you have called the keytab file.
-
The principal to use in the keytab file.
We can then submit the job with something like the following:
export SPARK_KAFKA_VERSION=0.10 spark-submit \ --files jaas.conf,kafka.kt \ --driver-java-options=-Djava.security.auth.login.config=jaas.conf \ --conf spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf \ envelope-*.jar envelope_app.conf
If reading from Kafka secured by Sentry, you need to ensure the user in the supplied keytab has been granted a Sentry role with the following:
-
read access to the topics in the step
-
access to the consumer group specified in the input
group.id
-
access to Spark’s extra consumer group
spark-executor-{group.id}
If you need to specify both --keytab
and supply a keytab in --files
, you can
use the same file but you need to symlink one file with a different name to prevent
Spark from refusing to upload the same file twice.
Long-running applications—that is, applications expected to run longer than the shortest token renewal lifetime—require the addition of a keytab and principal to the Spark submission command. Spark can use these to obtain new tokens as and when required.
For example, a streaming application which reads from secure Kafka and writes to secure HBase would be launched with something like the following:
ln -s user.kt kafka.kt export SPARK_KAFKA_VERSION=0.10 (1) export PRINCNAME=REPLACEME HADOOP_CONF_DIR=/etc/hbase/conf:/etc/hive/conf:/etc/spark2/conf:/opt/cloudera/parcels/CDH/lib/hbase/hbase-common.jar:/opt/cloudera/parcels/CDH/lib/hbase/hbase-client.jar:/opt/cloudera/parcels/CDH/lib/hbase/hbase-protocol.jar:/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core.jar \ spark-submit \ --keytab user.kt \ --principal ${PRINCNAME} \ --files jaas.conf,envelope_app.conf,kafka.kt \ --driver-java-options=-Djava.security.auth.login.config=jaas.conf \ --conf spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf \ --conf spark.executor.extraClassPath=/etc/hbase/conf \ envelope-*.jar envelope_app.conf
-
Only required if 0.8 is the default Kafka version.
If you refer to the same external system in multiple locations in your Envelope
configuration file, it can be tedious to repeat the connection and security configuration
multiple times. For these scenarios we can use a separate environment config file and include it in
our main config file. For example, if we have an env.conf
file with the following:
env { kafka { brokers = "ip-172-31-61-61.ec2.internal:9092,ip-172-31-61-62.ec2.internal:9092,ip-172-31-61-63.ec2.internal:9092" parameter { security.protocol = SASL_PLAINTEXT sasl.kerberos.service.name = kafka } } kudu { connection = "ip-172-31-61-61.ec2.internal:7051,ip-172-31-61-62.ec2.internal:7051,ip-172-31-61-63.ec2.internal:7051" security { renew-interval = 1d } } }
We can use this in our main configuration as in the following example:
application { name = FIX Envelope example batch.milliseconds = 5000 executors = 1 executor.cores = 4 executor.memory = 4G spark.conf { spark.streaming.kafka.consumer.cache.enabled = false } } include file("env.conf") (1) steps { fix { input = ${env.kafka} { (2) type = kafka topics = [fixk] group.id = fixk-group encoding = string translator { type = kvp delimiter.kvp = "\u0001" delimiter.field = "=" schema { type = flat field.names = [6,10,11,14,17,20,21,35,37,38,39,40,54,55,60,150,151] field.types = [double,string,string,int,string,int,int,string,string,int,int,int,int,string,long,int,int] } } } } messagetypes { input = ${env.kudu} { (3) type = kudu table.name = "impala::default.fix_messagetypes" hint.small = true } } newordersingle { dependencies = [fix, messagetypes] deriver { type = sql query.literal = """ SELECT `11` AS clordid, `35` AS msgtype, msgtypedesc, `21` AS handlinst, `55` AS symbol, `54` AS side, `60` AS transacttime, `38` AS orderqty, `40` AS ordtype, `10` AS checksum FROM fix f LEFT OUTER JOIN messagetypes mt ON f.`35` = mt.msgtype WHERE msgtype = 'D'""" } planner { type = upsert } output = ${env.kudu} { (3) type = kudu table.name = "impala::default.fix_newordersingle" } } orderhistory { dependencies = [fix] deriver { type = sql query.literal = """ SELECT `11` AS clordid, `55` AS symbol, `38` AS orderqty, NVL(`151`, `38`) AS leavesqty, NVL(`14`, 0) AS cumqty, `6` AS avgpx, `60` AS transacttime FROM fix""" } partitioner { type = uuid } planner { type = history carry.forward.when.null = true fields.key = [clordid] fields.timestamp = [transacttime] fields.values = [symbol,orderqty,leavesqty,cumqty] field.last.updated = lastupdated fields.effective.from = [startdate] fields.effective.to = [enddate] field.current.flag = currentflag time.model { event.type = longmillis last.updated.type = stringdatetime } } output = ${env.kudu} { (3) type = kudu table.name = "impala::default.fix_orderhistory" } } }
-
Include the environment file
-
Merge in the Kafka configuration
-
Merge in the Kudu configuration
Note that in cluster mode, you need to ship both config files to the driver via --files
.
Spark provides a pluggable mechanism for obtaining tokens for external systems. Unfortunately, this API has not proven stable between 2.x releases, so Envelope provides its own API for for developers to use for obtaining and refreshing tokens.
In Envelope’s system, a central TokenStoreManager
in the driver process is responsible for
obtaining new tokens from inputs and outputs which implement a TokenProvider
interface. Providers
are registered with the manager automatically at startup if their Input/Output implementation
supplies one.
The manager runs a background thread which periodically checks with each provider whether a new token is required and writes out a credentials file to the application’s staging directory on HDFS.
In the executors, Input and Output implementations can request tokens from a TokenStoreListener
which
periodically checks HDFS for new tokens. It is up to the implementer of each Input or Output to deal
with the new tokens appropriately.
For an example of its usage, see the KuduTokenProvider
and KuduConnectionManager
classes in the
envelope-kudu
module.