Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark sql support #13

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions runner/parquet/convert_to_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from pyspark import SparkContext
from pyspark.sql import HiveContext

sc = SparkContext(appName = "Parquet Converter")
hiveContext = HiveContext(sc)
hiveContext.table("rankings").saveAsParquetFile("/user/spark/benchmark/rankings-parquet")
hiveContext.table("uservisits").saveAsParquetFile("/user/spark/benchmark/uservisits-parquet")
hiveContext.table("documents").saveAsParquetFile("/user/spark/benchmark/documents-parquet")
205 changes: 178 additions & 27 deletions runner/prepare_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,12 @@ def parse_args():

parser.add_option("-m", "--impala", action="store_true", default=False,
help="Whether to include Impala")
parser.add_option("-s", "--shark", action="store_true", default=False,
help="Whether to include Shark")
parser.add_option("-s", "--spark", action="store_true", default=False,
help="Whether to include Spark SQL")
parser.add_option("-r", "--redshift", action="store_true", default=False,
help="Whether to include Redshift")
parser.add_option("--shark", action="store_true", default=False,
help="Whether to include Shark")
parser.add_option("--hive", action="store_true", default=False,
help="Whether to include Hive")
parser.add_option("--hive-tez", action="store_true", default=False,
Expand All @@ -57,18 +59,24 @@ def parse_args():

parser.add_option("-a", "--impala-host",
help="Hostname of Impala state store node")
parser.add_option("-b", "--shark-host",
help="Hostname of Shark master node")
parser.add_option("--impala-data-node",
help="Hostname of an Impala data node. Required for Impala conversion to Parquet")
parser.add_option("-b", "--spark-host",
help="Hostname of Spark master node")
parser.add_option("-c", "--redshift-host",
help="Hostname of Redshift ODBC endpoint")
parser.add_option("--shark-host",
help="Hostname of Shark master node")
parser.add_option("--hive-host",
help="Hostname of Hive master node")
parser.add_option("--hive-slaves",
help="Comma separated list of Hive slaves")

parser.add_option("-x", "--impala-identity-file",
help="SSH private key file to use for logging into Impala node")
parser.add_option("-y", "--shark-identity-file",
parser.add_option("-y", "--spark-identity-file",
help="SSH private key file to use for logging into Spark node")
parser.add_option("--shark-identity-file",
help="SSH private key file to use for logging into Shark node")
parser.add_option("--hive-identity-file",
help="SSH private key file to use for logging into Hive node")
Expand All @@ -84,6 +92,8 @@ def parse_args():
parser.add_option("-f", "--file-format", default="sequence-snappy",
help="File format to copy (text, text-deflate, "\
"sequence, or sequence-snappy)")
parser.add_option("--executor-memory", type="string", default="24G",
help="How much executor memory spark sql nodes should use")

parser.add_option("-d", "--aws-key-id",
help="Access key ID for AWS")
Expand All @@ -92,10 +102,12 @@ def parse_args():

parser.add_option("--skip-s3-import", action="store_true", default=False,
help="Assumes s3 data is already loaded")
parser.add_option("--parquet", action="store_true", default=False,
help="Convert benchmark data to parquet")

(opts, args) = parser.parse_args()

if not (opts.impala or opts.shark or opts.redshift or opts.hive or opts.hive_tez or opts.hive_cdh):
if not (opts.impala or opts.spark or opts.shark or opts.redshift or opts.hive or opts.hive_tez or opts.hive_cdh):
parser.print_help()
sys.exit(1)

Expand All @@ -112,6 +124,14 @@ def parse_args():
print >> stderr, "Impala requires identity file, hostname, and AWS creds"
sys.exit(1)

if opts.spark and (opts.spark_identity_file is None or
opts.spark_host is None or
opts.aws_key_id is None or
opts.aws_key is None):
print >> stderr, \
"Spark SQL requires identity file, shark hostname, and AWS credentials"
sys.exit(1)

if opts.shark and (opts.shark_identity_file is None or
opts.shark_host is None or
opts.aws_key_id is None or
Expand Down Expand Up @@ -174,6 +194,131 @@ def add_aws_credentials(remote_host, remote_user, identity_file,
out.close()
scp_to(remote_host, identity_file, remote_user, local_xml, remote_xml_file)

def prepare_spark_dataset(opts):
def ssh_spark(command):
command = "source /root/.bash_profile; %s" % command
ssh(opts.spark_host, "root", opts.spark_identity_file, command)

if not opts.skip_s3_import:
print "=== IMPORTING BENCHMARK DATA FROM S3 ==="
try:
ssh_spark("/root/ephemeral-hdfs/bin/hadoop fs -mkdir /user/spark/benchmark")
except Exception:
pass # Folder may already exist

add_aws_credentials(opts.spark_host, "root", opts.spark_identity_file,
"/root/ephemeral-hdfs/conf/core-site.xml", opts.aws_key_id, opts.aws_key)

ssh_spark("/root/ephemeral-hdfs/bin/start-mapred.sh")

ssh_spark(
"/root/ephemeral-hdfs/bin/hadoop distcp " \
"s3n://big-data-benchmark/pavlo/%s/%s/rankings/ " \
"/user/spark/benchmark/rankings/" % (opts.file_format, opts.data_prefix))

ssh_spark(
"/root/ephemeral-hdfs/bin/hadoop distcp " \
"s3n://big-data-benchmark/pavlo/%s/%s/uservisits/ " \
"/user/spark/benchmark/uservisits/" % (
opts.file_format, opts.data_prefix))

ssh_spark(
"/root/ephemeral-hdfs/bin/hadoop distcp " \
"s3n://big-data-benchmark/pavlo/%s/%s/crawl/ " \
"/user/spark/benchmark/crawl/" % (opts.file_format, opts.data_prefix))

print "=== CREATING HIVE TABLES FOR BENCHMARK ==="
hive_site = '''
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://NAMENODE:9000</value>
</property>
<property>
<name>fs.defaultFS</name>
<value>hdfs://NAMENODE:9000</value>
</property>
<property>
<name>mapred.job.tracker</name>
<value>NONE</value>
</property>
<property>
<name>mapreduce.framework.name</name>
<value>NONE</value>
</property>
</configuration>
'''.replace("NAMENODE", opts.spark_host).replace('\n', '')

ssh_spark('echo "%s" > ~/ephemeral-hdfs/conf/hive-site.xml' % hive_site)

scp_to(opts.spark_host, opts.spark_identity_file, "root", "udf/url_count.py",
"/root/url_count.py")
ssh_spark("/root/spark-ec2/copy-dir /root/url_count.py")

ssh_spark("/root/spark/sbin/start-thriftserver.sh --executor-memory %s" % opts.executor_memory)

#TODO: Should keep checking to see if the JDBC server has started yet
print "Sleeping for 30 seconds so the jdbc server can start"
time.sleep(30)

def beeline(query):
ssh_spark("/root/spark/bin/beeline -u jdbc:hive2://localhost:10000 -n root -e \"%s\"" % query)

beeline("DROP TABLE IF EXISTS rankings")
beeline(
"CREATE EXTERNAL TABLE rankings (pageURL STRING, pageRank INT, " \
"avgDuration INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY \\\",\\\" " \
"STORED AS TEXTFILE LOCATION \\\"/user/spark/benchmark/rankings\\\";")

beeline("DROP TABLE IF EXISTS uservisits;")
beeline(
"CREATE EXTERNAL TABLE uservisits (sourceIP STRING,destURL STRING," \
"visitDate STRING,adRevenue DOUBLE,userAgent STRING,countryCode STRING," \
"languageCode STRING,searchWord STRING,duration INT ) " \
"ROW FORMAT DELIMITED FIELDS TERMINATED BY \\\",\\\" " \
"STORED AS TEXTFILE LOCATION \\\"/user/spark/benchmark/uservisits\\\";")

beeline("DROP TABLE IF EXISTS documents;")
beeline(
"CREATE EXTERNAL TABLE documents (line STRING) STORED AS TEXTFILE " \
"LOCATION \\\"/user/spark/benchmark/crawl\\\";")

if opts.parquet:
ssh_spark("/root/spark/sbin/stop-thriftserver.sh")

print "Sleeping for 30 seconds so the jdbc server can stop"
time.sleep(30)

scp_to(opts.spark_host, opts.spark_identity_file, "root", "parquet/convert_to_parquet.py",
"/root/convert_to_parquet.py")
ssh_spark("/root/spark/bin/spark-submit --master spark://%s:7077 /root/convert_to_parquet.py" % opts.spark_host)

ssh_spark("/root/spark/sbin/start-thriftserver.sh --executor-memory %s" % opts.executor_memory)

print "Sleeping for 30 seconds so the jdbc server can start"
time.sleep(30)

beeline("DROP TABLE IF EXISTS rankings")
beeline(
"CREATE EXTERNAL TABLE rankings (pageURL STRING, pageRank INT, " \
"avgDuration INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY \\\",\\\" " \
"STORED AS PARQUET LOCATION \\\"/user/spark/benchmark/rankings-parquet\\\";")

beeline("DROP TABLE IF EXISTS uservisits;")
beeline(
"CREATE EXTERNAL TABLE uservisits (sourceIP STRING,destURL STRING," \
"visitDate STRING,adRevenue DOUBLE,userAgent STRING,countryCode STRING," \
"languageCode STRING,searchWord STRING,duration INT ) " \
"ROW FORMAT DELIMITED FIELDS TERMINATED BY \\\",\\\" " \
"STORED AS PARQUET LOCATION \\\"/user/spark/benchmark/uservisits-parquet\\\";")

beeline("DROP TABLE IF EXISTS documents;")
beeline(
"CREATE EXTERNAL TABLE documents (line STRING) STORED AS PARQUET " \
"LOCATION \\\"/user/spark/benchmark/documents-parquet\\\";")

print "=== FINISHED CREATING BENCHMARK DATA ==="

def prepare_shark_dataset(opts):
def ssh_shark(command):
command = "source /root/.bash_profile; %s" % command
Expand All @@ -182,34 +327,34 @@ def ssh_shark(command):
if not opts.skip_s3_import:
print "=== IMPORTING BENCHMARK DATA FROM S3 ==="
try:
ssh_shark("/root/ephemeral-hdfs/bin/hdfs dfs -mkdir /user/shark/benchmark")
ssh_shark("/root/ephemeral-hdfs/bin/hadoop fs -mkdir /user/shark/benchmark")
except Exception:
pass # Folder may already exist

add_aws_credentials(opts.shark_host, "root", opts.shark_identity_file,
"/root/mapreduce/conf/core-site.xml", opts.aws_key_id, opts.aws_key)
"/root/ephemeral-hdfs/conf/core-site.xml", opts.aws_key_id, opts.aws_key)

ssh_shark("/root/mapreduce/bin/start-mapred.sh")
ssh_shark("/root/ephemeral-hdfs/bin/start-mapred.sh")

ssh_shark(
"/root/mapreduce/bin/hadoop distcp " \
"/root/ephemeral-hdfs/bin/hadoop distcp " \
"s3n://big-data-benchmark/pavlo/%s/%s/rankings/ " \
"/user/shark/benchmark/rankings/" % (opts.file_format, opts.data_prefix))

ssh_shark(
"/root/mapreduce/bin/hadoop distcp " \
"/root/ephemeral-hdfs/bin/hadoop distcp " \
"s3n://big-data-benchmark/pavlo/%s/%s/uservisits/ " \
"/user/shark/benchmark/uservisits/" % (
opts.file_format, opts.data_prefix))

ssh_shark(
"/root/mapreduce/bin/hadoop distcp " \
"/root/ephemeral-hdfs/bin/hadoop distcp " \
"s3n://big-data-benchmark/pavlo/%s/%s/crawl/ " \
"/user/shark/benchmark/crawl/" % (opts.file_format, opts.data_prefix))

# Scratch table used for JVM warmup
ssh_shark(
"/root/mapreduce/bin/hadoop distcp /user/shark/benchmark/rankings " \
"/root/ephemeral-hdfs/bin/hadoop distcp /user/shark/benchmark/rankings " \
"/user/shark/benchmark/scratch"
)

Expand Down Expand Up @@ -241,15 +386,6 @@ def ssh_shark(command):
"/root/url_count.py")
ssh_shark("/root/spark-ec2/copy-dir /root/url_count.py")

ssh_shark("""
mv shark shark-back;
git clone https://github.com/ahirreddy/shark.git -b branch-0.8;
cp shark-back/conf/shark-env.sh shark/conf/shark-env.sh;
cd shark;
sbt/sbt assembly;
/root/spark-ec2/copy-dir --delete /root/shark;
""")

ssh_shark(
"/root/shark/bin/shark -e \"DROP TABLE IF EXISTS rankings; " \
"CREATE EXTERNAL TABLE rankings (pageURL STRING, pageRank INT, " \
Expand Down Expand Up @@ -278,7 +414,7 @@ def ssh_shark(command):

def prepare_impala_dataset(opts):
def ssh_impala(command):
ssh(opts.impala_host, "ubuntu", opts.impala_identity_file, command)
ssh(opts.impala_host, "root", opts.impala_identity_file, command)

if not opts.skip_s3_import:
print "=== IMPORTING BENCHMARK FROM S3 ==="
Expand All @@ -290,9 +426,9 @@ def ssh_impala(command):
ssh_impala("sudo chmod 777 /etc/hadoop/conf/hdfs-site.xml")
ssh_impala("sudo chmod 777 /etc/hadoop/conf/core-site.xml")

add_aws_credentials(opts.impala_host, "ubuntu", opts.impala_identity_file,
add_aws_credentials(opts.impala_host, "root", opts.impala_identity_file,
"/etc/hadoop/conf/hdfs-site.xml", opts.aws_key_id, opts.aws_key)
add_aws_credentials(opts.impala_host, "ubuntu", opts.impala_identity_file,
add_aws_credentials(opts.impala_host, "root", opts.impala_identity_file,
"/etc/hadoop/conf/core-site.xml", opts.aws_key_id, opts.aws_key)

ssh_impala(
Expand Down Expand Up @@ -329,6 +465,19 @@ def ssh_impala(command):
"TERMINATED BY \\\"\\001\\\" " \
"STORED AS SEQUENCEFILE LOCATION \\\"/tmp/benchmark/scratch\\\";\"")

if opts.parquet:
print "=== CONVERTING TABLES TO PARQUET ==="

ssh(opts.impala_data_node, "root", opts.impala_identity_file,
"impala-shell -r -q \"CREATE TABLE rankings_parquet STORED AS PARQUET AS SELECT * FROM rankings; " \
"DROP TABLE rankings; " \
"ALTER TABLE rankings_parquet RENAME TO rankings;\"")

ssh(opts.impala_data_node, "root", opts.impala_identity_file,
"impala-shell -r -q \"CREATE TABLE uservisits_parquet STORED AS PARQUET AS SELECT * FROM uservisits; " \
"DROP TABLE uservisits; " \
"ALTER TABLE uservisits_parquet RENAME TO uservisits;\"")

print "=== FINISHED CREATING BENCHMARK DATA ==="

def prepare_hive_dataset(opts):
Expand Down Expand Up @@ -590,10 +739,12 @@ def main():

if opts.impala:
prepare_impala_dataset(opts)
if opts.shark:
prepare_shark_dataset(opts)
if opts.spark:
prepare_spark_dataset(opts)
if opts.redshift:
prepare_redshift_dataset(opts)
if opts.shark:
prepare_shark_dataset(opts)
if opts.hive:
prepare_hive_dataset(opts)
if opts.hive_tez:
Expand Down
Loading