-
Notifications
You must be signed in to change notification settings - Fork 317
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
7b5accf
commit cd14758
Showing
5 changed files
with
358 additions
and
8 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
c1,c3,c4,c5,c6,c7,c8 | ||
aaa,11,22,1.2,11.3,1636097290000,2021-07-20 | ||
aaa,12,22,2.2,12.3,1636097390000,2021-08-19 | ||
aa,13,22,3.2,13.3,1636097490000,2021-05-20 | ||
bb,14,22,4.2,14.3,1636097590000,2021-09-23 | ||
bb,15,22,5.2,15.3,1636097690000,2021-03-21 | ||
bb,16,22,6.2,16.3,1636097790000,2021-05-20 | ||
cc,17,22,7.2,17.3,1636097890000,2021-05-26 | ||
dd,18,22,8.2,18.3,1636097990000,2021-06-20 | ||
ee,19,22,9.2,19.3,1636097000000,2021-01-10 | ||
ff,20,22,9.2,19.3,1636098000000,2021-01-10 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,166 @@ | ||
|
||
# 快速上手(集群版) | ||
本教程针对OpenMLDB集群版,会涵盖使用OpemMLDB构建机器学习应用的整个流程,包括离线特征抽取、模型训练,以及在线的数据导入、在线特征抽取、模型预测等步骤。希望通过该教程,读者可以了解通过OpenMLDB,如何快速完成从原始数据处理到模型上线的整个生命周期。 | ||
|
||
|
||
OpenMLDB提供了Java和Python SDKs,该示例中,我们使用Python SDK。 | ||
|
||
为了方便理解,我们会基于Kaggle比赛[Predict Taxi Tour Duration数据集](https://github.com/4paradigm/OpenMLDB/tree/main/demo/predict-taxi-trip-duration-nb/demo/data),来介绍整个流程。数据集和相关代码可以在[这里](https://github.com/4paradigm/OpenMLDB/tree/main/demo/predict-taxi-trip-duration-nb/demo)查看,并且可以根据步骤尝试运行。 | ||
|
||
## 离线 | ||
### 特征抽取 | ||
对于特征抽取,用户首先需要通过对数据集的了解,构建特征抽取的SQL脚本,例如,在Taxi Tour Duration数据集中,我们可以构建如下特征抽取的[SQL](https://github.com/4paradigm/OpenMLDB/blob/main/demo/predict-taxi-trip-duration-nb/demo/fe.sql): | ||
```sql | ||
select trip_duration, passenger_count, | ||
sum(pickup_latitude) over w as vendor_sum_pl, | ||
max(pickup_latitude) over w as vendor_max_pl, | ||
min(pickup_latitude) over w as vendor_min_pl, | ||
avg(pickup_latitude) over w as vendor_avg_pl, | ||
sum(pickup_latitude) over w2 as pc_sum_pl, | ||
max(pickup_latitude) over w2 as pc_max_pl, | ||
min(pickup_latitude) over w2 as pc_min_pl, | ||
avg(pickup_latitude) over w2 as pc_avg_pl , | ||
count(vendor_id) over w2 as pc_cnt, | ||
count(vendor_id) over w as vendor_cnt | ||
from t1 | ||
window w as (partition by vendor_id order by pickup_datetime ROWS_RANGE BETWEEN 1d PRECEDING AND CURRENT ROW), | ||
w2 as (partition by passenger_count order by pickup_datetime ROWS_RANGE BETWEEN 1d PRECEDING AND CURRENT ROW); | ||
``` | ||
|
||
|
||
通过执行特征抽取的SQL,可以从原数据里提取出我们需要的特征数据集,用来进行训练。 | ||
|
||
对于离线特征抽取的输入数据,可以直接存放在本地文件或者HDFS上面,然后通过Spark执行特征抽取SQL。 | ||
|
||
|
||
Python示例代码如下(完整代码参考[这里](https://github.com/4paradigm/OpenMLDB/blob/main/demo/predict-taxi-trip-duration-nb/demo/openmldb_batch.py)): | ||
|
||
```python | ||
from pyspark.sql import SparkSession | ||
import numpy as np | ||
import pandas as pd | ||
|
||
|
||
def run_batch_sql(sql): | ||
spark = SparkSession.builder.appName("OpenMLDB Demo").getOrCreate() | ||
parquet_predict = "./data/taxi_tour_table_predict_simple.snappy.parquet" | ||
parquet_train = "./data/taxi_tour_table_train_simple.snappy.parquet" | ||
train = spark.read.parquet(parquet_train) | ||
train.createOrReplaceTempView("t1") | ||
train_df = spark.sql(sql) | ||
train_set = train_df.toPandas() | ||
predict = spark.read.parquet(parquet_predict) | ||
predict.createOrReplaceTempView("t1") | ||
predict_df = spark.sql(sql) | ||
predict_set = predict_df.toPandas() | ||
return train_set, predict_set | ||
``` | ||
|
||
|
||
***注意***:需要使用[OpenMLDB Spark Distribution](https://github.com/4paradigm/OpenMLDB/blob/main/docs/en/compile.md#optimized-spark-distribution-for-openmldb-optional) | ||
|
||
|
||
### 模型训练 | ||
通过特征抽取,获得训练和预测数据集,就可以使用标准的模型训练方式,来进行模型训练。 | ||
|
||
|
||
下面示例使用Gradient Boosting Machine (GBM) 进行模型训练,并将训练好的模型保存在`model_path`路径下(完整代码参考[这里](https://github.com/4paradigm/OpenMLDB/blob/main/demo/predict-taxi-trip-duration-nb/demo/train.py)): | ||
```python | ||
import lightgbm as lgb | ||
import openmldb_batch | ||
|
||
|
||
train_set, predict_set = openmldb_batch.run_batch_sql(sql) | ||
y_train = train_set['trip_duration'] | ||
x_train = train_set.drop(columns=['trip_duration']) | ||
y_predict = predict_set['trip_duration'] | ||
x_predict = predict_set.drop(columns=['trip_duration']) | ||
lgb_train = lgb.Dataset(x_train, y_train) | ||
lgb_eval = lgb.Dataset(x_predict, y_predict, reference=lgb_train) | ||
|
||
gbm = lgb.train(params, | ||
lgb_train, | ||
num_boost_round=20, | ||
valid_sets=lgb_eval, | ||
early_stopping_rounds=5) | ||
gbm.save_model(model_path) | ||
``` | ||
|
||
## 在线 | ||
在线服务主要需要两个输入: | ||
- 离线过程中训练好的模型 | ||
- 在线数据集 | ||
|
||
对于在线模型预测,一般我们需要历史数据,来完成特征抽取,主要是因为特征抽取SQL一般是基于一定的时间窗口,需要一段时间的数据做特征统计。 | ||
我们称在线预测需要依赖的这部分历史数据为在线数据集,这部分数据集相对离线训练的数据集会较小,主要为近期的一些历史数据。 | ||
|
||
### 在线数据导入 | ||
在线数据集可以通过类似数据库的导入方式,导入OpenMLDB。 | ||
|
||
下面示例展示如何从csv文件导入数据到OpenMLDB,和标准数据库导入数据类似(完整代码参考[这里](https://github.com/4paradigm/OpenMLDB/blob/main/demo/predict-taxi-trip-duration-nb/demo/import.py)) | ||
|
||
```python | ||
import sqlalchemy as db | ||
|
||
|
||
ddl=""" | ||
create table t1( | ||
id string, | ||
vendor_id int, | ||
pickup_datetime timestamp, | ||
dropoff_datetime timestamp, | ||
passenger_count int, | ||
pickup_longitude double, | ||
pickup_latitude double, | ||
dropoff_longitude double, | ||
dropoff_latitude double, | ||
store_and_fwd_flag string, | ||
trip_duration int, | ||
index(key=vendor_id, ts=pickup_datetime), | ||
index(key=passenger_count, ts=pickup_datetime) | ||
); | ||
""" | ||
|
||
engine = db.create_engine('openmldb:///db_test?zk=127.0.0.1:2181&zkPath=/openmldb') | ||
connection = engine.connect() | ||
# 创建数据库 | ||
connection.execute("create database db_test;") | ||
# 建表 | ||
connection.execute(ddl) | ||
|
||
# 从csv读出数据并插入到表里 | ||
with open('data/taxi_tour_table_train_simple.csv', 'r') as fd: | ||
for line in fd: | ||
row = line.split(',') | ||
insert = "insert into t1 values('%s', %s, %s, %s, %s, %s, %s, %s, %s, '%s', %s);"% tuple(row) | ||
connection.execute(insert) | ||
``` | ||
|
||
### 在线特征抽取 | ||
对于在线特征抽取,需要根据输入的数据,以及在线数据集,通过和离线特征相同的特征抽取[SQL](https://github.com/4paradigm/OpenMLDB/blob/main/demo/predict-taxi-trip-duration-nb/demo/fe.sql),获得特征。 | ||
|
||
|
||
示例代码如下: | ||
```python | ||
import sqlalchemy as db | ||
|
||
|
||
engine = db.create_engine('openmldb:///db_test?zk=127.0.0.1:2181&zkPath=/openmldb') | ||
connection = engine.connect() | ||
features = connection.execute(sql, request_data) | ||
``` | ||
|
||
### 模型预测 | ||
根据在线特征抽取结果,以及离线训练好的模型,可以获得预测值,就完成了在线预估的过程。 | ||
|
||
|
||
示例代码如下: | ||
```python | ||
import lightgbm as lgb | ||
|
||
|
||
bst = lgb.Booster(model_path) | ||
duration = bst.predict(feature) | ||
``` | ||
|
||
***注***:实际中,会启动预估服务,通过线上服务的方式,接收在线输入的数据,进行模型预测,然后再把预测结果返回给用户。这里省略了服务上线的过程,不过在[demo](https://github.com/4paradigm/OpenMLDB/blob/main/demo/predict-taxi-trip-duration-nb/demo)的上手示例中,包含了模型上线的过程。 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,173 @@ | ||
|
||
# QuickStart (Cluster Version) | ||
|
||
This tutorial is targeted at the cluster version of OpenMLDB and it will cover the whole lifecycle of how to build a machine learning application with the help of OpenMLDB, | ||
including feature extraction, model training; online data import, online feature extraction, model prediction, etc. | ||
From this tutorial, readers can understand how to use OpenMLDB to complete the machine learning lifecycle from raw data to model deployment. | ||
|
||
|
||
OpenMLDB provides both Java and Python SDKs. In this tutorial, we will Python SDK. | ||
|
||
In order to better understand the workflow, we use Kaggle Competition [Predict Taxi Tour Duration Dataset](https://github.com/4paradigm/OpenMLDB/tree/main/demo/predict-taxi-trip-duration-nb/demo/data) | ||
to demostrate the whole process. Dataset and source code can be found | ||
[here](https://github.com/4paradigm/OpenMLDB/tree/main/demo/predict-taxi-trip-duration-nb/demo). | ||
|
||
## Offline | ||
### Feature Extraction | ||
In order to do feature extraction, users have to know the data and construct a SQL script. | ||
|
||
For example, for the Taxi Tour Duration Dataset, we can construct the following [SQL](https://github.com/4paradigm/OpenMLDB/blob/main/demo/predict-taxi-trip-duration-nb/demo/fe.sql) for feature extraction: | ||
```sql | ||
select trip_duration, passenger_count, | ||
sum(pickup_latitude) over w as vendor_sum_pl, | ||
max(pickup_latitude) over w as vendor_max_pl, | ||
min(pickup_latitude) over w as vendor_min_pl, | ||
avg(pickup_latitude) over w as vendor_avg_pl, | ||
sum(pickup_latitude) over w2 as pc_sum_pl, | ||
max(pickup_latitude) over w2 as pc_max_pl, | ||
min(pickup_latitude) over w2 as pc_min_pl, | ||
avg(pickup_latitude) over w2 as pc_avg_pl , | ||
count(vendor_id) over w2 as pc_cnt, | ||
count(vendor_id) over w as vendor_cnt | ||
from t1 | ||
window w as (partition by vendor_id order by pickup_datetime ROWS_RANGE BETWEEN 1d PRECEDING AND CURRENT ROW), | ||
w2 as (partition by passenger_count order by pickup_datetime ROWS_RANGE BETWEEN 1d PRECEDING AND CURRENT ROW); | ||
``` | ||
|
||
After executing the feature extraction SQL, we can extract the features from the raw data, which will be used for model training. | ||
|
||
The SQL can executed in Spark to extract features from the raw dataset, which are stored directly in local filesystem or HDFS. | ||
|
||
Sample Python code is shown as follows(complete code can be found [here](https://github.com/4paradigm/OpenMLDB/blob/main/demo/predict-taxi-trip-duration-nb/demo/openmldb_batch.py)): | ||
|
||
```python | ||
from pyspark.sql import SparkSession | ||
import numpy as np | ||
import pandas as pd | ||
|
||
|
||
def run_batch_sql(sql): | ||
spark = SparkSession.builder.appName("OpenMLDB Demo").getOrCreate() | ||
parquet_predict = "./data/taxi_tour_table_predict_simple.snappy.parquet" | ||
parquet_train = "./data/taxi_tour_table_train_simple.snappy.parquet" | ||
train = spark.read.parquet(parquet_train) | ||
train.createOrReplaceTempView("t1") | ||
train_df = spark.sql(sql) | ||
train_set = train_df.toPandas() | ||
predict = spark.read.parquet(parquet_predict) | ||
predict.createOrReplaceTempView("t1") | ||
predict_df = spark.sql(sql) | ||
predict_set = predict_df.toPandas() | ||
return train_set, predict_set | ||
``` | ||
|
||
|
||
***NOTE***: [OpenMLDB Spark Distribution](https://github.com/4paradigm/OpenMLDB/blob/main/docs/en/compile.md#optimized-spark-distribution-for-openmldb-optional) is used | ||
|
||
|
||
### Model Training | ||
After we get the train and predict datasets from feature extraction, we can use the standard methods to train models. | ||
|
||
|
||
For example, we can use Gradient Boosting Machine (GBM) to train the model and produce a model file(complete source code can be found [here](https://github.com/4paradigm/OpenMLDB/blob/main/demo/predict-taxi-trip-duration-nb/demo/train.py)): | ||
```python | ||
import lightgbm as lgb | ||
import openmldb_batch | ||
|
||
|
||
train_set, predict_set = openmldb_batch.run_batch_sql(sql) | ||
y_train = train_set['trip_duration'] | ||
x_train = train_set.drop(columns=['trip_duration']) | ||
y_predict = predict_set['trip_duration'] | ||
x_predict = predict_set.drop(columns=['trip_duration']) | ||
lgb_train = lgb.Dataset(x_train, y_train) | ||
lgb_eval = lgb.Dataset(x_predict, y_predict, reference=lgb_train) | ||
|
||
gbm = lgb.train(params, | ||
lgb_train, | ||
num_boost_round=20, | ||
valid_sets=lgb_eval, | ||
early_stopping_rounds=5) | ||
gbm.save_model(model_path) | ||
``` | ||
|
||
## Online | ||
Online service requires two inputs: | ||
- the model trained from offline process | ||
- online dataset | ||
|
||
The feature extraction SQL is generally based on time windows. Thus in the online model prediction, we usually need history data to extract features from time ranges of data. | ||
The history data is called online dataset. Online dataset is generally restricted to the recent time range, which is small compared to the offline dataset. | ||
|
||
### Online dataset import | ||
The online dataset can be imported to OpenMLDB in a similar way to the traditional database. | ||
|
||
The following code shows how to import a csv file to OpenMLDB (Comlete code can be found [here](https://github.com/4paradigm/OpenMLDB/blob/main/demo/predict-taxi-trip-duration-nb/demo/import.py)) | ||
|
||
```python | ||
import sqlalchemy as db | ||
|
||
|
||
ddl=""" | ||
create table t1( | ||
id string, | ||
vendor_id int, | ||
pickup_datetime timestamp, | ||
dropoff_datetime timestamp, | ||
passenger_count int, | ||
pickup_longitude double, | ||
pickup_latitude double, | ||
dropoff_longitude double, | ||
dropoff_latitude double, | ||
store_and_fwd_flag string, | ||
trip_duration int, | ||
index(key=vendor_id, ts=pickup_datetime), | ||
index(key=passenger_count, ts=pickup_datetime) | ||
); | ||
""" | ||
|
||
engine = db.create_engine('openmldb:///db_test?zk=127.0.0.1:2181&zkPath=/openmldb') | ||
connection = engine.connect() | ||
# create database | ||
connection.execute("create database db_test;") | ||
# create table | ||
connection.execute(ddl) | ||
|
||
# read data from csv file and insert into table | ||
with open('data/taxi_tour_table_train_simple.csv', 'r') as fd: | ||
for line in fd: | ||
row = line.split(',') | ||
insert = "insert into t1 values('%s', %s, %s, %s, %s, %s, %s, %s, %s, '%s', %s);"% tuple(row) | ||
connection.execute(insert) | ||
``` | ||
|
||
### Online feature extraction | ||
Online feature extraction requires both input data and online dataset. The feature extraction SQL is the same as the offline | ||
[SQL](https://github.com/4paradigm/OpenMLDB/blob/main/demo/predict-taxi-trip-duration-nb/demo/fe.sql). | ||
|
||
|
||
Sample code is as follows: | ||
```python | ||
import sqlalchemy as db | ||
|
||
|
||
engine = db.create_engine('openmldb:///db_test?zk=127.0.0.1:2181&zkPath=/openmldb') | ||
connection = engine.connect() | ||
features = connection.execute(sql, request_data) | ||
``` | ||
|
||
### Model Prediction | ||
Base on the online feature extracted and the model trained from the offline process, we can get the prediction result. | ||
|
||
Sample code is as follows: | ||
```python | ||
import lightgbm as lgb | ||
|
||
|
||
bst = lgb.Booster(model_path) | ||
duration = bst.predict(feature) | ||
``` | ||
|
||
***NOTE***: Generally, in the real deployment,we launch a prediction service, | ||
who will accept request and do the prediction, and then return the results back to the users. | ||
We skip the online deployment step here, but it is included in the [demo tour](https://github.com/4paradigm/OpenMLDB/blob/main/demo/predict-taxi-trip-duration-nb/demo). |
Oops, something went wrong.