From dd557d504ee9b34f9c991ce1b3f0fe7099239469 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Y=C3=BCce=20Tekol?= Date: Mon, 29 Jul 2024 18:51:45 +0300 Subject: [PATCH] Added Feast streaming features tutorial (#1201) --- .../pages/streaming-features-with-feast.adoc | 363 ++++++++++++++++++ 1 file changed, 363 insertions(+) create mode 100644 docs/modules/integrate/pages/streaming-features-with-feast.adoc diff --git a/docs/modules/integrate/pages/streaming-features-with-feast.adoc b/docs/modules/integrate/pages/streaming-features-with-feast.adoc new file mode 100644 index 000000000..74d5119db --- /dev/null +++ b/docs/modules/integrate/pages/streaming-features-with-feast.adoc @@ -0,0 +1,363 @@ += Feast Streaming Features +:description: This tutorial will get you started with streaming features using the Hazelcast integration with Feast. + +{description} + +== What You'll Learn + +You will setup an offline feature store with PostgreSQL and an online feature store with Hazelcast. +Then update the online feature store using a Jet job in real time from transactions streaming in a Kafka topic. + +== Before you Begin + +You will need the following ready before starting the tutorial: + +* Hazelcast CLC - link:https://docs.hazelcast.com/clc/latest/install-clc[Installation instructions] +* A recent version of Docker and Docker Compose + +To set up your project, complete the following steps: + +. Create the sample project using the following command: ++ +[source,shell] +---- +clc project create -t feast-streaming-demo +---- + +. Switch to the project directory: ++ +[source,shell] +---- +cd feast-streaming-demo +---- + +. In the project directory, start the containers: ++ +[source,shell] +---- +docker-compose up +---- + +. Login to the demo container: ++ +[source,shell] +---- +docker-compose exec demo /bin/bash +---- + +To set up Feast, complete the following steps: + +. The Feast project is in the `feature_repo` directory. +You can take a look at the Feast configuration using the following command: ++ +[source,shell] +---- +cat feature_repo/feature_store.yaml +---- ++ +[source,yaml] +---- +project: feast_streaming + +registry: /home/sam/feast/data/registry.db + +provider: local + +online_store: + type: hazelcast + cluster_name: dev + cluster_members: ["hazelcast:5701"] + +offline_store: + type: file + +entity_key_serialization_version: 2 +---- + +. The feature views are defined in the `features.py`. +Run the following command to see its contents: ++ +[source,shell] +---- +cat feature_repo/features.py +---- ++ +[source,python] +---- +from datetime import timedelta +from feast import FeatureView, Entity, ValueType, Field +from feast.data_source import PushSource +from feast.infra.offline_stores.contrib.postgres_offline_store.postgres_source import PostgreSQLSource +from feast.types import Int32 + +user_entity = Entity( + name="user_id", + description="A user that has executed a transaction or received a transaction", + value_type=ValueType.STRING +) + +user_transaction_count_7d_source = PushSource( + name="user_transaction_count_7d", + batch_source=PostgreSQLSource( + table="user_transaction_count_7d", + timestamp_field="feature_timestamp"), +) + +user_transaction_count_7d_stream_fv = FeatureView( + schema=[ + Field(name="transaction_count_7d", dtype=Int32), + ], + name="user_transaction_count_7d", + entities=[user_entity], + ttl=timedelta(weeks=1), + online=True, + source=user_transaction_count_7d_source, +) +---- + +. Before you can use features, you must run the following command: ++ +[source,shell] +---- +feast -c feature_repo apply +---- ++ +Outputs: ++ +[source,output] +---- +Deploying infrastructure for user_transaction_count_7d +---- + +. At this point, you are ready to start the feature server. +Note that the command below prevents the `feast` process from outputting to the terminal, since you will use the same terminal for running other commands. ++ +[source,shell] +---- +feast -c feature_repo serve -h 0.0.0.0 -p 6566 --no-access-log 2> /dev/null & +---- ++ + +== Feature Transformation Using Jet + +You will create a Jet job that reads transactions from a Kakfa topic, computes the number of transactions per user in the last seven-days' buckets and populates the Hazelcast online store. + +. Before creating the Jet job, you may want to see how the jobs are created by running the following command: ++ +[source,shell] +---- +cat jet/streaming_features/src/main/java/com/example/Main.java +---- ++ +[source,java] +---- +package com.example; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.hazelcast.core.Hazelcast; +import com.hazelcast.core.HazelcastInstance; +import com.hazelcast.jet.aggregate.AggregateOperations; +import com.hazelcast.jet.kafka.KafkaSources; +import com.hazelcast.jet.pipeline.Pipeline; +import com.hazelcast.jet.pipeline.StreamSource; +import com.hazelcast.jet.pipeline.WindowDefinition; +import com.hazelcast.map.impl.MapEntrySimple; +import org.apache.kafka.common.serialization.StringDeserializer; + +import java.util.Date; +import java.util.Map; +import java.util.Properties; + +import static com.hazelcast.jet.aggregate.AggregateOperations.counting; +import static com.hazelcast.jet.pipeline.WindowDefinition.sliding; +import static java.util.concurrent.TimeUnit.DAYS; +import static java.util.concurrent.TimeUnit.SECONDS; + +public class Main { + private final static long MONITORING_INTERVAL_7_DAYS = DAYS.toMillis(7); + private final static long REPORTING_INTERVAL = SECONDS.toMillis(1); + + public static Pipeline createPipeline(String feastBaseUrl, String kafkaBaseUrl) { + var mapper = new ObjectMapper(); + Properties props = new Properties(); + props.setProperty("bootstrap.servers", kafkaBaseUrl); + props.setProperty("key.deserializer", StringDeserializer.class.getCanonicalName()); + props.setProperty("value.deserializer", StringDeserializer.class.getCanonicalName()); + props.setProperty("auto.offset.reset", "earliest"); + + StreamSource> kafkaSource = KafkaSources.kafka(props, "transaction"); + Pipeline pipeline = Pipeline.create(); + pipeline + .readFrom(kafkaSource) + .withNativeTimestamps(0) + .map(item -> mapper.readValue(item.getValue(), Transaction.class)) + .groupingKey(Transaction::getAcct_num) + .window(sliding(MONITORING_INTERVAL_7_DAYS, REPORTING_INTERVAL)) + .aggregate(counting()) + .map(item -> { + var userId = item.getKey(); + // set the current datetime + var timestamp = new Date(); + var utc = new UserTransactionCount7d(userId, item.getValue(), timestamp); + return (Map.Entry) new MapEntrySimple(userId, utc); + }) + .map(item -> mapper.writeValueAsString(item.getValue())) + .writeTo(FeastSinks.push(feastBaseUrl, "user_transaction_count_7d")); + return pipeline; + } + + public static void main(String[] args) { + var feastBaseUrl = "http://localhost:6566"; + var kafkaBaseUrl = "localhost:9092"; + if (args.length >= 1) { + feastBaseUrl = args[0]; + } + if (args.length >= 2) { + kafkaBaseUrl = args[1]; + } + Pipeline pipeline = createPipeline(feastBaseUrl, kafkaBaseUrl); + HazelcastInstance hz = Hazelcast.bootstrappedInstance(); + hz.getJet().newJob(pipeline); + } +} +---- + +. You must compile the Java code that creates the Jet job. +We provided an easy-to-use script to do that from inside the demo container: ++ +[source,shell] +---- +run build_jet streaming_features +---- + +. You can now create the Jet job and run it. +The Jet job requires the addresses of the feature server and the Kafka instance: ++ +[source,shell] +---- +clc job submit --name transform_features build/jet/streaming_features/libs/*.jar http://demo:6566 kafka:19092 +---- + +. You can list the running jobs and verify that the jobs completed successfully using the following command: ++ +[source,shell] +---- +clc job list +---- ++ +Outputs: ++ +[source,output] +---- +------------------------------------------------------------------------------------------------ + Job ID | Name | Status | Submitted | Completed +------------------------------------------------------------------------------------------------ + 0c13-9428-92c4-0001 | transform_features | RUNNING | 2024-07-29 07:18:53 | - +---- + +. Running the Jet job created an IMap that corresponds to the "user_transaction_count_7d" feature in the Hazelcast cluster. +You can list it using the following command: ++ +[source,shell] +---- +clc object list map +---- ++ +Outputs: ++ +[source,output] +---- +------------------------------------------- + Object Name +------------------------------------------- + feast_streaming_user_transaction_count_7d +------------------------------------------- + OK Returned 1 row(s). +---- + +. Check the contents of the feature IMap to check the data written by Feast: ++ +[source,shell] +---- +clc map -n feast_streaming_user_transaction_count_7d entry-set | head -10 +---- + +. You can retrieve features from the feature server in a human-readable format: ++ +[source,shell] +---- +curl -X POST \ + "http://localhost:6566/get-online-features" \ + -d '{ + "features": [ + "user_transaction_count_7d:transaction_count_7d" + ], + "entities": { + "user_id": ["EBJD80665876768751", "YVCV56500100273531", "QRQP56813768247223"] + } + }' | jq +---- ++ +Outputs something similar to: +[source,output] +---- +{ + "metadata": { + "feature_names": [ + "user_id", + "transaction_count_7d" + ] + }, + "results": [ + { + "values": [ + "EBJD80665876768751", + "YVCV56500100273531", + "QRQP56813768247223" + ], + "statuses": [ + "PRESENT", + "PRESENT", + "PRESENT" + ], + "event_timestamps": [ + "1970-01-01T00:00:00Z", + "1970-01-01T00:00:00Z", + "1970-01-01T00:00:00Z" + ] + }, + { + "values": [ + 6, + 11, + 11 + ], + "statuses": [ + "PRESENT", + "PRESENT", + "PRESENT" + ], + "event_timestamps": [ + "2024-07-29T07:24:00Z", + "2024-07-29T07:24:00Z", + "2024-07-29T07:24:00Z" + ] + } + ] +} +---- +== Summary + +In this tutorial, you learned how to set up a feature engineering project that uses Hazelcast as the online store. +You also learned how to write a Jet job that transforms data and sends it to a Feast feature server. + +== See Also + +There is more to feature engineering with Hazelcast. + +Check out our documentation about Feast: + +* xref:integrate:integrate-with-feast.adoc[] +* xref:integrate:feast-config.adoc[] + +If you have any questions, suggestions, or feedback please do not hesitate to reach out to us through https://slack.hazelcast.com/[Hazelcast Community Slack].