Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ metadataSyncEventTopic=
# Event topic to sync configuration-metadata between separate pulsar clusters on different cloud platforms.
configurationMetadataSyncEventTopic=

# Exclusions for metadata sync between separate clusters on different cloud platforms
# multiple configurations are separated by comma eg. /admin/clusters/.*,/admin/test/.*
metadataSyncEventExclusions=

# The metadata store URL for the configuration data. If empty, we fall back to use metadataStoreUrl
configurationMetadataStoreUrl=

Expand Down
52 changes: 52 additions & 0 deletions pip/pip-449.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# PIP-449: Improve metadata sync to have exclusions in syncing config across clusters in different environments (on-prem and cloud)

Implementation: https://github.com/apache/pulsar/pull/25035

# Background knowledge

Apache Pulsar is a cloud-native, distributed messaging framework which natively provides geo-replication. Many organizations deploy pulsar instances on-prem and on multiple different cloud providers and at the same time they would like to enable replication between multiple clusters deployed in different cloud providers. Pulsar already provides various proxy options (Pulsar proxy/ enterprise proxy solutions on SNI) to fulfill security requirements when brokers are deployed on different security zones connected with each other.

However, sometimes it's not possible to share metadata-store (global zookeeper) between pulsar clusters deployed on separate cloud provider platforms, and synchronizing configuration metadata (policies) can be a critical path to share tenant/namespace/topic policies between clusters and administrate pulsar policies uniformly across all clusters. Pulsar has metadata sync feature that syncs configurations across clusters in different environments.

The metadata sync feature syncs all the configuration metadata cluster/tenant/namespace/topic policies and has no way to exclude specific config sync. This is needed as a cluster can have different config in on-prem vs cloud and synchronizing such config breaks the geo-replication. This PIP is to enhance metadata sync to exclude cluster config from syncing across clusters in on-prem and cloud.


# Motivation
The pulsar metadata sync feature syncs all the configuration metadata cluster/tenant/namespace/topic policies and has no way to exclude specific config (eg. cluster configuration) from synchronization across clusters. This is crucial for pulsar multi-environment cluster set up where a cluster can have different configuration in on-prem and cloud and synchronizing this configuration can break geo replication across clusters in different environments on-prem and cloud. Also in pulsar set up where there is separate cluster for a high value tenant in on-prem vs a common cluster for all other tenants, this exclusion in metadata sync becomes crucial in synchronizing tenant/namespace/topic policies across onprem and cloud clusters.

![diagram](https://github-production-user-asset-6210df.s3.amazonaws.com/95091480/523150696-462180ad-2dbf-4e80-929f-26134c087691.png?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20251205%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20251205T202822Z&X-Amz-Expires=300&X-Amz-Signature=3b3ec1d227d1ca5f1283a18488aa21d3c227ad782507ba87e6f821a5e8f5d7ff&X-Amz-SignedHeaders=host)
# Goals

## In Scope

Pulsar metadata sync should exclude synchronization of configuration metadata cluster/tenant/namespace/topic policies across clusters based on the exclusion regex pattern set in the broker config. This helps the cluster admin to set up desired cluster configuration to achieve geo replication across multi-environment cluster in on-prem and cloud while still achieving synchronization of tenant/namespace/topic policies across clusters.

## Out of Scope
None as part of the metadata sync feature.

# High Level Design

Configure regex pattern in `metadataSyncEventExclusions` as part of the broker config on cluster to exclude metadata sync events from synchronizing on the cluster. The destination cluster will receive the event from the source cluster on the metadata sync topic but would not update/delete config from the metadata store if the metadata sync event matches the regex pattern configured as exclusion. A destination cluster would receive all the metadata sync events but ignore them if they match the regex pattern configured as exclusion in the broker config. Multiple comma separated regex patterns can be provided and sync event would be excluded if it matches any of the pattern. IF no exclusions are provided, configuration metadata will sync as before without any exclusions.

# Detailed Design

## Design & Implementation Details

- Add a config `metadataSyncEventExclusions` in the broker config to capture exclusion patterns for the cluster.
- Add a method `private boolean isMetadataEventExcluded(MetadataEvent metadataEvent)` to PulsarMetadataEventSynchronizer.java to match the regex pattern. Return true if the pattern matches the path in the MetadataEvent, false otherwise.
- Update the message listener for the consumer on metadata sync topic to acknowledge the event without processing it if the event is excluded.

### Configuration
- Add a broker config `metadataSyncEventExclusions`
```
# Exclusions for metadata sync between separate clusters on different cloud platforms
# multiple configurations are separated by comma eg. /admin/clusters/.*,/admin/test/.*
metadataSyncEventExclusions=
```

# General Notes

# Links
- Mailing List Discussion Thread - https://lists.apache.org/thread/kbkgy0q6fgt2hfyrqync2gvxsordzv0d
- Mailing List Voting Thread -

Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,13 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
)
private String configurationMetadataSyncEventTopic = null;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Metadata events to be excluded from synchronization between Pulsar "
+ "clusters deployed on different cloud platforms."
)
private String metadataSyncEventExclusions = null;

