Skip to content

Commit

Permalink
Merge branch 'risingwavelabs:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
agiron123 authored Apr 25, 2024
2 parents 3e25370 + 5618388 commit 6d58915
Show file tree
Hide file tree
Showing 10 changed files with 111 additions and 80 deletions.
9 changes: 8 additions & 1 deletion .wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -193,4 +193,11 @@ backfill
backfills
MVs
Nats
Psycopg
Psycopg
Datadog
Hasura
Liquibase
EMQX
HiveMQ
MQTT
RabbitMQ
74 changes: 74 additions & 0 deletions docs/guides/sink-to-mqtt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
---
id: sink-to-mqtt
title: Sink data from RisingWave to MQTT
description: Sink data from RisingWave to MQTT.
slug: /sink-to-mqtt
---
This guide describes how to sink data from RisingWave to the MQTT topic using the MQTT sink connector in RisingWave.

The [Message Queuing Telemetry Transport](https://mqtt.org/) (MQTT) protocol is ideal for connecting remote devices with a small code footprint and minimal network bandwidth. MQTT today is used in a wide variety of industries, such as automotive, manufacturing, telecommunications, oil and gas, etc.

## Prerequisites

Before sinking data from RisingWave to an MQTT topic, please ensure the following:

- The RisingWave cluster is running.
- An MQTT broker is running and accessible from your RisingWave cluster.
- Create an MQTT topic that you want to sink data to.
- You have permission to publish data to the MQTT topic.

For example, we have an `iot_sensor_data` table in RisingWave that stores data from various IoT devices at a given timestamp, including temperature and humidity readings, along with a status field indicating whether the device is in a normal or abnormal state. For more information to learn about MQTT and get started with it, refer to the [MQTT guide](https://mqtt.org/getting-started/).
### Syntax
To sink data from RisingWave to an MQTT topic, create a sink using the syntax below:

```sql
CREATE SINK [ IF NOT EXISTS ] sink_name
[FROM sink_from | AS select_query]
WITH (
connector='mqtt',
url = '<your MQTT server>:<port>',
topic = '<topic>',
qos = '<qos_level>',
type = '<append-only>'
username = '<your user name>',
password = '<your password>')
FORMAT PLAIN ENCODE data_encode -- Format options: plain (encode BYTES and JSON) (
force_append_only='true',
);
```
This query sets up an MQTT sink `mqtt_sink` to forward data from `iot_sensor_data` to an MQTT server. It configures the MQTT connector, server URL, target topic, data type, message retention, quality of service, and JSON encoding.

```sql
CREATE SINK mqtt_sink
FROM iot_sensor_data
WITH
(
connector='mqtt',
url='tcp://mqtt-server',
topic= 'sink_iot_data',
type = 'append-only',
retain = 'true',
qos = 'at_least_once',
) FORMAT PLAIN ENCODE JSON (
force_append_only='true',
);
```
After the sink is created, you will continuously consume the data in the MQTT topic from RisingWave in append-only mode.

### Parameters

| Field | Notes |
|--------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `url` | Required. The URL of the broker to connect to, e.g., `tcp://localhost`. Must be prefixed with `tcp://`, `mqtt://`, `ssl://`, or `mqtts://` to denote the protocol. `mqtts://` and `ssl://` use native certificates if no CA is specified. |
| `qos` | Optional. The quality of service for publishing messages. Defaults to `at_most_once`. Options include `at_most_once`, `at_least_once`, or `exactly_once`. |
| `username` | Optional. Username for the MQTT broker. |
| `password` | Optional. Password for the MQTT broker. |
| `client_prefix` | Optional. Prefix for the MQTT client ID. Defaults to "risingwave". |
| `clean_start` | Optional. Determines if all states from queues are removed when the client disconnects. If true, the broker clears all client states upon disconnect; if false, the broker retains the client state and resumes pending operations upon reconnection. |
| `inflight_messages`| Optional. Maximum number of inflight messages. Defaults to 100. |
| `tls.client_cert` | Optional. Path to the client's certificate file (PEM) or a string with the certificate content. Required for client authentication. Can use `fs://` prefix for file paths. |
| `tls.client_key` | Optional. Path to the client's private key file (PEM) or a string with the private key content. Required for client authentication. Can use `fs://` prefix for file paths. |
| `topic` | Required. The topic name to subscribe or publish to. Can include wildcard topics, e.g., `/topic/#`. |
| `retain` | Optional. Whether the message should be retained by the broker. |
| `r#type` | Required. Type identifier. |

2 changes: 1 addition & 1 deletion docs/guides/sink-to-redis.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ FORMAT data_format ENCODE data_encode [ (

| Parameter Names | Description |
| --------------- | ---------------------------------------------------------------------- |
|redis.url | Required. The address of the Redis database. |
|redis.url | Required. Choose either the Redis cluster address or a non-cluster Redis address. If the address is a cluster address, it should be in the form of a JSON array, like `redis.url= '["redis://redis-server:6379/"]'`. If the address is a non-cluster address, it should be in the form of a string, like `redis.url= 'redis://redis-server:6379/'`.|
|primary_key| Required. The primary keys of the sink. If necessary, use ',' to delimit the primary key columns. |

## FORMAT and ENCODE options
Expand Down
14 changes: 11 additions & 3 deletions docs/rw-integration-summary.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,17 @@ RisingWave can sink data to Kafka. This allows indirectly sinking data to any sy
|Broker or streaming service| Source | Sink |
|---|---|---|
|Apache Kafka | Available. See [Ingest from Kafka](/ingest/ingest-from-kafka.md) for details. | Available. See [Sink to Kafka](/guides/create-sink-kafka.md) for details.| |
| Confluent Cloud | Available. See [Ingest from Confluent Cloud](/guides/confluent-kafka-source.md) for details. | |
| Amazon MSK | Available. See [Ingest from Amazon MSK](/guides/connector-amazon-msk.md) for details. | |
|Redpanda | Available. See [Ingest from Redpanda](/ingest/ingest-from-redpanda.md) for details. |Available. See [Sink to Kafka](/guides/create-sink-kafka.md) for details.|
|Apache Pulsar|Available. See [Ingest from Pulsar](/ingest/ingest-from-pulsar.md) for details. | Available. See [Sink to Pulsar](/guides/sink-to-pulsar.md) for details.|
| Confluent Cloud | Available. See [Ingest from Confluent Cloud](/guides/confluent-kafka-source.md) for details. | |
|DataStax Astra Streaming| Available. See [Ingest data from DataStax Astra Streaming](/guides/connector-astra-streaming.md) for details. | Researching <voteNotify note="astra_streaming_sink" />|
|StreamNative Cloud| Available| Researching <voteNotify note="streamnative_cloud_sink" />|
|EMQX|Available. See [Ingest from MQTT brokers](/ingest/ingest-from-mqtt.md) for details.| In progress <voteNotify note="emqx" /> |
|Google Pub/Sub|Available. See [Ingest from Google Pub/Sub](/ingest/ingest-from-google-pubsub.md) for details.| In progress <voteNotify note="google_pubsub_sink" /> |
|HiveMQ|Available. See [Ingest from MQTT brokers](/ingest/ingest-from-mqtt.md) for details.| In progress <voteNotify note="hivemq" /> |
|Kinesis Data Streams|Available. See [Ingest from Kinesis](/ingest/ingest-from-kinesis.md) for details.|Available. See [Sink to Kinesis](/guides/sink-to-aws-kinesis.md) for details.|
|RabbitMQ|Researching <voteNotify note="rabbitmq_source" />|Researching <voteNotify note="rabbitmq_sink" />|
|Redpanda | Available. See [Ingest from Redpanda](/ingest/ingest-from-redpanda.md) for details. |Available. See [Sink to Kafka](/guides/create-sink-kafka.md) for details.|
|StreamNative Cloud| Available| Researching <voteNotify note="streamnative_cloud_sink" />|
|NATS / NATS JetStream | Available. See [Ingest from NATS JetStream](/ingest/ingest-from-nats.md) for details. | Available. See [Sink to NATS](/guides/sink-to-nats.md) for details.|

## ETL/ELT and data integration
Expand Down Expand Up @@ -72,6 +76,7 @@ RisingWave can sink data to Kafka. This allows indirectly sinking data to any sy
|AWS RDS (Postgres)| Available. See [Ingest from PostgreSQL CDC](/guides/ingest-from-postgres-cdc.md) for details. |Available. See [Sink to PostgreSQL](/guides/sink-to-postgres.md) for details.|
|AWS Aurora (Postgres)| In progress <voteNotify note="aurora_pg_source" />|Researching <voteNotify note="aurora_pg_sink" />|
|Citus Data| Available. See [Ingest from Citus CDC](/guides/ingest-from-citus-cdc.md). | Researching <voteNotify note="citus_sink" />|
|Neon| Available. See [Ingest from Neon CDC](/guides/ingest-from-neon-cdc.md). | Researching <voteNotify note="neon_sink" />|

### MySQL

Expand Down Expand Up @@ -133,5 +138,8 @@ RisingWave can sink data to Kafka. This allows indirectly sinking data to any sy
|System | |Availability |
|---|---|---|
|Alluxio| |Researching <voteNotify note="alluxio" />|
|Datadog| |Researching <voteNotify note="datadog" />|
|Google Cloud Storage (GCS)| | Available as source. For details, see [Ingest from Google Cloud Storage](/ingest/ingest-from-gcs.md).|
|Hasura| |Researching <voteNotify note="hasura" />|
|Liquibase| |Researching <voteNotify note="liquibase" />|
|Supabase| |Available. For details, see [Empower Supabase with stream processing capabilities](/guides/supabase-integration.md).|
25 changes: 2 additions & 23 deletions docs/sql/commands/sql-drop-function.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,35 +13,14 @@ Use the `DROP FUNCTION` command to remove an existing [user-defined function (UD
## Syntax

```sql
DROP FUNCTION function_name [ ( argument_type [, ...] ) ] ;
DROP FUNCTION [ IF EXISTS ] function_name [ ( argument_type [, ...] ) ] ;
```

import rr from '@theme/RailroadDiagram'

export const svg = rr.Diagram(
rr.Sequence(
rr.Terminal('DROP FUNCTION'),
rr.NonTerminal('function_name'),
rr.Optional(
rr.Sequence(
rr.Terminal('('),
rr.OneOrMore(
rr.NonTerminal('argument_type', 'skip'),
','
),
rr.Terminal(')'),
),
),
rr.Terminal(';'),
)
);

<drawer SVG={svg} />

| Parameter or clause | Description |
|-------------------------------|-------------------------------------------------------|
| *function_name* | Name of the UDF you want to drop. |
| ( *argument_type* [ , ... ] ) | Optional: Argument types of the function.<br/>Specify the argument types when the name of the function you want to drop isn't unique within the schema. |
|IF EXISTS| Do not return an error if the specified function does not exist. A notice is issued in this case. |

## Usage

Expand Down
2 changes: 1 addition & 1 deletion docusaurus.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ const config = {
"runllm-server-address": "https://api.runllm.com",
"runllm-assistant-id": "29",
"runllm-position": "TOP_RIGHT",
"runllm-keyboard-shortcut": "Mod+k",
"runllm-keyboard-shortcut": "Mod+l",
"runllm-theme-color": "#005EEC",
"runllm-slack-community-url": "https://risingwave-community.slack.com/join/shared_invite/zt-2abrj3cbo-xnT_xn3_jd9piiM3vNPVdw",
"runllm-name": "RisingWave",
Expand Down
10 changes: 5 additions & 5 deletions sidebarCloud.js
Original file line number Diff line number Diff line change
Expand Up @@ -221,11 +221,11 @@ module.exports = {
id: "organization-sso",
label: "Single Sign-On (SSO)",
},
// {
// type: "doc",
// id: "organization-service-account",
// label: "Service account & API key",
// },
{
type: "doc",
id: "organization-service-account",
label: "Service account & API key",
},
],
},
{
Expand Down
5 changes: 5 additions & 0 deletions sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,11 @@ const sidebars = {
label: "Google BigQuery",
id: "guides/sink-to-bigquery",
},
{
type: "doc",
label: "MQTT",
id: "guides/sink-to-mqtt",
},
{
type: "doc",
label: "MySQL",
Expand Down
25 changes: 2 additions & 23 deletions versioned_docs/version-1.7/sql/commands/sql-drop-function.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,35 +13,14 @@ Use the `DROP FUNCTION` command to remove an existing [user-defined function (UD
## Syntax

```sql
DROP FUNCTION function_name [ ( argument_type [, ...] ) ] ;
DROP FUNCTION [ IF EXISTS ] function_name [ ( argument_type [, ...] ) ] ;
```

import rr from '@theme/RailroadDiagram'

export const svg = rr.Diagram(
rr.Sequence(
rr.Terminal('DROP FUNCTION'),
rr.NonTerminal('function_name'),
rr.Optional(
rr.Sequence(
rr.Terminal('('),
rr.OneOrMore(
rr.NonTerminal('argument_type', 'skip'),
','
),
rr.Terminal(')'),
),
),
rr.Terminal(';'),
)
);

<drawer SVG={svg} />

| Parameter or clause | Description |
|-------------------------------|-------------------------------------------------------|
| *function_name* | Name of the UDF you want to drop. |
| ( *argument_type* [ , ... ] ) | Optional: Argument types of the function.<br/>Specify the argument types when the name of the function you want to drop isn't unique within the schema. |
|IF EXISTS| Do not return an error if the specified function does not exist. A notice is issued in this case. |

## Usage

Expand Down
25 changes: 2 additions & 23 deletions versioned_docs/version-1.8/sql/commands/sql-drop-function.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,35 +13,14 @@ Use the `DROP FUNCTION` command to remove an existing [user-defined function (UD
## Syntax

```sql
DROP FUNCTION function_name [ ( argument_type [, ...] ) ] ;
DROP FUNCTION [ IF EXISTS ] function_name [ ( argument_type [, ...] ) ] ;
```

import rr from '@theme/RailroadDiagram'

export const svg = rr.Diagram(
rr.Sequence(
rr.Terminal('DROP FUNCTION'),
rr.NonTerminal('function_name'),
rr.Optional(
rr.Sequence(
rr.Terminal('('),
rr.OneOrMore(
rr.NonTerminal('argument_type', 'skip'),
','
),
rr.Terminal(')'),
),
),
rr.Terminal(';'),
)
);

<drawer SVG={svg} />

| Parameter or clause | Description |
|-------------------------------|-------------------------------------------------------|
| *function_name* | Name of the UDF you want to drop. |
| ( *argument_type* [ , ... ] ) | Optional: Argument types of the function.<br/>Specify the argument types when the name of the function you want to drop isn't unique within the schema. |
|IF EXISTS| Do not return an error if the specified function does not exist. A notice is issued in this case. |

## Usage

Expand Down

0 comments on commit 6d58915

Please sign in to comment.