Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lr rework #488

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bin/functions/hibench_prop_env_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
# For Logistic Regression
NUM_EXAMPLES_LR="hibench.lr.examples",
NUM_FEATURES_LR="hibench.lr.features",
AGGREGATION_DEPTH_LR="hibench.lr.agg.depth",
# For ALS
NUM_USERS="hibench.als.users",
NUM_PRODUCTS="hibench.als.products",
Expand Down
2 changes: 1 addition & 1 deletion bin/workloads/ml/lr/spark/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ rmr_hdfs $OUTPUT_HDFS || true

SIZE=`dir_size $INPUT_HDFS`
START_TIME=`timestamp`
run_spark_job com.intel.hibench.sparkbench.ml.LogisticRegression ${INPUT_HDFS}
run_spark_job com.intel.hibench.sparkbench.ml.LogisticRegression ${INPUT_HDFS} $NUM_FEATURES_LR $AGGREGATION_DEPTH_LR
END_TIME=`timestamp`

gen_report ${START_TIME} ${END_TIME} ${SIZE}
Expand Down
4 changes: 3 additions & 1 deletion conf/workloads/ml/lr.conf
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ hibench.lr.gigantic.features 500000
hibench.lr.bigdata.examples 10000
hibench.lr.bigdata.features 700000


hibench.lr.agg.depth 2
hibench.lr.examples ${hibench.lr.${hibench.scale.profile}.examples}
hibench.lr.features ${hibench.lr.${hibench.scale.profile}.features}
hibench.lr.partitions ${hibench.default.map.parallelism}

hibench.workload.input ${hibench.hdfs.data.dir}/LR/Input
hibench.workload.output ${hibench.hdfs.data.dir}/LR/Output
spark.rpc.message.maxSize 2000
spark.driver.maxResultSize 4g
Original file line number Diff line number Diff line change
Expand Up @@ -20,47 +20,35 @@ package com.intel.hibench.sparkbench.ml

import org.apache.spark.{SparkConf, SparkContext}

import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.LabeledPoint
import org.apache.spark.sql.SparkSession

object LogisticRegression {

def main(args: Array[String]): Unit = {
var inputPath = ""

if (args.length == 1) {
var numFeatures = 0
var aggDepth = 0
if (args.length == 3) {
inputPath = args(0)
numFeatures = args(1).toInt
aggDepth = if (args(2).toInt < 2) 2 else args(2).toInt
}

val conf = new SparkConf()
.setAppName("LogisticRegressionWithLBFGS")
val sc = new SparkContext(conf)

// $example on$
// Load training data in LIBSVM format.
val data: RDD[LabeledPoint] = sc.objectFile(inputPath)

// Split data into training (60%) and test (40%).
val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)
val training = splits(0).cache()
val test = splits(1)

val spark = SparkSession.builder.appName("LogisticRegression").getOrCreate()
val df = spark.read.format("libsvm")
.option("numFeatures", numFeatures)
.load(inputPath)
// Run training algorithm to build the model
val model = new LogisticRegressionWithLBFGS()
.setNumClasses(10)
.run(training)

// Compute raw scores on the test set.
val predictionAndLabels = test.map { case LabeledPoint(label, features) =>
val prediction = model.predict(features)
(prediction, label)
}

val accuracy = predictionAndLabels.filter(x => x._1 == x._2).count().toDouble / predictionAndLabels.count()
println(s"Accuracy = $accuracy")

sc.stop()
val model = new LogisticRegression()
.setMaxIter(30)
.setRegParam(0.3)
.setElasticNetParam(0.8)
// uncomment below config for spark 2.1 or above
// .setAggregationDepth(aggDepth)
.fit(df)
println(s"training complete!")
spark.stop()
}
}
// scalastyle:on println
Original file line number Diff line number Diff line change
Expand Up @@ -21,52 +21,47 @@ import com.intel.hibench.sparkbench.common.IOCommon

import scala.util.Random

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.annotation.{DeveloperApi, Since}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.feature.LabeledPoint
import org.apache.spark.sql.SparkSession

/**
* :: DeveloperApi ::
* Generate test data for LogisticRegression. This class chooses positive labels
* with probability `probOne` and scales features for positive examples by `eps`.
*/
object LogisticRegressionDataGenerator {

/**
* Generate an RDD containing test data for LogisticRegression.
* Generate an DataFrame containing test data for LogisticRegression.
*
* @param sc SparkContext to use for creating the RDD.
* @param nexamples Number of examples that will be contained in the RDD.
* @param nexamples Number of examples that will be contained in the data.
* @param nfeatures Number of features to generate for each example.
* @param eps Epsilon factor by which positive examples are scaled.
* @param nparts Number of partitions of the generated RDD. Default value is 2.
* @param nparts Number of partitions of the generated DataFrame. Default value is 2.
* @param probOne Probability that a label is 1 (and not 0). Default value is 0.5.
*/
def generateLogisticRDD(
sc: SparkContext,

val spark = SparkSession.builder.appName("LogisticRegressionDataGenerator").getOrCreate()
val sc = spark.sparkContext
def generateLogisticDF(
nexamples: Int,
nfeatures: Int,
eps: Double,
nparts: Int = 2,
probOne: Double = 0.5): RDD[LabeledPoint] = {
val data = sc.parallelize(0 until nexamples, nparts).map { idx =>
val rnd = new Random(42 + idx)

val y = if (idx % 2 == 0) 0.0 else 1.0
val x = Array.fill[Double](nfeatures) {
rnd.nextGaussian() + (y * eps)
}
LabeledPoint(y, Vectors.dense(x))
probOne: Double = 0.5) = {
val data = sc.parallelize(0 until nexamples, nparts).map { idx =>
val rnd = new Random(42 + idx)
val y = if (idx % 2 == 0) 0.0 else 1.0
val x = Array.fill[Double](nfeatures) {
rnd.nextGaussian() + (y * eps)
}
LabeledPoint(y, Vectors.dense(x))
}
spark.createDataFrame(data)
}
data
}

def main(args: Array[String]) {
val conf = new SparkConf().setAppName("LogisticRegressionDataGenerator")
val sc = new SparkContext(conf)

var outputPath = ""
var numExamples: Int = 200000
var numFeatures: Int = 20
Expand All @@ -79,20 +74,15 @@ object LogisticRegressionDataGenerator {
outputPath = args(0)
numExamples = args(1).toInt
numFeatures = args(2).toInt
println(s"Output Path: $outputPath")
println(s"Num of Examples: $numExamples")
println(s"Num of Features: $numFeatures")
println(s"Output Path: $outputPath, Num of Examples: $numExamples, Num of Features: $numFeatures")
} else {
System.err.println(
s"Usage: $LogisticRegressionDataGenerator <OUTPUT_PATH> <NUM_EXAMPLES> <NUM_FEATURES>"
)
System.exit(1)
}

val data = generateLogisticRDD(sc, numExamples, numFeatures, eps, numPartitions)

data.saveAsObjectFile(outputPath)

sc.stop()
val df = generateLogisticDF(numExamples, numFeatures, eps, numPartitions)
df.write.format("libsvm").save(outputPath)
spark.stop()
}
}