Skip to content

Commit

Permalink
release v1.3.0
Browse files Browse the repository at this point in the history
  • Loading branch information
tgpfeiffer committed Jun 29, 2015
1 parent af80705 commit fe4df9e
Show file tree
Hide file tree
Showing 78 changed files with 13,029 additions and 1,232 deletions.
68 changes: 68 additions & 0 deletions Changelog.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
Changelog
=========

1.3.0
-----

### New Features

* Cascaded Processing

* The concept of a user-defined "stream" was introduced.
Similar to `CREATE VIEW` in SQL, `CREATE STREAM name FROM SELECT ...`
allows to create a stream holding the results of a SELECT query over
some input stream.
* In particular, `ANALYZE` results can be added to a stream in a new
column and used further down in the processing pipeline.
* A user can define custom functions in JavaScript and use them in
queries using the `CREATE FUNCTION` statement.
* Multiple data sources can be defined and used one after another
for updating/analyzing a model.

* Trigger-Based Action

* A user can also define functions without a return value using
`CREATE TRIGGER FUNCTION` and attach them as trigger on a stream
using `CREATE TRIGGER`. This can be used to act based on the contents
of a stream, in particular analysis results.

* Time-Series Analysis using Sliding Windows

* To analyze time-series data, sliding windows over an input stream
(based on either item count or an embedded timestamp) can be computed
and data in each window aggregated using a set of provided functions
such as standard deviation or histogram.
* The results of this aggregation can be used like any other data
stream.

* Other

* It is now possible to do feature extraction using user-defined
functions.

### Breaking Changes

* `CREATE DATASOURCE`

* A schema should now in general be provided, as in
many cases schema inference will lead to errors whenever an empty
data batch is encountered.

* `CREATE MODEL`

* The syntax for specifying the label/id column has changed from
`model_name WITH (label: "class", datum: "name")` to
`model_name (label: class) AS ...`
* Feature converters are not specified in the JSON configuration any
more, but instead with a `column WITH converter` syntax.

* `UPDATE MODEL`

* The statement will only establish the connection between stream and
model, processing will not start yet. This will be done by
`START PROCESSING`.

1.2.0
-----

This is the first public release. See the documentation for features and usage information.
34 changes: 19 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ How to get started with JubaQL

### Development Setup

* Get a Hadoop-enabled version of Spark 1.1.1:
`wget http://d3kbcqa49mib13.cloudfront.net/spark-1.1.1-bin-hadoop2.4.tgz`
* Get a Hadoop-enabled version of Spark 1.2.2:
`wget http://d3kbcqa49mib13.cloudfront.net/spark-1.2.2-bin-hadoop2.4.tgz`
and unpack it somewhere:
`tar -xzf spark-1.1.1-bin-hadoop2.4.tgz && export SPARK_DIST="$(pwd)/spark-1.1.1-bin-hadoop2.4/"`
`tar -xzf spark-1.2.2-bin-hadoop2.4.tgz && export SPARK_DIST="$(pwd)/spark-1.2.2-bin-hadoop2.4/"`
* Install Jubatus.
* Get JubaQLClient and JubaQLServer (consists of JubaQLProcessor and JubaQLGateway):
* Get JubaQL-Client and JubaQL-Server:
`git clone https://github.com/jubatus/jubaql-client.git`
`git clone https://github.com/jubatus/jubaql-server.git`
* Build the JubaQL components:
Expand All @@ -22,16 +22,17 @@ How to get started with JubaQL
* JubaQLGateway:
`cd jubaql-server/gateway && sbt assembly && cd ../..`
* Start the JubaQLGateway:
`cd jubaql-server && java -Dspark.distribution="$SPARK_DIST" -Djubaql.processor.fatjar=processor/target/scala-2.10/jubaql-processor-assembly-1.2.0.jar -jar gateway/target/scala-2.10/jubaql-gateway-assembly-1.2.0.jar -i 127.0.0.1`
`cd jubaql-server && java -Dspark.distribution="$SPARK_DIST" -Djubaql.processor.fatjar=processor/target/scala-2.10/jubaql-processor-assembly-1.3.0.jar -jar gateway/target/scala-2.10/jubaql-gateway-assembly-1.3.0.jar -i 127.0.0.1`
* In a different shell, start the JubaQLClient:
`./jubaql-client/target/start`
* You will see the prompt `jubaql>` in the shell and you will in fact be able to type your commands there, but until the JubaQLProcessor is up and running correctly, you will get the message "Unexpected response status: 503".
* You will see the prompt `jubaql>` in the shell and you will in fact be able to type your commands there, but until the JubaQLProcessor is up and running correctly, you will see the message: "This session has not been registered. Wait a second."

In order to test that your setup is working correctly, you can do a simple classification using the data from the [shogun example](https://github.com/jubatus/jubatus-example/tree/master/shogun). Run the following JubaQL commands in the client:

* `CREATE CLASSIFIER MODEL test WITH (label: "label", datum: "name") config = '{"method": "AROW","converter": { "num_filter_types": {}, "num_filter_rules": [], "string_filter_types": {}, "string_filter_rules": [], "num_types": {}, "num_rules": [],"string_types": {"unigram": { "method": "ngram", "char_num": "1" }},"string_rules": [{ "key": "*", "type": "unigram", "sample_weight": "bin", "global_weight": "bin" } ]},"parameter": {"regularization_weight" : 1.0}}'`
* `CREATE CLASSIFIER MODEL test (label: label) AS name WITH unigram CONFIG '{"method": "AROW", "parameter": {"regularization_weight" : 1.0}}'`
* `CREATE DATASOURCE shogun (label string, name string) FROM (STORAGE: "file://data/shogun_data.json")`
* `UPDATE MODEL test USING train FROM shogun`
* `START PROCESSING shogun`
* `ANALYZE '{"name": "慶喜"}' BY MODEL test USING classify`
* `SHUTDOWN`

Expand All @@ -42,34 +43,37 @@ The JSON returned by the `ANALYZE` statement should indicate that the label "徳
* Set up a Hadoop cluster with YARN and HDFS in place.
* Install Jubatus on all cluster nodes.
* Get JubaQL and compile it as described above. (This time, Jubatus is not required locally.)
* Install the [Jubatus on YARN](https://github.com/jubatus/jubatus-on-yarn) libraries in HDFS as described in [the instructions](https://github.com/jubatus/jubatus-on-yarn/blob/master/document/%E3%83%93%E3%83%AB%E3%83%89%E3%83%BB%E5%88%A9%E7%94%A8%E6%89%8B%E9%A0%86%E6%9B%B8.md#%E5%AE%9F%E8%A1%8C%E3%81%AB%E5%BF%85%E8%A6%81%E3%81%AA%E3%83%95%E3%82%A1%E3%82%A4%E3%83%AB%E3%81%AE%E6%BA%96%E5%82%99). Make sure that the HDFS directory `/jubatus-on-yarn/application-master/jubaconfig/` exists and is writeable by the user running the JubaQLProcessor application.
* Install the [Jubatus on YARN](https://github.com/jubatus/jubatus-on-yarn) libraries in HDFS as described in [the instructions](https://github.com/jubatus/jubatus-on-yarn/blob/master/document/instruction.md#required-files). Make sure that the HDFS directory `/jubatus-on-yarn/application-master/jubaconfig/` exists and is writeable by the user running the JubaQLProcessor application.
* To test the setup, also copy the file `shogun-data.json` from the JubaQL source tree's `data/` directory to `/jubatus-on-yarn/sample/shogun_data.json` in HDFS.
* Copy the files `core-site.xml`, `yarn-site.xml`, `hdfs-site.xml` containing your Hadoop setup description from one of your cluster nodes to some directory and point the environment variable `HADOOP_CONF_DIR` to that directory.
* Get your local computer's IP address that points towards the cluster. On Linux, given the IP address of one of your cluster nodes, this should be possible with something like:
`export MY_IP=$(ip route get 12.34.56.78 | grep -Po 'src \K.+')`
Make sure that this IP address can be connected to from the cluster nodes and no firewall rules etc. are blocking access.
* Get the addresses of your Zookeeper nodes and concatenate their `host:port` locations with a comma:
`export MY_ZOOKEEPER=zk1:2181,zk2:2181`
* Locate a temporary directory in HDFS that Spark can use for checkpointing:
`export CHECKPOINT=hdfs:///tmp/spark`
* Start the JubaQLGateway:
`cd jubaql-server`
`java -Drun.mode=production -Djubaql.zookeeper=$MY_ZOOKEEPER -Dspark.distribution="$SPARK_DIST" -Djubaql.processor.fatjar=processor/target/scala-2.10/jubaql-processor-assembly-1.2.0.jar -jar gateway/target/scala-2.10/jubaql-gateway-assembly-1.2.0.jar -i $MY_IP`
`cd jubaql-server`
`java -Drun.mode=production -Djubaql.checkpointdir=$CHECKPOINT -Djubaql.zookeeper=$MY_ZOOKEEPER -Dspark.distribution="$SPARK_DIST" -Djubaql.processor.fatjar=processor/target/scala-2.10/jubaql-processor-assembly-1.3.0.jar -jar gateway/target/scala-2.10/jubaql-gateway-assembly-1.3.0.jar -i $MY_IP`
* In a different shell, start the JubaQLClient:
`./jubaql-client/target/start`
* You will see the prompt `jubaql>` in the shell and you will in fact be able to type your commands there, but until the JubaQLProcessor is up and running correctly, you will get the message "Unexpected response status: 503".
* You will see the prompt `jubaql>` in the shell and you will in fact be able to type your commands there, but until the JubaQLProcessor is up and running correctly, you will see the message: "This session has not been registered. Wait a second."

In order to test that your setup is working correctly, you can do a simple classification using the `shogun-data.json` file you copied to HDFS before. Run the following JubaQL commands in the client:

* `CREATE CLASSIFIER MODEL test WITH (label: "label", datum: "name") config = '{"method": "AROW","converter": { "num_filter_types": {}, "num_filter_rules": [], "string_filter_types": {}, "string_filter_rules": [], "num_types": {}, "num_rules": [],"string_types": {"unigram": { "method": "ngram", "char_num": "1" }},"string_rules": [{ "key": "*", "type": "unigram", "sample_weight": "bin", "global_weight": "bin" } ]},"parameter": {"regularization_weight" : 1.0}}'`
* `CREATE CLASSIFIER MODEL test (label: label) AS name WITH unigram CONFIG '{"method": "AROW", "parameter": {"regularization_weight" : 1.0}}'`
* `CREATE DATASOURCE shogun (label string, name string) FROM (STORAGE: "hdfs:///jubatus-on-yarn/sample/shogun_data.json")`
* `UPDATE MODEL test USING train FROM shogun`
* `START PROCESSING shogun`
* `ANALYZE '{"name": "慶喜"}' BY MODEL test USING classify`
* `SHUTDOWN`

The JSON returned by the `ANALYZE` statement should indicate that the label "徳川" has the highest score.
The JSON returned by the `ANALYZE` statement should indicate that the label "徳川" has the highest score. Note that the score may differ than in development since multiple Jubatus instances are used for training.

Note:
* When the JubaQLProcessor is started using `spark-submit` as outlined above, it will first upload the `spark-assembly-1.1.1-hadoop2.4.0.jar` and `jubaql-processor-assembly-1.2.0.jar` to the cluster and add them to HDFS, from where they will be downloaded by each executor.
* It is possible to skip the upload of the Spark libraries by copying the Spark jar file to HDFS manually and adding the parameter `-Dspark.yarn.jar=hdfs:///path/to/spark-assembly-1.1.1-hadoop2.4.0.jar` when starting the JubaQLGateway.

* When the JubaQLProcessor is started, first the files `spark-assembly-1.2.2-hadoop2.4.0.jar` and `jubaql-processor-assembly-1.3.0.jar` will be uploaded to the cluster and added to HDFS, from where they will be downloaded by each executor. It is possible to skip the upload of the Spark libraries by copying the Spark jar file to HDFS manually and adding the parameter `-Dspark.yarn.jar=hdfs:///path/to/spark-assembly-1.2.2-hadoop2.4.0.jar` when starting the JubaQLGateway.
* In theory, it is also possible to do the same for the JubaQLProcessor application jar file. However, at the moment we rely on extracting a `log4j.xml` file from that jar locally before upload, so there is no support for also storing that file in HDFS, yet.

### Run on YARN with remote gateway
Expand Down
2 changes: 1 addition & 1 deletion gateway/build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name := "JubaQL Gateway"

version := "1.2.0"
version := "1.3.0"

// use an older version than necessary to use the same set of dependencies
// across projects
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ import java.nio.file.{StandardCopyOption, Files}
@io.netty.channel.ChannelHandler.Sharable
class GatewayPlan(ipAddress: String, port: Int,
envpForProcessor: Array[String], runMode: RunMode,
sparkDistribution: String, fatjar: String)
sparkDistribution: String, fatjar: String,
checkpointDir: String)
extends cycle.Plan
/* With cycle.SynchronousExecution, there is a group of N (16?) threads
(named "nioEventLoopGroup-5-*") that will process N requests in
Expand Down Expand Up @@ -77,7 +78,7 @@ class GatewayPlan(ipAddress: String, port: Int,
*/
val tmpLog4jPath: String = try {
val jar = new JarFile(new File(fatjar))
val log4jFile = jar.getEntry("log4j.xml")
val log4jFile = jar.getEntry("log4j-spark-submit.xml")
val log4jIs = jar.getInputStream(log4jFile)
val tmpFile = File.createTempFile("log4j", ".xml")
Files.copy(log4jIs, tmpFile.toPath, StandardCopyOption.REPLACE_EXISTING)
Expand All @@ -88,7 +89,7 @@ class GatewayPlan(ipAddress: String, port: Int,
logger.error("failed to create temporary log4j.xml copy: " + e.getMessage)
throw e
}
logger.debug("extracted log4j.xml file to %s".format(tmpLog4jPath))
logger.debug("extracted log4j-spark-submit.xml file to %s".format(tmpLog4jPath))

val errorMsgContentType = ContentType("text/plain; charset=utf-8")

Expand Down Expand Up @@ -131,7 +132,8 @@ class GatewayPlan(ipAddress: String, port: Int,
// double-escaped on their way to the Spark driver and probably never end
// up there.
cmd.update(6, "spark.driver.extraJavaOptions=-Drun.mode=production " +
s"-Djubaql.zookeeper=$zookeeper") // --conf
s"-Djubaql.zookeeper=$zookeeper " +
s"-Djubaql.checkpointdir=$checkpointDir") // --conf
// also specify the location of the Spark jar file, if given
val sparkJarParams = sparkJar match {
case Some(url) => "--conf" :: s"spark.yarn.jar=$url" :: Nil
Expand All @@ -152,7 +154,7 @@ class GatewayPlan(ipAddress: String, port: Int,
val isr = new InputStreamReader(is)
val br = new BufferedReader(isr)
var line: String = br.readLine()
while (line != null && line.trim != "yarnAppState: RUNNING") {
while (line != null && !line.trim.contains("state: RUNNING")) {
if (line.contains("Exception")) {
logger.error(line)
throw new RuntimeException("could not start spark-submit")
Expand All @@ -167,6 +169,7 @@ class GatewayPlan(ipAddress: String, port: Int,
case RunMode.Development(numThreads) =>
cmd.update(4, s"local[$numThreads]") // --master
cmd.update(6, "run.mode=development") // --conf
cmd.insertAll(7, Seq("--conf", s"jubaql.checkpointdir=$checkpointDir"))
logger.debug("executing: " + cmd.mkString(" "))

Try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,17 +80,13 @@ object JubaQLGateway extends LazyLogging {
}
logger.info("Starting in run mode %s".format(runMode))

val sparkDistribution: String = System.getProperty("spark.distribution")
if (sparkDistribution == null || sparkDistribution.trim.isEmpty) {
System.err.println("No spark.distribution property")
System.exit(1)
}
val fatjar: String = System.getProperty("jubaql.processor.fatjar")
if (fatjar == null || fatjar.trim.isEmpty) {
System.err.println("No jubaql.processor.fatjar")
System.exit(1)
}
val plan = new GatewayPlan(ipAddress, port, envp, runMode, sparkDistribution, fatjar)
val sparkDistribution: String = getPropertyOrExitIfEmpty("spark.distribution")
val fatjar: String = getPropertyOrExitIfEmpty("jubaql.processor.fatjar")
val checkpointDir = getCheckpointDir(runMode)
val plan = new GatewayPlan(ipAddress, port, envp, runMode,
sparkDistribution = sparkDistribution,
fatjar = fatjar,
checkpointDir = checkpointDir)
val nettyServer = unfiltered.netty.Server.http(port).plan(plan)
logger.info("JubaQLGateway starting")
nettyServer.run()
Expand All @@ -114,6 +110,29 @@ object JubaQLGateway extends LazyLogging {

parser.parse(args, CommandlineOptions())
}

private def getPropertyOrExitIfEmpty(name: String): String = {
val prop = scala.util.Properties.propOrElse(name, "")
if (prop.trim.isEmpty) {
System.err.println(s"No ${name} property")
System.exit(1)
}
prop
}

private def getCheckpointDir(runMode: RunMode): String = {
val dir = scala.util.Properties.propOrElse("jubaql.checkpointdir", "")
if (dir.trim.isEmpty) {
runMode match {
case RunMode.Production(_, _, _, _) =>
"hdfs:///tmp/spark"
case RunMode.Development(_) =>
"file:///tmp/spark"
}
} else {
dir
}
}
}

case class CommandlineOptions(ip: String = "", port: Int = JubaQLGateway.defaultPort)
Binary file modified gateway/src/test/resources/processor-logfile.jar
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ trait GatewayServer extends BeforeAndAfterAll {

protected val plan = new GatewayPlan("example.com", 1234,
Array(), RunMode.Test,
"", "src/test/resources/processor-logfile.jar")
sparkDistribution = "",
fatjar = "src/test/resources/processor-logfile.jar",
checkpointDir = "file:///tmp/spark")
protected val server = unfiltered.netty.Server.http(9877).plan(plan)

override protected def beforeAll() = {
Expand Down
22 changes: 22 additions & 0 deletions increase-version.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#!/bin/bash

if [ $# -ne 1 ]; then
echo "Usage: increase-version.sh toversion"
exit 1
fi

OLDVERSION=$(grep "version := " processor/build.sbt | sed 's/[^"]*"\([^"]*\).*/\1/')
NEWVERSION=$1

echo "Bumping version from $OLDVERSION to $NEWVERSION ..."

sed -i "s/$OLDVERSION/$NEWVERSION/g" */build.sbt

sed -i "s/$OLDVERSION/$NEWVERSION/g" README.md

echo "Checking for old occurences of $OLDVERSION ..."

grep -F -R "$OLDVERSION" */src
grep -F --directories=skip "$OLDVERSION" */*
grep -F --directories=skip "$OLDVERSION" *

Loading

0 comments on commit fe4df9e

Please sign in to comment.