Skip to content

Conversation

@wzhero1
Copy link

@wzhero1 wzhero1 commented Dec 22, 2025

Purpose

This PR adds support for accessing system fields (specifically sequence_number) in AuditLogTable and BinlogTable.

Currently, downstream consumers often need the sequence_number to ensure strict ordering or to implement exactly-once processing when reading from audit logs or binlogs. However, these system tables previously did not expose this field.

This change implements KeyValueSystemFieldsRecordReader to enable the projection of system fields for KeyValue records, allowing users to query sequence_number directly from audit log and binlog tables.

Tests

  • Added IncrementalReadSystemFieldsTest to verify functionality.
  • Updated AuditLogTableTest and BinlogTableTest to cover sequence_number read cases.

API and Format

  • No change to storage format.
  • No public API change.

Documentation

  • docs/content/concepts/system-tables.md needs to change

@wzhero1 wzhero1 force-pushed the auditlog-support-seqnum branch 9 times, most recently from ed453a8 to eae1a70 Compare December 25, 2025 03:40
@wzhero1 wzhero1 force-pushed the auditlog-support-seqnum branch from eae1a70 to 7ab955a Compare December 29, 2025 02:23
Copy link
Contributor

@yunfengzhou-hub yunfengzhou-hub left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! Left some comments as below.


// Join system fields first, then physical fields
// Natural order: [system fields...] + [physical fields...]
joinedRow.replace(systemFieldsRow, value);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little concerned about the correctness risk due to object reuse. We can see that in this project, JoinedRow#replace is mainly used in two ways

  • new JoinedRow().replace: no reuse for this class.
  • joinedRow.replace soon followed by a serializer or writer, which means there is no more modification to this row after reused.

But here, the joined row will be output by the source to downstream operators, whose operations might modify the row in place and have KeyValueSystemFieldsRecordReader and that operator affects each other.

You can refer to online blogs and stackoverflow questions about the correctness risk of Flink object reuse mechanism, and check whether it is acceptable to enable object reuse here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch! You're right - downstream operator behavior is unpredictable and could mutate the row in-place. I've changed it to new JoinedRow(). The minor overhead of object allocation is acceptable to ensure correctness and avoid potential interference between KeyValueSystemFieldsRecordReader and downstream operators.

@Nullable int[] projection) {
if (systemFieldExtractors.isEmpty()) {
// No system fields, use the default unwrap logic
return KeyValueTableRead.unwrap(reader);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be better to merge this method into KeyValueTableRead#unwrap. As a decorator/wrapper to this method, extending the original method might be more beneficial to extensibility.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I've moved the logic into KeyValueTableRead.java. The method structure is now:

  • unwrap(RecordReader<KeyValue> reader, List<SpecialFieldExtractor> specialFieldExtractors, @Nullable int[] projection) as public method for extensibility
  • unwrap(RecordReader<KeyValue> reader) as private method for internal use

*
* <p>Extracts the sequence number from KeyValue metadata.
*/
SystemFieldExtractor SEQUENCE_NUMBER = kv -> kv.sequenceNumber();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
SystemFieldExtractor SEQUENCE_NUMBER = kv -> kv.sequenceNumber();
SystemFieldExtractor SEQUENCE_NUMBER = KeyValue::sequenceNumber;

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

* future implementation where level information would need to be tracked through the read
* path.
*/
SystemFieldExtractor LEVEL = kv -> null; // TODO: Level information needs to be propagated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this system field cannot be extracted for now, I would prefer to direct throw exceptions in such cases, instead of giving users a false assumption that it could work, but only gets null values.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now throw new UnsupportedOperationException

* scenarios.
*/
SystemFieldExtractor ROW_ID =
kv -> null; // ROW_ID is computed from file metadata, not available in KeyValue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now throw new UnsupportedOperationException

checkAnswer(
sql("SELECT * FROM paimon_incremental_query('`t$audit_log`', 'tag1', 'tag2') ORDER BY id"),
Seq(Row("-D", 999)))
Seq(Row("-D", 100002L, 999)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100002L is the value of id column, rather than sequence number. Not sure whether it is correct here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

999 is the value of id, 100002L is the sequence number

}

/** Creates a table with changelog producer enabled. */
private FileStoreTable createChangelogTable(String tableName) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is only used in one place. So compared with introducing this method, it might be better to preserve the original code structure and commit history.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While it's currently used in one place, I prefer keeping this helper method for single responsibility and better extensibility. It encapsulates the changelog table creation logic cleanly, making future test additions easier without duplicating configuration code.

import java.util.Map;

/**
* A decorator for {@link RecordReader} that injects system fields into the output rows for
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A decorator pattern should have the same wrapped type and output type, but here the wrapped is RecordReader<KeyValue>, and the output is RecordReader<InternalRow>. It might be more proper to call this class a wrapper rather than a decorator.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to wrapped

*/
@Nullable
public static SystemFieldExtractor getExtractor(String fieldName) {
return EXTRACTOR_REGISTRY.get(fieldName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method only checks field names. It might be better to also check field data types.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For special fields (like _ROWKIND, _SEQUENCE_NUMBER), field name matching is sufficient.

* <p><b>Performance:</b> Implementation uses {@link JoinedRow} for zero-copy concatenation of
* system fields and physical fields, then {@link ProjectedRow} for zero-copy field reordering.
*/
public class KeyValueSystemFieldsRecordReader implements RecordReader<InternalRow> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the existing SpecialFields class, Might be better to name them as KeyValueSpecialFieldsRecordReader and SpecialFieldExtractor.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good suggestion. i have changed all terms: system -> special

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants