Skip to content

Commit 4e0c3e7

Browse files
author
sivaram-mongodb
committed
feat: Stream Connection CloudFormation Resource
1 parent 6fd44b2 commit 4e0c3e7

26 files changed

Lines changed: 1324 additions & 136 deletions

cfn-resources/stream-connection/README.md

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,52 @@ For instructions on setting up a profile, [see here](/README.md#mongodb-atlas-ap
1111

1212
See the [resource docs](docs/README.md). Also refer [AWS security best practices for CloudFormation](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/security-best-practices.html#creds) to manage credentials.
1313

14-
## Cloudformation Examples
14+
## CloudFormation Examples
1515

16-
See the example [CFN Template](/examples/stream-connection/kafka-stream-connection.json) for example resource.
16+
Example templates are available in the [examples directory](/examples/atlas-streams/stream-connection/):
17+
18+
- **Cluster Connection**: [cluster-stream-connection.json](/examples/atlas-streams/stream-connection/cluster-stream-connection.json) - Connects a Stream Workspace to an Atlas cluster
19+
- **Kafka Connection**: [kafka-stream-connection.json](/examples/atlas-streams/stream-connection/kafka-stream-connection.json) - Connects a Stream Workspace to a Kafka cluster
20+
- **Sample Connection**: [sample-stream-connection.json](/examples/atlas-streams/stream-connection/sample-stream-connection.json) - Uses a sample dataset
21+
22+
For detailed deployment and verification instructions, see the [examples README](/examples/atlas-streams/stream-connection/README.md).
23+
24+
## Deployment
25+
26+
### Prerequisites
27+
1. An existing Atlas project
28+
2. An existing Stream Workspace (create using `atlas streams instances create`)
29+
3. For Cluster connections: An existing Atlas cluster
30+
4. AWS credentials configured with CloudFormation permissions
31+
5. Atlas API keys stored in AWS Secrets Manager
32+
33+
### Deploy Example
34+
35+
```bash
36+
aws cloudformation deploy \
37+
--template-file examples/atlas-streams/stream-connection/cluster-stream-connection.json \
38+
--stack-name my-stream-connection \
39+
--parameter-overrides \
40+
ProjectId=YOUR_PROJECT_ID \
41+
WorkspaceName=YOUR_WORKSPACE_NAME \
42+
ConnectionName=my-connection \
43+
ClusterName=YOUR_CLUSTER_NAME \
44+
DbRole=atlasAdmin \
45+
DbRoleType=BUILT_IN \
46+
--capabilities CAPABILITY_IAM \
47+
--region us-east-1
48+
```
49+
50+
## Verification
51+
52+
After deployment, verify the connection using Atlas CLI:
53+
54+
```bash
55+
# List all connections for the workspace
56+
atlas streams connections list <WORKSPACE_NAME> --projectId <PROJECT_ID>
57+
58+
# Get specific connection details
59+
atlas streams connections get <WORKSPACE_NAME> <CONNECTION_NAME> --projectId <PROJECT_ID>
60+
```
61+
62+
The connection should appear in the list with the correct type and configuration matching your CloudFormation template parameters.

cfn-resources/stream-connection/cmd/resource/mappings.go

Lines changed: 109 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,37 +15,74 @@
1515
package resource
1616

1717
import (
18-
admin20231115014 "go.mongodb.org/atlas-sdk/v20231115014/admin"
18+
admin20250312010 "go.mongodb.org/atlas-sdk/v20250312010/admin"
1919

2020
"github.com/mongodb/mongodbatlas-cloudformation-resources/util"
2121
)
2222

23-
func GetStreamConnectionModel(streamsConn *admin20231115014.StreamsConnection, currentModel *Model) *Model {
24-
model := new(Model)
23+
func GetStreamConnectionModel(streamsConn *admin20250312010.StreamsConnection, currentModel *Model) *Model {
24+
var model *Model
2525

2626
if currentModel != nil {
27+
// Use currentModel directly to preserve all fields including primary identifier fields
28+
// (ProjectId, WorkspaceName/InstanceName, Profile, ConnectionName)
2729
model = currentModel
30+
// Ensure WorkspaceName is set for primary identifier (required by CFN)
31+
// If InstanceName is provided but WorkspaceName is not, set WorkspaceName from InstanceName
32+
if model.WorkspaceName == nil && model.InstanceName != nil && *model.InstanceName != "" {
33+
model.WorkspaceName = model.InstanceName
34+
}
35+
} else {
36+
// Create new model only when currentModel is nil (e.g., in List operations)
37+
model = new(Model)
2838
}
2939

3040
model.ConnectionName = streamsConn.Name
3141
model.Type = streamsConn.Type
3242
model.ClusterName = streamsConn.ClusterName
43+
if streamsConn.ClusterGroupId != nil {
44+
model.ClusterProjectId = streamsConn.ClusterGroupId
45+
}
3346
model.BootstrapServers = streamsConn.BootstrapServers
47+
if streamsConn.Url != nil {
48+
model.Url = streamsConn.Url
49+
}
3450

3551
model.DbRoleToExecute = NewModelDBRoleToExecute(streamsConn.DbRoleToExecute)
3652

37-
model.Authentication = NewModelAuthentication(streamsConn.Authentication)
53+
model.Authentication = NewModelAuthentication(streamsConn.Authentication, currentModel)
3854

3955
model.Security = NewModelSecurity(streamsConn.Security)
4056

4157
if streamsConn.Config != nil {
4258
model.Config = *streamsConn.Config
4359
}
4460

61+
if streamsConn.Headers != nil {
62+
model.Headers = *streamsConn.Headers
63+
}
64+
65+
// Networking
66+
if streamsConn.Networking != nil && streamsConn.Networking.Access != nil {
67+
model.Networking = &Networking{
68+
Access: &Access{
69+
Type: streamsConn.Networking.Access.Type,
70+
ConnectionId: streamsConn.Networking.Access.ConnectionId,
71+
},
72+
}
73+
}
74+
75+
// AWS
76+
if streamsConn.Aws != nil {
77+
model.Aws = &Aws{
78+
RoleArn: streamsConn.Aws.RoleArn,
79+
}
80+
}
81+
4582
return model
4683
}
4784

48-
func NewModelDBRoleToExecute(dbRole *admin20231115014.DBRoleToExecute) *DBRoleToExecute {
85+
func NewModelDBRoleToExecute(dbRole *admin20250312010.DBRoleToExecute) *DBRoleToExecute {
4986
if dbRole == nil {
5087
return nil
5188
}
@@ -56,19 +93,27 @@ func NewModelDBRoleToExecute(dbRole *admin20231115014.DBRoleToExecute) *DBRoleTo
5693
}
5794
}
5895

59-
func NewModelAuthentication(authentication *admin20231115014.StreamsKafkaAuthentication) *StreamsKafkaAuthentication {
96+
func NewModelAuthentication(authentication *admin20250312010.StreamsKafkaAuthentication, currentModel *Model) *StreamsKafkaAuthentication {
6097
if authentication == nil {
6198
return nil
6299
}
63100

64-
return &StreamsKafkaAuthentication{
65-
Mechanism: authentication.Mechanism,
66-
Password: authentication.Password,
67-
Username: authentication.Username,
101+
authModel := &StreamsKafkaAuthentication{
102+
Mechanism: authentication.Mechanism,
103+
Method: authentication.Method,
104+
Username: authentication.Username,
105+
TokenEndpointUrl: authentication.TokenEndpointUrl,
106+
ClientId: authentication.ClientId,
107+
Scope: authentication.Scope,
108+
SaslOauthbearerExtensions: authentication.SaslOauthbearerExtensions,
109+
// Note: Password and ClientSecret are write-only fields and should NOT be set here
110+
// They are only used during Create/Update operations, never returned in Read responses
68111
}
112+
113+
return authModel
69114
}
70115

71-
func NewModelSecurity(security *admin20231115014.StreamsKafkaSecurity) *StreamsKafkaSecurity {
116+
func NewModelSecurity(security *admin20250312010.StreamsKafkaSecurity) *StreamsKafkaSecurity {
72117
if security == nil {
73118
return nil
74119
}
@@ -79,60 +124,100 @@ func NewModelSecurity(security *admin20231115014.StreamsKafkaSecurity) *StreamsK
79124
}
80125
}
81126

82-
func newStreamConnectionReq(model *Model) *admin20231115014.StreamsConnection {
83-
streamConnReq := admin20231115014.StreamsConnection{
127+
func newStreamConnectionReq(model *Model) *admin20250312010.StreamsConnection {
128+
streamConnReq := admin20250312010.StreamsConnection{
84129
Name: model.ConnectionName,
85130
Type: model.Type,
86131
}
87132

88-
if util.SafeString(streamConnReq.Type) == ClusterConnectionType {
133+
typeStr := util.SafeString(streamConnReq.Type)
134+
135+
// Cluster type specific fields
136+
if typeStr == ClusterConnectionType {
89137
streamConnReq.ClusterName = model.ClusterName
138+
if model.ClusterProjectId != nil {
139+
streamConnReq.ClusterGroupId = model.ClusterProjectId
140+
}
90141
streamConnReq.DbRoleToExecute = NewDBRoleToExecute(model.DbRoleToExecute)
91142
}
92143

93-
if util.SafeString(streamConnReq.Type) == KafkaConnectionType {
144+
// Kafka type specific fields
145+
if typeStr == KafkaConnectionType {
94146
streamConnReq.BootstrapServers = model.BootstrapServers
95147
streamConnReq.Security = newStreamsKafkaSecurity(model.Security)
96148
streamConnReq.Authentication = newStreamsKafkaAuthentication(model.Authentication)
97149

98150
if model.Config != nil {
99151
streamConnReq.Config = &model.Config
100152
}
153+
154+
// Networking for Kafka
155+
if model.Networking != nil && model.Networking.Access != nil {
156+
streamConnReq.Networking = &admin20250312010.StreamsKafkaNetworking{
157+
Access: &admin20250312010.StreamsKafkaNetworkingAccess{
158+
Type: model.Networking.Access.Type,
159+
ConnectionId: model.Networking.Access.ConnectionId,
160+
},
161+
}
162+
}
163+
}
164+
165+
// AWS Lambda type specific fields
166+
if typeStr == AWSLambdaType {
167+
if model.Aws != nil {
168+
streamConnReq.Aws = &admin20250312010.StreamsAWSConnectionConfig{
169+
RoleArn: model.Aws.RoleArn,
170+
}
171+
}
172+
}
173+
174+
// HTTPS type specific fields
175+
if typeStr == HTTPSType {
176+
streamConnReq.Url = model.Url
177+
if model.Headers != nil {
178+
streamConnReq.Headers = &model.Headers
179+
}
101180
}
102181

103182
return &streamConnReq
104183
}
105184

106-
func NewDBRoleToExecute(dbRoleToExecuteModel *DBRoleToExecute) *admin20231115014.DBRoleToExecute {
185+
func NewDBRoleToExecute(dbRoleToExecuteModel *DBRoleToExecute) *admin20250312010.DBRoleToExecute {
107186
if dbRoleToExecuteModel == nil {
108187
return nil
109188
}
110189

111-
return &admin20231115014.DBRoleToExecute{
190+
return &admin20250312010.DBRoleToExecute{
112191
Role: dbRoleToExecuteModel.Role,
113192
Type: dbRoleToExecuteModel.Type,
114193
}
115194
}
116195

117-
func newStreamsKafkaSecurity(securityModel *StreamsKafkaSecurity) *admin20231115014.StreamsKafkaSecurity {
196+
func newStreamsKafkaSecurity(securityModel *StreamsKafkaSecurity) *admin20250312010.StreamsKafkaSecurity {
118197
if securityModel == nil {
119198
return nil
120199
}
121200

122-
return &admin20231115014.StreamsKafkaSecurity{
201+
return &admin20250312010.StreamsKafkaSecurity{
123202
BrokerPublicCertificate: securityModel.BrokerPublicCertificate,
124203
Protocol: securityModel.Protocol,
125204
}
126205
}
127206

128-
func newStreamsKafkaAuthentication(authenticationModel *StreamsKafkaAuthentication) *admin20231115014.StreamsKafkaAuthentication {
207+
func newStreamsKafkaAuthentication(authenticationModel *StreamsKafkaAuthentication) *admin20250312010.StreamsKafkaAuthentication {
129208
if authenticationModel == nil {
130209
return nil
131210
}
132211

133-
return &admin20231115014.StreamsKafkaAuthentication{
134-
Mechanism: authenticationModel.Mechanism,
135-
Password: authenticationModel.Password,
136-
Username: authenticationModel.Username,
212+
return &admin20250312010.StreamsKafkaAuthentication{
213+
Mechanism: authenticationModel.Mechanism,
214+
Method: authenticationModel.Method,
215+
Username: authenticationModel.Username,
216+
Password: authenticationModel.Password,
217+
TokenEndpointUrl: authenticationModel.TokenEndpointUrl,
218+
ClientId: authenticationModel.ClientId,
219+
ClientSecret: authenticationModel.ClientSecret,
220+
Scope: authenticationModel.Scope,
221+
SaslOauthbearerExtensions: authenticationModel.SaslOauthbearerExtensions,
137222
}
138223
}

0 commit comments

Comments
 (0)