Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions website/docs/table-design/table-types/log-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,5 +114,35 @@ In the above example, we set the compression codec to `LZ4_FRAME` and the compre
2. The valid range of `table.log.arrow.compression.zstd.level` is 1 to 22.
:::

## Change Data Feed

Fluss captures row-level inserts on Log Tables, making this **change data** available for downstream consumption via the [`$changelog`](/table-design/virtual-tables.md#changelog-table) virtual table.

### Example

```sql title="Flink SQL"
CREATE TABLE log_table (
order_id BIGINT,
item_id BIGINT,
amount INT,
address STRING
);

INSERT INTO log_table VALUES (1, 100, 50, 'Beijing'), (2, 200, 75, 'Shanghai');

SELECT * FROM log_table$changelog;
```

```
+--------------+-------------+---------------------+----------+---------+--------+----------+
| _change_type | _log_offset | _commit_timestamp | order_id | item_id | amount | address |
+--------------+-------------+---------------------+----------+---------+--------+----------+
| insert | 0 | 2024-01-15 10:30:00 | 1 | 100 | 50 | Beijing |
| insert | 1 | 2024-01-15 10:30:00 | 2 | 200 | 75 | Shanghai |
+--------------+-------------+---------------------+----------+---------+--------+----------+
```

For detailed information, see the [Virtual Tables](/table-design/virtual-tables.md) documentation.

## Log Tiering
Log Table supports tiering data to different storage tiers. See more details about [Remote Log](maintenance/tiered-storage/remote-storage.md).
73 changes: 57 additions & 16 deletions website/docs/table-design/table-types/pk-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,27 @@ The following merge engines are supported:
4. [Aggregation Merge Engine](/table-design/merge-engines/aggregation.md)


## Changelog Generation
## Change Data Feed

Fluss will capture the changes when inserting, updating, deleting records on the Primary Key Table, which is known as
the changelog. Downstream consumers can directly consume the changelog to obtain the changes in the table. For example,
consider the following primary key table in Fluss:
Fluss automatically captures row-level changes (inserts, updates, and deletes) on Primary Key Tables, making this **change data** available for downstream consumption. This capability, commonly known as **Change Data Feed**, enables powerful use cases such as:

- **Real-time data synchronization** to downstream systems
- **Incremental ETL pipelines** that process only changed data
- **Audit logging** and compliance tracking
- **Event-driven architectures** with change notifications

### Accessing Change Data via Virtual Tables

Fluss provides two virtual tables to consume change data in different formats:

| Virtual Table | Description |
|---------------|-------------|
| [`$changelog`](/table-design/virtual-tables.md#changelog-table) | Raw changelog stream with `insert`, `update_before`, `update_after`, and `delete` change types |
| [`$binlog`](/table-design/virtual-tables.md#binlog-table) | Binlog format with `before` and `after` row images in a single record |

### Example: Observing Change Data

Consider the following primary key table in Fluss:

```sql title="Flink SQL"
CREATE TABLE T
Expand All @@ -118,26 +134,51 @@ INSERT INTO T (k, v1, v2) VALUES (1, 4.0, 'banana');

-- delete the record with primary key k=1
DELETE FROM T WHERE k = 1;
```

#### Using `$changelog`

Query the changelog virtual table to see all row-level changes with metadata:

-- set to streaming mode to observe the changelogs
```sql title="Flink SQL"
SET execution.runtime-mode = streaming;
SELECT * FROM T;
SELECT * FROM T$changelog;
```

Generate the following output in the Flink SQL CLI:

```
+------+------+------+--------+
| op | k | v1 | v2 |
| ---- | ---- | ---- | ------ |
| +I | 1 | 2.0 | apple |
| -U | 1 | 2.0 | apple |
| +U | 1 | 4.0 | banana |
| -D | 1 | 4.0 | banana |
+------+------+------+--------+
+---------------+-------------+---------------------+---+-----+--------+
| _change_type | _log_offset | _commit_timestamp | k | v1 | v2 |
+---------------+-------------+---------------------+---+-----+--------+
| insert | 0 | 2024-01-15 10:30:00 | 1 | 2.0 | apple |
| update_before | 1 | 2024-01-15 10:35:00 | 1 | 2.0 | apple |
| update_after | 2 | 2024-01-15 10:35:00 | 1 | 4.0 | banana |
| delete | 3 | 2024-01-15 10:40:00 | 1 | 4.0 | banana |
+---------------+-------------+---------------------+---+-----+--------+
4 rows in set
```

#### Using `$binlog`

Query the binlog virtual table to see changes with before and after row images:

```sql title="Flink SQL"
SET execution.runtime-mode = streaming;
SELECT * FROM T$binlog;
```

```
+--------------+-------------+---------------------+---------------------+---------------------+
| _change_type | _log_offset | _commit_timestamp | before | after |
+--------------+-------------+---------------------+---------------------+---------------------+
| insert | 0 | 2024-01-15 10:30:00 | NULL | (1, 2.0, apple) |
| update | 2 | 2024-01-15 10:35:00 | (1, 2.0, apple) | (1, 4.0, banana) |
| delete | 3 | 2024-01-15 10:40:00 | (1, 4.0, banana) | NULL |
+--------------+-------------+---------------------+---------------------+---------------------+
3 rows in set
```

For detailed schema information, change types, and startup mode options, see the [Virtual Tables](/table-design/virtual-tables.md) documentation.

## Auto-Increment Column

In Fluss, the auto increment column is a feature that automatically generates a unique numeric value, commonly used to create unique identifiers for each row of data.
Expand Down