-
Notifications
You must be signed in to change notification settings - Fork 98
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added Feast streaming features tutorial (#1201)
- Loading branch information
Showing
1 changed file
with
363 additions
and
0 deletions.
There are no files selected for viewing
363 changes: 363 additions & 0 deletions
363
docs/modules/integrate/pages/streaming-features-with-feast.adoc
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,363 @@ | ||
= Feast Streaming Features | ||
:description: This tutorial will get you started with streaming features using the Hazelcast integration with Feast. | ||
|
||
{description} | ||
|
||
== What You'll Learn | ||
|
||
You will setup an offline feature store with PostgreSQL and an online feature store with Hazelcast. | ||
Then update the online feature store using a Jet job in real time from transactions streaming in a Kafka topic. | ||
|
||
== Before you Begin | ||
|
||
You will need the following ready before starting the tutorial: | ||
|
||
* Hazelcast CLC - link:https://docs.hazelcast.com/clc/latest/install-clc[Installation instructions] | ||
* A recent version of Docker and Docker Compose | ||
|
||
To set up your project, complete the following steps: | ||
|
||
. Create the sample project using the following command: | ||
+ | ||
[source,shell] | ||
---- | ||
clc project create -t feast-streaming-demo | ||
---- | ||
|
||
. Switch to the project directory: | ||
+ | ||
[source,shell] | ||
---- | ||
cd feast-streaming-demo | ||
---- | ||
|
||
. In the project directory, start the containers: | ||
+ | ||
[source,shell] | ||
---- | ||
docker-compose up | ||
---- | ||
|
||
. Login to the demo container: | ||
+ | ||
[source,shell] | ||
---- | ||
docker-compose exec demo /bin/bash | ||
---- | ||
|
||
To set up Feast, complete the following steps: | ||
|
||
. The Feast project is in the `feature_repo` directory. | ||
You can take a look at the Feast configuration using the following command: | ||
+ | ||
[source,shell] | ||
---- | ||
cat feature_repo/feature_store.yaml | ||
---- | ||
+ | ||
[source,yaml] | ||
---- | ||
project: feast_streaming | ||
registry: /home/sam/feast/data/registry.db | ||
provider: local | ||
online_store: | ||
type: hazelcast | ||
cluster_name: dev | ||
cluster_members: ["hazelcast:5701"] | ||
offline_store: | ||
type: file | ||
entity_key_serialization_version: 2 | ||
---- | ||
|
||
. The feature views are defined in the `features.py`. | ||
Run the following command to see its contents: | ||
+ | ||
[source,shell] | ||
---- | ||
cat feature_repo/features.py | ||
---- | ||
+ | ||
[source,python] | ||
---- | ||
from datetime import timedelta | ||
from feast import FeatureView, Entity, ValueType, Field | ||
from feast.data_source import PushSource | ||
from feast.infra.offline_stores.contrib.postgres_offline_store.postgres_source import PostgreSQLSource | ||
from feast.types import Int32 | ||
user_entity = Entity( | ||
name="user_id", | ||
description="A user that has executed a transaction or received a transaction", | ||
value_type=ValueType.STRING | ||
) | ||
user_transaction_count_7d_source = PushSource( | ||
name="user_transaction_count_7d", | ||
batch_source=PostgreSQLSource( | ||
table="user_transaction_count_7d", | ||
timestamp_field="feature_timestamp"), | ||
) | ||
user_transaction_count_7d_stream_fv = FeatureView( | ||
schema=[ | ||
Field(name="transaction_count_7d", dtype=Int32), | ||
], | ||
name="user_transaction_count_7d", | ||
entities=[user_entity], | ||
ttl=timedelta(weeks=1), | ||
online=True, | ||
source=user_transaction_count_7d_source, | ||
) | ||
---- | ||
|
||
. Before you can use features, you must run the following command: | ||
+ | ||
[source,shell] | ||
---- | ||
feast -c feature_repo apply | ||
---- | ||
+ | ||
Outputs: | ||
+ | ||
[source,output] | ||
---- | ||
Deploying infrastructure for user_transaction_count_7d | ||
---- | ||
|
||
. At this point, you are ready to start the feature server. | ||
Note that the command below prevents the `feast` process from outputting to the terminal, since you will use the same terminal for running other commands. | ||
+ | ||
[source,shell] | ||
---- | ||
feast -c feature_repo serve -h 0.0.0.0 -p 6566 --no-access-log 2> /dev/null & | ||
---- | ||
+ | ||
|
||
== Feature Transformation Using Jet | ||
|
||
You will create a Jet job that reads transactions from a Kakfa topic, computes the number of transactions per user in the last seven-days' buckets and populates the Hazelcast online store. | ||
|
||
. Before creating the Jet job, you may want to see how the jobs are created by running the following command: | ||
+ | ||
[source,shell] | ||
---- | ||
cat jet/streaming_features/src/main/java/com/example/Main.java | ||
---- | ||
+ | ||
[source,java] | ||
---- | ||
package com.example; | ||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
import com.hazelcast.core.Hazelcast; | ||
import com.hazelcast.core.HazelcastInstance; | ||
import com.hazelcast.jet.aggregate.AggregateOperations; | ||
import com.hazelcast.jet.kafka.KafkaSources; | ||
import com.hazelcast.jet.pipeline.Pipeline; | ||
import com.hazelcast.jet.pipeline.StreamSource; | ||
import com.hazelcast.jet.pipeline.WindowDefinition; | ||
import com.hazelcast.map.impl.MapEntrySimple; | ||
import org.apache.kafka.common.serialization.StringDeserializer; | ||
import java.util.Date; | ||
import java.util.Map; | ||
import java.util.Properties; | ||
import static com.hazelcast.jet.aggregate.AggregateOperations.counting; | ||
import static com.hazelcast.jet.pipeline.WindowDefinition.sliding; | ||
import static java.util.concurrent.TimeUnit.DAYS; | ||
import static java.util.concurrent.TimeUnit.SECONDS; | ||
public class Main { | ||
private final static long MONITORING_INTERVAL_7_DAYS = DAYS.toMillis(7); | ||
private final static long REPORTING_INTERVAL = SECONDS.toMillis(1); | ||
public static Pipeline createPipeline(String feastBaseUrl, String kafkaBaseUrl) { | ||
var mapper = new ObjectMapper(); | ||
Properties props = new Properties(); | ||
props.setProperty("bootstrap.servers", kafkaBaseUrl); | ||
props.setProperty("key.deserializer", StringDeserializer.class.getCanonicalName()); | ||
props.setProperty("value.deserializer", StringDeserializer.class.getCanonicalName()); | ||
props.setProperty("auto.offset.reset", "earliest"); | ||
StreamSource<Map.Entry<String, String>> kafkaSource = KafkaSources.kafka(props, "transaction"); | ||
Pipeline pipeline = Pipeline.create(); | ||
pipeline | ||
.readFrom(kafkaSource) | ||
.withNativeTimestamps(0) | ||
.map(item -> mapper.readValue(item.getValue(), Transaction.class)) | ||
.groupingKey(Transaction::getAcct_num) | ||
.window(sliding(MONITORING_INTERVAL_7_DAYS, REPORTING_INTERVAL)) | ||
.aggregate(counting()) | ||
.map(item -> { | ||
var userId = item.getKey(); | ||
// set the current datetime | ||
var timestamp = new Date(); | ||
var utc = new UserTransactionCount7d(userId, item.getValue(), timestamp); | ||
return (Map.Entry<String, UserTransactionCount7d>) new MapEntrySimple(userId, utc); | ||
}) | ||
.map(item -> mapper.writeValueAsString(item.getValue())) | ||
.writeTo(FeastSinks.push(feastBaseUrl, "user_transaction_count_7d")); | ||
return pipeline; | ||
} | ||
public static void main(String[] args) { | ||
var feastBaseUrl = "http://localhost:6566"; | ||
var kafkaBaseUrl = "localhost:9092"; | ||
if (args.length >= 1) { | ||
feastBaseUrl = args[0]; | ||
} | ||
if (args.length >= 2) { | ||
kafkaBaseUrl = args[1]; | ||
} | ||
Pipeline pipeline = createPipeline(feastBaseUrl, kafkaBaseUrl); | ||
HazelcastInstance hz = Hazelcast.bootstrappedInstance(); | ||
hz.getJet().newJob(pipeline); | ||
} | ||
} | ||
---- | ||
|
||
. You must compile the Java code that creates the Jet job. | ||
We provided an easy-to-use script to do that from inside the demo container: | ||
+ | ||
[source,shell] | ||
---- | ||
run build_jet streaming_features | ||
---- | ||
|
||
. You can now create the Jet job and run it. | ||
The Jet job requires the addresses of the feature server and the Kafka instance: | ||
+ | ||
[source,shell] | ||
---- | ||
clc job submit --name transform_features build/jet/streaming_features/libs/*.jar http://demo:6566 kafka:19092 | ||
---- | ||
|
||
. You can list the running jobs and verify that the jobs completed successfully using the following command: | ||
+ | ||
[source,shell] | ||
---- | ||
clc job list | ||
---- | ||
+ | ||
Outputs: | ||
+ | ||
[source,output] | ||
---- | ||
------------------------------------------------------------------------------------------------ | ||
Job ID | Name | Status | Submitted | Completed | ||
------------------------------------------------------------------------------------------------ | ||
0c13-9428-92c4-0001 | transform_features | RUNNING | 2024-07-29 07:18:53 | - | ||
---- | ||
|
||
. Running the Jet job created an IMap that corresponds to the "user_transaction_count_7d" feature in the Hazelcast cluster. | ||
You can list it using the following command: | ||
+ | ||
[source,shell] | ||
---- | ||
clc object list map | ||
---- | ||
+ | ||
Outputs: | ||
+ | ||
[source,output] | ||
---- | ||
------------------------------------------- | ||
Object Name | ||
------------------------------------------- | ||
feast_streaming_user_transaction_count_7d | ||
------------------------------------------- | ||
OK Returned 1 row(s). | ||
---- | ||
. Check the contents of the feature IMap to check the data written by Feast: | ||
+ | ||
[source,shell] | ||
---- | ||
clc map -n feast_streaming_user_transaction_count_7d entry-set | head -10 | ||
---- | ||
. You can retrieve features from the feature server in a human-readable format: | ||
+ | ||
[source,shell] | ||
---- | ||
curl -X POST \ | ||
"http://localhost:6566/get-online-features" \ | ||
-d '{ | ||
"features": [ | ||
"user_transaction_count_7d:transaction_count_7d" | ||
], | ||
"entities": { | ||
"user_id": ["EBJD80665876768751", "YVCV56500100273531", "QRQP56813768247223"] | ||
} | ||
}' | jq | ||
---- | ||
+ | ||
Outputs something similar to: | ||
[source,output] | ||
---- | ||
{ | ||
"metadata": { | ||
"feature_names": [ | ||
"user_id", | ||
"transaction_count_7d" | ||
] | ||
}, | ||
"results": [ | ||
{ | ||
"values": [ | ||
"EBJD80665876768751", | ||
"YVCV56500100273531", | ||
"QRQP56813768247223" | ||
], | ||
"statuses": [ | ||
"PRESENT", | ||
"PRESENT", | ||
"PRESENT" | ||
], | ||
"event_timestamps": [ | ||
"1970-01-01T00:00:00Z", | ||
"1970-01-01T00:00:00Z", | ||
"1970-01-01T00:00:00Z" | ||
] | ||
}, | ||
{ | ||
"values": [ | ||
6, | ||
11, | ||
11 | ||
], | ||
"statuses": [ | ||
"PRESENT", | ||
"PRESENT", | ||
"PRESENT" | ||
], | ||
"event_timestamps": [ | ||
"2024-07-29T07:24:00Z", | ||
"2024-07-29T07:24:00Z", | ||
"2024-07-29T07:24:00Z" | ||
] | ||
} | ||
] | ||
} | ||
---- | ||
== Summary | ||
In this tutorial, you learned how to set up a feature engineering project that uses Hazelcast as the online store. | ||
You also learned how to write a Jet job that transforms data and sends it to a Feast feature server. | ||
== See Also | ||
There is more to feature engineering with Hazelcast. | ||
Check out our documentation about Feast: | ||
* xref:integrate:integrate-with-feast.adoc[] | ||
* xref:integrate:feast-config.adoc[] | ||
If you have any questions, suggestions, or feedback please do not hesitate to reach out to us through https://slack.hazelcast.com/[Hazelcast Community Slack]. |