Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 105 additions & 4 deletions integrations/destinations/redis.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Before sinking data from RisingWave to Redis, please ensure the following:
* The Redis database you want to sink to is accessible from RisingWave.
* Ensure you have an upstream materialized view or source in RisingWave that you can sink data from.

## syntax
## Syntax

```sql
CREATE SINK [ IF NOT EXISTS ] sink_name
Expand All @@ -41,20 +41,32 @@ FORMAT data_format ENCODE data_encode [ (

<Note>

These options should be set in `FORMAT data_format ENCODE data_encode (key = 'value')`, instead of the `WITH` clause
These options should be set in `FORMAT data_format ENCODE data_encode (key = 'value')`, instead of the `WITH` clause.
</Note>

| Field | Notes |
| :------------------ | :-------------------------------------------------- |
| data\_format | Data format. Allowed formats:<ul><li> `PLAIN`: Output data with insert operations.</li><li> `UPSERT`: Output data as a changelog stream. </li></ul> |
| data\_encode | Data encoding. Supported encodings: <ul><li>`JSON`:<ul><li>`date`: number of days since the Common Era (CE).</li></ul><ul><li>`interval`: `P<years>Y<months>M<days>DT<hours>H<minutes>M<seconds>S` format string.</li></ul><ul><li>`time without time zone`: number of milliseconds past the last midnight.</li></ul><ul><li>`timestamp`: number of milliseconds since the Epoch.</li></ul></li><li>`TEMPLATE`: converts data to the string specified by `key_format`/`value_format`.</li></ul> |
| force\_append\_only | If true, forces the sink to be `PLAIN` (also known as append-only), even if it cannot be. |
| key\_format | Required if `data_encode` is `TEMPLATE`. Specify the format for the key as a string. |
| value\_format | Required if `data_encode` is `TEMPLATE`. Specify the format for the value as a string. |
| key\_format | Specify the format for the key as a string. |
| value\_format | Specify the format for the value as a string. |
| key\_encode | Optional. <ul><li>When specified, the key encode can only be `TEXT`, and the primary key should be one and only one of the following types: `varchar`, `bool`, `smallint`, `int`, and `bigint`;</li><li>When absent, both key and value will use the same setting of `ENCODE data_encode ( ... )`.</li></ul> |
| redis_value_type | Optional. Controls how data is written to Redis. For details, see [examples](#example) below.<ul><li>`string` (Default): Key-value storage. `key_format` and `value_format` are required. </li><li>`pubsub`: Publishes messages to Redis Pub/Sub channels. `value_format` is required. </li><li>`geospatial`: Stores data using Redis' geospatial index. `key_format` is required.</li></ul> |
| channel | Optional. Required if `redis_value_type = 'pubsub'` and `channel_column` is not set. The name of the Redis Pub/Sub channel to publish messages to. |
| channel_column | Optional. Required if `redis_value_type = 'pubsub'` and `channel` is not set. The values from this column will be used as the names of the Redis Pub/Sub channels to publish messages to. Must be type of `VARCHAR`. |
| longitude | Optional. Required if `redis_value_type = 'geospatial'`. Contains the longitude value. Must be of type `FLOAT`, `REAL`, or `VARCHAR`. |
| latitude | Optional. Required if `redis_value_type = 'geospatial'`. Contains the latitude value. Must be of type `FLOAT`, `REAL`, or `VARCHAR`. |
| member | Optional. Required when `redis_value_type = 'geospatial'`. Contains the member names for the geospatial set. Must be of type `VARCHAR` and part of the primary key. |

## Example

This section provides examples for sinking data from RisingWave to Redis, covering key-value storage, geospatial data storage, and Pub/Sub messaging.

### Key-value storage

This approach is suitable when you want to use Redis as a fast data store or cache, storing data as key-value pairs. Specify `redis_value_type = 'string'` or omit this parameter, as `string` is the default.

Assume we create a materialized view, `bhv_mv`, from a source.

```sql
Expand Down Expand Up @@ -94,3 +106,92 @@ FROM bhv_mv WITH (
value_format = 'TargetID:{target_id},EventTimestamp{event_timestamp}'
);
```

### Geospatial data storage

This approach allows RisingWave to store geospatial data (longitude, latitude, member) in Redis. Make sure to specify `FORMAT UPSERT ENCODE TEMPLATE`, `redis_value_type = 'geospatial'`, `key_format`, `longitude`, `latitude`, and `member`.

Assume we have a table `t1`:

```sql
CREATE TABLE t1(v1 float, v2 float, v3 varchar, v4 varchar);
```

We can sink geospatial data to Redis:

```sql
CREATE SINK s1
FROM
t1 WITH (
primary_key = 'v3,v4',
connector = 'redis',
redis.url= 'redis://127.0.0.1:6379/',
)FORMAT UPSERT ENCODE TEMPLATE
(redis_value_type ='geospatial',
longitude = 'v1',
latitude = 'v2',
member = 'v3',
key_format = 'abcd3:{v4}'
);
```

Now insert the geospatial data into the table `t1`:

```sql
INSERT INTO t1 VALUES (1.1, 1.1, 'test','test');
```

You can retrieve the geospatial information from Redis:

```
GEOPOS abcd3:test test
------
1) 1) "1.10000044298171997"
2) "1.10000000482478555"
```

For more information, see [Geospatial indexing](https://redis.io/glossary/geospatial-indexing/).

### Pub/Sub messaging

This approach allows RisingWave to publish messages to Redis channels. Make sure to specify `FORMAT UPSERT ENCODE TEMPLATE`, `redis_value_type = 'pubsub'`, and `value_format`. At least one of `channel` or `channel_column` must also be specified.

Assume we have a table `t1`:

```sql
CREATE TABLE t1(v1 float, v2 float, v3 varchar, v4 varchar);
```

We can sink data from `t1` as messages to a predefined Redis Pub/Sub channel:

```sql
CREATE SINK s1
FROM
t1 WITH (
primary_key = 'v3,v4',
connector = 'redis',
redis.url= 'redis://127.0.0.1:6379/',
)FORMAT UPSERT ENCODE TEMPLATE
(redis_value_type ='pubsub',
channel= 'test123',
value_format = 'abcd3:{v4}'
);
```

Alternatively, you can publish to a dynamic channel using `channel_column`:

```sql
CREATE SINK s1
FROM
t1 WITH (
primary_key = 'v3,v4',
connector = 'redis',
redis.url= 'redis://127.0.0.1:6379/',
)FORMAT UPSERT ENCODE TEMPLATE
(redis_value_type ='pubsub',
channel_column = 'v3',
value_format = 'abcd3:{v4}'
);
```


Loading