@FieldContext(
dynamic = true,
doc = "Factory class-name to create topic with custom workflow"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@
import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
import com.google.common.annotations.VisibleForTesting;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import java.util.stream.Collectors;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -56,6 +59,7 @@ public class PulsarMetadataEventSynchronizer implements MetadataEventSynchronize
protected volatile PulsarClientImpl client;
protected volatile Producer<MetadataEvent> producer;
protected volatile Consumer<MetadataEvent> consumer;
private final List<Pattern> exclusionsPatterns;
private final CopyOnWriteArrayList<Function<MetadataEvent, CompletableFuture<Void>>>
listeners = new CopyOnWriteArrayList<>();

Expand Down Expand Up @@ -86,6 +90,35 @@ public PulsarMetadataEventSynchronizer(PulsarService pulsar, String topicName) {
if (!StringUtils.isNotBlank(topicName)) {
log.info("Metadata synchronizer is disabled");
}
String exclusions = pulsar.getConfig().getMetadataSyncEventExclusions();
if (!StringUtils.isBlank(exclusions)) {
this.exclusionsPatterns = List.of();
} else {
this.exclusionsPatterns = Arrays.stream(exclusions.split(","))
.map(String::trim)
.filter(s -> !s.isEmpty())
.map(exclusion -> compileExclusionPattern(exclusion))
.filter(pattern -> pattern != null)
.collect(Collectors.toUnmodifiableList());
if (!this.exclusionsPatterns.isEmpty()) {
log.info("Metadata synchronizer configured with exclusions: {}", this.exclusionsPatterns);
}
}
}

/*
* Compiles an exclusion string into a regex pattern
* @param exclusion - The exclusion string to compile
* @return Compiled Pattern object
*/
private Pattern compileExclusionPattern(String exclusion) {
try {
return Pattern.compile(exclusion);
} catch (PatternSyntaxException pse) {
log.warn("Invalid exclusion pattern: '{}', error: {}. Pattern will be ignored",
exclusion, pse.getMessage());
}
return null;
}

public void start() throws PulsarServerException {
Expand Down Expand Up @@ -199,7 +232,11 @@ private void startConsumer() {
if (listeners.size() == 0) {
c.acknowledgeAsync(msg);
return;

}
// Applying exclusion at consumer
if (isMetadataEventExcluded(msg.getValue())) {
c.acknowledgeAsync(msg);
return;
}
if (listeners.size() == 1) {
listeners.get(0).apply(msg.getValue()).thenApply(__ -> c.acknowledgeAsync(msg))
Expand Down Expand Up @@ -323,4 +360,29 @@ private void closeResource(final Supplier<CompletableFuture<Void>> asyncCloseabl
TimeUnit.MILLISECONDS);
});
}

/**
* Checks if a metadata event should be excluded from synchronization based on the configured exclusion patterns.
* Events are excluded if their path matches any of the configured regex patterns.
* Patterns are standard Java regular expressions evaluated against the full event path
*
* @param metadataEvent The metadata event to check
* @return true if the event should be excluded, false otherwise
*/
private boolean isMetadataEventExcluded(MetadataEvent metadataEvent) {
String path = metadataEvent.getPath();
if (path == null || exclusionsPatterns.isEmpty()){
return false;
}
for (Pattern pattern : exclusionsPatterns) {
if (pattern.matcher(path).matches()) {
if (log.isDebugEnabled()) {
log.debug("Excluding metadata event for path: {} (matched pattern: {})",
path, pattern.pattern());
}
return true;
}
}
return false;
}
}
Loading