Skip to content

feat: add Stream Processor resource#1532

Merged
ParthasarathyV merged 11 commits intomasterfrom
CLOUDP-368428-Stream-Processor
Jan 21, 2026
Merged

feat: add Stream Processor resource#1532
ParthasarathyV merged 11 commits intomasterfrom
CLOUDP-368428-Stream-Processor

Conversation

@sivaram-mongodb
Copy link
Copy Markdown
Contributor

@sivaram-mongodb sivaram-mongodb commented Jan 12, 2026

Proposed changes

Added new resource Stream Processor:

  • Manages stream processing pipelines within Atlas Stream Processing workspaces
  • Supports configurable stream aggregation pipelines with JSON-encoded stages
  • Provides state management with automatic transitions (CREATED, STARTED, STOPPED)
  • Includes Dead Letter Queue (DLQ) configuration for error handling
  • Implements callback-based asynchronous operations for long-running processor creation
  • Features configurable timeouts with optional cleanup on timeout
  • Supports backward compatibility with both WorkspaceName and InstanceName fields
  • Enables real-time data transformation and routing from sources to sinks

Resource Configuration:

The Stream Processor resource enables you to create and manage stream processing pipelines that transform and route data from source connections (Kafka, clusters, sample data) to sink destinations (clusters). Processors apply MongoDB aggregation pipeline stages to streaming data in real-time.

Required Properties:

  • ProjectId: Atlas project identifier (24-hexadecimal characters)
  • ProcessorName: Human-readable label for the stream processor
  • Pipeline: JSON-encoded array of stream aggregation pipeline stages

Optional Properties:

  • WorkspaceName or InstanceName: Stream workspace identifier (WorkspaceName is preferred)
  • DesiredState: Target state for the processor (CREATED, STARTED, STOPPED)
  • Options.Dlq: Dead Letter Queue configuration for failed messages
    • ConnectionName: Atlas cluster connection for DLQ storage
    • Db: Database name for DLQ messages
    • Coll: Collection name for DLQ messages
  • Timeouts.Create: Timeout duration for creation operations (default: 20 minutes)
  • DeleteOnCreateTimeout: Whether to delete the resource on timeout (default: true)
  • Profile: AWS Secrets Manager profile (default: "default")

Read-Only Properties:

  • Id: Unique 24-hexadecimal identifier
  • State: Current processor state from Atlas API (CREATED, STARTED, STOPPED, FAILED)
  • Stats: Processing statistics (available when processor is STARTED)

Create-Only Properties:

  • ProjectId, ProcessorName, WorkspaceName, InstanceName, Profile: Cannot be changed after creation

cfn testing:

image

AWS Stacks:

Example 1 - Stream -> Cluster

image

Atlas CLI:

Before:

image

Create:

image

Update:

image

delete:

image

Example 2 - Stream -> Cluster (with DLQ)

image

Atlas CLI:

Before:

image

Create:

image

Update:

image

delete:

image

AWS Stacks:

Example 3 - Kafka -> Cluster

image

Atlas CLI:

Before:

image

Create:

image

Update:

image

delete:

image

AWS Stacks:

Example 4 - Stream -> Cluster

image

Atlas CLI:

Before:

image

Create:

image

Update:

image

delete:

image

Jira ticket: CLOUDP-368428

Please include a summary of the fix/feature/change, including any relevant motivation and context.

Link to any related issue(s):

Type of change:

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as
    expected)
  • This change requires a documentation update
  • If changes include removal or addition of 3rd party GitHub actions, I updated our internal document. Reach out to the APIx Integration slack channel to get access to the internal document.

Manual QA performed:

  • cfn invoke for each of CRUDL/cfn test
  • Updated resource in example
  • Published to AWS private registry
  • Used the template in example to create and update a stack in AWS
  • Deleted stack to ensure resources are deleted
  • Created multiple resources in same stack
  • Validated in Atlas UI
  • Included screenshots

Required Checklist:

  • I have signed the MongoDB CLA
  • I have added tests that prove my fix is effective or that my feature works
  • I have checked that this change does not generate any credentials and that they are NOT accidentally logged anywhere.
  • I have added any necessary documentation (if appropriate)
  • I have run make fmt and formatted my code
  • For CFN Resources: I have released by changes in the private registry and proved by change
    works in Atlas

Further comments

@sivaram-mongodb sivaram-mongodb force-pushed the CLOUDP-368428-Stream-Processor branch from 1681f8c to 9c46a1d Compare January 13, 2026 11:48
@ParthasarathyV ParthasarathyV marked this pull request as ready for review January 15, 2026 20:38
@ParthasarathyV ParthasarathyV requested a review from a team as a code owner January 15, 2026 20:38
@@ -0,0 +1,380 @@
// Copyright 2026 MongoDB Inc
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we rename this file? not sure why this is called "share.go"? I see the main handlers here

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we've renamed it to a more appropriate name, 'handlers.go'. It was initially named 'share.go' following the pattern of flex-cluster. We assumed it was some sort of legacy name for handlers.

},
"InstanceName": {
"type": "string",
"description": "Label that identifies the stream processing workspace. This field is deprecated in favor of WorkspaceName. Exactly one of InstanceName or WorkspaceName must be provided."
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we update the description to callout the deprecation clearly?
Similar to TF messaging & referencing other CFN deprecated attributes:

"**WARNING:** This field is deprecated and will be removed in the next major release. Please use WorkspaceName instead" ...... "

Copy link
Copy Markdown
Collaborator

@maastha maastha Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually since this is a new resource, we should just remove "InstanceName" and we can include in Workspace Name description that this is same as 'InstanceName' in other stream resources
cc @marcosuma

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Valid point. We've removed InstanceName entirely since this is a new resource. We've also updated the WorkspaceName description.

ProcessorName: processorName,
}).Execute()
if err != nil {
if apiResp != nil && apiResp.StatusCode == http.StatusNotFound {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

common helper methods for such checks were added in a previous PR, can we merge that and then use those here before we merge this?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using common helper methods wherever applicable.

if err != nil {
_, _ = logger.Warnf("Cleanup delete failed: %v", err)
}
return nil
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this would always return nil even on error

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Fixed. The function now returns the error when cleanup fails, matching Terraform's behavior.

@rakhul-mongo rakhul-mongo force-pushed the CLOUDP-368428-Stream-Processor branch from e4bdab8 to 3420198 Compare January 20, 2026 04:20
Comment thread .github/workflows/contract-testing.yaml Outdated
Comment thread cfn-resources/stream-processor/Makefile Outdated
Comment thread cfn-resources/stream-processor/cmd/resource/callbacks.go Outdated
Comment thread cfn-resources/stream-processor/cmd/resource/callbacks.go
Comment thread cfn-resources/stream-processor/cmd/resource/callbacks.go
Comment thread cfn-resources/stream-processor/cmd/resource/handlers.go
Comment thread cfn-resources/stream-processor/cmd/resource/helpers.go
Copy link
Copy Markdown
Collaborator

@oarbusi oarbusi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for addressing the comments

@ParthasarathyV ParthasarathyV added this pull request to the merge queue Jan 21, 2026
Merged via the queue into master with commit 164b3b5 Jan 21, 2026
41 checks passed
@ParthasarathyV ParthasarathyV deleted the CLOUDP-368428-Stream-Processor branch January 21, 2026 13:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants