feat: Add Pulsar auto cluster failover support#196
Open
freeznet wants to merge 7 commits into
Open
Conversation
There was a problem hiding this comment.
Pull request overview
This PR adds client-side Pulsar AutoClusterFailover support to the Spark Pulsar connector, allowing a streaming/batch reader or sink to omit service.url when pulsar.failover.primary.serviceUrl is configured, and enabling per-secondary auth/TLS configuration.
Changes:
- Introduces
PulsarFailoverConfigto parse/validatepulsar.failover.*options and build anAutoClusterFailoverServiceUrlProvider. - Wires failover options into client creation and updates option validation so
service.urlcan be omitted (or must match the failover primary when both are set). - Adds unit/integration tests and README documentation for the new failover options.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| src/main/scala/org/apache/spark/sql/pulsar/PulsarFailoverConfig.scala | New parsing/validation + builder for AutoClusterFailover provider. |
| src/main/scala/org/apache/spark/sql/pulsar/PulsarOptions.scala | Adds pulsar.failover.* option constants. |
| src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala | Updates option validation and propagates failover options into client config. |
| src/main/scala/org/apache/spark/sql/pulsar/CachedPulsarClient.scala | Constructs Pulsar clients using serviceUrlProvider when failover is configured. |
| src/test/scala/org/apache/spark/sql/pulsar/PulsarFailoverConfigSuite.scala | Unit tests for failover option parsing/validation and provider building. |
| src/test/scala/org/apache/spark/sql/pulsar/PulsarFailoverIntegrationSuite.scala | Integration test verifying reads work without service.url when failover primary is set. |
| README.md | Documents how to configure AutoClusterFailover via connector options. |
Comments suppressed due to low confidence (2)
src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala:580
- Same as above for producer path: pulsar.failover.* options are merged into clientParams and will be logged in clear text by PulsarConfigUpdater at INFO, potentially exposing secondary auth/tls secrets. Avoid logging these values or add redaction for prefixed auth/password keys.
val serviceUrl = getServiceUrl(parameters)
var clientParams = getClientParams(parameters) ++ getFailoverParams(parameters)
clientParams += (ServiceUrlOptionKey -> serviceUrl)
val producerParams = getProducerParams(parameters)
src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala:532
- Same whitespace-normalization issue exists in sink option validation: service.url is compared without trimming to a trimmed primaryServiceUrl, so harmless whitespace differences can trigger a failure. Consider normalizing/trim both values before the equality check.
val failoverConfig = PulsarFailoverConfig.fromParams(caseInsensitiveParams)
failoverConfig match {
case Some(config) =>
caseInsensitiveParams.get(ServiceUrlOptionKey).foreach { serviceUrl =>
require(
serviceUrl == config.primaryServiceUrl,
s"$ServiceUrlOptionKey must match $PulsarFailoverPrimaryServiceUrlDisplayKey " +
"when Pulsar failover is enabled")
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Motivation
Pulsar Java client has supported
AutoClusterFailoversince Pulsar 2.10, allowing a client to switch from a primary cluster to one or more secondary clusters when the primary service URL becomes unreachable, then switch back after recovery.The Spark Pulsar connector currently only accepts a single
service.url. Users who need cross-cluster high availability must rely on external DNS or load balancers, and cannot configure independent authentication or TLS settings for each secondary Pulsar cluster.This PR adds first-class
AutoClusterFailoversupport through connector options while keeping existing single-cluster behavior unchanged.Modifications
PulsarFailoverConfigto parse and validatepulsar.failover.*options.pulsar.failover.primary.serviceUrlenables failover.pulsar.failover.secondary.<N>.serviceUrlconfigures secondaries.0and be continuous.CachedPulsarClientto build clients withserviceUrlProvider(...)when failover is enabled, and to keep usingserviceUrl(...)for the existing single-cluster path.pulsar.failover.*options before passing client configuration into PulsarloadConf, while keeping those options in the cache key.service.urlis optional whenpulsar.failover.primary.serviceUrlis set.AutoClusterFailoverprovider construction.service.url.Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
This change is already covered by existing tests, such as:
This change added tests and can be verified as follows:
The new
PulsarFailoverIntegrationSuitewas also added. Local execution requires a working Docker/Testcontainers environment; on my machine it could not start because the local Docker client API version is too old:Documentation
Check the box below.
Need to update docs?
doc-requiredno-need-docdoc