diff --git a/bin/functions/hibench_prop_env_mapping.py b/bin/functions/hibench_prop_env_mapping.py index ad0b21acf..4799dbc6d 100644 --- a/bin/functions/hibench_prop_env_mapping.py +++ b/bin/functions/hibench_prop_env_mapping.py @@ -93,6 +93,7 @@ # For Logistic Regression NUM_EXAMPLES_LR="hibench.lr.examples", NUM_FEATURES_LR="hibench.lr.features", + AGGREGATION_DEPTH_LR="hibench.lr.agg.depth", # For ALS NUM_USERS="hibench.als.users", NUM_PRODUCTS="hibench.als.products", diff --git a/bin/workloads/ml/lr/spark/run.sh b/bin/workloads/ml/lr/spark/run.sh index 54ed563e4..5fda8e355 100755 --- a/bin/workloads/ml/lr/spark/run.sh +++ b/bin/workloads/ml/lr/spark/run.sh @@ -26,7 +26,7 @@ rmr_hdfs $OUTPUT_HDFS || true SIZE=`dir_size $INPUT_HDFS` START_TIME=`timestamp` -run_spark_job com.intel.hibench.sparkbench.ml.LogisticRegression ${INPUT_HDFS} +run_spark_job com.intel.hibench.sparkbench.ml.LogisticRegression ${INPUT_HDFS} $NUM_FEATURES_LR $AGGREGATION_DEPTH_LR END_TIME=`timestamp` gen_report ${START_TIME} ${END_TIME} ${SIZE} diff --git a/conf/workloads/ml/lr.conf b/conf/workloads/ml/lr.conf index 9a05e6614..991d5b8d2 100644 --- a/conf/workloads/ml/lr.conf +++ b/conf/workloads/ml/lr.conf @@ -11,10 +11,12 @@ hibench.lr.gigantic.features 500000 hibench.lr.bigdata.examples 10000 hibench.lr.bigdata.features 700000 - +hibench.lr.agg.depth 2 hibench.lr.examples ${hibench.lr.${hibench.scale.profile}.examples} hibench.lr.features ${hibench.lr.${hibench.scale.profile}.features} hibench.lr.partitions ${hibench.default.map.parallelism} hibench.workload.input ${hibench.hdfs.data.dir}/LR/Input hibench.workload.output ${hibench.hdfs.data.dir}/LR/Output +spark.rpc.message.maxSize 2000 +spark.driver.maxResultSize 4g diff --git a/sparkbench/ml/src/main/scala/com/intel/sparkbench/ml/LogisticRegression.scala b/sparkbench/ml/src/main/scala/com/intel/sparkbench/ml/LogisticRegression.scala index 673ed2fee..910613e80 100644 --- a/sparkbench/ml/src/main/scala/com/intel/sparkbench/ml/LogisticRegression.scala +++ b/sparkbench/ml/src/main/scala/com/intel/sparkbench/ml/LogisticRegression.scala @@ -20,47 +20,35 @@ package com.intel.hibench.sparkbench.ml import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS -import org.apache.spark.mllib.regression.LabeledPoint -import org.apache.spark.rdd.RDD +import org.apache.spark.ml.classification.LogisticRegression +import org.apache.spark.ml.feature.LabeledPoint +import org.apache.spark.sql.SparkSession object LogisticRegression { def main(args: Array[String]): Unit = { var inputPath = "" - - if (args.length == 1) { + var numFeatures = 0 + var aggDepth = 0 + if (args.length == 3) { inputPath = args(0) + numFeatures = args(1).toInt + aggDepth = if (args(2).toInt < 2) 2 else args(2).toInt } - - val conf = new SparkConf() - .setAppName("LogisticRegressionWithLBFGS") - val sc = new SparkContext(conf) - - // $example on$ - // Load training data in LIBSVM format. - val data: RDD[LabeledPoint] = sc.objectFile(inputPath) - - // Split data into training (60%) and test (40%). - val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L) - val training = splits(0).cache() - val test = splits(1) - + val spark = SparkSession.builder.appName("LogisticRegression").getOrCreate() + val df = spark.read.format("libsvm") + .option("numFeatures", numFeatures) + .load(inputPath) // Run training algorithm to build the model - val model = new LogisticRegressionWithLBFGS() - .setNumClasses(10) - .run(training) - - // Compute raw scores on the test set. - val predictionAndLabels = test.map { case LabeledPoint(label, features) => - val prediction = model.predict(features) - (prediction, label) - } - - val accuracy = predictionAndLabels.filter(x => x._1 == x._2).count().toDouble / predictionAndLabels.count() - println(s"Accuracy = $accuracy") - - sc.stop() + val model = new LogisticRegression() + .setMaxIter(30) + .setRegParam(0.3) + .setElasticNetParam(0.8) +// uncomment below config for spark 2.1 or above +// .setAggregationDepth(aggDepth) + .fit(df) + println(s"training complete!") + spark.stop() } } // scalastyle:on println diff --git a/sparkbench/ml/src/main/scala/com/intel/sparkbench/ml/LogisticRegressionDataGenerator.scala b/sparkbench/ml/src/main/scala/com/intel/sparkbench/ml/LogisticRegressionDataGenerator.scala index f9d408fa0..f483f5477 100644 --- a/sparkbench/ml/src/main/scala/com/intel/sparkbench/ml/LogisticRegressionDataGenerator.scala +++ b/sparkbench/ml/src/main/scala/com/intel/sparkbench/ml/LogisticRegressionDataGenerator.scala @@ -21,11 +21,10 @@ import com.intel.hibench.sparkbench.common.IOCommon import scala.util.Random -import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.annotation.{DeveloperApi, Since} -import org.apache.spark.mllib.linalg.Vectors -import org.apache.spark.mllib.regression.LabeledPoint -import org.apache.spark.rdd.RDD +import org.apache.spark.ml.linalg.Vectors +import org.apache.spark.ml.feature.LabeledPoint +import org.apache.spark.sql.SparkSession /** * :: DeveloperApi :: @@ -33,40 +32,36 @@ import org.apache.spark.rdd.RDD * with probability `probOne` and scales features for positive examples by `eps`. */ object LogisticRegressionDataGenerator { - /** - * Generate an RDD containing test data for LogisticRegression. + * Generate an DataFrame containing test data for LogisticRegression. * - * @param sc SparkContext to use for creating the RDD. - * @param nexamples Number of examples that will be contained in the RDD. + * @param nexamples Number of examples that will be contained in the data. * @param nfeatures Number of features to generate for each example. * @param eps Epsilon factor by which positive examples are scaled. - * @param nparts Number of partitions of the generated RDD. Default value is 2. + * @param nparts Number of partitions of the generated DataFrame. Default value is 2. * @param probOne Probability that a label is 1 (and not 0). Default value is 0.5. */ - def generateLogisticRDD( - sc: SparkContext, + + val spark = SparkSession.builder.appName("LogisticRegressionDataGenerator").getOrCreate() + val sc = spark.sparkContext + def generateLogisticDF( nexamples: Int, nfeatures: Int, eps: Double, nparts: Int = 2, - probOne: Double = 0.5): RDD[LabeledPoint] = { - val data = sc.parallelize(0 until nexamples, nparts).map { idx => - val rnd = new Random(42 + idx) - - val y = if (idx % 2 == 0) 0.0 else 1.0 - val x = Array.fill[Double](nfeatures) { - rnd.nextGaussian() + (y * eps) - } - LabeledPoint(y, Vectors.dense(x)) + probOne: Double = 0.5) = { + val data = sc.parallelize(0 until nexamples, nparts).map { idx => + val rnd = new Random(42 + idx) + val y = if (idx % 2 == 0) 0.0 else 1.0 + val x = Array.fill[Double](nfeatures) { + rnd.nextGaussian() + (y * eps) + } + LabeledPoint(y, Vectors.dense(x)) + } + spark.createDataFrame(data) } - data - } def main(args: Array[String]) { - val conf = new SparkConf().setAppName("LogisticRegressionDataGenerator") - val sc = new SparkContext(conf) - var outputPath = "" var numExamples: Int = 200000 var numFeatures: Int = 20 @@ -79,20 +74,15 @@ object LogisticRegressionDataGenerator { outputPath = args(0) numExamples = args(1).toInt numFeatures = args(2).toInt - println(s"Output Path: $outputPath") - println(s"Num of Examples: $numExamples") - println(s"Num of Features: $numFeatures") + println(s"Output Path: $outputPath, Num of Examples: $numExamples, Num of Features: $numFeatures") } else { System.err.println( s"Usage: $LogisticRegressionDataGenerator " ) System.exit(1) } - - val data = generateLogisticRDD(sc, numExamples, numFeatures, eps, numPartitions) - - data.saveAsObjectFile(outputPath) - - sc.stop() + val df = generateLogisticDF(numExamples, numFeatures, eps, numPartitions) + df.write.format("libsvm").save(outputPath) + spark.stop() } }