Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 48 additions & 2 deletions cfn-resources/stream-connection/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,52 @@ For instructions on setting up a profile, [see here](/README.md#mongodb-atlas-ap

See the [resource docs](docs/README.md). Also refer [AWS security best practices for CloudFormation](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/security-best-practices.html#creds) to manage credentials.

## Cloudformation Examples
## CloudFormation Examples

See the example [CFN Template](/examples/stream-connection/kafka-stream-connection.json) for example resource.
Example templates are available in the [examples directory](/examples/atlas-streams/stream-connection/):

- **Cluster Connection**: [cluster-stream-connection.json](/examples/atlas-streams/stream-connection/cluster-stream-connection.json) - Connects a Stream Workspace to an Atlas cluster
- **Kafka Connection**: [kafka-stream-connection.json](/examples/atlas-streams/stream-connection/kafka-stream-connection.json) - Connects a Stream Workspace to a Kafka cluster
- **Sample Connection**: [sample-stream-connection.json](/examples/atlas-streams/stream-connection/sample-stream-connection.json) - Uses a sample dataset

For detailed deployment and verification instructions, see the [examples README](/examples/atlas-streams/stream-connection/README.md).

## Deployment

### Prerequisites
1. An existing Atlas project
2. An existing Stream Workspace (create using `atlas streams instances create`)
3. For Cluster connections: An existing Atlas cluster
4. AWS credentials configured with CloudFormation permissions
5. Atlas API keys stored in AWS Secrets Manager

### Deploy Example

```bash
aws cloudformation deploy \
--template-file examples/atlas-streams/stream-connection/cluster-stream-connection.json \
--stack-name my-stream-connection \
--parameter-overrides \
ProjectId=YOUR_PROJECT_ID \
WorkspaceName=YOUR_WORKSPACE_NAME \
ConnectionName=my-connection \
ClusterName=YOUR_CLUSTER_NAME \
DbRole=atlasAdmin \
DbRoleType=BUILT_IN \
--capabilities CAPABILITY_IAM \
--region us-east-1
```

## Verification

After deployment, verify the connection using Atlas CLI:

```bash
# List all connections for the workspace
atlas streams connections list <WORKSPACE_NAME> --projectId <PROJECT_ID>

# Get specific connection details
atlas streams connections get <WORKSPACE_NAME> <CONNECTION_NAME> --projectId <PROJECT_ID>
```

The connection should appear in the list with the correct type and configuration matching your CloudFormation template parameters.
119 changes: 95 additions & 24 deletions cfn-resources/stream-connection/cmd/resource/mappings.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,37 +15,67 @@
package resource

import (
admin20231115014 "go.mongodb.org/atlas-sdk/v20231115014/admin"
"go.mongodb.org/atlas-sdk/v20250312012/admin"

"github.com/mongodb/mongodbatlas-cloudformation-resources/util"
)

func GetStreamConnectionModel(streamsConn *admin20231115014.StreamsConnection, currentModel *Model) *Model {
model := new(Model)
func GetStreamConnectionModel(streamsConn *admin.StreamsConnection, currentModel *Model) *Model {
var model *Model

if currentModel != nil {
model = currentModel
if model.WorkspaceName == nil && model.InstanceName != nil && *model.InstanceName != "" {
model.WorkspaceName = model.InstanceName
}
} else {
model = new(Model)
}

model.ConnectionName = streamsConn.Name
model.Type = streamsConn.Type
model.ClusterName = streamsConn.ClusterName
if streamsConn.ClusterGroupId != nil {
model.ClusterProjectId = streamsConn.ClusterGroupId
}
model.BootstrapServers = streamsConn.BootstrapServers
if streamsConn.Url != nil {
model.Url = streamsConn.Url
}

model.DbRoleToExecute = NewModelDBRoleToExecute(streamsConn.DbRoleToExecute)

model.Authentication = NewModelAuthentication(streamsConn.Authentication)
model.Authentication = NewModelAuthentication(streamsConn.Authentication, currentModel)

model.Security = NewModelSecurity(streamsConn.Security)

if streamsConn.Config != nil {
model.Config = *streamsConn.Config
}

if streamsConn.Headers != nil {
model.Headers = *streamsConn.Headers
}

if streamsConn.Networking != nil && streamsConn.Networking.Access != nil {
model.Networking = &Networking{
Access: &Access{
Type: streamsConn.Networking.Access.Type,
ConnectionId: streamsConn.Networking.Access.ConnectionId,
},
}
}

if streamsConn.Aws != nil {
model.Aws = &Aws{
RoleArn: streamsConn.Aws.RoleArn,
}
}

return model
}

func NewModelDBRoleToExecute(dbRole *admin20231115014.DBRoleToExecute) *DBRoleToExecute {
func NewModelDBRoleToExecute(dbRole *admin.DBRoleToExecute) *DBRoleToExecute {
if dbRole == nil {
return nil
}
Expand All @@ -56,19 +86,25 @@ func NewModelDBRoleToExecute(dbRole *admin20231115014.DBRoleToExecute) *DBRoleTo
}
}

func NewModelAuthentication(authentication *admin20231115014.StreamsKafkaAuthentication) *StreamsKafkaAuthentication {
func NewModelAuthentication(authentication *admin.StreamsKafkaAuthentication, currentModel *Model) *StreamsKafkaAuthentication {
if authentication == nil {
return nil
}

return &StreamsKafkaAuthentication{
Mechanism: authentication.Mechanism,
Password: authentication.Password,
Username: authentication.Username,
authModel := &StreamsKafkaAuthentication{
Mechanism: authentication.Mechanism,
Method: authentication.Method,
Username: authentication.Username,
TokenEndpointUrl: authentication.TokenEndpointUrl,
ClientId: authentication.ClientId,
Scope: authentication.Scope,
SaslOauthbearerExtensions: authentication.SaslOauthbearerExtensions,
}

return authModel
}

func NewModelSecurity(security *admin20231115014.StreamsKafkaSecurity) *StreamsKafkaSecurity {
func NewModelSecurity(security *admin.StreamsKafkaSecurity) *StreamsKafkaSecurity {
if security == nil {
return nil
}
Expand All @@ -79,60 +115,95 @@ func NewModelSecurity(security *admin20231115014.StreamsKafkaSecurity) *StreamsK
}
}

func newStreamConnectionReq(model *Model) *admin20231115014.StreamsConnection {
streamConnReq := admin20231115014.StreamsConnection{
func newStreamConnectionReq(model *Model) *admin.StreamsConnection {
streamConnReq := admin.StreamsConnection{
Name: model.ConnectionName,
Type: model.Type,
}

if util.SafeString(streamConnReq.Type) == ClusterConnectionType {
typeStr := util.SafeString(streamConnReq.Type)

if typeStr == ClusterConnectionType {
streamConnReq.ClusterName = model.ClusterName
if model.ClusterProjectId != nil {
streamConnReq.ClusterGroupId = model.ClusterProjectId
}
streamConnReq.DbRoleToExecute = NewDBRoleToExecute(model.DbRoleToExecute)
}

if util.SafeString(streamConnReq.Type) == KafkaConnectionType {
if typeStr == KafkaConnectionType {
streamConnReq.BootstrapServers = model.BootstrapServers
streamConnReq.Security = newStreamsKafkaSecurity(model.Security)
streamConnReq.Authentication = newStreamsKafkaAuthentication(model.Authentication)

if model.Config != nil {
streamConnReq.Config = &model.Config
}

if model.Networking != nil && model.Networking.Access != nil {
streamConnReq.Networking = &admin.StreamsKafkaNetworking{
Access: &admin.StreamsKafkaNetworkingAccess{
Type: model.Networking.Access.Type,
ConnectionId: model.Networking.Access.ConnectionId,
},
}
}
}

if typeStr == AWSLambdaType {
if model.Aws != nil {
streamConnReq.Aws = &admin.StreamsAWSConnectionConfig{
RoleArn: model.Aws.RoleArn,
}
}
}

if typeStr == HTTPSType {
streamConnReq.Url = model.Url
if model.Headers != nil {
streamConnReq.Headers = &model.Headers
}
}

return &streamConnReq
}

func NewDBRoleToExecute(dbRoleToExecuteModel *DBRoleToExecute) *admin20231115014.DBRoleToExecute {
func NewDBRoleToExecute(dbRoleToExecuteModel *DBRoleToExecute) *admin.DBRoleToExecute {
if dbRoleToExecuteModel == nil {
return nil
}

return &admin20231115014.DBRoleToExecute{
return &admin.DBRoleToExecute{
Role: dbRoleToExecuteModel.Role,
Type: dbRoleToExecuteModel.Type,
}
}

func newStreamsKafkaSecurity(securityModel *StreamsKafkaSecurity) *admin20231115014.StreamsKafkaSecurity {
func newStreamsKafkaSecurity(securityModel *StreamsKafkaSecurity) *admin.StreamsKafkaSecurity {
if securityModel == nil {
return nil
}

return &admin20231115014.StreamsKafkaSecurity{
return &admin.StreamsKafkaSecurity{
BrokerPublicCertificate: securityModel.BrokerPublicCertificate,
Protocol: securityModel.Protocol,
}
}

func newStreamsKafkaAuthentication(authenticationModel *StreamsKafkaAuthentication) *admin20231115014.StreamsKafkaAuthentication {
func newStreamsKafkaAuthentication(authenticationModel *StreamsKafkaAuthentication) *admin.StreamsKafkaAuthentication {
if authenticationModel == nil {
return nil
}

return &admin20231115014.StreamsKafkaAuthentication{
Mechanism: authenticationModel.Mechanism,
Password: authenticationModel.Password,
Username: authenticationModel.Username,
return &admin.StreamsKafkaAuthentication{
Mechanism: authenticationModel.Mechanism,
Method: authenticationModel.Method,
Username: authenticationModel.Username,
Password: authenticationModel.Password,
TokenEndpointUrl: authenticationModel.TokenEndpointUrl,
ClientId: authenticationModel.ClientId,
ClientSecret: authenticationModel.ClientSecret,
Scope: authenticationModel.Scope,
SaslOauthbearerExtensions: authenticationModel.SaslOauthbearerExtensions,
}
}
Loading
Loading