feat: add Stream Processor resource#1532
Conversation
1681f8c to
9c46a1d
Compare
| @@ -0,0 +1,380 @@ | |||
| // Copyright 2026 MongoDB Inc | |||
There was a problem hiding this comment.
can we rename this file? not sure why this is called "share.go"? I see the main handlers here
There was a problem hiding this comment.
Yes, we've renamed it to a more appropriate name, 'handlers.go'. It was initially named 'share.go' following the pattern of flex-cluster. We assumed it was some sort of legacy name for handlers.
| }, | ||
| "InstanceName": { | ||
| "type": "string", | ||
| "description": "Label that identifies the stream processing workspace. This field is deprecated in favor of WorkspaceName. Exactly one of InstanceName or WorkspaceName must be provided." |
There was a problem hiding this comment.
Can we update the description to callout the deprecation clearly?
Similar to TF messaging & referencing other CFN deprecated attributes:
"**WARNING:** This field is deprecated and will be removed in the next major release. Please use WorkspaceName instead" ...... "
There was a problem hiding this comment.
Actually since this is a new resource, we should just remove "InstanceName" and we can include in Workspace Name description that this is same as 'InstanceName' in other stream resources
cc @marcosuma
There was a problem hiding this comment.
Valid point. We've removed InstanceName entirely since this is a new resource. We've also updated the WorkspaceName description.
| ProcessorName: processorName, | ||
| }).Execute() | ||
| if err != nil { | ||
| if apiResp != nil && apiResp.StatusCode == http.StatusNotFound { |
There was a problem hiding this comment.
common helper methods for such checks were added in a previous PR, can we merge that and then use those here before we merge this?
There was a problem hiding this comment.
Using common helper methods wherever applicable.
| if err != nil { | ||
| _, _ = logger.Warnf("Cleanup delete failed: %v", err) | ||
| } | ||
| return nil |
There was a problem hiding this comment.
this would always return nil even on error
There was a problem hiding this comment.
Good catch. Fixed. The function now returns the error when cleanup fails, matching Terraform's behavior.
…n stream processor
e4bdab8 to
3420198
Compare
oarbusi
left a comment
There was a problem hiding this comment.
LGTM, thanks for addressing the comments
Proposed changes
Added new resource Stream Processor:
Resource Configuration:
The Stream Processor resource enables you to create and manage stream processing pipelines that transform and route data from source connections (Kafka, clusters, sample data) to sink destinations (clusters). Processors apply MongoDB aggregation pipeline stages to streaming data in real-time.
Required Properties:
ProjectId: Atlas project identifier (24-hexadecimal characters)ProcessorName: Human-readable label for the stream processorPipeline: JSON-encoded array of stream aggregation pipeline stagesOptional Properties:
WorkspaceNameorInstanceName: Stream workspace identifier (WorkspaceName is preferred)DesiredState: Target state for the processor (CREATED, STARTED, STOPPED)Options.Dlq: Dead Letter Queue configuration for failed messagesConnectionName: Atlas cluster connection for DLQ storageDb: Database name for DLQ messagesColl: Collection name for DLQ messagesTimeouts.Create: Timeout duration for creation operations (default: 20 minutes)DeleteOnCreateTimeout: Whether to delete the resource on timeout (default: true)Profile: AWS Secrets Manager profile (default: "default")Read-Only Properties:
Id: Unique 24-hexadecimal identifierState: Current processor state from Atlas API (CREATED, STARTED, STOPPED, FAILED)Stats: Processing statistics (available when processor is STARTED)Create-Only Properties:
ProjectId,ProcessorName,WorkspaceName,InstanceName,Profile: Cannot be changed after creationcfn testing:
AWS Stacks:
Example 1 - Stream -> Cluster
Atlas CLI:
Before:
Create:
Update:
delete:
Example 2 - Stream -> Cluster (with DLQ)
Atlas CLI:
Before:
Create:
Update:
delete:
AWS Stacks:
Example 3 - Kafka -> Cluster
Atlas CLI:
Before:
Create:
Update:
delete:
AWS Stacks:
Example 4 - Stream -> Cluster
Atlas CLI:
Before:
Create:
Update:
delete:
Jira ticket: CLOUDP-368428
Please include a summary of the fix/feature/change, including any relevant motivation and context.
Link to any related issue(s):
Type of change:
expected)
Manual QA performed:
Required Checklist:
make fmtand formatted my codeworks in Atlas
Further comments