From 05bfa6b016cbd1072b38ebc520014479dcb24afb Mon Sep 17 00:00:00 2001 From: Tomer Kaftan Date: Fri, 16 Jan 2015 17:49:19 -0800 Subject: [PATCH 1/6] Updated shark to shark 0.9.1 launched from the spark 1.2 ec2 scripts --- runner/prepare_benchmark.py | 23 +++++++---------------- runner/run_query.py | 6 +++--- 2 files changed, 10 insertions(+), 19 deletions(-) diff --git a/runner/prepare_benchmark.py b/runner/prepare_benchmark.py index b71bb75..a3d6e56 100644 --- a/runner/prepare_benchmark.py +++ b/runner/prepare_benchmark.py @@ -182,34 +182,34 @@ def ssh_shark(command): if not opts.skip_s3_import: print "=== IMPORTING BENCHMARK DATA FROM S3 ===" try: - ssh_shark("/root/ephemeral-hdfs/bin/hdfs dfs -mkdir /user/shark/benchmark") + ssh_shark("/root/ephemeral-hdfs/bin/hadoop fs -mkdir /user/shark/benchmark") except Exception: pass # Folder may already exist add_aws_credentials(opts.shark_host, "root", opts.shark_identity_file, - "/root/mapreduce/conf/core-site.xml", opts.aws_key_id, opts.aws_key) + "/root/ephemeral-hdfs/conf/core-site.xml", opts.aws_key_id, opts.aws_key) - ssh_shark("/root/mapreduce/bin/start-mapred.sh") + ssh_shark("/root/ephemeral-hdfs/bin/start-mapred.sh") ssh_shark( - "/root/mapreduce/bin/hadoop distcp " \ + "/root/ephemeral-hdfs/bin/hadoop distcp " \ "s3n://big-data-benchmark/pavlo/%s/%s/rankings/ " \ "/user/shark/benchmark/rankings/" % (opts.file_format, opts.data_prefix)) ssh_shark( - "/root/mapreduce/bin/hadoop distcp " \ + "/root/ephemeral-hdfs/bin/hadoop distcp " \ "s3n://big-data-benchmark/pavlo/%s/%s/uservisits/ " \ "/user/shark/benchmark/uservisits/" % ( opts.file_format, opts.data_prefix)) ssh_shark( - "/root/mapreduce/bin/hadoop distcp " \ + "/root/ephemeral-hdfs/bin/hadoop distcp " \ "s3n://big-data-benchmark/pavlo/%s/%s/crawl/ " \ "/user/shark/benchmark/crawl/" % (opts.file_format, opts.data_prefix)) # Scratch table used for JVM warmup ssh_shark( - "/root/mapreduce/bin/hadoop distcp /user/shark/benchmark/rankings " \ + "/root/ephemeral-hdfs/bin/hadoop distcp /user/shark/benchmark/rankings " \ "/user/shark/benchmark/scratch" ) @@ -241,15 +241,6 @@ def ssh_shark(command): "/root/url_count.py") ssh_shark("/root/spark-ec2/copy-dir /root/url_count.py") - ssh_shark(""" - mv shark shark-back; - git clone https://github.com/ahirreddy/shark.git -b branch-0.8; - cp shark-back/conf/shark-env.sh shark/conf/shark-env.sh; - cd shark; - sbt/sbt assembly; - /root/spark-ec2/copy-dir --delete /root/shark; - """) - ssh_shark( "/root/shark/bin/shark -e \"DROP TABLE IF EXISTS rankings; " \ "CREATE EXTERNAL TABLE rankings (pageURL STRING, pageRank INT, " \ diff --git a/runner/run_query.py b/runner/run_query.py index 8c7407b..3d18320 100644 --- a/runner/run_query.py +++ b/runner/run_query.py @@ -307,11 +307,11 @@ def ssh_shark(command): slaves = map(str.strip, open(local_slaves_file).readlines()) print "Restarting standalone scheduler..." - ssh_shark("/root/spark/bin/stop-all.sh") + ssh_shark("/root/ephemeral-hdfs/bin/stop-all.sh") ensure_spark_stopped_on_slaves(slaves) time.sleep(30) - ssh_shark("/root/spark/bin/stop-all.sh") - ssh_shark("/root/spark/bin/start-all.sh") + ssh_shark("/root/ephemeral-hdfs/bin/stop-all.sh") + ssh_shark("/root/ephemeral-hdfs/bin/start-all.sh") time.sleep(10) # Two modes here: Shark Mem and Shark Disk. If using Shark disk clear buffer From 26362482382749f37d733bcc5f9d8d030dc913c5 Mon Sep 17 00:00:00 2001 From: Tomer Kaftan Date: Sat, 17 Jan 2015 14:51:19 -0800 Subject: [PATCH 2/6] Added Spark SQL to the Big Data Benchmark --- runner/prepare_benchmark.py | 125 ++++++++++++++++++++-- runner/run_query.py | 206 +++++++++++++++++++++++++++++++++--- 2 files changed, 310 insertions(+), 21 deletions(-) diff --git a/runner/prepare_benchmark.py b/runner/prepare_benchmark.py index a3d6e56..e0f0e87 100644 --- a/runner/prepare_benchmark.py +++ b/runner/prepare_benchmark.py @@ -44,10 +44,12 @@ def parse_args(): parser.add_option("-m", "--impala", action="store_true", default=False, help="Whether to include Impala") - parser.add_option("-s", "--shark", action="store_true", default=False, - help="Whether to include Shark") + parser.add_option("-s", "--spark", action="store_true", default=False, + help="Whether to include Spark SQL") parser.add_option("-r", "--redshift", action="store_true", default=False, help="Whether to include Redshift") + parser.add_option("--shark", action="store_true", default=False, + help="Whether to include Shark") parser.add_option("--hive", action="store_true", default=False, help="Whether to include Hive") parser.add_option("--hive-tez", action="store_true", default=False, @@ -57,10 +59,12 @@ def parse_args(): parser.add_option("-a", "--impala-host", help="Hostname of Impala state store node") - parser.add_option("-b", "--shark-host", - help="Hostname of Shark master node") + parser.add_option("-b", "--spark-host", + help="Hostname of Spark master node") parser.add_option("-c", "--redshift-host", help="Hostname of Redshift ODBC endpoint") + parser.add_option("--shark-host", + help="Hostname of Shark master node") parser.add_option("--hive-host", help="Hostname of Hive master node") parser.add_option("--hive-slaves", @@ -68,7 +72,9 @@ def parse_args(): parser.add_option("-x", "--impala-identity-file", help="SSH private key file to use for logging into Impala node") - parser.add_option("-y", "--shark-identity-file", + parser.add_option("-y", "--spark-identity-file", + help="SSH private key file to use for logging into Spark node") + parser.add_option("--shark-identity-file", help="SSH private key file to use for logging into Shark node") parser.add_option("--hive-identity-file", help="SSH private key file to use for logging into Hive node") @@ -95,7 +101,7 @@ def parse_args(): (opts, args) = parser.parse_args() - if not (opts.impala or opts.shark or opts.redshift or opts.hive or opts.hive_tez or opts.hive_cdh): + if not (opts.impala or opts.spark or opts.shark or opts.redshift or opts.hive or opts.hive_tez or opts.hive_cdh): parser.print_help() sys.exit(1) @@ -112,6 +118,14 @@ def parse_args(): print >> stderr, "Impala requires identity file, hostname, and AWS creds" sys.exit(1) + if opts.spark and (opts.spark_identity_file is None or + opts.spark_host is None or + opts.aws_key_id is None or + opts.aws_key is None): + print >> stderr, \ + "Spark SQL requires identity file, shark hostname, and AWS credentials" + sys.exit(1) + if opts.shark and (opts.shark_identity_file is None or opts.shark_host is None or opts.aws_key_id is None or @@ -174,6 +188,99 @@ def add_aws_credentials(remote_host, remote_user, identity_file, out.close() scp_to(remote_host, identity_file, remote_user, local_xml, remote_xml_file) +def prepare_spark_dataset(opts): + def ssh_spark(command): + command = "source /root/.bash_profile; %s" % command + ssh(opts.spark_host, "root", opts.spark_identity_file, command) + + if not opts.skip_s3_import: + print "=== IMPORTING BENCHMARK DATA FROM S3 ===" + try: + ssh_spark("/root/ephemeral-hdfs/bin/hadoop fs -mkdir /user/spark/benchmark") + except Exception: + pass # Folder may already exist + + add_aws_credentials(opts.spark_host, "root", opts.spark_identity_file, + "/root/ephemeral-hdfs/conf/core-site.xml", opts.aws_key_id, opts.aws_key) + + ssh_spark("/root/ephemeral-hdfs/bin/start-mapred.sh") + + ssh_spark( + "/root/ephemeral-hdfs/bin/hadoop distcp " \ + "s3n://big-data-benchmark/pavlo/%s/%s/rankings/ " \ + "/user/spark/benchmark/rankings/" % (opts.file_format, opts.data_prefix)) + + ssh_spark( + "/root/ephemeral-hdfs/bin/hadoop distcp " \ + "s3n://big-data-benchmark/pavlo/%s/%s/uservisits/ " \ + "/user/spark/benchmark/uservisits/" % ( + opts.file_format, opts.data_prefix)) + + ssh_spark( + "/root/ephemeral-hdfs/bin/hadoop distcp " \ + "s3n://big-data-benchmark/pavlo/%s/%s/crawl/ " \ + "/user/spark/benchmark/crawl/" % (opts.file_format, opts.data_prefix)) + + # Scratch table used for JVM warmup + ssh_spark( + "/root/ephemeral-hdfs/bin/hadoop distcp /user/spark/benchmark/rankings " \ + "/user/spark/benchmark/scratch" + ) + + print "=== CREATING HIVE TABLES FOR BENCHMARK ===" + hive_site = ''' + + + fs.default.name + hdfs://NAMENODE:9000 + + + fs.defaultFS + hdfs://NAMENODE:9000 + + + mapred.job.tracker + NONE + + + mapreduce.framework.name + NONE + + + '''.replace("NAMENODE", opts.spark_host).replace('\n', '') + + ssh_spark('echo "%s" > ~/ephemeral-hdfs/conf/hive-site.xml' % hive_site) + + scp_to(opts.spark_host, opts.spark_identity_file, "root", "udf/url_count.py", + "/root/url_count.py") + ssh_spark("/root/spark-ec2/copy-dir /root/url_count.py") + + ssh_spark( + "/root/spark/bin/spark-sql -e \"DROP TABLE IF EXISTS rankings; " \ + "CREATE EXTERNAL TABLE rankings (pageURL STRING, pageRank INT, " \ + "avgDuration INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY \\\",\\\" " \ + "STORED AS TEXTFILE LOCATION \\\"/user/spark/benchmark/rankings\\\";\"") + + ssh_spark( + "/root/spark/bin/spark-sql -e \"DROP TABLE IF EXISTS scratch; " \ + "CREATE EXTERNAL TABLE scratch (pageURL STRING, pageRank INT, " \ + "avgDuration INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY \\\",\\\" " \ + "STORED AS TEXTFILE LOCATION \\\"/user/spark/benchmark/scratch\\\";\"") + + ssh_spark( + "/root/spark/bin/spark-sql -e \"DROP TABLE IF EXISTS uservisits; " \ + "CREATE EXTERNAL TABLE uservisits (sourceIP STRING,destURL STRING," \ + "visitDate STRING,adRevenue DOUBLE,userAgent STRING,countryCode STRING," \ + "languageCode STRING,searchWord STRING,duration INT ) " \ + "ROW FORMAT DELIMITED FIELDS TERMINATED BY \\\",\\\" " \ + "STORED AS TEXTFILE LOCATION \\\"/user/spark/benchmark/uservisits\\\";\"") + + ssh_spark("/root/spark/bin/spark-sql -e \"DROP TABLE IF EXISTS documents; " \ + "CREATE EXTERNAL TABLE documents (line STRING) STORED AS TEXTFILE " \ + "LOCATION \\\"/user/spark/benchmark/crawl\\\";\"") + + print "=== FINISHED CREATING BENCHMARK DATA ===" + def prepare_shark_dataset(opts): def ssh_shark(command): command = "source /root/.bash_profile; %s" % command @@ -581,10 +688,12 @@ def main(): if opts.impala: prepare_impala_dataset(opts) - if opts.shark: - prepare_shark_dataset(opts) + if opts.spark: + prepare_spark_dataset(opts) if opts.redshift: prepare_redshift_dataset(opts) + if opts.shark: + prepare_shark_dataset(opts) if opts.hive: prepare_hive_dataset(opts) if opts.hive_tez: diff --git a/runner/run_query.py b/runner/run_query.py index 3d18320..7238f68 100644 --- a/runner/run_query.py +++ b/runner/run_query.py @@ -171,10 +171,12 @@ def parse_args(): parser.add_option("-m", "--impala", action="store_true", default=False, help="Whether to include Impala") - parser.add_option("-s", "--shark", action="store_true", default=False, - help="Whether to include Shark") + parser.add_option("-s", "--spark", action="store_true", default=False, + help="Whether to include Spark SQL") parser.add_option("-r", "--redshift", action="store_true", default=False, help="Whether to include Redshift") + parser.add_option("--shark", action="store_true", default=False, + help="Whether to include Shark") parser.add_option("--hive", action="store_true", default=False, help="Whether to include Hive") parser.add_option("--tez", action="store_true", default=False, @@ -182,7 +184,9 @@ def parse_args(): parser.add_option("--hive-cdh", action="store_true", default=False, help="Hive on CDH cluster") - parser.add_option("-g", "--shark-no-cache", action="store_true", + parser.add_option("-g", "--spark-no-cache", action="store_true", + default=False, help="Disable caching in Spark SQL") + parser.add_option("--shark-no-cache", action="store_true", default=False, help="Disable caching in Shark") parser.add_option("--impala-use-hive", action="store_true", default=False, help="Use Hive for query executio on Impala nodes") @@ -193,10 +197,12 @@ def parse_args(): parser.add_option("-a", "--impala-hosts", help="Hostnames of Impala nodes (comma seperated)") - parser.add_option("-b", "--shark-host", - help="Hostname of Shark master node") + parser.add_option("-b", "--spark-host", + help="Hostname of Spark master node") parser.add_option("-c", "--redshift-host", help="Hostname of Redshift ODBC endpoint") + parser.add_option("--shark-host", + help="Hostname of Shark master node") parser.add_option("--hive-host", help="Hostname of Hive master node") parser.add_option("--hive-slaves", @@ -204,7 +210,9 @@ def parse_args(): parser.add_option("-x", "--impala-identity-file", help="SSH private key file to use for logging into Impala node") - parser.add_option("-y", "--shark-identity-file", + parser.add_option("-y", "--spark-identity-file", + help="SSH private key file to use for logging into Spark node") + parser.add_option("--shark-identity-file", help="SSH private key file to use for logging into Shark node") parser.add_option("--hive-identity-file", help="SSH private key file to use for logging into Hive node") @@ -225,7 +233,7 @@ def parse_args(): (opts, args) = parser.parse_args() - if not (opts.impala or opts.shark or opts.redshift or opts.hive or opts.hive_cdh): + if not (opts.impala or opts.spark or opts.shark or opts.redshift or opts.hive or opts.hive_cdh): parser.print_help() sys.exit(1) @@ -234,6 +242,12 @@ def parse_args(): print >> stderr, "Impala requires identity file and hostname" sys.exit(1) + if opts.spark and (opts.spark_identity_file is None or + opts.spark_host is None): + print >> stderr, \ + "Spark requires identity file and hostname" + sys.exit(1) + if opts.shark and (opts.shark_identity_file is None or opts.shark_host is None): print >> stderr, \ @@ -281,6 +295,150 @@ def scp_from(host, identity_file, username, remote_file, local_file): "scp -q -o StrictHostKeyChecking=no -i %s '%s@%s:%s' '%s'" % (identity_file, username, host, remote_file, local_file), shell=True) +def run_spark_benchmark(opts): + def ssh_spark(command): + command = "source /root/.bash_profile; %s" % command + ssh(opts.spark_host, "root", opts.spark_identity_file, command) + + local_clean_query = CLEAN_QUERY + local_query_map = QUERY_MAP + + prefix = str(time.time()).split(".")[0] + query_file_name = "%s_workload.sh" % prefix + slaves_file_name = "%s_slaves" % prefix + local_query_file = os.path.join(LOCAL_TMP_DIR, query_file_name) + local_slaves_file = os.path.join(LOCAL_TMP_DIR, slaves_file_name) + query_file = open(local_query_file, 'w') + remote_result_file = "/mnt/%s_results" % prefix + remote_tmp_file = "/mnt/%s_out" % prefix + remote_query_file = "/mnt/%s" % query_file_name + + runner = "/root/spark/bin/spark-sql" + + print "Getting Slave List" + scp_from(opts.spark_host, opts.spark_identity_file, "root", + "/root/spark-ec2/slaves", local_slaves_file) + slaves = map(str.strip, open(local_slaves_file).readlines()) + + print "Restarting standalone scheduler..." + ssh_spark("/root/ephemeral-hdfs/bin/stop-all.sh") + ensure_spark_stopped_on_slaves(slaves) + time.sleep(30) + ssh_spark("/root/ephemeral-hdfs/bin/stop-all.sh") + ssh_spark("/root/ephemeral-hdfs/bin/start-all.sh") + time.sleep(10) + + # Two modes here: Spark SQL Mem and Spark SQL Disk. If using Spark SQL disk use + # uncached tables. If using Spark SQL Mem, used cached tables. + + query_list = "set spark.sql.shuffle.partitions = %s;" % opts.reduce_tasks + + # Throw away query for JVM warmup + query_list += "SELECT COUNT(*) FROM scratch;" + + # Create cached queries for Spark SQL Mem + if not opts.spark_no_cache: + + # Set up cached tables + if '4' in opts.query_num: + # Query 4 uses entirely different tables + query_list += """ + CACHE TABLE documents; + SELECT COUNT(1) FROM documents; + """ + else: + query_list += """ + CACHE TABLE uservisits; + CACHE TABLE rankings; + SELECT COUNT(1) FROM uservisits; + SELECT COUNT(1) FROM rankings; + """ + else: + # Uncache tables if necessary + if '4' in opts.query_num: + # Query 4 uses entirely different tables + query_list += """ + UNCACHE TABLE documents; + """ + else: + query_list += """ + UNCACHE TABLE uservisits; + UNCACHE TABLE rankings; + """ + + + # Warm up for Query 1 + if '1' in opts.query_num: + query_list += "DROP TABLE IF EXISTS warmup;" + query_list += "CREATE TABLE warmup AS SELECT pageURL, pageRank FROM scratch WHERE pageRank > 1000;" + + if '4' not in opts.query_num: + query_list += local_clean_query + query_list += local_query_map[opts.query_num][0] + + query_list = re.sub("\s\s+", " ", query_list.replace('\n', ' ')) + + print "\nQuery:" + print query_list.replace(';', ";\n") + + query_file.write( + "%s -e '%s' > %s 2>&1\n" % (runner, query_list, remote_tmp_file)) + + query_file.write( + "cat %s | grep Time | grep -v INFO |grep -v MapReduce >> %s\n" % ( + remote_tmp_file, remote_result_file)) + + query_file.close() + + print "Copying files to Spark" + scp_to(opts.spark_host, opts.spark_identity_file, "root", local_query_file, + remote_query_file) + ssh_spark("chmod 775 %s" % remote_query_file) + + # Run benchmark + print "Running remote benchmark..." + + # Collect results + results = [] + contents = [] + + for i in range(opts.num_trials): + print "Stopping Executors on Slaves....." + ensure_spark_stopped_on_slaves(slaves) + print "Query %s : Trial %i" % (opts.query_num, i+1) + ssh_spark("%s" % remote_query_file) + local_results_file = os.path.join(LOCAL_TMP_DIR, "%s_results" % prefix) + scp_from(opts.spark_host, opts.spark_identity_file, "root", + "/mnt/%s_results" % prefix, local_results_file) + content = open(local_results_file).readlines() + all_times = map(lambda x: float(x.split(": ")[1].split(" ")[0]), content) + + if '4' in opts.query_num: + query_times = all_times[-4:] + part_a = query_times[1] + part_b = query_times[3] + print "Parts: %s, %s" % (part_a, part_b) + result = float(part_a) + float(part_b) + else: + result = all_times[-1] # Only want time of last query + + print "Result: ", result + print "Raw Times: ", content + + results.append(result) + contents.append(content) + + # Clean-up + #ssh_shark("rm /mnt/%s*" % prefix) + print "Clean Up...." + ssh_spark("rm /mnt/%s_results" % prefix) + os.remove(local_results_file) + + os.remove(local_slaves_file) + os.remove(local_query_file) + + return results, contents + def run_shark_benchmark(opts): def ssh_shark(command): command = "source /root/.bash_profile; %s" % command @@ -308,7 +466,7 @@ def ssh_shark(command): print "Restarting standalone scheduler..." ssh_shark("/root/ephemeral-hdfs/bin/stop-all.sh") - ensure_spark_stopped_on_slaves(slaves) + ensure_shark_stopped_on_slaves(slaves) time.sleep(30) ssh_shark("/root/ephemeral-hdfs/bin/stop-all.sh") ssh_shark("/root/ephemeral-hdfs/bin/start-all.sh") @@ -386,7 +544,7 @@ def convert_to_cached(query): for i in range(opts.num_trials): print "Stopping Executors on Slaves....." - ensure_spark_stopped_on_slaves(slaves) + ensure_shark_stopped_on_slaves(slaves) print "Query %s : Trial %i" % (opts.query_num, i+1) ssh_shark("%s" % remote_query_file) local_results_file = os.path.join(LOCAL_TMP_DIR, "%s_results" % prefix) @@ -768,6 +926,20 @@ def ssh_ret_code(host, user, id_file, cmd): return e.returncode def ensure_spark_stopped_on_slaves(slaves): + stop = False + while not stop: + cmd = "jps | grep ExecutorBackend" + ret_vals = map(lambda s: ssh_ret_code(s, "root", opts.spark_identity_file, cmd), slaves) + print ret_vals + if 0 in ret_vals: + print "Spark is still running on some slaves... sleeping" + cmd = "jps | grep ExecutorBackend | cut -d \" \" -f 1 | xargs -rn1 kill -9" + map(lambda s: ssh_ret_code(s, "root", opts.spark_identity_file, cmd), slaves) + time.sleep(2) + else: + stop = True + +def ensure_shark_stopped_on_slaves(slaves): stop = False while not stop: cmd = "jps | grep ExecutorBackend" @@ -789,8 +961,10 @@ def main(): if opts.impala: results, contents = run_impala_benchmark(opts) + if opts.spark: + results, contents = run_spark_benchmark(opts) if opts.shark: - results, contents = run_shark_benchmark(opts) + results, contents = run_shark_benchmark(opts) if opts.redshift: results = run_redshift_benchmark(opts) if opts.hive: @@ -803,10 +977,16 @@ def main(): fname = "impala_disk" else: fname = "impala_mem" - elif opts.shark and opts.shark_no_cache: - fname = "shark_disk" + elif opts.spark: + if opts.spark_no_cache: + fname = "spark_disk" + else: + fname = "spark_mem" elif opts.shark: - fname = "shark_mem" + if opts.shark_no_cache: + fname = "shark_disk" + else: + fname = "shark_mem" elif opts.redshift: fname = "redshift" elif opts.hive: From 2c34ab94c513757b035c73f4956327f919111237 Mon Sep 17 00:00:00 2001 From: Tomer Kaftan Date: Tue, 20 Jan 2015 14:55:20 -0800 Subject: [PATCH 3/6] Made spark sql use the thrift jdbc instead of cli --- runner/prepare_benchmark.py | 32 +++++++++++++++++++++----------- runner/run_query.py | 33 ++++++--------------------------- 2 files changed, 27 insertions(+), 38 deletions(-) diff --git a/runner/prepare_benchmark.py b/runner/prepare_benchmark.py index e0f0e87..ed8e131 100644 --- a/runner/prepare_benchmark.py +++ b/runner/prepare_benchmark.py @@ -255,29 +255,39 @@ def ssh_spark(command): "/root/url_count.py") ssh_spark("/root/spark-ec2/copy-dir /root/url_count.py") - ssh_spark( - "/root/spark/bin/spark-sql -e \"DROP TABLE IF EXISTS rankings; " \ + ssh_spark("/root/spark/sbin/start-thriftserver.sh") + + #TODO: Should keep checking to see if the JDBC server has started yet + print "Sleeping for 30 seconds so the jdbc server can start" + time.sleep(30) + + def beeline(query): + ssh_spark("/root/spark/bin/beeline -u jdbc:hive2://localhost:10000 -n root -e \"%s\"" % query) + + beeline("DROP TABLE IF EXISTS rankings") + beeline( "CREATE EXTERNAL TABLE rankings (pageURL STRING, pageRank INT, " \ "avgDuration INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY \\\",\\\" " \ - "STORED AS TEXTFILE LOCATION \\\"/user/spark/benchmark/rankings\\\";\"") + "STORED AS TEXTFILE LOCATION \\\"/user/spark/benchmark/rankings\\\";") - ssh_spark( - "/root/spark/bin/spark-sql -e \"DROP TABLE IF EXISTS scratch; " \ + beeline("DROP TABLE IF EXISTS scratch;") + beeline( "CREATE EXTERNAL TABLE scratch (pageURL STRING, pageRank INT, " \ "avgDuration INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY \\\",\\\" " \ - "STORED AS TEXTFILE LOCATION \\\"/user/spark/benchmark/scratch\\\";\"") + "STORED AS TEXTFILE LOCATION \\\"/user/spark/benchmark/scratch\\\";") - ssh_spark( - "/root/spark/bin/spark-sql -e \"DROP TABLE IF EXISTS uservisits; " \ + beeline("DROP TABLE IF EXISTS uservisits;") + beeline( "CREATE EXTERNAL TABLE uservisits (sourceIP STRING,destURL STRING," \ "visitDate STRING,adRevenue DOUBLE,userAgent STRING,countryCode STRING," \ "languageCode STRING,searchWord STRING,duration INT ) " \ "ROW FORMAT DELIMITED FIELDS TERMINATED BY \\\",\\\" " \ - "STORED AS TEXTFILE LOCATION \\\"/user/spark/benchmark/uservisits\\\";\"") + "STORED AS TEXTFILE LOCATION \\\"/user/spark/benchmark/uservisits\\\";") - ssh_spark("/root/spark/bin/spark-sql -e \"DROP TABLE IF EXISTS documents; " \ + beeline("DROP TABLE IF EXISTS documents;") + beeline( "CREATE EXTERNAL TABLE documents (line STRING) STORED AS TEXTFILE " \ - "LOCATION \\\"/user/spark/benchmark/crawl\\\";\"") + "LOCATION \\\"/user/spark/benchmark/crawl\\\";") print "=== FINISHED CREATING BENCHMARK DATA ===" diff --git a/runner/run_query.py b/runner/run_query.py index 7238f68..c6fdea1 100644 --- a/runner/run_query.py +++ b/runner/run_query.py @@ -190,8 +190,8 @@ def parse_args(): default=False, help="Disable caching in Shark") parser.add_option("--impala-use-hive", action="store_true", default=False, help="Use Hive for query executio on Impala nodes") - parser.add_option("-t", "--reduce-tasks", type="int", default=150, - help="Number of reduce tasks in Shark") + parser.add_option("-t", "--reduce-tasks", type="int", default=200, + help="Number of reduce tasks in Shark & Spark SQL") parser.add_option("-z", "--clear-buffer-cache", action="store_true", default=False, help="Clear disk buffer cache between query runs") @@ -305,28 +305,13 @@ def ssh_spark(command): prefix = str(time.time()).split(".")[0] query_file_name = "%s_workload.sh" % prefix - slaves_file_name = "%s_slaves" % prefix local_query_file = os.path.join(LOCAL_TMP_DIR, query_file_name) - local_slaves_file = os.path.join(LOCAL_TMP_DIR, slaves_file_name) query_file = open(local_query_file, 'w') remote_result_file = "/mnt/%s_results" % prefix remote_tmp_file = "/mnt/%s_out" % prefix remote_query_file = "/mnt/%s" % query_file_name - runner = "/root/spark/bin/spark-sql" - - print "Getting Slave List" - scp_from(opts.spark_host, opts.spark_identity_file, "root", - "/root/spark-ec2/slaves", local_slaves_file) - slaves = map(str.strip, open(local_slaves_file).readlines()) - - print "Restarting standalone scheduler..." - ssh_spark("/root/ephemeral-hdfs/bin/stop-all.sh") - ensure_spark_stopped_on_slaves(slaves) - time.sleep(30) - ssh_spark("/root/ephemeral-hdfs/bin/stop-all.sh") - ssh_spark("/root/ephemeral-hdfs/bin/start-all.sh") - time.sleep(10) + runner = "/root/spark/bin/beeline -u jdbc:hive2://localhost:10000 -n root" # Two modes here: Spark SQL Mem and Spark SQL Disk. If using Spark SQL disk use # uncached tables. If using Spark SQL Mem, used cached tables. @@ -344,14 +329,11 @@ def ssh_spark(command): # Query 4 uses entirely different tables query_list += """ CACHE TABLE documents; - SELECT COUNT(1) FROM documents; """ else: query_list += """ CACHE TABLE uservisits; CACHE TABLE rankings; - SELECT COUNT(1) FROM uservisits; - SELECT COUNT(1) FROM rankings; """ else: # Uncache tables if necessary @@ -382,10 +364,10 @@ def ssh_spark(command): print query_list.replace(';', ";\n") query_file.write( - "%s -e '%s' > %s 2>&1\n" % (runner, query_list, remote_tmp_file)) + "%s %s > %s 2>&1\n" % (runner, " ".join("-e '%s'" % q.strip() for q in query_list.split(";") if q.strip()), remote_tmp_file)) query_file.write( - "cat %s | grep Time | grep -v INFO |grep -v MapReduce >> %s\n" % ( + "cat %s | grep seconds >> %s\n" % ( remote_tmp_file, remote_result_file)) query_file.close() @@ -403,15 +385,13 @@ def ssh_spark(command): contents = [] for i in range(opts.num_trials): - print "Stopping Executors on Slaves....." - ensure_spark_stopped_on_slaves(slaves) print "Query %s : Trial %i" % (opts.query_num, i+1) ssh_spark("%s" % remote_query_file) local_results_file = os.path.join(LOCAL_TMP_DIR, "%s_results" % prefix) scp_from(opts.spark_host, opts.spark_identity_file, "root", "/mnt/%s_results" % prefix, local_results_file) content = open(local_results_file).readlines() - all_times = map(lambda x: float(x.split(": ")[1].split(" ")[0]), content) + all_times = [float(x.split("(")[1].split(" ")[0]) for x in content] if '4' in opts.query_num: query_times = all_times[-4:] @@ -434,7 +414,6 @@ def ssh_spark(command): ssh_spark("rm /mnt/%s_results" % prefix) os.remove(local_results_file) - os.remove(local_slaves_file) os.remove(local_query_file) return results, contents From b7efcb2641f29464b363d9e230d4c5ae8495b13d Mon Sep 17 00:00:00 2001 From: Tomer Kaftan Date: Mon, 26 Jan 2015 13:25:24 -0800 Subject: [PATCH 4/6] More bug fixes and added parquet conversion --- runner/parquet/convert_to_parquet.py | 8 +++++ runner/prepare_benchmark.py | 52 +++++++++++++++++++++------- runner/run_query.py | 52 ++++++---------------------- 3 files changed, 58 insertions(+), 54 deletions(-) create mode 100644 runner/parquet/convert_to_parquet.py diff --git a/runner/parquet/convert_to_parquet.py b/runner/parquet/convert_to_parquet.py new file mode 100644 index 0000000..29f0d0b --- /dev/null +++ b/runner/parquet/convert_to_parquet.py @@ -0,0 +1,8 @@ +from pyspark import SparkContext +from pyspark.sql import HiveContext + +sc = SparkContext(appName = "Parquet Converter") +hiveContext = HiveContext(sc) +hiveContext.table("rankings").saveAsParquetFile("/user/spark/benchmark/rankings-parquet") +hiveContext.table("uservisits").saveAsParquetFile("/user/spark/benchmark/uservisits-parquet") +hiveContext.table("documents").saveAsParquetFile("/user/spark/benchmark/documents-parquet") diff --git a/runner/prepare_benchmark.py b/runner/prepare_benchmark.py index ed8e131..76fb4db 100644 --- a/runner/prepare_benchmark.py +++ b/runner/prepare_benchmark.py @@ -90,6 +90,8 @@ def parse_args(): parser.add_option("-f", "--file-format", default="sequence-snappy", help="File format to copy (text, text-deflate, "\ "sequence, or sequence-snappy)") + parser.add_option("--executor-memory", type="string", default="24G", + help="How much executor memory spark sql nodes should use") parser.add_option("-d", "--aws-key-id", help="Access key ID for AWS") @@ -98,6 +100,8 @@ def parse_args(): parser.add_option("--skip-s3-import", action="store_true", default=False, help="Assumes s3 data is already loaded") + parser.add_option("--parquet", action="store_true", default=False, + help="Convert benchmark data to parquet") (opts, args) = parser.parse_args() @@ -221,12 +225,6 @@ def ssh_spark(command): "s3n://big-data-benchmark/pavlo/%s/%s/crawl/ " \ "/user/spark/benchmark/crawl/" % (opts.file_format, opts.data_prefix)) - # Scratch table used for JVM warmup - ssh_spark( - "/root/ephemeral-hdfs/bin/hadoop distcp /user/spark/benchmark/rankings " \ - "/user/spark/benchmark/scratch" - ) - print "=== CREATING HIVE TABLES FOR BENCHMARK ===" hive_site = ''' @@ -255,7 +253,7 @@ def ssh_spark(command): "/root/url_count.py") ssh_spark("/root/spark-ec2/copy-dir /root/url_count.py") - ssh_spark("/root/spark/sbin/start-thriftserver.sh") + ssh_spark("/root/spark/sbin/start-thriftserver.sh --executor-memory %s") % opts.executor_memory #TODO: Should keep checking to see if the JDBC server has started yet print "Sleeping for 30 seconds so the jdbc server can start" @@ -270,12 +268,6 @@ def beeline(query): "avgDuration INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY \\\",\\\" " \ "STORED AS TEXTFILE LOCATION \\\"/user/spark/benchmark/rankings\\\";") - beeline("DROP TABLE IF EXISTS scratch;") - beeline( - "CREATE EXTERNAL TABLE scratch (pageURL STRING, pageRank INT, " \ - "avgDuration INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY \\\",\\\" " \ - "STORED AS TEXTFILE LOCATION \\\"/user/spark/benchmark/scratch\\\";") - beeline("DROP TABLE IF EXISTS uservisits;") beeline( "CREATE EXTERNAL TABLE uservisits (sourceIP STRING,destURL STRING," \ @@ -289,6 +281,40 @@ def beeline(query): "CREATE EXTERNAL TABLE documents (line STRING) STORED AS TEXTFILE " \ "LOCATION \\\"/user/spark/benchmark/crawl\\\";") + if opts.parquet: + ssh_spark("/root/spark/sbin/stop-thriftserver.sh") + + print "Sleeping for 30 seconds so the jdbc server can stop" + time.sleep(30) + + scp_to(opts.spark_host, opts.spark_identity_file, "root", "parquet/convert_to_parquet.py", + "/root/convert_to_parquet.py") + ssh_spark("/root/spark/bin/spark-submit --master spark://%s:7077 /root/convert_to_parquet.py" % opts.spark_host) + + ssh_spark("/root/spark/sbin/start-thriftserver.sh --executor-memory %s") % opts.executor_memory + + print "Sleeping for 30 seconds so the jdbc server can start" + time.sleep(30) + + beeline("DROP TABLE IF EXISTS rankings") + beeline( + "CREATE EXTERNAL TABLE rankings (pageURL STRING, pageRank INT, " \ + "avgDuration INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY \\\",\\\" " \ + "STORED AS PARQUET LOCATION \\\"/user/spark/benchmark/rankings-parquet\\\";") + + beeline("DROP TABLE IF EXISTS uservisits;") + beeline( + "CREATE EXTERNAL TABLE uservisits (sourceIP STRING,destURL STRING," \ + "visitDate STRING,adRevenue DOUBLE,userAgent STRING,countryCode STRING," \ + "languageCode STRING,searchWord STRING,duration INT ) " \ + "ROW FORMAT DELIMITED FIELDS TERMINATED BY \\\",\\\" " \ + "STORED AS PARQUET LOCATION \\\"/user/spark/benchmark/uservisits-parquet\\\";") + + beeline("DROP TABLE IF EXISTS documents;") + beeline( + "CREATE EXTERNAL TABLE documents (line STRING) STORED AS PARQUET " \ + "LOCATION \\\"/user/spark/benchmark/documents-parquet\\\";") + print "=== FINISHED CREATING BENCHMARK DATA ===" def prepare_shark_dataset(opts): diff --git a/runner/run_query.py b/runner/run_query.py index c6fdea1..6291146 100644 --- a/runner/run_query.py +++ b/runner/run_query.py @@ -226,6 +226,8 @@ def parse_args(): help="Number of trials to run for this query") parser.add_option("--prefix", type="string", default="", help="Prefix result files with this string") + parser.add_option("--shark-mem", type="string", default="24g", + help="How much executor memory shark nodes should use") parser.add_option("-q", "--query-num", default="1a", help="Which query to run in benchmark: " \ @@ -316,10 +318,7 @@ def ssh_spark(command): # Two modes here: Spark SQL Mem and Spark SQL Disk. If using Spark SQL disk use # uncached tables. If using Spark SQL Mem, used cached tables. - query_list = "set spark.sql.shuffle.partitions = %s;" % opts.reduce_tasks - - # Throw away query for JVM warmup - query_list += "SELECT COUNT(*) FROM scratch;" + query_list = "set spark.sql.codegen=true; set spark.sql.shuffle.partitions = %s;" % opts.reduce_tasks # Create cached queries for Spark SQL Mem if not opts.spark_no_cache: @@ -329,35 +328,24 @@ def ssh_spark(command): # Query 4 uses entirely different tables query_list += """ CACHE TABLE documents; + SELECT COUNT(*) FROM documents """ else: query_list += """ CACHE TABLE uservisits; CACHE TABLE rankings; + SELECT COUNT(*) FROM uservisits; + SELECT COUNT(*) FROM rankings; """ - else: - # Uncache tables if necessary - if '4' in opts.query_num: - # Query 4 uses entirely different tables - query_list += """ - UNCACHE TABLE documents; - """ - else: - query_list += """ - UNCACHE TABLE uservisits; - UNCACHE TABLE rankings; - """ - - - # Warm up for Query 1 - if '1' in opts.query_num: - query_list += "DROP TABLE IF EXISTS warmup;" - query_list += "CREATE TABLE warmup AS SELECT pageURL, pageRank FROM scratch WHERE pageRank > 1000;" if '4' not in opts.query_num: query_list += local_clean_query query_list += local_query_map[opts.query_num][0] + # Store the result only in mem + if not opts.spark_no_cache: + query_list = query_list.replace("CREATE TABLE", "CACHE TABLE") + query_list = re.sub("\s\s+", " ", query_list.replace('\n', ' ')) print "\nQuery:" @@ -428,28 +416,13 @@ def ssh_shark(command): prefix = str(time.time()).split(".")[0] query_file_name = "%s_workload.sh" % prefix - slaves_file_name = "%s_slaves" % prefix local_query_file = os.path.join(LOCAL_TMP_DIR, query_file_name) - local_slaves_file = os.path.join(LOCAL_TMP_DIR, slaves_file_name) query_file = open(local_query_file, 'w') remote_result_file = "/mnt/%s_results" % prefix remote_tmp_file = "/mnt/%s_out" % prefix remote_query_file = "/mnt/%s" % query_file_name - runner = "/root/shark/bin/shark-withinfo" - - print "Getting Slave List" - scp_from(opts.shark_host, opts.shark_identity_file, "root", - "/root/spark-ec2/slaves", local_slaves_file) - slaves = map(str.strip, open(local_slaves_file).readlines()) - - print "Restarting standalone scheduler..." - ssh_shark("/root/ephemeral-hdfs/bin/stop-all.sh") - ensure_shark_stopped_on_slaves(slaves) - time.sleep(30) - ssh_shark("/root/ephemeral-hdfs/bin/stop-all.sh") - ssh_shark("/root/ephemeral-hdfs/bin/start-all.sh") - time.sleep(10) + runner = "export SPARK_MEM=%s; /root/shark/bin/shark" % opts.shark_mem # Two modes here: Shark Mem and Shark Disk. If using Shark disk clear buffer # cache in-between each query. If using Shark Mem, used cached tables. @@ -522,8 +495,6 @@ def convert_to_cached(query): contents = [] for i in range(opts.num_trials): - print "Stopping Executors on Slaves....." - ensure_shark_stopped_on_slaves(slaves) print "Query %s : Trial %i" % (opts.query_num, i+1) ssh_shark("%s" % remote_query_file) local_results_file = os.path.join(LOCAL_TMP_DIR, "%s_results" % prefix) @@ -553,7 +524,6 @@ def convert_to_cached(query): ssh_shark("rm /mnt/%s_results" % prefix) os.remove(local_results_file) - os.remove(local_slaves_file) os.remove(local_query_file) return results, contents From b44c1f924e580b0e161924453d105e31f0d6de8e Mon Sep 17 00:00:00 2001 From: Tomer Kaftan Date: Tue, 27 Jan 2015 13:43:05 -0800 Subject: [PATCH 5/6] Bug fix in executor memory string interpolation --- runner/prepare_benchmark.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/runner/prepare_benchmark.py b/runner/prepare_benchmark.py index 76fb4db..8405619 100644 --- a/runner/prepare_benchmark.py +++ b/runner/prepare_benchmark.py @@ -253,7 +253,7 @@ def ssh_spark(command): "/root/url_count.py") ssh_spark("/root/spark-ec2/copy-dir /root/url_count.py") - ssh_spark("/root/spark/sbin/start-thriftserver.sh --executor-memory %s") % opts.executor_memory + ssh_spark("/root/spark/sbin/start-thriftserver.sh --executor-memory %s" % opts.executor_memory) #TODO: Should keep checking to see if the JDBC server has started yet print "Sleeping for 30 seconds so the jdbc server can start" @@ -291,7 +291,7 @@ def beeline(query): "/root/convert_to_parquet.py") ssh_spark("/root/spark/bin/spark-submit --master spark://%s:7077 /root/convert_to_parquet.py" % opts.spark_host) - ssh_spark("/root/spark/sbin/start-thriftserver.sh --executor-memory %s") % opts.executor_memory + ssh_spark("/root/spark/sbin/start-thriftserver.sh --executor-memory %s" % opts.executor_memory) print "Sleeping for 30 seconds so the jdbc server can start" time.sleep(30) From 9bab0be2e86e36fbc3bd097f202e6f19ed7a8e4e Mon Sep 17 00:00:00 2001 From: Tomer Kaftan Date: Mon, 23 Feb 2015 10:55:01 -0800 Subject: [PATCH 6/6] Updated impala support to 2.1.1 and added Impala Parquet --- runner/prepare_benchmark.py | 21 ++++++++++++++++++--- runner/run_query.py | 15 +++++++-------- 2 files changed, 25 insertions(+), 11 deletions(-) diff --git a/runner/prepare_benchmark.py b/runner/prepare_benchmark.py index 8405619..e5c29b9 100644 --- a/runner/prepare_benchmark.py +++ b/runner/prepare_benchmark.py @@ -59,6 +59,8 @@ def parse_args(): parser.add_option("-a", "--impala-host", help="Hostname of Impala state store node") + parser.add_option("--impala-data-node", + help="Hostname of an Impala data node. Required for Impala conversion to Parquet") parser.add_option("-b", "--spark-host", help="Hostname of Spark master node") parser.add_option("-c", "--redshift-host", @@ -412,7 +414,7 @@ def ssh_shark(command): def prepare_impala_dataset(opts): def ssh_impala(command): - ssh(opts.impala_host, "ubuntu", opts.impala_identity_file, command) + ssh(opts.impala_host, "root", opts.impala_identity_file, command) if not opts.skip_s3_import: print "=== IMPORTING BENCHMARK FROM S3 ===" @@ -424,9 +426,9 @@ def ssh_impala(command): ssh_impala("sudo chmod 777 /etc/hadoop/conf/hdfs-site.xml") ssh_impala("sudo chmod 777 /etc/hadoop/conf/core-site.xml") - add_aws_credentials(opts.impala_host, "ubuntu", opts.impala_identity_file, + add_aws_credentials(opts.impala_host, "root", opts.impala_identity_file, "/etc/hadoop/conf/hdfs-site.xml", opts.aws_key_id, opts.aws_key) - add_aws_credentials(opts.impala_host, "ubuntu", opts.impala_identity_file, + add_aws_credentials(opts.impala_host, "root", opts.impala_identity_file, "/etc/hadoop/conf/core-site.xml", opts.aws_key_id, opts.aws_key) ssh_impala( @@ -463,6 +465,19 @@ def ssh_impala(command): "TERMINATED BY \\\"\\001\\\" " \ "STORED AS SEQUENCEFILE LOCATION \\\"/tmp/benchmark/scratch\\\";\"") + if opts.parquet: + print "=== CONVERTING TABLES TO PARQUET ===" + + ssh(opts.impala_data_node, "root", opts.impala_identity_file, + "impala-shell -r -q \"CREATE TABLE rankings_parquet STORED AS PARQUET AS SELECT * FROM rankings; " \ + "DROP TABLE rankings; " \ + "ALTER TABLE rankings_parquet RENAME TO rankings;\"") + + ssh(opts.impala_data_node, "root", opts.impala_identity_file, + "impala-shell -r -q \"CREATE TABLE uservisits_parquet STORED AS PARQUET AS SELECT * FROM uservisits; " \ + "DROP TABLE uservisits; " \ + "ALTER TABLE uservisits_parquet RENAME TO uservisits;\"") + print "=== FINISHED CREATING BENCHMARK DATA ===" def prepare_hive_dataset(opts): diff --git a/runner/run_query.py b/runner/run_query.py index 6291146..f218407 100644 --- a/runner/run_query.py +++ b/runner/run_query.py @@ -328,7 +328,7 @@ def ssh_spark(command): # Query 4 uses entirely different tables query_list += """ CACHE TABLE documents; - SELECT COUNT(*) FROM documents + SELECT COUNT(*) FROM documents; """ else: query_list += """ @@ -531,10 +531,10 @@ def convert_to_cached(query): def run_impala_benchmark(opts): impala_host = opts.impala_hosts[0] def ssh_impala(command): - ssh(impala_host, "ubuntu", opts.impala_identity_file, command) + ssh(impala_host, "root", opts.impala_identity_file, command) def clear_buffer_cache_impala(host): - ssh(host, "ubuntu", opts.impala_identity_file, + ssh(host, "root", opts.impala_identity_file, "sudo bash -c \"sync && echo 3 > /proc/sys/vm/drop_caches\"") runner = "impala-shell -r -q" @@ -548,7 +548,7 @@ def clear_buffer_cache_impala(host): remote_tmp_file = "/tmp/%s_tmp" % prefix remote_result_file = "/tmp/%s_results" % prefix - query_file.write("hive -e '%s'\n" % IMPALA_MAP[opts.query_num]) + query_file.write("%s '%s'\n" % (runner, IMPALA_MAP[opts.query_num])) query = QUERY_MAP[opts.query_num][1] if '3c' in opts.query_num: @@ -560,7 +560,6 @@ def clear_buffer_cache_impala(host): # Populate the full buffer cache if running Impala + cached if (not opts.impala_use_hive) and (not opts.clear_buffer_cache): - query = "set mem_limit=68g;" + query query = "select count(*) from rankings;" + query query = "select count(*) from uservisits;" + query @@ -572,12 +571,12 @@ def clear_buffer_cache_impala(host): "%s '%s%s' > %s 2>&1;\n" % (runner, connect_stmt, query, remote_tmp_file)) query_file.write("cat %s |egrep 'Inserted|Time' |grep -v MapReduce >> %s;\n" % ( remote_tmp_file, remote_result_file)) - query_file.write("hive -e '%s';\n" % CLEAN_QUERY) + query_file.write("%s '%s';\n" % (runner, CLEAN_QUERY)) query_file.close() remote_query_file = "/tmp/%s" % query_file_name print >> stderr, "Copying files to Impala" - scp_to(impala_host, opts.impala_identity_file, "ubuntu", + scp_to(impala_host, opts.impala_identity_file, "root", local_query_file, remote_query_file) ssh_impala("chmod 775 %s" % remote_query_file) @@ -593,7 +592,7 @@ def clear_buffer_cache_impala(host): # Collect results local_result_file = os.path.join(LOCAL_TMP_DIR, "%s_results" % prefix) - scp_from(impala_host, opts.impala_identity_file, "ubuntu", + scp_from(impala_host, opts.impala_identity_file, "root", remote_result_file, local_result_file) contents = open(local_result_file).readlines()