Skip to content

Commit 164b3b5

Browse files
sivaram-mongodbsivaram-mongodbParthasarathyVoutcomes-winter-rakhulsprakash
authored
feat: add Stream Processor resource (#1532)
Co-authored-by: sivaram-mongodb <sivaram@mongodb.com> Co-authored-by: ParthasarathyV <parthasarathy.varadhan@mongodb.com> Co-authored-by: Rakhul S Prakash <rakhul.s.prakash@peerislands.io> Co-authored-by: ParthasarathyV <114770988+ParthasarathyV@users.noreply.github.com>
1 parent 25ac09f commit 164b3b5

42 files changed

Lines changed: 3630 additions & 1 deletion

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/contract-testing.yaml

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ jobs:
3030
search-deployment: ${{ steps.filter.outputs.search-deployment }}
3131
stream-connection: ${{ steps.filter.outputs.stream-connection }}
3232
stream-instance: ${{ steps.filter.outputs.stream-instance }}
33+
stream-processor: ${{ steps.filter.outputs.stream-processor }}
3334
stream-workspace: ${{ steps.filter.outputs.stream-workspace }}
3435
steps:
3536
- uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8
@@ -76,6 +77,8 @@ jobs:
7677
- 'cfn-resources/stream-connection/**'
7778
stream-instance:
7879
- 'cfn-resources/stream-instance/**'
80+
stream-processor:
81+
- 'cfn-resources/stream-processor/**'
7982
stream-workspace:
8083
- 'cfn-resources/stream-workspace/**'
8184
access-list-api-key:
@@ -855,6 +858,46 @@ jobs:
855858
pushd cfn-resources/stream-instance
856859
make create-test-resources
857860
cat inputs/inputs_1_create.json
861+
make run-contract-testing
862+
make delete-test-resources
863+
stream-processor:
864+
needs: change-detection
865+
if: ${{ needs.change-detection.outputs.stream-processor == 'true' }}
866+
runs-on: ubuntu-latest
867+
steps:
868+
- uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8
869+
- uses: actions/setup-go@7a3fe6cf4cb3a834922a1244abfce67bcef6a0c5
870+
with:
871+
go-version-file: 'cfn-resources/go.mod'
872+
- name: setup Atlas CLI
873+
uses: mongodb/atlas-github-action@e3c9e0204659bafbb3b65e1eb1ee745cca0e9f3b
874+
- uses: aws-actions/setup-sam@c2a20b1822cc4a6bc594ff7f1dbb658758e383c3
875+
with:
876+
use-installer: true
877+
- uses: aws-actions/configure-aws-credentials@61815dcd50bd041e203e49132bacad1fd04d2708
878+
with:
879+
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID_TEST_ENV }}
880+
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY_TEST_ENV }}
881+
aws-region: eu-west-1
882+
- uses: actions/setup-python@83679a892e2d95755f2dac6acb0bfd1e9ac5d548
883+
with:
884+
python-version: '3.9'
885+
cache: 'pip' # caching pip dependencies
886+
- run: pip install cloudformation-cli cloudformation-cli-go-plugin
887+
- name: Run the Contract test
888+
shell: bash
889+
env:
890+
MONGODB_ATLAS_PUBLIC_API_KEY: ${{ secrets.CLOUD_DEV_PUBLIC_KEY }}
891+
MONGODB_ATLAS_PRIVATE_API_KEY: ${{ secrets.CLOUD_DEV_PRIVATE_KEY }}
892+
MONGODB_ATLAS_ORG_ID: ${{ secrets.CLOUD_DEV_ORG_ID }}
893+
MONGODB_ATLAS_OPS_MANAGER_URL: ${{ vars.MONGODB_ATLAS_BASE_URL }}
894+
MONGODB_ATLAS_PROFILE: cfn-cloud-dev-github-action
895+
run: |
896+
cd cfn-resources/stream-processor
897+
make create-test-resources
898+
899+
cat inputs/*
900+
858901
make run-contract-testing
859902
make delete-test-resources
860903
stream-workspace:
@@ -897,4 +940,4 @@ jobs:
897940
cat inputs/inputs_1_update.json
898941
899942
make run-contract-testing
900-
make delete-test-resources
943+
make delete-test-resources
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
{
2+
"artifact_type": "RESOURCE",
3+
"typeName": "MongoDB::Atlas::StreamProcessor",
4+
"language": "go",
5+
"runtime": "provided.al2",
6+
"entrypoint": "bootstrap",
7+
"testEntrypoint": "bootstrap",
8+
"settings": {
9+
"version": false,
10+
"subparser_name": null,
11+
"verbose": 0,
12+
"force": false,
13+
"type_name": "MongoDB::Atlas::StreamProcessor",
14+
"artifact_type": "r",
15+
"endpoint_url": null,
16+
"region": null,
17+
"target_schemas": [],
18+
"profile": null,
19+
"import_path": "github.com/mongodb/mongodbatlas-cloudformation-resources/stream-processor",
20+
"protocolVersion": "2.0.0"
21+
},
22+
"canarySettings": {
23+
"contract_test_file_names": [
24+
"inputs_1.json"
25+
]
26+
}
27+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
.PHONY: build debug test clean
2+
tags=logging callback metrics scheduler
3+
cgo=0
4+
goos=linux
5+
goarch=amd64
6+
CFNREP_GIT_SHA?=$(shell git rev-parse HEAD)
7+
ldXflags=-s -w -X github.com/mongodb/mongodbatlas-cloudformation-resources/util.defaultLogLevel=info -X github.com/mongodb/mongodbatlas-cloudformation-resources/version.Version=${CFNREP_GIT_SHA}
8+
ldXflagsD=-X github.com/mongodb/mongodbatlas-cloudformation-resources/util.defaultLogLevel=debug -X github.com/mongodb/mongodbatlas-cloudformation-resources/version.Version=${CFNREP_GIT_SHA}
9+
10+
build:
11+
cfn generate
12+
env GOOS=$(goos) CGO_ENABLED=$(cgo) GOARCH=$(goarch) go build -ldflags="$(ldXflags)" -tags="$(tags)" -o bin/bootstrap cmd/main.go
13+
14+
debug:
15+
cfn generate
16+
env GOOS=$(goos) CGO_ENABLED=$(cgo) GOARCH=$(goarch) go build -ldflags="$(ldXflagsD)" -tags="$(tags)" -o bin/debug cmd/main.go
17+
18+
test:
19+
cfn generate
20+
env GOOS=$(goos) CGO_ENABLED=$(cgo) GOARCH=$(goarch) go build -ldflags="$(ldXflags)" -tags="$(tags)" -o bin/bootstrap cmd/main.go
21+
22+
clean:
23+
rm -rf bin
24+
25+
submit: clean build
26+
@echo "==> Submitting to private registry for testing"
27+
cfn submit --set-default --region us-east-1
28+
29+
create-test-resources:
30+
@echo "==> Creating test files and resources for contract testing"
31+
./test/contract-testing/cfn-test-create.sh
32+
33+
delete-test-resources:
34+
@echo "==> Delete test resources used for contract testing"
35+
./test/contract-testing/cfn-test-delete.sh
36+
37+
run-contract-testing:
38+
@echo "==> Run contract testing"
39+
make build
40+
sam local start-lambda &
41+
cfn test --function-name TestEntrypoint --verbose
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
# MongoDB::Atlas::StreamProcessor
2+
3+
## Description
4+
5+
Resource for creating and managing [Stream Processors for an Atlas Stream Instance](https://www.mongodb.com/docs/api/doc/atlas-admin-api-v2/operation/operation-createstreamprocessor).
6+
7+
## Requirements
8+
9+
Set up an AWS profile to securely give CloudFormation access to your Atlas credentials.
10+
For instructions on setting up a profile, [see here](/README.md#mongodb-atlas-api-keys-credential-management).
11+
12+
## Attributes and Parameters
13+
14+
See the [resource docs](docs/README.md). Also refer [AWS security best practices for CloudFormation](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/security-best-practices.html#creds) to manage credentials.
15+
16+
## CloudFormation Examples
17+
18+
See the example [CFN Templates](/examples/atlas-streams/stream-processor/) for example resources:
19+
20+
- [Basic Stream Processor](/examples/atlas-streams/stream-processor/stream-processor.json)
21+
- [Stream Processor with DLQ](/examples/atlas-streams/stream-processor/stream-processor-with-dlq.json)
22+
23+
## Prerequisites
24+
25+
Before creating a stream processor, you must have:
26+
27+
- An existing Atlas Project
28+
- An existing Stream Instance/Workspace (created via `MongoDB::Atlas::StreamInstance` resource)
29+
- At least one Stream Connection configured (created via `MongoDB::Atlas::StreamConnection` resource)
30+
- A source connection (e.g., sample data source, cluster connection, or Kafka connection)
31+
- A sink connection (must be a cluster connection for merge operations)
32+
33+
## Deployment
34+
35+
### Deploy Basic Stream Processor
36+
37+
```bash
38+
aws cloudformation deploy \
39+
--template-file examples/atlas-streams/stream-processor/stream-processor.json \
40+
--stack-name stream-processor-stack \
41+
--parameter-overrides \
42+
ProjectId=<YOUR_PROJECT_ID> \
43+
WorkspaceName=<YOUR_WORKSPACE_NAME> \
44+
ProcessorName=my-processor \
45+
SourceConnectionName=sample_stream_solar \
46+
SinkConnectionName=<YOUR_CLUSTER_CONNECTION_NAME> \
47+
SinkDatabase=test \
48+
SinkCollection=output \
49+
State=CREATED \
50+
--capabilities CAPABILITY_IAM \
51+
--region us-east-1
52+
```
53+
54+
### Deploy Stream Processor with DLQ
55+
56+
```bash
57+
aws cloudformation deploy \
58+
--template-file examples/atlas-streams/stream-processor/stream-processor-with-dlq.json \
59+
--stack-name stream-processor-dlq-stack \
60+
--parameter-overrides \
61+
ProjectId=<YOUR_PROJECT_ID> \
62+
WorkspaceName=<YOUR_WORKSPACE_NAME> \
63+
ProcessorName=my-processor-dlq \
64+
SourceConnectionName=sample_stream_solar \
65+
SinkConnectionName=<YOUR_CLUSTER_CONNECTION_NAME> \
66+
SinkDatabase=test \
67+
SinkCollection=output \
68+
DlqConnectionName=<YOUR_DLQ_CLUSTER_CONNECTION_NAME> \
69+
DlqDatabase=dlq \
70+
DlqCollection=dlq-messages \
71+
State=CREATED \
72+
--capabilities CAPABILITY_IAM \
73+
--region us-east-1
74+
```
75+
76+
## Verification
77+
78+
After deployment, verify the stream processor was created successfully using both Atlas CLI and Atlas UI.
79+
80+
### Atlas CLI Verification
81+
82+
```bash
83+
# List all stream processors for a workspace
84+
atlas streams processors list <WORKSPACE_NAME> --projectId <PROJECT_ID>
85+
86+
# Describe a specific stream processor
87+
atlas streams processors describe <PROCESSOR_NAME> \
88+
--instance <WORKSPACE_NAME> \
89+
--projectId <PROJECT_ID>
90+
```
91+
92+
### Expected CLI Output
93+
94+
The `atlas streams processors describe` command should return:
95+
96+
- `id`: Unique identifier of the processor (matches the `Id` attribute in CloudFormation)
97+
- `name`: Processor name (matches `ProcessorName` parameter)
98+
- `state`: Current state (CREATED, STARTED, STOPPED, or FAILED)
99+
- `pipeline`: Array of pipeline stages matching your Pipeline configuration
100+
- `options`: DLQ configuration if provided (should match your Options.Dlq settings)
101+
- `stats`: Processing statistics (available when processor is STARTED)
102+
103+
### Verify Pipeline Configuration
104+
105+
The pipeline should match your CloudFormation template:
106+
107+
- Source connection name should match `SourceConnectionName` parameter
108+
- Merge target connection should match `SinkConnectionName` parameter
109+
- Database and collection should match `SinkDatabase` and `SinkCollection` parameters
110+
111+
### Verify DLQ Configuration (if applicable)
112+
113+
For processors with DLQ:
114+
115+
- `options.dlq.connectionName` should match `DlqConnectionName` parameter
116+
- `options.dlq.db` should match `DlqDatabase` parameter
117+
- `options.dlq.coll` should match `DlqCollection` parameter
118+
119+
### Atlas UI Verification
120+
121+
1. Navigate to your Atlas project in the [Atlas UI](https://cloud.mongodb.com)
122+
2. Go to **Stream Processing** section
123+
3. Select your stream workspace/instance
124+
4. Verify the processor appears in the **Processors** tab with:
125+
- **Name**: Matches the `ProcessorName` from your CloudFormation template
126+
- **State**: Matches the `State` parameter (CREATED, STARTED, or STOPPED)
127+
- **Pipeline**: Click on the processor to view pipeline stages and verify:
128+
- Source connection matches your `SourceConnectionName` parameter
129+
- Merge target connection matches your `SinkConnectionName` parameter
130+
- Target database and collection match your `SinkDatabase` and `SinkCollection` parameters
131+
5. For processors with DLQ:
132+
- Verify DLQ configuration is displayed in the processor details
133+
- Check that DLQ connection, database, and collection match your parameters
134+
6. If processor is in STARTED state:
135+
- Verify processing statistics are available
136+
- Check that messages are being processed (stats show input/output message counts)
137+
138+
## Notes
139+
140+
- **AWS Only**: This CloudFormation resource is designed for AWS deployments. The provider is effectively AWS.
141+
- **WorkspaceName**: This field is the same as 'InstanceName' used in other stream resources.
142+
- **State Management**: When creating a processor, specify `State: STARTED` to automatically start processing, or `State: CREATED` to create it in a stopped state.
143+
- **Long-Running Operations**: Creating and starting stream processors can take several minutes. The resource uses callback-based state management to handle these operations asynchronously.
144+
- **Timeout Configuration**: Use `Timeouts.Create` to configure how long to wait for processor creation/startup (default: 20 minutes).

cfn-resources/stream-processor/cmd/main.go

Lines changed: 85 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)