Skip to content

feat: Add Pulsar auto cluster failover support#196

Open
freeznet wants to merge 7 commits into
masterfrom
freeznet/pulsar-auto-cluster-failover
Open

feat: Add Pulsar auto cluster failover support#196
freeznet wants to merge 7 commits into
masterfrom
freeznet/pulsar-auto-cluster-failover

Conversation

@freeznet
Copy link
Copy Markdown
Member

@freeznet freeznet commented May 14, 2026

Motivation

Pulsar Java client has supported AutoClusterFailover since Pulsar 2.10, allowing a client to switch from a primary cluster to one or more secondary clusters when the primary service URL becomes unreachable, then switch back after recovery.

The Spark Pulsar connector currently only accepts a single service.url. Users who need cross-cluster high availability must rely on external DNS or load balancers, and cannot configure independent authentication or TLS settings for each secondary Pulsar cluster.

This PR adds first-class AutoClusterFailover support through connector options while keeping existing single-cluster behavior unchanged.

Modifications

  • Added PulsarFailoverConfig to parse and validate pulsar.failover.* options.
    • pulsar.failover.primary.serviceUrl enables failover.
    • pulsar.failover.secondary.<N>.serviceUrl configures secondaries.
    • Secondary indexes must start at 0 and be continuous.
    • Delay options are parsed as positive millisecond durations.
    • Secondary auth and TLS options can be configured independently per secondary cluster.
  • Updated CachedPulsarClient to build clients with serviceUrlProvider(...) when failover is enabled, and to keep using serviceUrl(...) for the existing single-cluster path.
  • Filtered pulsar.failover.* options before passing client configuration into Pulsar loadConf, while keeping those options in the cache key.
  • Updated provider validation so service.url is optional when pulsar.failover.primary.serviceUrl is set.
    • If both are set, they must match.
    • Failover options are parsed during DataFrame option validation for fail-fast behavior.
  • Added unit tests covering failover config parsing, validation, and AutoClusterFailover provider construction.
  • Added an integration test covering streaming read with failover options and omitted service.url.
  • Updated README with the new option schema, examples, per-secondary auth/TLS configuration, and documented limitations.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

  • This change is a trivial rework / code cleanup without any test coverage.

  • This change is already covered by existing tests, such as:

  • This change added tests and can be verified as follows:

    ./mvnw -pl . scalafmt:format -Dscalafmt.skip=false
    ./mvnw -pl . compile
    ./mvnw -pl . scalatest:test -Dsuites=org.apache.spark.sql.pulsar.PulsarFailoverConfigSuite

    The new PulsarFailoverIntegrationSuite was also added. Local execution requires a working Docker/Testcontainers environment; on my machine it could not start because the local Docker client API version is too old:

    client version 1.32 is too old. Minimum supported API version is 1.40
    

Documentation

Check the box below.

Need to update docs?

  • doc-required
  • no-need-doc
  • doc

@freeznet freeznet self-assigned this May 14, 2026
Copilot AI review requested due to automatic review settings May 14, 2026 11:25
@freeznet freeznet requested a review from a team as a code owner May 14, 2026 11:25
@github-actions github-actions Bot added the no-need-doc This pr does not need any document label May 14, 2026
@github-actions github-actions Bot added doc This pr contains a document and removed no-need-doc This pr does not need any document labels May 14, 2026
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds client-side Pulsar AutoClusterFailover support to the Spark Pulsar connector, allowing a streaming/batch reader or sink to omit service.url when pulsar.failover.primary.serviceUrl is configured, and enabling per-secondary auth/TLS configuration.

Changes:

  • Introduces PulsarFailoverConfig to parse/validate pulsar.failover.* options and build an AutoClusterFailover ServiceUrlProvider.
  • Wires failover options into client creation and updates option validation so service.url can be omitted (or must match the failover primary when both are set).
  • Adds unit/integration tests and README documentation for the new failover options.

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
src/main/scala/org/apache/spark/sql/pulsar/PulsarFailoverConfig.scala New parsing/validation + builder for AutoClusterFailover provider.
src/main/scala/org/apache/spark/sql/pulsar/PulsarOptions.scala Adds pulsar.failover.* option constants.
src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala Updates option validation and propagates failover options into client config.
src/main/scala/org/apache/spark/sql/pulsar/CachedPulsarClient.scala Constructs Pulsar clients using serviceUrlProvider when failover is configured.
src/test/scala/org/apache/spark/sql/pulsar/PulsarFailoverConfigSuite.scala Unit tests for failover option parsing/validation and provider building.
src/test/scala/org/apache/spark/sql/pulsar/PulsarFailoverIntegrationSuite.scala Integration test verifying reads work without service.url when failover primary is set.
README.md Documents how to configure AutoClusterFailover via connector options.
Comments suppressed due to low confidence (2)

src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala:580

  • Same as above for producer path: pulsar.failover.* options are merged into clientParams and will be logged in clear text by PulsarConfigUpdater at INFO, potentially exposing secondary auth/tls secrets. Avoid logging these values or add redaction for prefixed auth/password keys.
    val serviceUrl = getServiceUrl(parameters)

    var clientParams = getClientParams(parameters) ++ getFailoverParams(parameters)
    clientParams += (ServiceUrlOptionKey -> serviceUrl)
    val producerParams = getProducerParams(parameters)

src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala:532

  • Same whitespace-normalization issue exists in sink option validation: service.url is compared without trimming to a trimmed primaryServiceUrl, so harmless whitespace differences can trigger a failure. Consider normalizing/trim both values before the equality check.
    val failoverConfig = PulsarFailoverConfig.fromParams(caseInsensitiveParams)
    failoverConfig match {
      case Some(config) =>
        caseInsensitiveParams.get(ServiceUrlOptionKey).foreach { serviceUrl =>
          require(
            serviceUrl == config.primaryServiceUrl,
            s"$ServiceUrlOptionKey must match $PulsarFailoverPrimaryServiceUrlDisplayKey " +
              "when Pulsar failover is enabled")
        }

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala
Comment thread src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala Outdated
Comment thread README.md Outdated
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc This pr contains a document

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants