diff --git a/.github/workflows/contract-testing.yaml b/.github/workflows/contract-testing.yaml index 4820758d1..81eedd449 100644 --- a/.github/workflows/contract-testing.yaml +++ b/.github/workflows/contract-testing.yaml @@ -30,6 +30,7 @@ jobs: search-deployment: ${{ steps.filter.outputs.search-deployment }} stream-connection: ${{ steps.filter.outputs.stream-connection }} stream-instance: ${{ steps.filter.outputs.stream-instance }} + stream-processor: ${{ steps.filter.outputs.stream-processor }} stream-workspace: ${{ steps.filter.outputs.stream-workspace }} steps: - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 @@ -76,6 +77,8 @@ jobs: - 'cfn-resources/stream-connection/**' stream-instance: - 'cfn-resources/stream-instance/**' + stream-processor: + - 'cfn-resources/stream-processor/**' stream-workspace: - 'cfn-resources/stream-workspace/**' access-list-api-key: @@ -855,6 +858,46 @@ jobs: pushd cfn-resources/stream-instance make create-test-resources cat inputs/inputs_1_create.json + make run-contract-testing + make delete-test-resources + stream-processor: + needs: change-detection + if: ${{ needs.change-detection.outputs.stream-processor == 'true' }} + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 + - uses: actions/setup-go@7a3fe6cf4cb3a834922a1244abfce67bcef6a0c5 + with: + go-version-file: 'cfn-resources/go.mod' + - name: setup Atlas CLI + uses: mongodb/atlas-github-action@e3c9e0204659bafbb3b65e1eb1ee745cca0e9f3b + - uses: aws-actions/setup-sam@c2a20b1822cc4a6bc594ff7f1dbb658758e383c3 + with: + use-installer: true + - uses: aws-actions/configure-aws-credentials@61815dcd50bd041e203e49132bacad1fd04d2708 + with: + aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID_TEST_ENV }} + aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY_TEST_ENV }} + aws-region: eu-west-1 + - uses: actions/setup-python@83679a892e2d95755f2dac6acb0bfd1e9ac5d548 + with: + python-version: '3.9' + cache: 'pip' # caching pip dependencies + - run: pip install cloudformation-cli cloudformation-cli-go-plugin + - name: Run the Contract test + shell: bash + env: + MONGODB_ATLAS_PUBLIC_API_KEY: ${{ secrets.CLOUD_DEV_PUBLIC_KEY }} + MONGODB_ATLAS_PRIVATE_API_KEY: ${{ secrets.CLOUD_DEV_PRIVATE_KEY }} + MONGODB_ATLAS_ORG_ID: ${{ secrets.CLOUD_DEV_ORG_ID }} + MONGODB_ATLAS_OPS_MANAGER_URL: ${{ vars.MONGODB_ATLAS_BASE_URL }} + MONGODB_ATLAS_PROFILE: cfn-cloud-dev-github-action + run: | + cd cfn-resources/stream-processor + make create-test-resources + + cat inputs/* + make run-contract-testing make delete-test-resources stream-workspace: @@ -897,4 +940,4 @@ jobs: cat inputs/inputs_1_update.json make run-contract-testing - make delete-test-resources \ No newline at end of file + make delete-test-resources diff --git a/cfn-resources/stream-processor/.rpdk-config b/cfn-resources/stream-processor/.rpdk-config new file mode 100644 index 000000000..40acec48b --- /dev/null +++ b/cfn-resources/stream-processor/.rpdk-config @@ -0,0 +1,27 @@ +{ + "artifact_type": "RESOURCE", + "typeName": "MongoDB::Atlas::StreamProcessor", + "language": "go", + "runtime": "provided.al2", + "entrypoint": "bootstrap", + "testEntrypoint": "bootstrap", + "settings": { + "version": false, + "subparser_name": null, + "verbose": 0, + "force": false, + "type_name": "MongoDB::Atlas::StreamProcessor", + "artifact_type": "r", + "endpoint_url": null, + "region": null, + "target_schemas": [], + "profile": null, + "import_path": "github.com/mongodb/mongodbatlas-cloudformation-resources/stream-processor", + "protocolVersion": "2.0.0" + }, + "canarySettings": { + "contract_test_file_names": [ + "inputs_1.json" + ] + } +} diff --git a/cfn-resources/stream-processor/Makefile b/cfn-resources/stream-processor/Makefile new file mode 100644 index 000000000..ed0cb0d06 --- /dev/null +++ b/cfn-resources/stream-processor/Makefile @@ -0,0 +1,41 @@ +.PHONY: build debug test clean +tags=logging callback metrics scheduler +cgo=0 +goos=linux +goarch=amd64 +CFNREP_GIT_SHA?=$(shell git rev-parse HEAD) +ldXflags=-s -w -X github.com/mongodb/mongodbatlas-cloudformation-resources/util.defaultLogLevel=info -X github.com/mongodb/mongodbatlas-cloudformation-resources/version.Version=${CFNREP_GIT_SHA} +ldXflagsD=-X github.com/mongodb/mongodbatlas-cloudformation-resources/util.defaultLogLevel=debug -X github.com/mongodb/mongodbatlas-cloudformation-resources/version.Version=${CFNREP_GIT_SHA} + +build: + cfn generate + env GOOS=$(goos) CGO_ENABLED=$(cgo) GOARCH=$(goarch) go build -ldflags="$(ldXflags)" -tags="$(tags)" -o bin/bootstrap cmd/main.go + +debug: + cfn generate + env GOOS=$(goos) CGO_ENABLED=$(cgo) GOARCH=$(goarch) go build -ldflags="$(ldXflagsD)" -tags="$(tags)" -o bin/debug cmd/main.go + +test: + cfn generate + env GOOS=$(goos) CGO_ENABLED=$(cgo) GOARCH=$(goarch) go build -ldflags="$(ldXflags)" -tags="$(tags)" -o bin/bootstrap cmd/main.go + +clean: + rm -rf bin + +submit: clean build + @echo "==> Submitting to private registry for testing" + cfn submit --set-default --region us-east-1 + +create-test-resources: + @echo "==> Creating test files and resources for contract testing" + ./test/contract-testing/cfn-test-create.sh + +delete-test-resources: + @echo "==> Delete test resources used for contract testing" + ./test/contract-testing/cfn-test-delete.sh + +run-contract-testing: + @echo "==> Run contract testing" + make build + sam local start-lambda & + cfn test --function-name TestEntrypoint --verbose diff --git a/cfn-resources/stream-processor/README.md b/cfn-resources/stream-processor/README.md new file mode 100644 index 000000000..443a57974 --- /dev/null +++ b/cfn-resources/stream-processor/README.md @@ -0,0 +1,144 @@ +# MongoDB::Atlas::StreamProcessor + +## Description + +Resource for creating and managing [Stream Processors for an Atlas Stream Instance](https://www.mongodb.com/docs/api/doc/atlas-admin-api-v2/operation/operation-createstreamprocessor). + +## Requirements + +Set up an AWS profile to securely give CloudFormation access to your Atlas credentials. +For instructions on setting up a profile, [see here](/README.md#mongodb-atlas-api-keys-credential-management). + +## Attributes and Parameters + +See the [resource docs](docs/README.md). Also refer [AWS security best practices for CloudFormation](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/security-best-practices.html#creds) to manage credentials. + +## CloudFormation Examples + +See the example [CFN Templates](/examples/atlas-streams/stream-processor/) for example resources: + +- [Basic Stream Processor](/examples/atlas-streams/stream-processor/stream-processor.json) +- [Stream Processor with DLQ](/examples/atlas-streams/stream-processor/stream-processor-with-dlq.json) + +## Prerequisites + +Before creating a stream processor, you must have: + +- An existing Atlas Project +- An existing Stream Instance/Workspace (created via `MongoDB::Atlas::StreamInstance` resource) +- At least one Stream Connection configured (created via `MongoDB::Atlas::StreamConnection` resource) + - A source connection (e.g., sample data source, cluster connection, or Kafka connection) + - A sink connection (must be a cluster connection for merge operations) + +## Deployment + +### Deploy Basic Stream Processor + +```bash +aws cloudformation deploy \ + --template-file examples/atlas-streams/stream-processor/stream-processor.json \ + --stack-name stream-processor-stack \ + --parameter-overrides \ + ProjectId= \ + WorkspaceName= \ + ProcessorName=my-processor \ + SourceConnectionName=sample_stream_solar \ + SinkConnectionName= \ + SinkDatabase=test \ + SinkCollection=output \ + State=CREATED \ + --capabilities CAPABILITY_IAM \ + --region us-east-1 +``` + +### Deploy Stream Processor with DLQ + +```bash +aws cloudformation deploy \ + --template-file examples/atlas-streams/stream-processor/stream-processor-with-dlq.json \ + --stack-name stream-processor-dlq-stack \ + --parameter-overrides \ + ProjectId= \ + WorkspaceName= \ + ProcessorName=my-processor-dlq \ + SourceConnectionName=sample_stream_solar \ + SinkConnectionName= \ + SinkDatabase=test \ + SinkCollection=output \ + DlqConnectionName= \ + DlqDatabase=dlq \ + DlqCollection=dlq-messages \ + State=CREATED \ + --capabilities CAPABILITY_IAM \ + --region us-east-1 +``` + +## Verification + +After deployment, verify the stream processor was created successfully using both Atlas CLI and Atlas UI. + +### Atlas CLI Verification + +```bash +# List all stream processors for a workspace +atlas streams processors list --projectId + +# Describe a specific stream processor +atlas streams processors describe \ + --instance \ + --projectId +``` + +### Expected CLI Output + +The `atlas streams processors describe` command should return: + +- `id`: Unique identifier of the processor (matches the `Id` attribute in CloudFormation) +- `name`: Processor name (matches `ProcessorName` parameter) +- `state`: Current state (CREATED, STARTED, STOPPED, or FAILED) +- `pipeline`: Array of pipeline stages matching your Pipeline configuration +- `options`: DLQ configuration if provided (should match your Options.Dlq settings) +- `stats`: Processing statistics (available when processor is STARTED) + +### Verify Pipeline Configuration + +The pipeline should match your CloudFormation template: + +- Source connection name should match `SourceConnectionName` parameter +- Merge target connection should match `SinkConnectionName` parameter +- Database and collection should match `SinkDatabase` and `SinkCollection` parameters + +### Verify DLQ Configuration (if applicable) + +For processors with DLQ: + +- `options.dlq.connectionName` should match `DlqConnectionName` parameter +- `options.dlq.db` should match `DlqDatabase` parameter +- `options.dlq.coll` should match `DlqCollection` parameter + +### Atlas UI Verification + +1. Navigate to your Atlas project in the [Atlas UI](https://cloud.mongodb.com) +2. Go to **Stream Processing** section +3. Select your stream workspace/instance +4. Verify the processor appears in the **Processors** tab with: + - **Name**: Matches the `ProcessorName` from your CloudFormation template + - **State**: Matches the `State` parameter (CREATED, STARTED, or STOPPED) + - **Pipeline**: Click on the processor to view pipeline stages and verify: + - Source connection matches your `SourceConnectionName` parameter + - Merge target connection matches your `SinkConnectionName` parameter + - Target database and collection match your `SinkDatabase` and `SinkCollection` parameters +5. For processors with DLQ: + - Verify DLQ configuration is displayed in the processor details + - Check that DLQ connection, database, and collection match your parameters +6. If processor is in STARTED state: + - Verify processing statistics are available + - Check that messages are being processed (stats show input/output message counts) + +## Notes + +- **AWS Only**: This CloudFormation resource is designed for AWS deployments. The provider is effectively AWS. +- **WorkspaceName**: This field is the same as 'InstanceName' used in other stream resources. +- **State Management**: When creating a processor, specify `State: STARTED` to automatically start processing, or `State: CREATED` to create it in a stopped state. +- **Long-Running Operations**: Creating and starting stream processors can take several minutes. The resource uses callback-based state management to handle these operations asynchronously. +- **Timeout Configuration**: Use `Timeouts.Create` to configure how long to wait for processor creation/startup (default: 20 minutes). diff --git a/cfn-resources/stream-processor/cmd/main.go b/cfn-resources/stream-processor/cmd/main.go new file mode 100644 index 000000000..e34cceb2e --- /dev/null +++ b/cfn-resources/stream-processor/cmd/main.go @@ -0,0 +1,85 @@ +// Code generated by 'cfn generate', changes will be undone by the next invocation. DO NOT EDIT. +package main + +import ( + "errors" + "fmt" + "log" + + "github.com/aws-cloudformation/cloudformation-cli-go-plugin/cfn" + "github.com/aws-cloudformation/cloudformation-cli-go-plugin/cfn/handler" + "github.com/mongodb/mongodbatlas-cloudformation-resources/stream-processor/cmd/resource" +) + +// Handler is a container for the CRUDL actions exported by resources +type Handler struct{} + +// Create wraps the related Create function exposed by the resource code +func (r *Handler) Create(req handler.Request) handler.ProgressEvent { + return wrap(req, resource.Create) +} + +// Read wraps the related Read function exposed by the resource code +func (r *Handler) Read(req handler.Request) handler.ProgressEvent { + return wrap(req, resource.Read) +} + +// Update wraps the related Update function exposed by the resource code +func (r *Handler) Update(req handler.Request) handler.ProgressEvent { + return wrap(req, resource.Update) +} + +// Delete wraps the related Delete function exposed by the resource code +func (r *Handler) Delete(req handler.Request) handler.ProgressEvent { + return wrap(req, resource.Delete) +} + +// List wraps the related List function exposed by the resource code +func (r *Handler) List(req handler.Request) handler.ProgressEvent { + return wrap(req, resource.List) +} + +// main is the entry point of the application. +func main() { + cfn.Start(&Handler{}) +} + +type handlerFunc func(handler.Request, *resource.Model, *resource.Model) (handler.ProgressEvent, error) + +func wrap(req handler.Request, f handlerFunc) (response handler.ProgressEvent) { + defer func() { + // Catch any panics and return a failed ProgressEvent + if r := recover(); r != nil { + err, ok := r.(error) + if !ok { + err = errors.New(fmt.Sprint(r)) + } + + log.Printf("Trapped error in handler: %v", err) + + response = handler.NewFailedEvent(err) + } + }() + + // Populate the previous model + prevModel := &resource.Model{} + if err := req.UnmarshalPrevious(prevModel); err != nil { + log.Printf("Error unmarshaling prev model: %v", err) + return handler.NewFailedEvent(err) + } + + // Populate the current model + currentModel := &resource.Model{} + if err := req.Unmarshal(currentModel); err != nil { + log.Printf("Error unmarshaling model: %v", err) + return handler.NewFailedEvent(err) + } + + response, err := f(req, prevModel, currentModel) + if err != nil { + log.Printf("Error returned from handler function: %v", err) + return handler.NewFailedEvent(err) + } + + return response +} diff --git a/cfn-resources/stream-processor/cmd/resource/callbacks.go b/cfn-resources/stream-processor/cmd/resource/callbacks.go new file mode 100644 index 000000000..07a2e52e4 --- /dev/null +++ b/cfn-resources/stream-processor/cmd/resource/callbacks.go @@ -0,0 +1,260 @@ +// Copyright 2026 MongoDB Inc +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package resource + +import ( + "context" + "fmt" + "maps" + + "go.mongodb.org/atlas-sdk/v20250312012/admin" + + "github.com/aws-cloudformation/cloudformation-cli-go-plugin/cfn/handler" + + "github.com/mongodb/mongodbatlas-cloudformation-resources/util" + "github.com/mongodb/mongodbatlas-cloudformation-resources/util/constants" + "github.com/mongodb/mongodbatlas-cloudformation-resources/util/logger" +) + +type CallbackData struct { + ProjectID string + WorkspaceName string + ProcessorName string + DesiredState string + StartTime string + TimeoutDuration string + NeedsStarting bool + DeleteOnCreateTimeout bool +} + +func getCallbackData(req handler.Request) *CallbackData { + ctx := &CallbackData{} + + if val, ok := req.CallbackContext["projectID"].(string); ok { + ctx.ProjectID = val + } + if val, ok := req.CallbackContext["workspaceName"].(string); ok { + ctx.WorkspaceName = val + } + if val, ok := req.CallbackContext["processorName"].(string); ok { + ctx.ProcessorName = val + } + if val, ok := req.CallbackContext["needsStarting"].(bool); ok { + ctx.NeedsStarting = val + } + if val, ok := req.CallbackContext["desiredState"].(string); ok { + ctx.DesiredState = val + } + if val, ok := req.CallbackContext["startTime"].(string); ok { + ctx.StartTime = val + } + if val, ok := req.CallbackContext["timeoutDuration"].(string); ok { + ctx.TimeoutDuration = val + } + if val, ok := req.CallbackContext["deleteOnCreateTimeout"].(bool); ok { + ctx.DeleteOnCreateTimeout = val + } + + return ctx +} + +func validateCallbackData(ctx *CallbackData) *handler.ProgressEvent { + if ctx.ProjectID == "" || ctx.WorkspaceName == "" || ctx.ProcessorName == "" { + return &handler.ProgressEvent{ + OperationStatus: handler.Failed, + Message: "Missing required values in callback context", + } + } + return nil +} + +func buildCallbackContext(projectID, workspaceName, processorName string, additionalFields map[string]any) map[string]any { + ctx := map[string]any{ + "callbackStreamProcessor": true, + "projectID": projectID, + "workspaceName": workspaceName, + "processorName": processorName, + } + + maps.Copy(ctx, additionalFields) + + return ctx +} + +func cleanupOnCreateTimeout(ctx context.Context, client *util.MongoDBClient, callbackCtx *CallbackData) error { + if !callbackCtx.DeleteOnCreateTimeout { + return nil + } + + _, err := client.AtlasSDK.StreamsApi.DeleteStreamProcessor(ctx, callbackCtx.ProjectID, callbackCtx.WorkspaceName, callbackCtx.ProcessorName).Execute() + if err != nil { + _, _ = logger.Warnf("Cleanup delete failed: %v", err) + return err + } + return nil +} + +func handleCreateCallback(ctx context.Context, client *util.MongoDBClient, currentModel *Model, callbackCtx *CallbackData) handler.ProgressEvent { + needsStarting := callbackCtx.NeedsStarting + + if isTimeoutExceeded(callbackCtx.StartTime, callbackCtx.TimeoutDuration) { + if err := cleanupOnCreateTimeout(context.Background(), client, callbackCtx); err != nil { + return handler.ProgressEvent{ + OperationStatus: handler.Failed, + Message: fmt.Sprintf("Timeout reached and cleanup failed: %s", err.Error()), + } + } + cleanupMsg := "Timeout reached when waiting for stream processor creation" + if callbackCtx.DeleteOnCreateTimeout { + cleanupMsg += ". Deletion of resource has been triggered because delete_on_create_timeout is true. If you suspect a transient error, wait before retrying to allow resource deletion to finish." + } else { + cleanupMsg += ". Cleanup was not performed because delete_on_create_timeout is false." + } + return handler.ProgressEvent{ + OperationStatus: handler.Failed, + Message: cleanupMsg, + } + } + + streamProcessor, peErr := getStreamProcessor(ctx, client.AtlasSDK, callbackCtx.ProjectID, callbackCtx.WorkspaceName, callbackCtx.ProcessorName) + if peErr != nil { + return *peErr + } + + currentState := streamProcessor.GetState() + + callbackContext := buildCallbackContext(callbackCtx.ProjectID, callbackCtx.WorkspaceName, callbackCtx.ProcessorName, map[string]any{ + "needsStarting": callbackCtx.NeedsStarting, + "startTime": callbackCtx.StartTime, + "timeoutDuration": callbackCtx.TimeoutDuration, + "deleteOnCreateTimeout": callbackCtx.DeleteOnCreateTimeout, + }) + + switch currentState { + case CreatedState: + if needsStarting { + if peErr := startStreamProcessor(ctx, client.AtlasSDK, callbackCtx.ProjectID, callbackCtx.WorkspaceName, callbackCtx.ProcessorName); peErr != nil { + return *peErr + } + return createInProgressEvent(constants.Pending, currentModel, callbackContext) + } + return finalizeModel(streamProcessor, currentModel, constants.Complete) + + case StartedState: + return finalizeModel(streamProcessor, currentModel, constants.Complete) + + case InitiatingState, CreatingState: + return createInProgressEvent(constants.Pending, currentModel, callbackContext) + + case FailedState: + return handler.ProgressEvent{ + OperationStatus: handler.Failed, + Message: "Stream processor entered FAILED state", + } + + default: + return handler.ProgressEvent{ + OperationStatus: handler.Failed, + Message: fmt.Sprintf("Unexpected state during creation: %s", currentState), + } + } +} + +func handleUpdateCallback(ctx context.Context, client *util.MongoDBClient, currentModel *Model, callbackCtx *CallbackData) handler.ProgressEvent { + streamProcessor, peErr := getStreamProcessor(ctx, client.AtlasSDK, callbackCtx.ProjectID, callbackCtx.WorkspaceName, callbackCtx.ProcessorName) + if peErr != nil { + return *peErr + } + + desiredState := callbackCtx.DesiredState + if desiredState == "" { + desiredState = streamProcessor.GetState() + } + if desiredState == "" && currentModel != nil && currentModel.DesiredState != nil && *currentModel.DesiredState != "" { + desiredState = *currentModel.DesiredState + } + if desiredState == "" { + desiredState = CreatedState + } + + currentState := streamProcessor.GetState() + + callbackContext := buildCallbackContext(callbackCtx.ProjectID, callbackCtx.WorkspaceName, callbackCtx.ProcessorName, map[string]any{ + "desiredState": desiredState, + }) + + switch currentState { + case StoppedState, CreatedState: + modifyAPIRequestParams, err := NewStreamProcessorUpdateReq(currentModel) + if err != nil { + return handler.ProgressEvent{ + OperationStatus: handler.Failed, + Message: fmt.Sprintf("Error creating update request: %s", err.Error()), + } + } + + streamProcessorResp, apiResp, err := client.AtlasSDK.StreamsApi.UpdateStreamProcessorWithParams(ctx, modifyAPIRequestParams).Execute() + if err != nil { + return handleError(apiResp, constants.UPDATE, err) + } + + if desiredState == StartedState { + if peErr := startStreamProcessor(ctx, client.AtlasSDK, callbackCtx.ProjectID, callbackCtx.WorkspaceName, callbackCtx.ProcessorName); peErr != nil { + return *peErr + } + return createInProgressEvent(constants.Pending, currentModel, callbackContext) + } + + return finalizeModel(streamProcessorResp, currentModel, constants.Complete) + + case StartedState: + if desiredState == StartedState { + return finalizeModel(streamProcessor, currentModel, constants.Complete) + } + + // Only StoppedState is a valid transition from StartedState + // (CreatedState transitions are not allowed per validateUpdateStateTransition) + if desiredState != StoppedState { + return handler.ProgressEvent{ + OperationStatus: handler.Failed, + Message: fmt.Sprintf("Unexpected desired state %s when current state is %s. Only %s is allowed.", desiredState, StartedState, StoppedState), + } + } + + _, err := client.AtlasSDK.StreamsApi.StopStreamProcessorWithParams(ctx, + &admin.StopStreamProcessorApiParams{ + GroupId: callbackCtx.ProjectID, + TenantName: callbackCtx.WorkspaceName, + ProcessorName: callbackCtx.ProcessorName, + }, + ).Execute() + if err != nil { + return handler.ProgressEvent{ + OperationStatus: handler.Failed, + Message: fmt.Sprintf("Error stopping stream processor: %s", err.Error()), + } + } + return createInProgressEvent(constants.Pending, currentModel, callbackContext) + + case FailedState: + return handler.ProgressEvent{ + OperationStatus: handler.Failed, + Message: "Stream processor entered FAILED state", + } + + default: + return createInProgressEvent(constants.Pending, currentModel, callbackContext) + } +} diff --git a/cfn-resources/stream-processor/cmd/resource/config.go b/cfn-resources/stream-processor/cmd/resource/config.go new file mode 100644 index 000000000..4d9eb7831 --- /dev/null +++ b/cfn-resources/stream-processor/cmd/resource/config.go @@ -0,0 +1,19 @@ +// Code generated by 'cfn generate', changes will be undone by the next invocation. DO NOT EDIT. +// Updates to this type are made my editing the schema file and executing the 'generate' command. +package resource + +import "github.com/aws-cloudformation/cloudformation-cli-go-plugin/cfn/handler" + +// TypeConfiguration is autogenerated from the json schema +type TypeConfiguration struct { +} + +// Configuration returns a resource's configuration. +func Configuration(req handler.Request) (*TypeConfiguration, error) { + // Populate the type configuration + typeConfig := &TypeConfiguration{} + if err := req.UnmarshalTypeConfig(typeConfig); err != nil { + return typeConfig, err + } + return typeConfig, nil +} diff --git a/cfn-resources/stream-processor/cmd/resource/handlers.go b/cfn-resources/stream-processor/cmd/resource/handlers.go new file mode 100644 index 000000000..9f6f23170 --- /dev/null +++ b/cfn-resources/stream-processor/cmd/resource/handlers.go @@ -0,0 +1,338 @@ +// Copyright 2026 MongoDB Inc +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package resource + +import ( + "context" + "fmt" + "time" + + "go.mongodb.org/atlas-sdk/v20250312012/admin" + + "github.com/aws-cloudformation/cloudformation-cli-go-plugin/cfn/handler" + + "github.com/mongodb/mongodbatlas-cloudformation-resources/util" + "github.com/mongodb/mongodbatlas-cloudformation-resources/util/constants" +) + +func IsCallback(req *handler.Request) bool { + _, found := req.CallbackContext["callbackStreamProcessor"] + return found +} + +func HandleCreate(req *handler.Request, client *util.MongoDBClient, model *Model) handler.ProgressEvent { + if IsCallback(req) { + callbackCtx := getCallbackData(*req) + if peErr := validateCallbackData(callbackCtx); peErr != nil { + return *peErr + } + return handleCreateCallback( + context.Background(), + client, + model, + callbackCtx, + ) + } + + workspaceName := util.SafeString(model.WorkspaceName) + + ctx := context.Background() + projectID := util.SafeString(model.ProjectId) + processorName := util.SafeString(model.ProcessorName) + + var needsStarting bool + if model.DesiredState != nil { + state := *model.DesiredState + switch state { + case StartedState: + needsStarting = true + case CreatedState: + needsStarting = false + default: + return handler.ProgressEvent{ + OperationStatus: handler.Failed, + Message: "When creating a stream processor, the only valid states are CREATED and STARTED", + } + } + } + + streamProcessorReq, err := NewStreamProcessorReq(model) + if err != nil { + return handler.ProgressEvent{ + OperationStatus: handler.Failed, + Message: fmt.Sprintf("Error creating stream processor request: %s", err.Error()), + } + } + + _, apiResp, err := client.AtlasSDK.StreamsApi.CreateStreamProcessor(ctx, projectID, workspaceName, streamProcessorReq).Execute() + if err != nil { + return handleError(apiResp, constants.CREATE, err) + } + + timeoutStr := "" + if model.Timeouts != nil && model.Timeouts.Create != nil { + timeoutStr = *model.Timeouts.Create + } + + deleteOnCreateTimeout := true + if model.DeleteOnCreateTimeout != nil { + deleteOnCreateTimeout = *model.DeleteOnCreateTimeout + } + + inProgressModel := &Model{} + if model != nil { + *inProgressModel = *model + inProgressModel.DeleteOnCreateTimeout = nil + } + copyIdentifyingFields(inProgressModel, model) + + return handler.ProgressEvent{ + OperationStatus: handler.InProgress, + Message: constants.Pending, + ResourceModel: inProgressModel, + CallbackDelaySeconds: defaultCallbackDelaySeconds, + CallbackContext: buildCallbackContext(projectID, workspaceName, processorName, map[string]any{ + "needsStarting": needsStarting, + "startTime": time.Now().Format(time.RFC3339), + "timeoutDuration": timeoutStr, + "deleteOnCreateTimeout": deleteOnCreateTimeout, + }), + } +} + +func HandleRead(req *handler.Request, client *util.MongoDBClient, model *Model) handler.ProgressEvent { + workspaceName := util.SafeString(model.WorkspaceName) + projectID := util.SafeString(model.ProjectId) + processorName := util.SafeString(model.ProcessorName) + + streamProcessor, apiResp, err := client.AtlasSDK.StreamsApi.GetStreamProcessorWithParams(context.Background(), + &admin.GetStreamProcessorApiParams{ + GroupId: projectID, + TenantName: workspaceName, + ProcessorName: processorName, + }).Execute() + if err != nil { + if util.StatusNotFound(apiResp) { + return handler.ProgressEvent{ + OperationStatus: handler.Failed, + Message: "Resource not found", + HandlerErrorCode: "NotFound", + } + } + return handleError(apiResp, constants.READ, err) + } + + resourceModel, err := GetStreamProcessorModel(streamProcessor, model) + if err != nil { + return handler.ProgressEvent{ + OperationStatus: handler.Failed, + Message: fmt.Sprintf("Error converting stream processor model: %s", err.Error()), + } + } + + copyIdentifyingFields(resourceModel, model) + + return handler.ProgressEvent{ + OperationStatus: handler.Success, + Message: constants.ReadComplete, + ResourceModel: resourceModel, + } +} + +func HandleUpdate(req *handler.Request, client *util.MongoDBClient, prevModel *Model, model *Model) handler.ProgressEvent { + if IsCallback(req) { + callbackCtx := getCallbackData(*req) + if peErr := validateCallbackData(callbackCtx); peErr != nil { + return *peErr + } + return handleUpdateCallback( + context.Background(), + client, + model, + callbackCtx, + ) + } + + workspaceName := util.SafeString(model.WorkspaceName) + + ctx := context.Background() + projectID := util.SafeString(model.ProjectId) + processorName := util.SafeString(model.ProcessorName) + + requestParams := &admin.GetStreamProcessorApiParams{ + GroupId: projectID, + TenantName: workspaceName, + ProcessorName: processorName, + } + + currentStreamProcessor, apiResp, err := client.AtlasSDK.StreamsApi.GetStreamProcessorWithParams(ctx, requestParams).Execute() + if err != nil { + if util.StatusNotFound(apiResp) { + return handler.ProgressEvent{ + OperationStatus: handler.Failed, + Message: "Resource not found", + HandlerErrorCode: "NotFound", + } + } + return handleError(apiResp, constants.READ, err) + } + + currentState := currentStreamProcessor.GetState() + + desiredState := currentState + if model.DesiredState != nil && *model.DesiredState != "" { + desiredState = *model.DesiredState + } else if prevModel != nil && prevModel.DesiredState != nil && *prevModel.DesiredState != "" { + desiredState = *prevModel.DesiredState + } + + if errMsg, isValid := validateUpdateStateTransition(currentState, desiredState); !isValid { + return handler.ProgressEvent{ + OperationStatus: handler.Failed, + Message: errMsg, + } + } + + if currentState == StartedState { + _, err := client.AtlasSDK.StreamsApi.StopStreamProcessorWithParams(ctx, + &admin.StopStreamProcessorApiParams{ + GroupId: projectID, + TenantName: workspaceName, + ProcessorName: processorName, + }, + ).Execute() + if err != nil { + return handler.ProgressEvent{ + OperationStatus: handler.Failed, + Message: fmt.Sprintf("Error stopping stream processor: %s", err.Error()), + } + } + + inProgressModel := &Model{} + if model != nil { + *inProgressModel = *model + inProgressModel.DeleteOnCreateTimeout = nil + } + copyIdentifyingFields(inProgressModel, model) + + return handler.ProgressEvent{ + OperationStatus: handler.InProgress, + Message: constants.Pending, + ResourceModel: inProgressModel, + CallbackDelaySeconds: defaultCallbackDelaySeconds, + CallbackContext: buildCallbackContext(projectID, workspaceName, processorName, map[string]any{ + "desiredState": desiredState, + }), + } + } + + modifyAPIRequestParams, err := NewStreamProcessorUpdateReq(model) + if err != nil { + return handler.ProgressEvent{ + OperationStatus: handler.Failed, + Message: fmt.Sprintf("Error creating update request: %s", err.Error()), + } + } + + streamProcessorResp, apiResp, err := client.AtlasSDK.StreamsApi.UpdateStreamProcessorWithParams(ctx, modifyAPIRequestParams).Execute() + if err != nil { + return handleError(apiResp, constants.UPDATE, err) + } + + if desiredState == StartedState { + if peErr := startStreamProcessor(ctx, client.AtlasSDK, projectID, workspaceName, processorName); peErr != nil { + return *peErr + } + + inProgressModel := &Model{} + if model != nil { + *inProgressModel = *model + inProgressModel.DeleteOnCreateTimeout = nil + } + copyIdentifyingFields(inProgressModel, model) + + return handler.ProgressEvent{ + OperationStatus: handler.InProgress, + Message: constants.Pending, + ResourceModel: inProgressModel, + CallbackDelaySeconds: defaultCallbackDelaySeconds, + CallbackContext: buildCallbackContext(projectID, workspaceName, processorName, map[string]any{ + "desiredState": desiredState, + }), + } + } + + return finalizeModel(streamProcessorResp, model, constants.Complete) +} + +func HandleDelete(req *handler.Request, client *util.MongoDBClient, model *Model) handler.ProgressEvent { + workspaceName := util.SafeString(model.WorkspaceName) + + ctx := context.Background() + projectID := util.SafeString(model.ProjectId) + processorName := util.SafeString(model.ProcessorName) + + apiResp, err := client.AtlasSDK.StreamsApi.DeleteStreamProcessor(ctx, projectID, workspaceName, processorName).Execute() + if err != nil { + if util.StatusNotFound(apiResp) { + return handler.ProgressEvent{ + OperationStatus: handler.Failed, + Message: "Resource not found", + HandlerErrorCode: "NotFound", + } + } + return handler.ProgressEvent{ + OperationStatus: handler.Failed, + Message: fmt.Sprintf("Error deleting stream processor: %s", err.Error()), + } + } + + return handler.ProgressEvent{ + OperationStatus: handler.Success, + Message: constants.Complete, + } +} + +func HandleList(req *handler.Request, client *util.MongoDBClient, model *Model) handler.ProgressEvent { + workspaceName := util.SafeString(model.WorkspaceName) + + ctx := context.Background() + projectID := util.SafeString(model.ProjectId) + + accumulatedProcessors, apiResp, err := getAllStreamProcessors(ctx, client.AtlasSDK, projectID, workspaceName) + if err != nil { + return handleError(apiResp, constants.LIST, err) + } + + response := make([]interface{}, 0, len(accumulatedProcessors)) + for i := range accumulatedProcessors { + modelItem, err := GetStreamProcessorModel(&accumulatedProcessors[i], model) + if err != nil { + return handler.ProgressEvent{ + OperationStatus: handler.Failed, + Message: fmt.Sprintf("Error converting stream processor model: %s", err.Error()), + } + } + + copyIdentifyingFields(modelItem, model) + response = append(response, modelItem) + } + + return handler.ProgressEvent{ + OperationStatus: handler.Success, + Message: constants.Complete, + ResourceModels: response, + } +} diff --git a/cfn-resources/stream-processor/cmd/resource/helpers.go b/cfn-resources/stream-processor/cmd/resource/helpers.go new file mode 100644 index 000000000..473fc55ca --- /dev/null +++ b/cfn-resources/stream-processor/cmd/resource/helpers.go @@ -0,0 +1,202 @@ +// Copyright 2026 MongoDB Inc +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package resource + +import ( + "context" + "fmt" + "net/http" + "time" + + "go.mongodb.org/atlas-sdk/v20250312012/admin" + + "github.com/aws-cloudformation/cloudformation-cli-go-plugin/cfn/handler" + + "github.com/mongodb/mongodbatlas-cloudformation-resources/util" + "github.com/mongodb/mongodbatlas-cloudformation-resources/util/constants" + "github.com/mongodb/mongodbatlas-cloudformation-resources/util/logger" + "github.com/mongodb/mongodbatlas-cloudformation-resources/util/progressevent" +) + +func copyIdentifyingFields(resourceModel, currentModel *Model) { + resourceModel.Profile = currentModel.Profile + resourceModel.ProjectId = currentModel.ProjectId + resourceModel.ProcessorName = currentModel.ProcessorName + resourceModel.WorkspaceName = currentModel.WorkspaceName +} + +func parseTimeout(timeoutStr string) time.Duration { + if timeoutStr == "" { + return DefaultCreateTimeout + } + duration, err := time.ParseDuration(timeoutStr) + if err != nil { + _, _ = logger.Warnf("Invalid timeout format '%s', using default: %v", timeoutStr, err) + return DefaultCreateTimeout + } + return duration +} + +// isTimeoutExceeded checks if the elapsed time since startTimeStr exceeds the timeoutDurationStr. +// If this function needs to be used by other resources in the future, it should be moved to the util package. +func isTimeoutExceeded(startTimeStr, timeoutDurationStr string) bool { + if startTimeStr == "" || timeoutDurationStr == "" { + return false + } + + startTime, err := time.Parse(time.RFC3339, startTimeStr) + if err != nil { + _, _ = logger.Warnf("Invalid start time format '%s': %v", startTimeStr, err) + return false + } + + timeoutDuration := parseTimeout(timeoutDurationStr) + elapsed := time.Since(startTime) + + return elapsed >= timeoutDuration +} + +func finalizeModel(streamProcessor *admin.StreamsProcessorWithStats, currentModel *Model, message string) handler.ProgressEvent { + resourceModel, err := GetStreamProcessorModel(streamProcessor, currentModel) + if err != nil { + return handler.ProgressEvent{ + OperationStatus: handler.Failed, + Message: fmt.Sprintf("Error converting stream processor model: %s", err.Error()), + } + } + + copyIdentifyingFields(resourceModel, currentModel) + + return handler.ProgressEvent{ + OperationStatus: handler.Success, + Message: message, + ResourceModel: resourceModel, + } +} + +func getAllStreamProcessors(ctx context.Context, atlasClient *admin.APIClient, projectID, workspaceName string) ([]admin.StreamsProcessorWithStats, *http.Response, error) { + pageNum := 1 + accumulatedProcessors := make([]admin.StreamsProcessorWithStats, 0) + + for allRecordsRetrieved := false; !allRecordsRetrieved; { + processorsResp, apiResp, err := atlasClient.StreamsApi.GetStreamProcessorsWithParams(ctx, &admin.GetStreamProcessorsApiParams{ + GroupId: projectID, + TenantName: workspaceName, + ItemsPerPage: util.Pointer(constants.DefaultListItemsPerPage), + PageNum: util.Pointer(pageNum), + }).Execute() + + if err != nil { + return nil, apiResp, err + } + + results := processorsResp.GetResults() + accumulatedProcessors = append(accumulatedProcessors, results...) + + totalCount := processorsResp.GetTotalCount() + allRecordsRetrieved = totalCount <= len(accumulatedProcessors) || len(results) < constants.DefaultListItemsPerPage + pageNum++ + } + + return accumulatedProcessors, nil, nil +} + +func getStreamProcessor(ctx context.Context, atlasClient *admin.APIClient, projectID, workspaceName, processorName string) (*admin.StreamsProcessorWithStats, *handler.ProgressEvent) { + requestParams := &admin.GetStreamProcessorApiParams{ + GroupId: projectID, + TenantName: workspaceName, + ProcessorName: processorName, + } + + streamProcessor, resp, err := atlasClient.StreamsApi.GetStreamProcessorWithParams(ctx, requestParams).Execute() + if err != nil { + if util.StatusNotFound(resp) { + return nil, &handler.ProgressEvent{ + OperationStatus: handler.Failed, + Message: "Stream processor not found", + HandlerErrorCode: "NotFound", + } + } + return nil, &handler.ProgressEvent{ + OperationStatus: handler.Failed, + Message: fmt.Sprintf("Error getting stream processor: %s", err.Error()), + } + } + return streamProcessor, nil +} + +func startStreamProcessor(ctx context.Context, atlasClient *admin.APIClient, projectID, workspaceName, processorName string) *handler.ProgressEvent { + _, err := atlasClient.StreamsApi.StartStreamProcessorWithParams(ctx, + &admin.StartStreamProcessorApiParams{ + GroupId: projectID, + TenantName: workspaceName, + ProcessorName: processorName, + }, + ).Execute() + if err != nil { + return &handler.ProgressEvent{ + OperationStatus: handler.Failed, + Message: fmt.Sprintf("Error starting stream processor: %s", err.Error()), + } + } + return nil +} + +func createInProgressEvent(message string, currentModel *Model, callbackContext map[string]any) handler.ProgressEvent { + inProgressModel := &Model{} + if currentModel != nil { + *inProgressModel = *currentModel + inProgressModel.DeleteOnCreateTimeout = nil + } + copyIdentifyingFields(inProgressModel, currentModel) + + return handler.ProgressEvent{ + OperationStatus: handler.InProgress, + Message: message, + ResourceModel: inProgressModel, + CallbackDelaySeconds: defaultCallbackDelaySeconds, + CallbackContext: callbackContext, + } +} + +func validateUpdateStateTransition(currentState, desiredState string) (errMsg string, isValidTransition bool) { + if currentState == desiredState { + return "", true + } + + if desiredState == StoppedState && currentState != StartedState { + return fmt.Sprintf("Stream Processor must be in %s state to transition to %s state", StartedState, StoppedState), false + } + + if desiredState == CreatedState { + return fmt.Sprintf("Stream Processor cannot transition from %s to CREATED", currentState), false + } + + return "", true +} + +func handleError(response *http.Response, method constants.CfnFunctions, err error) handler.ProgressEvent { + errMsg := fmt.Sprintf("%s error:%s", method, err.Error()) + + if util.StatusConflict(response) { + return handler.ProgressEvent{ + OperationStatus: handler.Failed, + Message: errMsg, + HandlerErrorCode: "AlreadyExists", + } + } + + return progressevent.GetFailedEventByResponse(errMsg, response) +} diff --git a/cfn-resources/stream-processor/cmd/resource/mappings.go b/cfn-resources/stream-processor/cmd/resource/mappings.go new file mode 100644 index 000000000..76f5c3ac2 --- /dev/null +++ b/cfn-resources/stream-processor/cmd/resource/mappings.go @@ -0,0 +1,172 @@ +// Copyright 2026 MongoDB Inc +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package resource + +import ( + "encoding/json" + "fmt" + + "go.mongodb.org/atlas-sdk/v20250312012/admin" + + "github.com/mongodb/mongodbatlas-cloudformation-resources/util" +) + +func ConvertPipelineToSdk(pipeline string) ([]any, error) { + var pipelineSliceOfMaps []any + err := json.Unmarshal([]byte(pipeline), &pipelineSliceOfMaps) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal pipeline: %w", err) + } + return pipelineSliceOfMaps, nil +} + +func ConvertPipelineToString(pipeline []any) (string, error) { + pipelineJSON, err := json.Marshal(pipeline) + if err != nil { + return "", fmt.Errorf("failed to marshal pipeline: %w", err) + } + return string(pipelineJSON), nil +} + +func ConvertStatsToString(stats any) (string, error) { + if stats == nil { + return "", nil + } + statsJSON, err := json.Marshal(stats) + if err != nil { + return "", fmt.Errorf("failed to marshal stats: %w", err) + } + return string(statsJSON), nil +} + +func NewStreamProcessorReq(model *Model) (*admin.StreamsProcessor, error) { + pipeline, err := ConvertPipelineToSdk(util.SafeString(model.Pipeline)) + if err != nil { + return nil, err + } + + streamProcessor := &admin.StreamsProcessor{ + Name: model.ProcessorName, + Pipeline: &pipeline, + } + + if model.Options != nil && model.Options.Dlq != nil { + dlq := model.Options.Dlq + if dlq.Coll != nil && *dlq.Coll != "" && + dlq.ConnectionName != nil && *dlq.ConnectionName != "" && + dlq.Db != nil && *dlq.Db != "" { + streamProcessor.Options = &admin.StreamsOptions{ + Dlq: &admin.StreamsDLQ{ + Coll: dlq.Coll, + ConnectionName: dlq.ConnectionName, + Db: dlq.Db, + }, + } + } + } + + return streamProcessor, nil +} + +func NewStreamProcessorUpdateReq(model *Model) (*admin.UpdateStreamProcessorApiParams, error) { + pipeline, err := ConvertPipelineToSdk(util.SafeString(model.Pipeline)) + if err != nil { + return nil, err + } + + workspaceName := util.SafeString(model.WorkspaceName) + + streamProcessorAPIParams := &admin.UpdateStreamProcessorApiParams{ + GroupId: util.SafeString(model.ProjectId), + TenantName: workspaceName, + ProcessorName: util.SafeString(model.ProcessorName), + StreamsModifyStreamProcessor: &admin.StreamsModifyStreamProcessor{ + Name: model.ProcessorName, + Pipeline: &pipeline, + }, + } + + if model.Options != nil && model.Options.Dlq != nil { + dlq := model.Options.Dlq + if dlq.Coll != nil && *dlq.Coll != "" && + dlq.ConnectionName != nil && *dlq.ConnectionName != "" && + dlq.Db != nil && *dlq.Db != "" { + streamProcessorAPIParams.StreamsModifyStreamProcessor.Options = &admin.StreamsModifyStreamProcessorOptions{ + Dlq: &admin.StreamsDLQ{ + Coll: dlq.Coll, + ConnectionName: dlq.ConnectionName, + Db: dlq.Db, + }, + } + } + } + + return streamProcessorAPIParams, nil +} + +func GetStreamProcessorModel(streamProcessor *admin.StreamsProcessorWithStats, currentModel *Model) (*Model, error) { + model := new(Model) + + if currentModel != nil { + *model = *currentModel + model.DeleteOnCreateTimeout = nil + } + + model.ProcessorName = util.Pointer(streamProcessor.Name) + model.Id = util.Pointer(streamProcessor.Id) + model.State = util.Pointer(streamProcessor.State) + + if currentModel != nil && currentModel.Pipeline != nil { + model.Pipeline = currentModel.Pipeline + } else if streamProcessor.Pipeline != nil { + pipelineStr, err := ConvertPipelineToString(streamProcessor.GetPipeline()) + if err != nil { + return nil, err + } + model.Pipeline = &pipelineStr + } + + if streamProcessor.Stats != nil { + statsStr, err := ConvertStatsToString(streamProcessor.GetStats()) + if err != nil { + return nil, err + } + model.Stats = &statsStr + } + + if streamProcessor.Options != nil && streamProcessor.Options.Dlq != nil { + apiDlq := streamProcessor.Options.Dlq + if apiDlq.Coll != nil && *apiDlq.Coll != "" && + apiDlq.ConnectionName != nil && *apiDlq.ConnectionName != "" && + apiDlq.Db != nil && *apiDlq.Db != "" { + model.Options = &StreamsOptions{ + Dlq: &StreamsDLQ{ + Coll: apiDlq.Coll, + ConnectionName: apiDlq.ConnectionName, + Db: apiDlq.Db, + }, + } + } + } else if currentModel != nil && currentModel.Options != nil && currentModel.Options.Dlq != nil { + currentDlq := currentModel.Options.Dlq + if currentDlq.Coll != nil && *currentDlq.Coll != "" && + currentDlq.ConnectionName != nil && *currentDlq.ConnectionName != "" && + currentDlq.Db != nil && *currentDlq.Db != "" { + model.Options = currentModel.Options + } + } + + return model, nil +} diff --git a/cfn-resources/stream-processor/cmd/resource/mappings_test.go b/cfn-resources/stream-processor/cmd/resource/mappings_test.go new file mode 100644 index 000000000..c5a156cb6 --- /dev/null +++ b/cfn-resources/stream-processor/cmd/resource/mappings_test.go @@ -0,0 +1,341 @@ +// Copyright 2026 MongoDB Inc +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package resource_test + +import ( + "encoding/json" + "testing" + + "github.com/mongodb/mongodbatlas-cloudformation-resources/stream-processor/cmd/resource" + "github.com/mongodb/mongodbatlas-cloudformation-resources/util" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.mongodb.org/atlas-sdk/v20250312012/admin" +) + +func assertJSONEqual(t *testing.T, expected, actual string) { + t.Helper() + var expectedJSON, actualJSON any + require.NoError(t, json.Unmarshal([]byte(expected), &expectedJSON)) + require.NoError(t, json.Unmarshal([]byte(actual), &actualJSON)) + assert.Equal(t, expectedJSON, actualJSON) +} + +func TestConvertPipelineToSdk(t *testing.T) { + testCases := map[string]struct { + pipeline string + expectedError bool + }{ + "validPipeline": { + pipeline: `[{"$match": {"status": "active"}}]`, + }, + "invalidJSON": { + pipeline: `[{"$match": {"status": "active"}`, + expectedError: true, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + result, err := resource.ConvertPipelineToSdk(tc.pipeline) + if tc.expectedError { + require.Error(t, err) + assert.Nil(t, result) + } else { + require.NoError(t, err) + assert.NotNil(t, result) + } + }) + } +} + +func TestConvertPipelineToString(t *testing.T) { + testCases := map[string]struct { + expectedJSON string + pipeline []any + }{ + "validPipeline": { + pipeline: []any{map[string]any{"$match": map[string]any{"status": "active"}}}, + expectedJSON: `[{"$match":{"status":"active"}}]`, + }, + "nilPipeline": { + pipeline: nil, + expectedJSON: `null`, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + result, err := resource.ConvertPipelineToString(tc.pipeline) + require.NoError(t, err) + assertJSONEqual(t, tc.expectedJSON, result) + }) + } +} + +func TestConvertStatsToString(t *testing.T) { + result, err := resource.ConvertStatsToString(map[string]any{"bytesProcessed": 1000}) + require.NoError(t, err) + assertJSONEqual(t, `{"bytesProcessed":1000}`, result) + + result, err = resource.ConvertStatsToString(nil) + require.NoError(t, err) + assert.Empty(t, result) +} + +func TestNewStreamProcessorReq(t *testing.T) { + validPipeline := `[{"$match": {"status": "active"}}]` + validDLQ := &resource.StreamsOptions{ + Dlq: &resource.StreamsDLQ{ + Coll: util.StringPtr("dlq-collection"), + ConnectionName: util.StringPtr("dlq-connection"), + Db: util.StringPtr("dlq-db"), + }, + } + + testCases := map[string]struct { + model *resource.Model + expectedError bool + checkOptions bool + }{ + "minimalRequest": { + model: &resource.Model{ + ProcessorName: util.StringPtr("test-processor"), + Pipeline: util.StringPtr(validPipeline), + }, + }, + "withOptions": { + model: &resource.Model{ + ProcessorName: util.StringPtr("test-processor"), + Pipeline: util.StringPtr(validPipeline), + Options: validDLQ, + }, + checkOptions: true, + }, + "invalidPipeline": { + model: &resource.Model{ + ProcessorName: util.StringPtr("test-processor"), + Pipeline: util.StringPtr(`invalid json`), + }, + expectedError: true, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + result, err := resource.NewStreamProcessorReq(tc.model) + if tc.expectedError { + require.Error(t, err) + assert.Nil(t, result) + } else { + require.NoError(t, err) + require.NotNil(t, result) + assert.Equal(t, "test-processor", result.GetName()) + if tc.checkOptions { + require.NotNil(t, result.Options.Dlq) + assert.Equal(t, "dlq-collection", util.SafeString(result.Options.Dlq.Coll)) + } + } + }) + } +} + +func TestNewStreamProcessorUpdateReq(t *testing.T) { + validPipeline := `[{"$match": {"status": "active"}}]` + validDLQ := &resource.StreamsOptions{ + Dlq: &resource.StreamsDLQ{ + Coll: util.StringPtr("dlq-collection"), + ConnectionName: util.StringPtr("dlq-connection"), + Db: util.StringPtr("dlq-db"), + }, + } + + testCases := map[string]struct { + model *resource.Model + checkTenant string + expectedError bool + checkOptions bool + }{ + "minimalRequest": { + model: &resource.Model{ + ProjectId: util.StringPtr("507f1f77bcf86cd799439011"), + ProcessorName: util.StringPtr("test-processor"), + WorkspaceName: util.StringPtr("workspace-1"), + Pipeline: util.StringPtr(validPipeline), + }, + checkTenant: "workspace-1", + }, + "withOptions": { + model: &resource.Model{ + ProjectId: util.StringPtr("507f1f77bcf86cd799439011"), + ProcessorName: util.StringPtr("test-processor"), + WorkspaceName: util.StringPtr("workspace-1"), + Pipeline: util.StringPtr(validPipeline), + Options: validDLQ, + }, + checkTenant: "workspace-1", + checkOptions: true, + }, + "invalidPipeline": { + model: &resource.Model{ + ProjectId: util.StringPtr("507f1f77bcf86cd799439011"), + ProcessorName: util.StringPtr("test-processor"), + WorkspaceName: util.StringPtr("workspace-1"), + Pipeline: util.StringPtr(`invalid json`), + }, + expectedError: true, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + result, err := resource.NewStreamProcessorUpdateReq(tc.model) + if tc.expectedError { + require.Error(t, err) + assert.Nil(t, result) + } else { + require.NoError(t, err) + require.NotNil(t, result) + assert.Equal(t, "507f1f77bcf86cd799439011", result.GroupId) + assert.Equal(t, "test-processor", result.ProcessorName) + if tc.checkTenant != "" { + assert.Equal(t, tc.checkTenant, result.TenantName) + } + if tc.checkOptions { + require.NotNil(t, result.StreamsModifyStreamProcessor.Options.Dlq) + assert.Equal(t, "dlq-collection", result.StreamsModifyStreamProcessor.Options.Dlq.GetColl()) + } + } + }) + } +} + +func TestGetStreamProcessorModel(t *testing.T) { + validDLQ := &admin.StreamsDLQ{ + Coll: admin.PtrString("dlq-collection"), + ConnectionName: admin.PtrString("dlq-connection"), + Db: admin.PtrString("dlq-db"), + } + currentModelWithDLQ := &resource.Model{ + Options: &resource.StreamsOptions{ + Dlq: &resource.StreamsDLQ{ + Coll: util.StringPtr("existing-dlq-collection"), + ConnectionName: util.StringPtr("existing-dlq-connection"), + Db: util.StringPtr("existing-dlq-db"), + }, + }, + } + + testCases := map[string]struct { + streamProcessor *admin.StreamsProcessorWithStats + currentModel *resource.Model + checkFields []string + }{ + "minimalConversion": { + streamProcessor: &admin.StreamsProcessorWithStats{ + Name: "test-processor", + Id: "507f1f77bcf86cd799439011", + State: "CREATED", + }, + checkFields: []string{"name", "id", "state"}, + }, + "withAllFields": { + streamProcessor: &admin.StreamsProcessorWithStats{ + Name: "test-processor", + Id: "507f1f77bcf86cd799439011", + State: "STARTED", + Pipeline: []any{map[string]any{"$match": map[string]any{"status": "active"}}}, + Stats: map[string]any{"bytesProcessed": 5000, "recordsProcessed": 500}, + Options: &admin.StreamsOptions{Dlq: validDLQ}, + }, + currentModel: &resource.Model{ + ProjectId: util.StringPtr("507f1f77bcf86cd799439011"), + ProcessorName: util.StringPtr("test-processor"), + }, + checkFields: []string{"all"}, + }, + "preserveCurrentModelOptions": { + streamProcessor: &admin.StreamsProcessorWithStats{ + Name: "test-processor", + Id: "507f1f77bcf86cd799439011", + State: "CREATED", + }, + currentModel: currentModelWithDLQ, + checkFields: []string{"preservedOptions"}, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + result, err := resource.GetStreamProcessorModel(tc.streamProcessor, tc.currentModel) + require.NoError(t, err) + require.NotNil(t, result) + + for _, field := range tc.checkFields { + switch field { + case "name": + assert.Equal(t, "test-processor", util.SafeString(result.ProcessorName)) + case "id": + assert.Equal(t, "507f1f77bcf86cd799439011", util.SafeString(result.Id)) + case "state": + assert.Equal(t, tc.streamProcessor.State, util.SafeString(result.State)) + case "all": + assert.Equal(t, "test-processor", util.SafeString(result.ProcessorName)) + assert.NotNil(t, result.Pipeline) + assert.NotNil(t, result.Stats) + require.NotNil(t, result.Options.Dlq) + assert.Equal(t, "dlq-collection", util.SafeString(result.Options.Dlq.Coll)) + case "preservedOptions": + require.NotNil(t, result.Options.Dlq) + assert.Equal(t, "existing-dlq-collection", util.SafeString(result.Options.Dlq.Coll)) + } + } + }) + } +} + +func TestRoundTripConversions(t *testing.T) { + t.Run("pipelineRoundTrip", func(t *testing.T) { + originalPipeline := `[{"$match": {"status": "active"}}, {"$group": {"_id": "$category", "count": {"$sum": 1}}}]` + sdkPipeline, err := resource.ConvertPipelineToSdk(originalPipeline) + require.NoError(t, err) + convertedBack, err := resource.ConvertPipelineToString(sdkPipeline) + require.NoError(t, err) + + var original, converted any + require.NoError(t, json.Unmarshal([]byte(originalPipeline), &original)) + require.NoError(t, json.Unmarshal([]byte(convertedBack), &converted)) + assert.Equal(t, original, converted) + }) + + t.Run("statsRoundTrip", func(t *testing.T) { + originalStats := map[string]any{ + "bytesProcessed": 1000, + "recordsProcessed": 100, + "nested": map[string]any{"value": 42}, + } + statsString, err := resource.ConvertStatsToString(originalStats) + require.NoError(t, err) + + var parsedStats any + require.NoError(t, json.Unmarshal([]byte(statsString), &parsedStats)) + parsedMap := parsedStats.(map[string]any) + + assert.InDelta(t, float64(1000), parsedMap["bytesProcessed"], 0.01) + assert.InDelta(t, float64(100), parsedMap["recordsProcessed"], 0.01) + nested := parsedMap["nested"].(map[string]any) + assert.InDelta(t, float64(42), nested["value"], 0.01) + }) +} diff --git a/cfn-resources/stream-processor/cmd/resource/model.go b/cfn-resources/stream-processor/cmd/resource/model.go new file mode 100644 index 000000000..8d05e02fd --- /dev/null +++ b/cfn-resources/stream-processor/cmd/resource/model.go @@ -0,0 +1,36 @@ +// Code generated by 'cfn generate', changes will be undone by the next invocation. DO NOT EDIT. +// Updates to this type are made my editing the schema file and executing the 'generate' command. +package resource + +// Model is autogenerated from the json schema +type Model struct { + Profile *string `json:",omitempty"` + ProjectId *string `json:",omitempty"` + WorkspaceName *string `json:",omitempty"` + ProcessorName *string `json:",omitempty"` + Pipeline *string `json:",omitempty"` + DesiredState *string `json:",omitempty"` + State *string `json:",omitempty"` + Options *StreamsOptions `json:",omitempty"` + Id *string `json:",omitempty"` + Stats *string `json:",omitempty"` + Timeouts *Timeouts `json:",omitempty"` + DeleteOnCreateTimeout *bool `json:",omitempty"` +} + +// StreamsOptions is autogenerated from the json schema +type StreamsOptions struct { + Dlq *StreamsDLQ `json:",omitempty"` +} + +// StreamsDLQ is autogenerated from the json schema +type StreamsDLQ struct { + Coll *string `json:",omitempty"` + ConnectionName *string `json:",omitempty"` + Db *string `json:",omitempty"` +} + +// Timeouts is autogenerated from the json schema +type Timeouts struct { + Create *string `json:",omitempty"` +} diff --git a/cfn-resources/stream-processor/cmd/resource/resource.go b/cfn-resources/stream-processor/cmd/resource/resource.go new file mode 100644 index 000000000..f02018fc8 --- /dev/null +++ b/cfn-resources/stream-processor/cmd/resource/resource.go @@ -0,0 +1,98 @@ +// Copyright 2026 MongoDB Inc +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package resource + +import ( + "time" + + "github.com/aws-cloudformation/cloudformation-cli-go-plugin/cfn/handler" + "github.com/mongodb/mongodbatlas-cloudformation-resources/util" + "github.com/mongodb/mongodbatlas-cloudformation-resources/util/constants" + "github.com/mongodb/mongodbatlas-cloudformation-resources/util/validator" +) + +const ( + InitiatingState = "INIT" + CreatingState = "CREATING" + CreatedState = "CREATED" + StartedState = "STARTED" + StoppedState = "STOPPED" + DroppedState = "DROPPED" + FailedState = "FAILED" +) + +const ( + defaultCallbackDelaySeconds = 3 + DefaultCreateTimeout = 20 * time.Minute +) + +var ( + createRequiredFields = []string{constants.ProjectID, constants.WorkspaceName, constants.ProcessorName, constants.Pipeline} + readUpdateDeleteRequiredFields = []string{constants.ProjectID, constants.WorkspaceName, constants.ProcessorName} + listRequiredFields = []string{constants.ProjectID, constants.WorkspaceName} +) + +func Create(req handler.Request, prevModel *Model, model *Model) (handler.ProgressEvent, error) { + client, setupErr := setupRequest(req, model, createRequiredFields) + if setupErr != nil { + return *setupErr, nil + } + return HandleCreate(&req, client, model), nil +} + +func Read(req handler.Request, prevModel *Model, model *Model) (handler.ProgressEvent, error) { + client, setupErr := setupRequest(req, model, readUpdateDeleteRequiredFields) + if setupErr != nil { + return *setupErr, nil + } + return HandleRead(&req, client, model), nil +} + +func Update(req handler.Request, prevModel *Model, model *Model) (handler.ProgressEvent, error) { + client, setupErr := setupRequest(req, model, readUpdateDeleteRequiredFields) + if setupErr != nil { + return *setupErr, nil + } + return HandleUpdate(&req, client, prevModel, model), nil +} + +func Delete(req handler.Request, prevModel *Model, model *Model) (handler.ProgressEvent, error) { + client, setupErr := setupRequest(req, model, readUpdateDeleteRequiredFields) + if setupErr != nil { + return *setupErr, nil + } + return HandleDelete(&req, client, model), nil +} + +func List(req handler.Request, prevModel *Model, model *Model) (handler.ProgressEvent, error) { + client, setupErr := setupRequest(req, model, listRequiredFields) + if setupErr != nil { + return *setupErr, nil + } + return HandleList(&req, client, model), nil +} + +func setupRequest(req handler.Request, model *Model, requiredFields []string) (*util.MongoDBClient, *handler.ProgressEvent) { + util.SetupLogger("mongodb-atlas-stream-processor") + if modelValidation := validator.ValidateModel(requiredFields, model); modelValidation != nil { + return nil, modelValidation + } + util.SetDefaultProfileIfNotDefined(&model.Profile) + client, peErr := util.NewAtlasClient(&req, model.Profile) + if peErr != nil { + return nil, peErr + } + return client, nil +} diff --git a/cfn-resources/stream-processor/docs/README.md b/cfn-resources/stream-processor/docs/README.md new file mode 100644 index 000000000..8eab21b46 --- /dev/null +++ b/cfn-resources/stream-processor/docs/README.md @@ -0,0 +1,167 @@ +# MongoDB::Atlas::StreamProcessor + +Returns, adds, edits, and removes Atlas Stream Processors. + +## Syntax + +To declare this entity in your AWS CloudFormation template, use the following syntax: + +### JSON + +
+{
+    "Type" : "MongoDB::Atlas::StreamProcessor",
+    "Properties" : {
+        "Profile" : String,
+        "ProjectId" : String,
+        "WorkspaceName" : String,
+        "ProcessorName" : String,
+        "Pipeline" : String,
+        "DesiredState" : String,
+        "Options" : StreamsOptions,
+        "Timeouts" : Timeouts,
+        "DeleteOnCreateTimeout" : Boolean
+    }
+}
+
+ +### YAML + +
+Type: MongoDB::Atlas::StreamProcessor
+Properties:
+    Profile: String
+    ProjectId: String
+    WorkspaceName: String
+    ProcessorName: String
+    Pipeline: String
+    DesiredState: String
+    Options: StreamsOptions
+    Timeouts: Timeouts
+    DeleteOnCreateTimeout: Boolean
+
+ +## Properties + +#### Profile + +Profile used to provide credentials information, (a secret with the cfn/atlas/profile/{Profile}, is required), if not provided default is used + +_Required_: No + +_Type_: String + +_Update requires_: [Replacement](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/using-cfn-updating-stacks-update-behaviors.html#update-replacement) + +#### ProjectId + +Unique 24-hexadecimal digit string that identifies your project. + +**NOTE**: Groups and projects are synonymous terms. Your group id is the same as your project id. For existing groups, your group/project id remains the same. The resource and corresponding endpoints use the term groups. + +_Required_: Yes + +_Type_: String + +_Minimum Length_: 24 + +_Maximum Length_: 24 + +_Pattern_: ^([a-f0-9]{24})$ + +_Update requires_: [Replacement](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/using-cfn-updating-stacks-update-behaviors.html#update-replacement) + +#### WorkspaceName + +Label that identifies the stream processing workspace. + +_Required_: Yes + +_Type_: String + +_Update requires_: [Replacement](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/using-cfn-updating-stacks-update-behaviors.html#update-replacement) + +#### ProcessorName + +Label that identifies the stream processor. + +_Required_: Yes + +_Type_: String + +_Update requires_: [Replacement](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/using-cfn-updating-stacks-update-behaviors.html#update-replacement) + +#### Pipeline + +Stream aggregation pipeline you want to apply to your streaming data. This should be a JSON-encoded array of pipeline stages. Refer to MongoDB Atlas Docs for more information on stream aggregation pipelines. + +_Required_: Yes + +_Type_: String + +_Update requires_: [No interruption](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/using-cfn-updating-stacks-update-behaviors.html#update-no-interrupt) + +#### DesiredState + +The desired state of the stream processor. Used to start or stop the Stream Processor. Valid values are CREATED, STARTED or STOPPED. When a Stream Processor is created without specifying the desired state, it will default to CREATED state. When a Stream Processor is updated without specifying the desired state, it will default to the Previous state. + +**NOTE** When a Stream Processor is updated without specifying the desired state, it is stopped and then restored to previous state upon update completion. + +_Required_: No + +_Type_: String + +_Allowed Values_: CREATED | STARTED | STOPPED + +_Update requires_: [No interruption](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/using-cfn-updating-stacks-update-behaviors.html#update-no-interrupt) + +#### Options + +Optional configuration for the stream processor. + +_Required_: No + +_Type_: StreamsOptions + +_Update requires_: [No interruption](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/using-cfn-updating-stacks-update-behaviors.html#update-no-interrupt) + +#### Timeouts + +Configurable timeouts for stream processor operations. + +_Required_: No + +_Type_: Timeouts + +_Update requires_: [No interruption](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/using-cfn-updating-stacks-update-behaviors.html#update-no-interrupt) + +#### DeleteOnCreateTimeout + +Indicates whether to delete the resource being created if a timeout is reached when waiting for completion. When set to `true` and timeout occurs, it triggers the deletion and returns immediately without waiting for deletion to complete. When set to `false`, the timeout will not trigger resource deletion. If you suspect a transient error when the value is `true`, wait before retrying to allow resource deletion to finish. Default is `true`. + +_Required_: No + +_Type_: Boolean + +_Update requires_: [No interruption](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/using-cfn-updating-stacks-update-behaviors.html#update-no-interrupt) + +## Return Values + +### Fn::GetAtt + +The `Fn::GetAtt` intrinsic function returns a value for a specified attribute of this type. The following are the available attributes and sample return values. + +For more information about using the `Fn::GetAtt` intrinsic function, see [Fn::GetAtt](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/intrinsic-function-reference-getatt.html). + +#### Id + +Unique 24-hexadecimal character string that identifies the stream processor. + +#### Stats + +The stats associated with the stream processor as a JSON string. Refer to the MongoDB Atlas Docs for more information. + +#### State + +The actual current state of the stream processor as returned by the Atlas API. Commonly occurring states are 'CREATED', 'STARTED', 'STOPPED' and 'FAILED'. This is a read-only property that reflects the real-time state of the processor. + diff --git a/cfn-resources/stream-processor/docs/streamsdlq.md b/cfn-resources/stream-processor/docs/streamsdlq.md new file mode 100644 index 000000000..e99782f57 --- /dev/null +++ b/cfn-resources/stream-processor/docs/streamsdlq.md @@ -0,0 +1,58 @@ +# MongoDB::Atlas::StreamProcessor StreamsDLQ + +Dead letter queue for the stream processor. Refer to the MongoDB Atlas Docs for more information. + +## Syntax + +To declare this entity in your AWS CloudFormation template, use the following syntax: + +### JSON + +
+{
+    "Coll" : String,
+    "ConnectionName" : String,
+    "Db" : String
+}
+
+ +### YAML + +
+Coll: String
+ConnectionName: String
+Db: String
+
+ +## Properties + +#### Coll + +Name of the collection to use for the DLQ. + +_Required_: Yes + +_Type_: String + +_Update requires_: [No interruption](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/using-cfn-updating-stacks-update-behaviors.html#update-no-interrupt) + +#### ConnectionName + +Name of the connection to write DLQ messages to. Must be an Atlas connection. + +_Required_: Yes + +_Type_: String + +_Update requires_: [No interruption](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/using-cfn-updating-stacks-update-behaviors.html#update-no-interrupt) + +#### Db + +Name of the database to use for the DLQ. + +_Required_: Yes + +_Type_: String + +_Update requires_: [No interruption](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/using-cfn-updating-stacks-update-behaviors.html#update-no-interrupt) + diff --git a/cfn-resources/stream-processor/docs/streamsoptions.md b/cfn-resources/stream-processor/docs/streamsoptions.md new file mode 100644 index 000000000..015dd98f3 --- /dev/null +++ b/cfn-resources/stream-processor/docs/streamsoptions.md @@ -0,0 +1,34 @@ +# MongoDB::Atlas::StreamProcessor StreamsOptions + +Optional configuration for the stream processor. + +## Syntax + +To declare this entity in your AWS CloudFormation template, use the following syntax: + +### JSON + +
+{
+    "Dlq" : StreamsDLQ
+}
+
+ +### YAML + +
+Dlq: StreamsDLQ
+
+ +## Properties + +#### Dlq + +Dead letter queue for the stream processor. Refer to the MongoDB Atlas Docs for more information. + +_Required_: Yes + +_Type_: StreamsDLQ + +_Update requires_: [No interruption](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/using-cfn-updating-stacks-update-behaviors.html#update-no-interrupt) + diff --git a/cfn-resources/stream-processor/docs/timeouts.md b/cfn-resources/stream-processor/docs/timeouts.md new file mode 100644 index 000000000..08c397cd8 --- /dev/null +++ b/cfn-resources/stream-processor/docs/timeouts.md @@ -0,0 +1,34 @@ +# MongoDB::Atlas::StreamProcessor Timeouts + +Configurable timeouts for stream processor operations. + +## Syntax + +To declare this entity in your AWS CloudFormation template, use the following syntax: + +### JSON + +
+{
+    "Create" : String
+}
+
+ +### YAML + +
+Create: String
+
+ +## Properties + +#### Create + +Timeout for create operation in Go duration format (e.g., '5m', '10s'). Default is 20 minutes. + +_Required_: No + +_Type_: String + +_Update requires_: [No interruption](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/using-cfn-updating-stacks-update-behaviors.html#update-no-interrupt) + diff --git a/cfn-resources/stream-processor/mongodb-atlas-streamprocessor.json b/cfn-resources/stream-processor/mongodb-atlas-streamprocessor.json new file mode 100644 index 000000000..9999078ab --- /dev/null +++ b/cfn-resources/stream-processor/mongodb-atlas-streamprocessor.json @@ -0,0 +1,144 @@ +{ + "typeName": "MongoDB::Atlas::StreamProcessor", + "description": "Returns, adds, edits, and removes Atlas Stream Processors.", + "sourceUrl": "https://github.com/mongodb/mongodbatlas-cloudformation-resources.git", + "definitions": { + "StreamsDLQ": { + "type": "object", + "description": "Dead letter queue for the stream processor. Refer to the MongoDB Atlas Docs for more information.", + "properties": { + "Coll": { + "type": "string", + "description": "Name of the collection to use for the DLQ." + }, + "ConnectionName": { + "type": "string", + "description": "Name of the connection to write DLQ messages to. Must be an Atlas connection." + }, + "Db": { + "type": "string", + "description": "Name of the database to use for the DLQ." + } + }, + "required": ["Coll", "ConnectionName", "Db"], + "additionalProperties": false + }, + "StreamsOptions": { + "type": "object", + "description": "Optional configuration for the stream processor.", + "properties": { + "Dlq": { + "$ref": "#/definitions/StreamsDLQ" + } + }, + "required": ["Dlq"], + "additionalProperties": false + }, + "Timeouts": { + "type": "object", + "description": "Configurable timeouts for stream processor operations.", + "properties": { + "Create": { + "type": "string", + "description": "Timeout for create operation in Go duration format (e.g., '5m', '10s'). Default is 20 minutes." + } + }, + "additionalProperties": false + } + }, + "properties": { + "Profile": { + "type": "string", + "description": "Profile used to provide credentials information, (a secret with the cfn/atlas/profile/{Profile}, is required), if not provided default is used", + "default": "default" + }, + "ProjectId": { + "type": "string", + "description": "Unique 24-hexadecimal digit string that identifies your project. \n\n**NOTE**: Groups and projects are synonymous terms. Your group id is the same as your project id. For existing groups, your group/project id remains the same. The resource and corresponding endpoints use the term groups.", + "maxLength": 24, + "minLength": 24, + "pattern": "^([a-f0-9]{24})$" + }, + "WorkspaceName": { + "type": "string", + "description": "Label that identifies the stream processing workspace." + }, + "ProcessorName": { + "type": "string", + "description": "Label that identifies the stream processor." + }, + "Pipeline": { + "type": "string", + "description": "Stream aggregation pipeline you want to apply to your streaming data. This should be a JSON-encoded array of pipeline stages. Refer to MongoDB Atlas Docs for more information on stream aggregation pipelines." + }, + "DesiredState": { + "type": "string", + "description": "The desired state of the stream processor. Used to start or stop the Stream Processor. Valid values are CREATED, STARTED or STOPPED. When a Stream Processor is created without specifying the desired state, it will default to CREATED state. When a Stream Processor is updated without specifying the desired state, it will default to the Previous state.\n\n**NOTE** When a Stream Processor is updated without specifying the desired state, it is stopped and then restored to previous state upon update completion.", + "enum": ["CREATED", "STARTED", "STOPPED"] + }, + "State": { + "type": "string", + "description": "The actual current state of the stream processor as returned by the Atlas API. Commonly occurring states are 'CREATED', 'STARTED', 'STOPPED' and 'FAILED'. This is a read-only property that reflects the real-time state of the processor." + }, + "Options": { + "$ref": "#/definitions/StreamsOptions" + }, + "Id": { + "type": "string", + "description": "Unique 24-hexadecimal character string that identifies the stream processor." + }, + "Stats": { + "type": "string", + "description": "The stats associated with the stream processor as a JSON string. Refer to the MongoDB Atlas Docs for more information." + }, + "Timeouts": { + "$ref": "#/definitions/Timeouts", + "description": "Configurable timeouts for stream processor operations." + }, + "DeleteOnCreateTimeout": { + "type": "boolean", + "description": "Indicates whether to delete the resource being created if a timeout is reached when waiting for completion. When set to `true` and timeout occurs, it triggers the deletion and returns immediately without waiting for deletion to complete. When set to `false`, the timeout will not trigger resource deletion. If you suspect a transient error when the value is `true`, wait before retrying to allow resource deletion to finish. Default is `true`." + } + }, + "additionalProperties": false, + "required": ["ProjectId", "WorkspaceName", "ProcessorName", "Pipeline"], + "readOnlyProperties": [ + "/properties/Id", + "/properties/Stats", + "/properties/State" + ], + "writeOnlyProperties": ["/properties/DeleteOnCreateTimeout"], + "primaryIdentifier": [ + "/properties/ProjectId", + "/properties/WorkspaceName", + "/properties/ProcessorName", + "/properties/Profile" + ], + "createOnlyProperties": [ + "/properties/ProjectId", + "/properties/WorkspaceName", + "/properties/ProcessorName", + "/properties/Profile" + ], + "handlers": { + "create": { + "permissions": ["secretsmanager:GetSecretValue"] + }, + "read": { + "permissions": ["secretsmanager:GetSecretValue"] + }, + "update": { + "permissions": ["secretsmanager:GetSecretValue"] + }, + "delete": { + "permissions": ["secretsmanager:GetSecretValue"] + }, + "list": { + "permissions": ["secretsmanager:GetSecretValue"] + } + }, + "documentationUrl": "https://github.com/mongodb/mongodbatlas-cloudformation-resources/blob/master/cfn-resources/stream-processor/README.md", + "tagging": { + "taggable": false + } +} diff --git a/cfn-resources/stream-processor/resource-role.yaml b/cfn-resources/stream-processor/resource-role.yaml new file mode 100644 index 000000000..bc6022d7d --- /dev/null +++ b/cfn-resources/stream-processor/resource-role.yaml @@ -0,0 +1,38 @@ +AWSTemplateFormatVersion: "2010-09-09" +Description: > + This CloudFormation template creates a role assumed by CloudFormation + during CRUDL operations to mutate resources on behalf of the customer. + +Resources: + ExecutionRole: + Type: AWS::IAM::Role + Properties: + MaxSessionDuration: 8400 + AssumeRolePolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Principal: + Service: resources.cloudformation.amazonaws.com + Action: sts:AssumeRole + Condition: + StringEquals: + aws:SourceAccount: + Ref: AWS::AccountId + StringLike: + aws:SourceArn: + Fn::Sub: arn:${AWS::Partition}:cloudformation:${AWS::Region}:${AWS::AccountId}:type/resource/MongoDB-Atlas-StreamProcessor/* + Path: "/" + Policies: + - PolicyName: ResourceTypePolicy + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - "secretsmanager:GetSecretValue" + Resource: "*" +Outputs: + ExecutionRoleArn: + Value: + Fn::GetAtt: ExecutionRole.Arn diff --git a/cfn-resources/stream-processor/template.yml b/cfn-resources/stream-processor/template.yml new file mode 100644 index 000000000..ad114f643 --- /dev/null +++ b/cfn-resources/stream-processor/template.yml @@ -0,0 +1,27 @@ +AWSTemplateFormatVersion: "2010-09-09" +Transform: AWS::Serverless-2016-10-31 +Description: AWS SAM template for the MongoDB::Atlas::StreamProcessor resource type + +Globals: + Function: + Timeout: 180 # docker start-up times can be long for SAM CLI + MemorySize: 256 + +Resources: + TypeFunction: + Type: AWS::Serverless::Function + Properties: + Handler: bootstrap + Runtime: provided.al2 + CodeUri: bin/ + + TestEntrypoint: + Type: AWS::Serverless::Function + Properties: + Handler: bootstrap + Runtime: provided.al2 + CodeUri: bin/ + Environment: + Variables: + MODE: Test + diff --git a/cfn-resources/stream-processor/test/README.md b/cfn-resources/stream-processor/test/README.md new file mode 100644 index 000000000..54bc6e0e0 --- /dev/null +++ b/cfn-resources/stream-processor/test/README.md @@ -0,0 +1,168 @@ +# MongoDB::Atlas::StreamProcessor + +## Impact + +The following components use this resource and are potentially impacted by any changes. They should also be validated to ensure the changes do not cause a regression. + +- Stream Processor L1 CDK constructor + +## Prerequisites + +### Resources needed to run the manual QA + +All resources are created as part of `cfn-testing-helper.sh`: + +- Atlas Project +- Atlas Stream Instance/Workspace (LONG-RUNNING operation, can take 10-30+ minutes) +- Cluster (for DLQ connection testing - inputs_3) +- Stream Connection (for DLQ connection testing - inputs_3) + +**IMPORTANT**: Stream Instance/Workspace creation is a LONG-RUNNING operation that can take 10-30+ minutes. The `cfn-test-create-inputs.sh` script will create the workspace and wait for it to be ready before proceeding. + +## Manual QA + +Please follow the steps in [TESTING.md](../../../TESTING.md). + +### Success criteria when testing the resource + +#### 1. Resource Creation Verification + +A Stream Processor should be created in the specified test project for the specified Atlas Stream workspace/instance: + +**Atlas UI Verification:** + +- Navigate to Atlas UI → Your Project → Stream Processing +- Select the stream workspace/instance used in the test +- Go to the **Processors** tab +- Verify the processor appears with: + - **Name**: Matches the `ProcessorName` from the test input + - **State**: Matches the `State` in the template (CREATED, STARTED, or STOPPED) + - **Pipeline**: Click on the processor to view details and verify: + - Pipeline stages match the `Pipeline` configuration in the template + - Source connection name is correct + - Merge target connection, database, and collection are correct + +**Atlas CLI Verification:** + +```bash +atlas streams processors describe \ + --instance \ + --projectId +``` + +- Verify `id` field is present (matches CloudFormation `Id` attribute) +- Verify `name` matches `ProcessorName` +- Verify `state` matches `State` parameter +- Verify `pipeline` array matches the `Pipeline` JSON string + +#### 2. DLQ Configuration Verification (inputs_3) + +For processors with DLQ configuration: + +- In Atlas UI: Verify DLQ settings are displayed in processor details +- Via Atlas CLI: Verify `options.dlq` object contains: + - `connectionName`: Matches `Options.Dlq.ConnectionName` + - `db`: Matches `Options.Dlq.Db` + - `coll`: Matches `Options.Dlq.Coll` + +#### 3. Backward Compatibility Testing + +Test both field names work correctly: + +- **Test with `WorkspaceName`** (preferred field): + - Create processor using `WorkspaceName` parameter + - Verify processor is created successfully + - Verify `WorkspaceName` is set in returned model (for primary identifier) + +#### 4. State Transition Testing + +Test all valid state transitions: + +- **Create with `State: CREATED`**: + - Verify processor is created in CREATED state + - Verify processor does not start processing automatically +- **Create with `State: STARTED`**: + - Verify processor is created and transitions to STARTED state + - Verify this is a long-running operation (may take several minutes) + - Verify callback-based state management handles the transition +- **Update state from CREATED to STARTED**: + - Verify processor stops (if needed) before update + - Verify processor starts after update completes + - Verify state transition is successful +- **Update state from STARTED to STOPPED**: + - Verify processor stops before update + - Verify processor remains stopped after update + - Verify state transition is successful + +#### 5. Timeout and Cleanup Behavior + +- **Verify `Timeouts.Create` is respected**: + - Set a short timeout (e.g., 1 minute) for a processor that takes longer to start + - Verify timeout is triggered after the specified duration +- **Verify `DeleteOnCreateTimeout` behavior**: + - When `DeleteOnCreateTimeout: true` and timeout occurs: + - Verify processor deletion is triggered + - Verify resource is cleaned up from Atlas + - When `DeleteOnCreateTimeout: false` and timeout occurs: + - Verify processor is not deleted + - Verify resource remains in Atlas (may be in partial state) + +#### 6. Primary Identifier Verification + +Verify all primary identifier fields are present in returned models: + +- `ProjectId`: Always present +- `WorkspaceName`: Always present +- `ProcessorName`: Always present +- `Profile`: Always present + +This is critical for CloudFormation to properly track the resource. + +#### 7. General CFN Resource Success Criteria + +Ensure general [CFN resource success criteria](../../../TESTING.md#success-criteria-when-testing-the-resource) for this resource is met: + +- All CRUD operations work correctly +- Read-after-Create returns correct values +- Update operations preserve primary identifier +- Delete operations clean up resources +- Error handling is appropriate + +## Important Links + +- [API Documentation](https://www.mongodb.com/docs/api/doc/atlas-admin-api-v2/group/endpoint-streams) +- [Resource Usage Documentation](https://www.mongodb.com/docs/atlas/atlas-sp/overview/) + +## Unit Testing Locally + +The local tests are integrated with the AWS `sam local` and `cfn invoke` tooling features: + +``` +sam local start-lambda --skip-pull-image +``` + +then in another shell: + +```bash +repo_root=$(git rev-parse --show-toplevel) +cd ${repo_root}/cfn-resources/stream-processor +cfn invoke resource CREATE stream-processor.sample-cfn-request.json +cfn invoke resource DELETE stream-processor.sample-cfn-request.json +cd - +``` + +Both CREATE & DELETE tests must pass. + +## Test Input Files + +The test directory contains the following input files: + +- `inputs_1_create.template.json` / `inputs_1_update.template.json`: Basic stream processor with WorkspaceName, CREATED state +- `inputs_2_create.template.json` / `inputs_2_update.template.json`: Stream processor with STARTED state, timeout configuration, and DeleteOnCreateTimeout +- `inputs_3_create.template.json` / `inputs_3_update.template.json`: Stream processor with DLQ options + +All input files respect: + +- AWS-only behavior (no Azure/GCP-only parameters) +- Required fields: ProjectId, WorkspaceName, ProcessorName, Pipeline +- Schema validation: All fields match the final CFN schema diff --git a/cfn-resources/stream-processor/test/cfn-test-create-inputs.sh b/cfn-resources/stream-processor/test/cfn-test-create-inputs.sh new file mode 100755 index 000000000..286f77378 --- /dev/null +++ b/cfn-resources/stream-processor/test/cfn-test-create-inputs.sh @@ -0,0 +1,300 @@ +#!/usr/bin/env bash +# cfn-test-create-inputs.sh +# +# This tool generates json files in the inputs/ for `cfn test`. +# + +set -o errexit +set -o nounset +set -o pipefail + +rm -rf inputs +mkdir inputs + +#set profile +profile="default" +if [ ${MONGODB_ATLAS_PROFILE+x} ]; then + echo "profile set to ${MONGODB_ATLAS_PROFILE}" + profile=${MONGODB_ATLAS_PROFILE} +fi + +projectName="${1:-$PROJECT_NAME}" +echo "$projectName" +projectId=$(atlas projects list --output json | jq --arg NAME "${projectName}" -r '.results[] | select(.name==$NAME) | .id') +if [ -z "$projectId" ]; then + projectId=$(atlas projects create "${projectName}" --output=json | jq -r '.id') + + echo -e "Created project \"${projectName}\" with id: ${projectId}\n" +else + echo -e "FOUND project \"${projectName}\" with id: ${projectId}\n" +fi +echo -e "=====\nrun this command to clean up\n=====\nmongocli iam projects delete ${projectId} --force\n=====" + +# Create Stream Instance/Workspace (this is a LONG-RUNNING operation, can take 10-30+ minutes) +workspaceName="stream-workspace-$(date +%s)-$RANDOM" +cloudProvider="AWS" + +echo -e "Creating Stream Instance/Workspace \"${workspaceName}\" (this may take 10-30+ minutes)...\n" +atlas streams instances create "${workspaceName}" --projectId "${projectId}" --region VIRGINIA_USA --provider ${cloudProvider} +echo -e "Waiting for Stream Instance/Workspace \"${workspaceName}\" to be ready...\n" +# Poll until the stream instance is ready (watch command doesn't exist for stream instances) +while true; do + hostnames=$(atlas streams instances describe "${workspaceName}" --projectId "${projectId}" --output json 2>/dev/null | jq -r '.hostnames[]? // empty' 2>/dev/null | head -1) + if [ -n "$hostnames" ]; then + echo -e "Stream Instance/Workspace \"${workspaceName}\" is ready\n" + break + fi + sleep 10 +done + +# For inputs_3 (DLQ testing), we need a cluster and stream connection +# Create cluster for DLQ connection (if needed) +clusterName="cluster-$(date +%s)-$RANDOM" +connectionName="stream-connection-$(date +%s)-$RANDOM" + +echo -e "Creating Cluster \"${clusterName}\" for DLQ connection...\n" +atlas clusters create "${clusterName}" --projectId "${projectId}" --backup --provider AWS --region US_EAST_1 --members 3 --tier M10 --diskSizeGB 10 --output=json +atlas clusters watch "${clusterName}" --projectId "${projectId}" +echo -e "Created Cluster \"${clusterName}\"\n" + +echo -e "Creating Stream Connection \"${connectionName}\" for DLQ...\n" +# Create temporary JSON file for connection configuration using jq (consistent with rest of script) +connectionConfig=$(mktemp).json +jq -n \ + --arg type "Cluster" \ + --arg clusterName "${clusterName}" \ + '{ + "type": $type, + "clusterName": $clusterName, + "dbRoleToExecute": { + "role": "atlasAdmin", + "type": "BUILT_IN" + } + }' > "${connectionConfig}" +atlas streams connections create "${connectionName}" \ + --projectId "${projectId}" \ + --instance "${workspaceName}" \ + --file "${connectionConfig}" \ + --output=json +rm -f "${connectionConfig}" +echo -e "Created Stream Connection \"${connectionName}\"\n" + +# Create Sample connection for inputs_1 and inputs_2 (sample_stream_solar) +sampleConnectionName="sample_stream_solar" +echo -e "Creating Sample Stream Connection \"${sampleConnectionName}\" for inputs_1 and inputs_2...\n" +sampleConnectionConfig=$(mktemp).json +jq -n \ + --arg type "Sample" \ + '{ + "type": $type + }' > "${sampleConnectionConfig}" +# Check if connection already exists +if atlas streams connections describe "${sampleConnectionName}" --projectId "${projectId}" --instance "${workspaceName}" --output json >/dev/null 2>&1; then + echo "Sample connection \"${sampleConnectionName}\" already exists, skipping creation" +else + atlas streams connections create "${sampleConnectionName}" \ + --projectId "${projectId}" \ + --instance "${workspaceName}" \ + --file "${sampleConnectionConfig}" \ + --output=json + echo -e "Created Sample Stream Connection \"${sampleConnectionName}\"\n" +fi +rm -f "${sampleConnectionConfig}" + +# Reuse the Cluster connection from inputs_3 for inputs_1 and inputs_2 sink (saves time/resources) +# No need to create a separate cluster - we'll use the same connectionName + +# Create Kafka connections for inputs_4 and inputs_5 (Kafka to Cluster and Cluster to Kafka) +# Using placeholder values matching Terraform tests (as per MongoDB team guidance) +kafkaSourceConnectionName="KafkaConnectionSrc-$(date +%s)-$RANDOM" +kafkaSinkConnectionName="KafkaConnectionDest-$(date +%s)-$RANDOM" + +echo -e "Creating Kafka Source Connection \"${kafkaSourceConnectionName}\" for inputs_4...\n" +kafkaSourceConnectionConfig=$(mktemp).json +jq -n \ + --arg type "Kafka" \ + --arg bootstrapServers "localhost:9092,localhost:9092" \ + --arg mechanism "PLAIN" \ + --arg username "user" \ + --arg password "rawpassword" \ + --arg protocol "SASL_PLAINTEXT" \ + '{ + "type": $type, + "bootstrapServers": $bootstrapServers, + "authentication": { + "mechanism": $mechanism, + "username": $username, + "password": $password + }, + "security": { + "protocol": $protocol + }, + "config": { + "auto.offset.reset": "earliest" + }, + "networking": { + "access": { + "type": "PUBLIC" + } + } + }' > "${kafkaSourceConnectionConfig}" +atlas streams connections create "${kafkaSourceConnectionName}" \ + --projectId "${projectId}" \ + --instance "${workspaceName}" \ + --file "${kafkaSourceConnectionConfig}" \ + --output=json +rm -f "${kafkaSourceConnectionConfig}" +echo -e "Created Kafka Source Connection \"${kafkaSourceConnectionName}\"\n" + +echo -e "Creating Kafka Sink Connection \"${kafkaSinkConnectionName}\" for inputs_5...\n" +kafkaSinkConnectionConfig=$(mktemp).json +jq -n \ + --arg type "Kafka" \ + --arg bootstrapServers "localhost:9092,localhost:9092" \ + --arg mechanism "PLAIN" \ + --arg username "user" \ + --arg password "rawpassword" \ + --arg protocol "SASL_PLAINTEXT" \ + '{ + "type": $type, + "bootstrapServers": $bootstrapServers, + "authentication": { + "mechanism": $mechanism, + "username": $username, + "password": $password + }, + "security": { + "protocol": $protocol + }, + "config": { + "auto.offset.reset": "earliest" + }, + "networking": { + "access": { + "type": "PUBLIC" + } + } + }' > "${kafkaSinkConnectionConfig}" +atlas streams connections create "${kafkaSinkConnectionName}" \ + --projectId "${projectId}" \ + --instance "${workspaceName}" \ + --file "${kafkaSinkConnectionConfig}" \ + --output=json +rm -f "${kafkaSinkConnectionConfig}" +echo -e "Created Kafka Sink Connection \"${kafkaSinkConnectionName}\"\n" + +# Generate input files +# Reuse connectionName from inputs_3 for inputs_1 and inputs_2 sink (saves creating another cluster) +jq --arg workspace_name "$workspaceName" \ + --arg project_id "$projectId" \ + --arg profile "$profile" \ + --arg sink_connection_name "$connectionName" \ + '.Profile?|=$profile + | .ProjectId?|=$project_id + | .WorkspaceName?|=$workspace_name + | .Pipeline?|=gsub("SINK_CONNECTION_PLACEHOLDER"; $sink_connection_name)' \ + "$(dirname "$0")/inputs_1_create.template.json" >"inputs/inputs_1_create.json" + +jq --arg workspace_name "$workspaceName" \ + --arg project_id "$projectId" \ + --arg profile "$profile" \ + --arg sink_connection_name "$connectionName" \ + '.Profile?|=$profile + | .ProjectId?|=$project_id + | .WorkspaceName?|=$workspace_name + | .Pipeline?|=gsub("SINK_CONNECTION_PLACEHOLDER"; $sink_connection_name)' \ + "$(dirname "$0")/inputs_1_update.template.json" >"inputs/inputs_1_update.json" + +jq --arg workspace_name "$workspaceName" \ + --arg project_id "$projectId" \ + --arg profile "$profile" \ + --arg sink_connection_name "$connectionName" \ + '.Profile?|=$profile + | .ProjectId?|=$project_id + | .WorkspaceName?|=$workspace_name + | .Pipeline?|=gsub("SINK_CONNECTION_PLACEHOLDER"; $sink_connection_name)' \ + "$(dirname "$0")/inputs_2_create.template.json" >"inputs/inputs_2_create.json" + +jq --arg workspace_name "$workspaceName" \ + --arg project_id "$projectId" \ + --arg profile "$profile" \ + --arg sink_connection_name "$connectionName" \ + '.Profile?|=$profile + | .ProjectId?|=$project_id + | .WorkspaceName?|=$workspace_name + | .Pipeline?|=gsub("SINK_CONNECTION_PLACEHOLDER"; $sink_connection_name)' \ + "$(dirname "$0")/inputs_2_update.template.json" >"inputs/inputs_2_update.json" + +jq --arg workspace_name "$workspaceName" \ + --arg project_id "$projectId" \ + --arg profile "$profile" \ + --arg connection_name "$connectionName" \ + '.Profile?|=$profile + | .ProjectId?|=$project_id + | .WorkspaceName?|=$workspace_name + | .Options.Dlq.ConnectionName?|=$connection_name + | .Pipeline?|=gsub("CONNECTION_NAME_PLACEHOLDER"; $connection_name)' \ + "$(dirname "$0")/inputs_3_create.template.json" >"inputs/inputs_3_create.json" + +jq --arg workspace_name "$workspaceName" \ + --arg project_id "$projectId" \ + --arg profile "$profile" \ + --arg connection_name "$connectionName" \ + '.Profile?|=$profile + | .ProjectId?|=$project_id + | .WorkspaceName?|=$workspace_name + | .Options.Dlq.ConnectionName?|=$connection_name + | .Pipeline?|=gsub("CONNECTION_NAME_PLACEHOLDER"; $connection_name)' \ + "$(dirname "$0")/inputs_3_update.template.json" >"inputs/inputs_3_update.json" + +# Generate inputs_4 (Kafka to Cluster) - using Kafka source and Cluster sink +jq --arg workspace_name "$workspaceName" \ + --arg project_id "$projectId" \ + --arg profile "$profile" \ + --arg kafka_source "$kafkaSourceConnectionName" \ + --arg cluster_sink "$connectionName" \ + '.Profile?|=$profile + | .ProjectId?|=$project_id + | .WorkspaceName?|=$workspace_name + | .Pipeline?|=gsub("KAFKA_SOURCE_CONNECTION_PLACEHOLDER"; $kafka_source) + | .Pipeline?|=gsub("CLUSTER_SINK_CONNECTION_PLACEHOLDER"; $cluster_sink)' \ + "$(dirname "$0")/inputs_4_create.template.json" >"inputs/inputs_4_create.json" + +jq --arg workspace_name "$workspaceName" \ + --arg project_id "$projectId" \ + --arg profile "$profile" \ + --arg kafka_source "$kafkaSourceConnectionName" \ + --arg cluster_sink "$connectionName" \ + '.Profile?|=$profile + | .ProjectId?|=$project_id + | .WorkspaceName?|=$workspace_name + | .Pipeline?|=gsub("KAFKA_SOURCE_CONNECTION_PLACEHOLDER"; $kafka_source) + | .Pipeline?|=gsub("CLUSTER_SINK_CONNECTION_PLACEHOLDER"; $cluster_sink)' \ + "$(dirname "$0")/inputs_4_update.template.json" >"inputs/inputs_4_update.json" + +# Generate inputs_5 (Cluster to Kafka) - using Cluster source and Kafka sink +jq --arg workspace_name "$workspaceName" \ + --arg project_id "$projectId" \ + --arg profile "$profile" \ + --arg cluster_source "$connectionName" \ + --arg kafka_sink "$kafkaSinkConnectionName" \ + '.Profile?|=$profile + | .ProjectId?|=$project_id + | .WorkspaceName?|=$workspace_name + | .Pipeline?|=gsub("CLUSTER_SOURCE_CONNECTION_PLACEHOLDER"; $cluster_source) + | .Pipeline?|=gsub("KAFKA_SINK_CONNECTION_PLACEHOLDER"; $kafka_sink)' \ + "$(dirname "$0")/inputs_5_create.template.json" >"inputs/inputs_5_create.json" + +jq --arg workspace_name "$workspaceName" \ + --arg project_id "$projectId" \ + --arg profile "$profile" \ + --arg cluster_source "$connectionName" \ + --arg kafka_sink "$kafkaSinkConnectionName" \ + '.Profile?|=$profile + | .ProjectId?|=$project_id + | .WorkspaceName?|=$workspace_name + | .Pipeline?|=gsub("CLUSTER_SOURCE_CONNECTION_PLACEHOLDER"; $cluster_source) + | .Pipeline?|=gsub("KAFKA_SINK_CONNECTION_PLACEHOLDER"; $kafka_sink)' \ + "$(dirname "$0")/inputs_5_update.template.json" >"inputs/inputs_5_update.json" + +echo -e "Test input files generated successfully in inputs/ directory\n" diff --git a/cfn-resources/stream-processor/test/cfn-test-delete-inputs.sh b/cfn-resources/stream-processor/test/cfn-test-delete-inputs.sh new file mode 100755 index 000000000..4e6b15de7 --- /dev/null +++ b/cfn-resources/stream-processor/test/cfn-test-delete-inputs.sh @@ -0,0 +1,92 @@ +#!/usr/bin/env bash +# cfn-test-delete-inputs.sh +# +# This tool deletes the mongodb resources used for `cfn test` as inputs. +# + +set -euo pipefail + +function usage { + echo "usage:$0 " +} + +projectId=$(jq -r '.ProjectId' ./inputs/inputs_1_create.json) +workspaceName=$(jq -r '.WorkspaceName' ./inputs/inputs_1_create.json) +processorName1=$(jq -r '.ProcessorName' ./inputs/inputs_1_create.json) +processorName2=$(jq -r '.ProcessorName' ./inputs/inputs_2_create.json) +processorName3=$(jq -r '.ProcessorName' ./inputs/inputs_3_create.json) + +# Delete stream processors (if they exist) +for processorName in "$processorName1" "$processorName2" "$processorName3"; do + if atlas streams processors delete "${processorName}" \ + --projectId "${projectId}" \ + --instance "${workspaceName}" \ + --force 2>/dev/null; then + echo "deleted stream processor with name ${processorName}" + else + echo "failed to delete or stream processor '${processorName}' does not exist" + fi +done + +# Delete Sample connection (sample_stream_solar) if it exists +sampleConnectionName="sample_stream_solar" +if atlas streams connections delete "${sampleConnectionName}" \ + --projectId "${projectId}" \ + --instance "${workspaceName}" \ + --force 2>/dev/null; then + echo "deleted sample stream connection with name ${sampleConnectionName}" +else + echo "failed to delete or sample stream connection '${sampleConnectionName}' does not exist" +fi + +# Get connection name from inputs_3 if it exists +if [ -f "./inputs/inputs_3_create.json" ]; then + connectionName=$(jq -r '.Options.Dlq.ConnectionName // empty' ./inputs/inputs_3_create.json) + if [ -n "$connectionName" ]; then + if atlas streams connections delete "${connectionName}" \ + --projectId "${projectId}" \ + --instance "${workspaceName}" \ + --force 2>/dev/null; then + echo "deleted stream connection with name ${connectionName}" + else + echo "failed to delete or stream connection '${connectionName}' does not exist" + fi + fi +fi + +# Delete all clusters in the project (created for DLQ testing) +# The cluster name is not stored in input JSON, so we list and delete all clusters +# Clusters must be deleted before stream instance and project to avoid dependency conflicts +echo "Checking for clusters to delete in project ${projectId}..." +clusterList=$(atlas clusters list --projectId "${projectId}" --output json 2>/dev/null | jq -r '.results[]?.name // empty' 2>/dev/null || echo "") +if [ -n "$clusterList" ]; then + while IFS= read -r clusterName; do + if [ -n "$clusterName" ] && [ "$clusterName" != "null" ] && [ "$clusterName" != "" ]; then + if atlas cluster delete "${clusterName}" --projectId "${projectId}" --force 2>/dev/null; then + echo "deleting cluster with name ${clusterName}" + # Wait for cluster deletion to complete + atlas cluster watch "${clusterName}" --projectId "${projectId}" 2>/dev/null || true + else + echo "failed to delete or cluster '${clusterName}' does not exist" + fi + fi + done <<< "$clusterList" +else + echo "No clusters found in project" +fi + +# Delete stream instance/workspace (after clusters are deleted) +if atlas streams instances delete "${workspaceName}" --projectId "${projectId}" --force 2>/dev/null; then + echo "deleting stream instance/workspace with name ${workspaceName}" + # Wait for deletion to complete + atlas streams instances watch "${workspaceName}" --projectId "${projectId}" 2>/dev/null || true +else + echo "failed to delete or stream instance/workspace '${workspaceName}' does not exist" +fi + +#delete project +if atlas projects delete "$projectId" --force 2>/dev/null; then + echo "$projectId project deletion OK" +else + (echo "Failed cleaning project:$projectId" && exit 1) +fi diff --git a/cfn-resources/stream-processor/test/contract-testing/cfn-test-create.sh b/cfn-resources/stream-processor/test/contract-testing/cfn-test-create.sh new file mode 100755 index 000000000..4b795316e --- /dev/null +++ b/cfn-resources/stream-processor/test/contract-testing/cfn-test-create.sh @@ -0,0 +1,16 @@ +#!/usr/bin/env bash + +# This tool generates the resources and json files in the inputs/ for `cfn test`. +set -o errexit +set -o nounset +set -o pipefail + +projectName="cfn-test-bot-$(date +%s)-$RANDOM" + +# create project +projectId=$(atlas projects create "${projectName}" --output=json | jq -r '.id') + +echo "projectId: $projectId" +echo "projectName: $projectName" + +./test/cfn-test-create-inputs.sh "$projectName" diff --git a/cfn-resources/stream-processor/test/contract-testing/cfn-test-delete.sh b/cfn-resources/stream-processor/test/contract-testing/cfn-test-delete.sh new file mode 100755 index 000000000..a1e063f38 --- /dev/null +++ b/cfn-resources/stream-processor/test/contract-testing/cfn-test-delete.sh @@ -0,0 +1,8 @@ +#!/usr/bin/env bash + +# This tool deletes the mongodb resources used for `cfn test` as inputs. +set -o errexit +set -o nounset +set -o pipefail + +./test/cfn-test-delete-inputs.sh diff --git a/cfn-resources/stream-processor/test/inputs_1_create.template.json b/cfn-resources/stream-processor/test/inputs_1_create.template.json new file mode 100644 index 000000000..7e0a720ec --- /dev/null +++ b/cfn-resources/stream-processor/test/inputs_1_create.template.json @@ -0,0 +1,8 @@ +{ + "Profile": "default", + "ProjectId": "", + "WorkspaceName": "", + "ProcessorName": "test-processor-1", + "Pipeline": "[{\"$source\": {\"connectionName\": \"sample_stream_solar\"}}, {\"$merge\": {\"into\": {\"connectionName\": \"SINK_CONNECTION_PLACEHOLDER\", \"db\": \"test\", \"coll\": \"output\"}}}]", + "DesiredState": "CREATED" +} diff --git a/cfn-resources/stream-processor/test/inputs_1_update.template.json b/cfn-resources/stream-processor/test/inputs_1_update.template.json new file mode 100644 index 000000000..7e0a720ec --- /dev/null +++ b/cfn-resources/stream-processor/test/inputs_1_update.template.json @@ -0,0 +1,8 @@ +{ + "Profile": "default", + "ProjectId": "", + "WorkspaceName": "", + "ProcessorName": "test-processor-1", + "Pipeline": "[{\"$source\": {\"connectionName\": \"sample_stream_solar\"}}, {\"$merge\": {\"into\": {\"connectionName\": \"SINK_CONNECTION_PLACEHOLDER\", \"db\": \"test\", \"coll\": \"output\"}}}]", + "DesiredState": "CREATED" +} diff --git a/cfn-resources/stream-processor/test/inputs_2_create.template.json b/cfn-resources/stream-processor/test/inputs_2_create.template.json new file mode 100644 index 000000000..3d2ac71a5 --- /dev/null +++ b/cfn-resources/stream-processor/test/inputs_2_create.template.json @@ -0,0 +1,12 @@ +{ + "Profile": "default", + "ProjectId": "", + "WorkspaceName": "", + "ProcessorName": "test-processor-2", + "Pipeline": "[{\"$source\": {\"connectionName\": \"sample_stream_solar\"}}, {\"$merge\": {\"into\": {\"connectionName\": \"SINK_CONNECTION_PLACEHOLDER\", \"db\": \"test\", \"coll\": \"output\"}}}]", + "DesiredState": "STARTED", + "Timeouts": { + "Create": "25m" + }, + "DeleteOnCreateTimeout": true +} diff --git a/cfn-resources/stream-processor/test/inputs_2_update.template.json b/cfn-resources/stream-processor/test/inputs_2_update.template.json new file mode 100644 index 000000000..2653d8846 --- /dev/null +++ b/cfn-resources/stream-processor/test/inputs_2_update.template.json @@ -0,0 +1,8 @@ +{ + "Profile": "default", + "ProjectId": "", + "WorkspaceName": "", + "ProcessorName": "test-processor-2", + "Pipeline": "[{\"$source\": {\"connectionName\": \"sample_stream_solar\"}}, {\"$merge\": {\"into\": {\"connectionName\": \"SINK_CONNECTION_PLACEHOLDER\", \"db\": \"test\", \"coll\": \"output\"}}}]", + "DesiredState": "STOPPED" +} diff --git a/cfn-resources/stream-processor/test/inputs_3_create.template.json b/cfn-resources/stream-processor/test/inputs_3_create.template.json new file mode 100644 index 000000000..4e61dd3a5 --- /dev/null +++ b/cfn-resources/stream-processor/test/inputs_3_create.template.json @@ -0,0 +1,15 @@ +{ + "Profile": "default", + "ProjectId": "", + "WorkspaceName": "", + "ProcessorName": "test-processor-3", + "Pipeline": "[{\"$source\": {\"connectionName\": \"CONNECTION_NAME_PLACEHOLDER\"}}, {\"$merge\": {\"into\": {\"connectionName\": \"CONNECTION_NAME_PLACEHOLDER\", \"db\": \"test\", \"coll\": \"output\"}}}]", + "DesiredState": "CREATED", + "Options": { + "Dlq": { + "Coll": "dlq-collection", + "ConnectionName": "", + "Db": "dlq-database" + } + } +} diff --git a/cfn-resources/stream-processor/test/inputs_3_update.template.json b/cfn-resources/stream-processor/test/inputs_3_update.template.json new file mode 100644 index 000000000..3d928c19c --- /dev/null +++ b/cfn-resources/stream-processor/test/inputs_3_update.template.json @@ -0,0 +1,15 @@ +{ + "Profile": "default", + "ProjectId": "", + "WorkspaceName": "", + "ProcessorName": "test-processor-3", + "Pipeline": "[{\"$source\": {\"connectionName\": \"CONNECTION_NAME_PLACEHOLDER\"}}, {\"$merge\": {\"into\": {\"connectionName\": \"CONNECTION_NAME_PLACEHOLDER\", \"db\": \"test\", \"coll\": \"output\"}}}]", + "DesiredState": "CREATED", + "Options": { + "Dlq": { + "Coll": "dlq-collection-updated", + "ConnectionName": "", + "Db": "dlq-database-updated" + } + } +} diff --git a/cfn-resources/stream-processor/test/inputs_4_create.template.json b/cfn-resources/stream-processor/test/inputs_4_create.template.json new file mode 100644 index 000000000..dc62f70aa --- /dev/null +++ b/cfn-resources/stream-processor/test/inputs_4_create.template.json @@ -0,0 +1,8 @@ +{ + "Profile": "default", + "ProjectId": "", + "WorkspaceName": "", + "ProcessorName": "test-processor-4-kafka-to-cluster", + "Pipeline": "[{\"$source\": {\"connectionName\": \"KAFKA_SOURCE_CONNECTION_PLACEHOLDER\", \"topic\": \"random_topic\"}}, {\"$emit\": {\"connectionName\": \"CLUSTER_SINK_CONNECTION_PLACEHOLDER\", \"db\": \"kafka\", \"coll\": \"kafka_messages\", \"timeseries\": {\"timeField\": \"ts\"}}}]", + "DesiredState": "CREATED" +} diff --git a/cfn-resources/stream-processor/test/inputs_4_update.template.json b/cfn-resources/stream-processor/test/inputs_4_update.template.json new file mode 100644 index 000000000..9214a44db --- /dev/null +++ b/cfn-resources/stream-processor/test/inputs_4_update.template.json @@ -0,0 +1,8 @@ +{ + "Profile": "default", + "ProjectId": "", + "WorkspaceName": "", + "ProcessorName": "test-processor-4-kafka-to-cluster", + "Pipeline": "[{\"$source\": {\"connectionName\": \"KAFKA_SOURCE_CONNECTION_PLACEHOLDER\", \"topic\": \"random_topic\"}}, {\"$emit\": {\"connectionName\": \"CLUSTER_SINK_CONNECTION_PLACEHOLDER\", \"db\": \"kafka\", \"coll\": \"kafka_messages_updated\", \"timeseries\": {\"timeField\": \"ts\"}}}]", + "DesiredState": "CREATED" +} diff --git a/cfn-resources/stream-processor/test/inputs_5_create.template.json b/cfn-resources/stream-processor/test/inputs_5_create.template.json new file mode 100644 index 000000000..e58c256ac --- /dev/null +++ b/cfn-resources/stream-processor/test/inputs_5_create.template.json @@ -0,0 +1,8 @@ +{ + "Profile": "default", + "ProjectId": "", + "WorkspaceName": "", + "ProcessorName": "test-processor-5-cluster-to-kafka", + "Pipeline": "[{\"$source\": {\"connectionName\": \"CLUSTER_SOURCE_CONNECTION_PLACEHOLDER\"}}, {\"$emit\": {\"connectionName\": \"KAFKA_SINK_CONNECTION_PLACEHOLDER\", \"topic\": \"random_topic\"}}]", + "DesiredState": "CREATED" +} diff --git a/cfn-resources/stream-processor/test/inputs_5_update.template.json b/cfn-resources/stream-processor/test/inputs_5_update.template.json new file mode 100644 index 000000000..8abc048e2 --- /dev/null +++ b/cfn-resources/stream-processor/test/inputs_5_update.template.json @@ -0,0 +1,8 @@ +{ + "Profile": "default", + "ProjectId": "", + "WorkspaceName": "", + "ProcessorName": "test-processor-5-cluster-to-kafka", + "Pipeline": "[{\"$source\": {\"connectionName\": \"CLUSTER_SOURCE_CONNECTION_PLACEHOLDER\"}}, {\"$emit\": {\"connectionName\": \"KAFKA_SINK_CONNECTION_PLACEHOLDER\", \"topic\": \"random_topic_updated\"}}]", + "DesiredState": "CREATED" +} diff --git a/cfn-resources/stream-processor/test/stream-processor.sample-cfn-request.json b/cfn-resources/stream-processor/test/stream-processor.sample-cfn-request.json new file mode 100644 index 000000000..434caeccc --- /dev/null +++ b/cfn-resources/stream-processor/test/stream-processor.sample-cfn-request.json @@ -0,0 +1,11 @@ +{ + "desiredResourceState": { + "Profile": "default", + "ProjectId": "", + "WorkspaceName": "", + "ProcessorName": "sample-processor", + "Pipeline": "[{\"$match\": {\"status\": \"active\"}}]", + "DesiredState": "CREATED" + }, + "previousResourceState": {} +} diff --git a/cfn-resources/util/constants/constants.go b/cfn-resources/util/constants/constants.go index 4cd57df70..5e1dc590b 100644 --- a/cfn-resources/util/constants/constants.go +++ b/cfn-resources/util/constants/constants.go @@ -157,4 +157,8 @@ const ( ConnectionName = "ConnectionName" Type = "Type" StreamConfig = "StreamConfig" + + ProcessorName = "ProcessorName" + Pipeline = "Pipeline" + WorkspaceName = "WorkspaceName" ) diff --git a/examples/atlas-streams/stream-processor/README.md b/examples/atlas-streams/stream-processor/README.md new file mode 100644 index 000000000..57c14cd76 --- /dev/null +++ b/examples/atlas-streams/stream-processor/README.md @@ -0,0 +1,191 @@ +# How to create a MongoDB::Atlas::StreamProcessor + +## Step 1: Activate the stream processor resource in cloudformation + +Step a: Create Role using [execution-role.yaml](../../execution-role.yaml) in examples folder. + +Step b: Search for Mongodb::Atlas::StreamProcessor resource. + + (CloudFormation > Public extensions > choose 'Third party' > Search with " Execution name prefix = MongoDB " ) + +Step c: Select and activate +Enter the RoleArn that is created in step 1. + +Your StreamProcessor Resource is ready to use. + +## Step 2: Choose a template based on your use case + +### Example 1: Basic Stream Processor ([stream-processor.json](stream-processor.json)) + +Creates a stream processor that reads from a source connection and merges data into a cluster connection. This example uses `$merge` to write data to a regular MongoDB collection. + +**Use cases:** + +- Sample data to cluster (e.g., using `sample_stream_solar`) +- Cluster to cluster data streaming +- Simple data replication + +**Parameters:** + +1. **ProjectId** - Atlas Project Id (24 hexadecimal characters) +2. **WorkspaceName** - Name of your stream instance/workspace +3. **ProcessorName** - Unique name for the stream processor +4. **SourceConnectionName** - Name of the source connection: + - For sample data: `sample_stream_solar` + - For cluster source: Your cluster connection name +5. **SinkConnectionName** - Name of the sink cluster connection (must be a cluster connection) +6. **SinkDatabase** - Target database name (optional, default: `test`) +7. **SinkCollection** - Target collection name (optional, default: `output`) +8. **DesiredState** - Desired state of the processor: `CREATED`, `STOPPED`, or `STARTED` (optional, default: `CREATED`) +9. **Profile** - Secret Manager Profile for Atlas credentials (optional, default: `default`) + +**Pipeline stages:** + +- `$source` - Reads from the source connection +- `$merge` - Merges data into the target cluster connection (for regular collections) + +### Example 2: Stream Processor with Dead Letter Queue ([stream-processor-dlq.json](stream-processor-dlq.json)) + +Creates a stream processor with Dead Letter Queue (DLQ) configuration. Failed messages are automatically sent to a DLQ collection for error handling and debugging. + +**Additional Parameters (beyond Example 1):** + +10. **DlqConnectionName** - Name of the DLQ connection (must be a cluster connection) +11. **DlqDatabase** - DLQ database name (optional, default: `dlq`) +12. **DlqCollection** - DLQ collection name (optional, default: `dlq-messages`) + +**Pipeline stages:** + +- `$source` - Reads from the source connection +- `$merge` - Merges data into the target cluster connection (for regular collections) +- **Options.Dlq** - Configured to capture failed messages + +### Example 3: Kafka to Cluster Stream Processor ([stream-processor-kafka-to-cluster.json](stream-processor-kafka-to-cluster.json)) + +Creates a stream processor that reads from a Kafka topic and writes to a cluster connection as a time-series collection. + +**Use cases:** + +- Ingesting data from Kafka into MongoDB Atlas +- Real-time data pipeline from Kafka to MongoDB +- Event streaming from Kafka to time-series collections + +**Parameters:** + +1. **ProjectId** - Atlas Project Id (24 hexadecimal characters) +2. **WorkspaceName** - Name of your stream instance/workspace +3. **ProcessorName** - Unique name for the stream processor +4. **KafkaSourceConnectionName** - Name of the Kafka source connection +5. **KafkaTopic** - Name of the Kafka topic to read from +6. **SinkConnectionName** - Name of the sink cluster connection (must be a cluster connection) +7. **SinkDatabase** - Target database name (optional, default: `kafka`) +8. **SinkCollection** - Target collection name (optional, default: `kafka_messages`) +9. **DesiredState** - Must be `CREATED` or `STOPPED` (cannot be `STARTED` without a working Kafka cluster) +10. **Profile** - Secret Manager Profile for Atlas credentials (optional, default: `default`) + +**Pipeline stages:** + +- `$source` - Reads from Kafka topic (requires `connectionName` and `topic`) +- `$emit` - Writes to cluster connection as time-series collection + +**Important Notes:** + +- ⚠️ **This processor must be created in `CREATED` state** - it cannot be started (`STARTED`) without a working Kafka cluster that is accessible from MongoDB Atlas Stream Processing infrastructure +- The processor will fail if you attempt to start it without a valid Kafka connection +- To use this processor with a real Kafka cluster, first ensure your Kafka connection is properly configured and accessible, then update the processor's `DesiredState` to `STARTED` + +### Example 4: Cluster to Kafka Stream Processor ([stream-processor-cluster-to-kafka.json](stream-processor-cluster-to-kafka.json)) + +Creates a stream processor that reads from a cluster connection and writes to a Kafka topic. + +**Use cases:** + +- Streaming MongoDB data to Kafka +- Real-time data export from Atlas to Kafka +- Event streaming from MongoDB to Kafka topics + +**Parameters:** + +1. **ProjectId** - Atlas Project Id (24 hexadecimal characters) +2. **WorkspaceName** - Name of your stream instance/workspace +3. **ProcessorName** - Unique name for the stream processor +4. **SourceConnectionName** - Name of the source cluster connection +5. **KafkaSinkConnectionName** - Name of the Kafka sink connection +6. **KafkaTopic** - Name of the Kafka topic to write to +7. **DesiredState** - Must be `CREATED` or `STOPPED` (cannot be `STARTED` without a working Kafka cluster) +8. **Profile** - Secret Manager Profile for Atlas credentials (optional, default: `default`) + +**Pipeline stages:** + +- `$source` - Reads from cluster connection +- `$emit` - Writes to Kafka topic (requires `connectionName` and `topic`) + +**Important Notes:** + +- ⚠️ **This processor must be created in `CREATED` state** - it cannot be started (`STARTED`) without a working Kafka cluster that is accessible from MongoDB Atlas Stream Processing infrastructure +- The processor will fail if you attempt to start it without a valid Kafka connection +- To use this processor with a real Kafka cluster, first ensure your Kafka connection is properly configured and accessible, then update the processor's `DesiredState` to `STARTED` + +## Pipeline Stage Options + +### $source + +Reads data from a source connection. Supported sources: + +- **Sample connections**: `sample_stream_solar` (for testing) +- **Cluster connections**: Read from MongoDB collections +- **Kafka connections**: Read from Kafka topics (requires `topic` parameter) + +### $emit + +Writes data to a target connection. Options: + +- **Cluster**: Write to MongoDB collections + - `connectionName` - Target cluster connection name + - `db` - Target database + - `coll` - Target collection + - `timeseries` (optional) - For time-series collections + - `timeField` - Field name containing timestamp +- **Kafka**: Write to Kafka topics + - `connectionName` - Target Kafka connection name + - `topic` - Target Kafka topic name + +### $merge + +Merges data into regular MongoDB collections. Use `$merge` for standard collections (non-timeseries). + +- **Cluster**: Merge into MongoDB collections + - `connectionName` - Target cluster connection name + - `db` - Target database + - `coll` - Target collection + - `into` - Object containing connection, database, and collection details + +**Note:** Use `$merge` for regular collections. Use `$emit` only for time-series collections (requires `timeseries` option). + +## State Management + +The `DesiredState` parameter controls the desired processor lifecycle: + +- **CREATED** - Processor is created but not running (default) +- **STARTED** - Processor is actively processing data +- **STOPPED** - Processor is stopped (can be restarted) + +The `State` output (read-only) reflects the actual current state of the processor as returned by the Atlas API. Common states include `CREATED`, `STARTED`, `STOPPED`, and `FAILED`. + +**Note:** When updating a processor, if the current state is `STARTED`, the processor will be stopped, updated, and then restarted if the `DesiredState` is `STARTED`. + +## Kafka Integration Notes + +When working with Kafka-based stream processors: + +1. **Connection Validation**: The stream processor validates that the connection name exists in the workspace, but does not validate Kafka connectivity at creation time. + +2. **State Management**: Kafka processors should be created in `CREATED` state. They can only be started (`STARTED`) when: + - A valid Kafka connection exists + - The Kafka cluster is accessible from MongoDB Atlas Stream Processing infrastructure + - Authentication credentials are correct + - Network connectivity is established (public access or VPC peering) + +3. **Failure Handling**: If you attempt to start a Kafka processor without a working Kafka cluster, the processor will enter `FAILED` state. You can check the processor state via the `ProcessorState` output. + +4. **Testing**: The examples provided (Examples 3 and 4) are designed to be created successfully even without a working Kafka cluster, allowing you to validate the CloudFormation template structure. To actually process data, you'll need a properly configured Kafka cluster. diff --git a/examples/atlas-streams/stream-processor/stream-processor-cluster-to-kafka.json b/examples/atlas-streams/stream-processor/stream-processor-cluster-to-kafka.json new file mode 100644 index 000000000..62e19dbaa --- /dev/null +++ b/examples/atlas-streams/stream-processor/stream-processor-cluster-to-kafka.json @@ -0,0 +1,93 @@ +{ + "AWSTemplateFormatVersion": "2010-09-09", + "Description": "This template creates a stream processor that reads from a cluster connection and writes to a Kafka connection. Note: This processor is created in CREATED state and cannot be started without a working Kafka cluster.", + "Parameters": { + "Profile": { + "Type": "String", + "Default": "default", + "Description": "Secret Manager Profile that contains the Atlas Programmatic keys" + }, + "ProjectId": { + "Type": "String", + "Description": "Atlas Project Id (24 hexadecimal characters)" + }, + "WorkspaceName": { + "Type": "String", + "Description": "Human-readable label that identifies the stream processing workspace" + }, + "ProcessorName": { + "Type": "String", + "Description": "Human-readable label that identifies the stream processor" + }, + "SourceConnectionName": { + "Type": "String", + "Description": "Name of the source stream connection (must be a cluster connection)" + }, + "KafkaSinkConnectionName": { + "Type": "String", + "Description": "Name of the Kafka sink stream connection" + }, + "KafkaTopic": { + "Type": "String", + "Description": "Name of the Kafka topic to write to" + }, + "DesiredState": { + "Type": "String", + "Default": "CREATED", + "Description": "Desired state of the stream processor. Must be CREATED (cannot be STARTED without a working Kafka cluster)", + "AllowedValues": ["CREATED", "STOPPED"] + } + }, + "Resources": { + "StreamProcessor": { + "Type": "MongoDB::Atlas::StreamProcessor", + "Properties": { + "Profile": { + "Ref": "Profile" + }, + "ProjectId": { + "Ref": "ProjectId" + }, + "WorkspaceName": { + "Ref": "WorkspaceName" + }, + "ProcessorName": { + "Ref": "ProcessorName" + }, + "Pipeline": { + "Fn::Sub": [ + "[{\"$source\": {\"connectionName\": \"${SourceConnection}\"}}, {\"$emit\": {\"connectionName\": \"${KafkaSink}\", \"topic\": \"${KafkaTopic}\"}}]", + { + "SourceConnection": { + "Ref": "SourceConnectionName" + }, + "KafkaSink": { + "Ref": "KafkaSinkConnectionName" + }, + "KafkaTopic": { + "Ref": "KafkaTopic" + } + } + ] + }, + "DesiredState": { + "Ref": "DesiredState" + } + } + } + }, + "Outputs": { + "ProcessorId": { + "Description": "Unique identifier of the stream processor", + "Value": { + "Fn::GetAtt": ["StreamProcessor", "Id"] + } + }, + "ProcessorState": { + "Description": "Current state of the stream processor from Atlas API", + "Value": { + "Fn::GetAtt": ["StreamProcessor", "State"] + } + } + } +} diff --git a/examples/atlas-streams/stream-processor/stream-processor-dlq.json b/examples/atlas-streams/stream-processor/stream-processor-dlq.json new file mode 100644 index 000000000..eff1e9158 --- /dev/null +++ b/examples/atlas-streams/stream-processor/stream-processor-dlq.json @@ -0,0 +1,129 @@ +{ + "AWSTemplateFormatVersion": "2010-09-09", + "Description": "This template creates a stream processor with Dead Letter Queue (DLQ) configuration. The processor uses a source connection and merges data into a cluster connection, with failed messages sent to a DLQ collection.", + "Parameters": { + "Profile": { + "Type": "String", + "Default": "default", + "Description": "Secret Manager Profile that contains the Atlas Programmatic keys" + }, + "ProjectId": { + "Type": "String", + "Description": "Atlas Project Id (24 hexadecimal characters)" + }, + "WorkspaceName": { + "Type": "String", + "Description": "Human-readable label that identifies the stream processing workspace" + }, + "ProcessorName": { + "Type": "String", + "Description": "Human-readable label that identifies the stream processor" + }, + "SourceConnectionName": { + "Type": "String", + "Description": "Name of the source stream connection" + }, + "SinkConnectionName": { + "Type": "String", + "Description": "Name of the sink stream connection (must be a cluster connection)" + }, + "SinkDatabase": { + "Type": "String", + "Default": "test", + "Description": "Name of the database for the sink connection" + }, + "SinkCollection": { + "Type": "String", + "Default": "output", + "Description": "Name of the collection for the sink connection" + }, + "DlqConnectionName": { + "Type": "String", + "Description": "Name of the DLQ connection (must be a cluster connection)" + }, + "DlqDatabase": { + "Type": "String", + "Default": "dlq", + "Description": "Name of the database for the DLQ" + }, + "DlqCollection": { + "Type": "String", + "Default": "dlq-messages", + "Description": "Name of the collection for the DLQ" + }, + "DesiredState": { + "Type": "String", + "Default": "CREATED", + "Description": "Desired state of the stream processor", + "AllowedValues": ["CREATED", "STARTED", "STOPPED"] + } + }, + "Resources": { + "StreamProcessor": { + "Type": "MongoDB::Atlas::StreamProcessor", + "Properties": { + "Profile": { + "Ref": "Profile" + }, + "ProjectId": { + "Ref": "ProjectId" + }, + "WorkspaceName": { + "Ref": "WorkspaceName" + }, + "ProcessorName": { + "Ref": "ProcessorName" + }, + "Pipeline": { + "Fn::Sub": [ + "[{\"$source\": {\"connectionName\": \"${SourceConnection}\"}}, {\"$merge\": {\"into\": {\"connectionName\": \"${SinkConnection}\", \"db\": \"${SinkDb}\", \"coll\": \"${SinkColl}\"}}}]", + { + "SourceConnection": { + "Ref": "SourceConnectionName" + }, + "SinkConnection": { + "Ref": "SinkConnectionName" + }, + "SinkDb": { + "Ref": "SinkDatabase" + }, + "SinkColl": { + "Ref": "SinkCollection" + } + } + ] + }, + "DesiredState": { + "Ref": "DesiredState" + }, + "Options": { + "Dlq": { + "ConnectionName": { + "Ref": "DlqConnectionName" + }, + "Db": { + "Ref": "DlqDatabase" + }, + "Coll": { + "Ref": "DlqCollection" + } + } + } + } + } + }, + "Outputs": { + "ProcessorId": { + "Description": "Unique identifier of the stream processor", + "Value": { + "Fn::GetAtt": ["StreamProcessor", "Id"] + } + }, + "ProcessorState": { + "Description": "Current state of the stream processor from Atlas API", + "Value": { + "Fn::GetAtt": ["StreamProcessor", "State"] + } + } + } +} diff --git a/examples/atlas-streams/stream-processor/stream-processor-kafka-to-cluster.json b/examples/atlas-streams/stream-processor/stream-processor-kafka-to-cluster.json new file mode 100644 index 000000000..b9ba30e2e --- /dev/null +++ b/examples/atlas-streams/stream-processor/stream-processor-kafka-to-cluster.json @@ -0,0 +1,109 @@ +{ + "AWSTemplateFormatVersion": "2010-09-09", + "Description": "This template creates a stream processor that reads from a Kafka connection and writes to a cluster connection. Note: This processor is created in CREATED state and cannot be started without a working Kafka cluster.", + "Parameters": { + "Profile": { + "Type": "String", + "Default": "default", + "Description": "Secret Manager Profile that contains the Atlas Programmatic keys" + }, + "ProjectId": { + "Type": "String", + "Description": "Atlas Project Id (24 hexadecimal characters)" + }, + "WorkspaceName": { + "Type": "String", + "Description": "Human-readable label that identifies the stream processing workspace" + }, + "ProcessorName": { + "Type": "String", + "Description": "Human-readable label that identifies the stream processor" + }, + "KafkaSourceConnectionName": { + "Type": "String", + "Description": "Name of the Kafka source stream connection" + }, + "KafkaTopic": { + "Type": "String", + "Description": "Name of the Kafka topic to read from" + }, + "SinkConnectionName": { + "Type": "String", + "Description": "Name of the sink stream connection (must be a cluster connection)" + }, + "SinkDatabase": { + "Type": "String", + "Default": "kafka", + "Description": "Name of the database for the sink connection" + }, + "SinkCollection": { + "Type": "String", + "Default": "kafka_messages", + "Description": "Name of the collection for the sink connection" + }, + "DesiredState": { + "Type": "String", + "Default": "CREATED", + "Description": "Desired state of the stream processor. Must be CREATED (cannot be STARTED without a working Kafka cluster)", + "AllowedValues": ["CREATED", "STOPPED"] + } + }, + "Resources": { + "StreamProcessor": { + "Type": "MongoDB::Atlas::StreamProcessor", + "Properties": { + "Profile": { + "Ref": "Profile" + }, + "ProjectId": { + "Ref": "ProjectId" + }, + "WorkspaceName": { + "Ref": "WorkspaceName" + }, + "ProcessorName": { + "Ref": "ProcessorName" + }, + "Pipeline": { + "Fn::Sub": [ + "[{\"$source\": {\"connectionName\": \"${KafkaSource}\", \"topic\": \"${KafkaTopic}\"}}, {\"$emit\": {\"connectionName\": \"${SinkConnection}\", \"db\": \"${SinkDb}\", \"coll\": \"${SinkColl}\", \"timeseries\": {\"timeField\": \"ts\"}}}]", + { + "KafkaSource": { + "Ref": "KafkaSourceConnectionName" + }, + "KafkaTopic": { + "Ref": "KafkaTopic" + }, + "SinkConnection": { + "Ref": "SinkConnectionName" + }, + "SinkDb": { + "Ref": "SinkDatabase" + }, + "SinkColl": { + "Ref": "SinkCollection" + } + } + ] + }, + "DesiredState": { + "Ref": "DesiredState" + } + } + } + }, + "Outputs": { + "ProcessorId": { + "Description": "Unique identifier of the stream processor", + "Value": { + "Fn::GetAtt": ["StreamProcessor", "Id"] + } + }, + "ProcessorState": { + "Description": "Current state of the stream processor from Atlas API", + "Value": { + "Fn::GetAtt": ["StreamProcessor", "State"] + } + } + } +} diff --git a/examples/atlas-streams/stream-processor/stream-processor.json b/examples/atlas-streams/stream-processor/stream-processor.json new file mode 100644 index 000000000..2093b7ebf --- /dev/null +++ b/examples/atlas-streams/stream-processor/stream-processor.json @@ -0,0 +1,102 @@ +{ + "AWSTemplateFormatVersion": "2010-09-09", + "Description": "This template creates a stream processor for a given stream workspace in the specified project. The processor reads from a source connection (sample data, cluster, or Kafka) and merges data into a cluster connection collection.", + "Parameters": { + "Profile": { + "Type": "String", + "Default": "default", + "Description": "Secret Manager Profile that contains the Atlas Programmatic keys" + }, + "ProjectId": { + "Type": "String", + "Description": "Atlas Project Id (24 hexadecimal characters)" + }, + "WorkspaceName": { + "Type": "String", + "Description": "Human-readable label that identifies the stream processing workspace" + }, + "ProcessorName": { + "Type": "String", + "Description": "Human-readable label that identifies the stream processor" + }, + "SourceConnectionName": { + "Type": "String", + "Description": "Name of the source stream connection (e.g., sample_stream_solar for sample data, or a cluster/kafka connection name)" + }, + "SinkConnectionName": { + "Type": "String", + "Description": "Name of the sink stream connection (must be a cluster connection)" + }, + "SinkDatabase": { + "Type": "String", + "Default": "test", + "Description": "Name of the database for the sink connection" + }, + "SinkCollection": { + "Type": "String", + "Default": "output", + "Description": "Name of the collection for the sink connection" + }, + "DesiredState": { + "Type": "String", + "Default": "CREATED", + "Description": "Desired state of the stream processor", + "AllowedValues": ["CREATED", "STARTED", "STOPPED"] + } + }, + "Resources": { + "StreamProcessor": { + "Type": "MongoDB::Atlas::StreamProcessor", + "Properties": { + "Profile": { + "Ref": "Profile" + }, + "ProjectId": { + "Ref": "ProjectId" + }, + "WorkspaceName": { + "Ref": "WorkspaceName" + }, + "ProcessorName": { + "Ref": "ProcessorName" + }, + "Pipeline": { + "Fn::Sub": [ + "[{\"$source\": {\"connectionName\": \"${SourceConnection}\"}}, {\"$merge\": {\"into\": {\"connectionName\": \"${SinkConnection}\", \"db\": \"${SinkDb}\", \"coll\": \"${SinkColl}\"}}}]", + { + "SourceConnection": { + "Ref": "SourceConnectionName" + }, + "SinkConnection": { + "Ref": "SinkConnectionName" + }, + "SinkDb": { + "Ref": "SinkDatabase" + }, + "SinkColl": { + "Ref": "SinkCollection" + } + } + ] + }, + "DesiredState": { + "Ref": "DesiredState" + } + } + } + }, + "Outputs": { + "ProcessorId": { + "Description": "Unique identifier of the stream processor", + "Value": { + "Fn::GetAtt": ["StreamProcessor", "Id"] + } + }, + "ProcessorState": { + "Description": "Current state of the stream processor from Atlas API", + "Value": { + "Fn::GetAtt": ["StreamProcessor", "State"] + } + } + } +}