Skip to content

Commit 8a31311

Browse files
author
sivaram-mongodb
committed
feat: Add StreamProcessor ClooudFormation resource
1 parent 6fd44b2 commit 8a31311

34 files changed

Lines changed: 6045 additions & 6 deletions
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
{
2+
"artifact_type": "RESOURCE",
3+
"typeName": "MongoDB::Atlas::StreamProcessor",
4+
"language": "go",
5+
"runtime": "provided.al2",
6+
"entrypoint": "bootstrap",
7+
"testEntrypoint": "bootstrap",
8+
"settings": {
9+
"version": false,
10+
"subparser_name": null,
11+
"verbose": 0,
12+
"force": false,
13+
"type_name": "MongoDB::Atlas::StreamProcessor",
14+
"artifact_type": "r",
15+
"endpoint_url": null,
16+
"region": null,
17+
"target_schemas": [],
18+
"profile": null,
19+
"import_path": "github.com/mongodb/mongodbatlas-cloudformation-resources/stream-processor",
20+
"protocolVersion": "2.0.0"
21+
},
22+
"canarySettings": {
23+
"contract_test_file_names": [
24+
"inputs_1.json"
25+
]
26+
}
27+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
.PHONY: build debug test clean
2+
tags=logging callback metrics scheduler
3+
cgo=0
4+
goos=linux
5+
goarch=amd64
6+
CFNREP_GIT_SHA?=$(shell git rev-parse HEAD)
7+
ldXflags=-s -w -X github.com/mongodb/mongodbatlas-cloudformation-resources/util.defaultLogLevel=info -X github.com/mongodb/mongodbatlas-cloudformation-resources/version.Version=${CFNREP_GIT_SHA}
8+
ldXflagsD=-X github.com/mongodb/mongodbatlas-cloudformation-resources/util.defaultLogLevel=debug -X github.com/mongodb/mongodbatlas-cloudformation-resources/version.Version=${CFNREP_GIT_SHA}
9+
10+
build:
11+
cfn generate
12+
env GOOS=$(goos) CGO_ENABLED=$(cgo) GOARCH=$(goarch) go build -ldflags="$(ldXflags)" -tags="$(tags)" -o bin/bootstrap cmd/main.go
13+
14+
debug:
15+
cfn generate
16+
env GOOS=$(goos) CGO_ENABLED=$(cgo) GOARCH=$(goarch) go build -ldflags="$(ldXflagsD)" -tags="$(tags)" -o bin/bootstrap cmd/main.go
17+
18+
test:
19+
cfn generate
20+
env GOOS=$(goos) CGO_ENABLED=$(cgo) GOARCH=$(goarch) go build -ldflags="$(ldXflags)" -tags="$(tags)" -o bin/bootstrap cmd/main.go
21+
22+
clean:
23+
rm -rf bin
24+
25+
create-test-resources:
26+
@echo "==> Creating test files and resources for contract testing"
27+
./test/contract-testing/cfn-test-create.sh
28+
29+
delete-test-resources:
30+
@echo "==> Delete test resources used for contract testing"
31+
./test/contract-testing/cfn-test-delete.sh
32+
33+
run-contract-testing:
34+
@echo "==> Run contract testing"
35+
make build
36+
sam local start-lambda &
37+
cfn test --function-name TestEntrypoint --verbose
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
# MongoDB::Atlas::StreamProcessor
2+
3+
## Description
4+
5+
Resource for creating and managing [Stream Processors for an Atlas Stream Instance](https://www.mongodb.com/docs/api/doc/atlas-admin-api-v2/operation/operation-createstreamprocessor).
6+
7+
## Requirements
8+
9+
Set up an AWS profile to securely give CloudFormation access to your Atlas credentials.
10+
For instructions on setting up a profile, [see here](/README.md#mongodb-atlas-api-keys-credential-management).
11+
12+
## Attributes and Parameters
13+
14+
See the [resource docs](docs/README.md). Also refer [AWS security best practices for CloudFormation](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/security-best-practices.html#creds) to manage credentials.
15+
16+
## Cloudformation Examples
17+
18+
See the example [CFN Templates](/examples/atlas-streams/stream-processor/) for example resources:
19+
- [Basic Stream Processor](/examples/atlas-streams/stream-processor/stream-processor.json)
20+
- [Stream Processor with DLQ](/examples/atlas-streams/stream-processor/stream-processor-with-dlq.json)
21+
22+
## Prerequisites
23+
24+
Before creating a stream processor, you must have:
25+
- An existing Atlas Project
26+
- An existing Stream Instance/Workspace (created via `MongoDB::Atlas::StreamInstance` resource)
27+
- At least one Stream Connection configured (created via `MongoDB::Atlas::StreamConnection` resource)
28+
- A source connection (e.g., sample data source, cluster connection, or Kafka connection)
29+
- A sink connection (must be a cluster connection for merge operations)
30+
31+
## Deployment
32+
33+
### Deploy Basic Stream Processor
34+
35+
```bash
36+
aws cloudformation deploy \
37+
--template-file examples/atlas-streams/stream-processor/stream-processor.json \
38+
--stack-name stream-processor-stack \
39+
--parameter-overrides \
40+
ProjectId=<YOUR_PROJECT_ID> \
41+
WorkspaceName=<YOUR_WORKSPACE_NAME> \
42+
ProcessorName=my-processor \
43+
SourceConnectionName=sample_stream_solar \
44+
SinkConnectionName=<YOUR_CLUSTER_CONNECTION_NAME> \
45+
SinkDatabase=test \
46+
SinkCollection=output \
47+
State=CREATED \
48+
--capabilities CAPABILITY_IAM \
49+
--region us-east-1
50+
```
51+
52+
### Deploy Stream Processor with DLQ
53+
54+
```bash
55+
aws cloudformation deploy \
56+
--template-file examples/atlas-streams/stream-processor/stream-processor-with-dlq.json \
57+
--stack-name stream-processor-dlq-stack \
58+
--parameter-overrides \
59+
ProjectId=<YOUR_PROJECT_ID> \
60+
WorkspaceName=<YOUR_WORKSPACE_NAME> \
61+
ProcessorName=my-processor-dlq \
62+
SourceConnectionName=sample_stream_solar \
63+
SinkConnectionName=<YOUR_CLUSTER_CONNECTION_NAME> \
64+
SinkDatabase=test \
65+
SinkCollection=output \
66+
DlqConnectionName=<YOUR_DLQ_CLUSTER_CONNECTION_NAME> \
67+
DlqDatabase=dlq \
68+
DlqCollection=dlq-messages \
69+
State=CREATED \
70+
--capabilities CAPABILITY_IAM \
71+
--region us-east-1
72+
```
73+
74+
## Verification
75+
76+
After deployment, verify the stream processor was created successfully using both Atlas CLI and Atlas UI.
77+
78+
### Atlas CLI Verification
79+
80+
```bash
81+
# List all stream processors for a workspace
82+
atlas streams processors list <WORKSPACE_NAME> --projectId <PROJECT_ID>
83+
84+
# Describe a specific stream processor
85+
atlas streams processors describe <PROCESSOR_NAME> \
86+
--instance <WORKSPACE_NAME> \
87+
--projectId <PROJECT_ID>
88+
```
89+
90+
### Expected CLI Output
91+
92+
The `atlas streams processors describe` command should return:
93+
- `id`: Unique identifier of the processor (matches the `Id` attribute in CloudFormation)
94+
- `name`: Processor name (matches `ProcessorName` parameter)
95+
- `state`: Current state (CREATED, STARTED, STOPPED, or FAILED)
96+
- `pipeline`: Array of pipeline stages matching your Pipeline configuration
97+
- `options`: DLQ configuration if provided (should match your Options.Dlq settings)
98+
- `stats`: Processing statistics (available when processor is STARTED)
99+
100+
### Verify Pipeline Configuration
101+
102+
The pipeline should match your CloudFormation template:
103+
- Source connection name should match `SourceConnectionName` parameter
104+
- Merge target connection should match `SinkConnectionName` parameter
105+
- Database and collection should match `SinkDatabase` and `SinkCollection` parameters
106+
107+
### Verify DLQ Configuration (if applicable)
108+
109+
For processors with DLQ:
110+
- `options.dlq.connectionName` should match `DlqConnectionName` parameter
111+
- `options.dlq.db` should match `DlqDatabase` parameter
112+
- `options.dlq.coll` should match `DlqCollection` parameter
113+
114+
### Atlas UI Verification
115+
116+
1. Navigate to your Atlas project in the [Atlas UI](https://cloud.mongodb.com)
117+
2. Go to **Stream Processing** section
118+
3. Select your stream workspace/instance
119+
4. Verify the processor appears in the **Processors** tab with:
120+
- **Name**: Matches the `ProcessorName` from your CloudFormation template
121+
- **State**: Matches the `State` parameter (CREATED, STARTED, or STOPPED)
122+
- **Pipeline**: Click on the processor to view pipeline stages and verify:
123+
- Source connection matches your `SourceConnectionName` parameter
124+
- Merge target connection matches your `SinkConnectionName` parameter
125+
- Target database and collection match your `SinkDatabase` and `SinkCollection` parameters
126+
5. For processors with DLQ:
127+
- Verify DLQ configuration is displayed in the processor details
128+
- Check that DLQ connection, database, and collection match your parameters
129+
6. If processor is in STARTED state:
130+
- Verify processing statistics are available
131+
- Check that messages are being processed (stats show input/output message counts)
132+
133+
## Notes
134+
135+
- **AWS Only**: This CloudFormation resource is designed for AWS deployments. The provider is effectively AWS.
136+
- **WorkspaceName vs InstanceName**: Use `WorkspaceName` (preferred). `InstanceName` is supported for backward compatibility but is deprecated.
137+
- **State Management**: When creating a processor, specify `State: STARTED` to automatically start processing, or `State: CREATED` to create it in a stopped state.
138+
- **Long-Running Operations**: Creating and starting stream processors can take several minutes. The resource uses callback-based state management to handle these operations asynchronously.
139+
- **Timeout Configuration**: Use `Timeouts.Create` to configure how long to wait for processor creation/startup (default: 20 minutes).

cfn-resources/stream-processor/cmd/main.go

Lines changed: 85 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cfn-resources/stream-processor/cmd/resource/config.go

Lines changed: 19 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)