Skip to content

Commit 25ac09f

Browse files
sivaram-mongodbsivaram-mongodbParthasarathyV
authored
feat: Update Stream Connection Resource (#1524)
Co-authored-by: sivaram-mongodb <sivaram@mongodb.com> Co-authored-by: ParthasarathyV <parthasarathy.varadhan@mongodb.com> Co-authored-by: ParthasarathyV <114770988+ParthasarathyV@users.noreply.github.com>
1 parent 600f5ba commit 25ac09f

34 files changed

Lines changed: 1451 additions & 384 deletions

cfn-resources/stream-connection/README.md

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,52 @@ For instructions on setting up a profile, [see here](/README.md#mongodb-atlas-ap
1111

1212
See the [resource docs](docs/README.md). Also refer [AWS security best practices for CloudFormation](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/security-best-practices.html#creds) to manage credentials.
1313

14-
## Cloudformation Examples
14+
## CloudFormation Examples
1515

16-
See the example [CFN Template](/examples/stream-connection/kafka-stream-connection.json) for example resource.
16+
Example templates are available in the [examples directory](/examples/atlas-streams/stream-connection/):
17+
18+
- **Cluster Connection**: [cluster-stream-connection.json](/examples/atlas-streams/stream-connection/cluster-stream-connection.json) - Connects a Stream Workspace to an Atlas cluster
19+
- **Kafka Connection**: [kafka-stream-connection.json](/examples/atlas-streams/stream-connection/kafka-stream-connection.json) - Connects a Stream Workspace to a Kafka cluster
20+
- **Sample Connection**: [sample-stream-connection.json](/examples/atlas-streams/stream-connection/sample-stream-connection.json) - Uses a sample dataset
21+
22+
For detailed deployment and verification instructions, see the [examples README](/examples/atlas-streams/stream-connection/README.md).
23+
24+
## Deployment
25+
26+
### Prerequisites
27+
1. An existing Atlas project
28+
2. An existing Stream Workspace (create using `atlas streams instances create`)
29+
3. For Cluster connections: An existing Atlas cluster
30+
4. AWS credentials configured with CloudFormation permissions
31+
5. Atlas API keys stored in AWS Secrets Manager
32+
33+
### Deploy Example
34+
35+
```bash
36+
aws cloudformation deploy \
37+
--template-file examples/atlas-streams/stream-connection/cluster-stream-connection.json \
38+
--stack-name my-stream-connection \
39+
--parameter-overrides \
40+
ProjectId=YOUR_PROJECT_ID \
41+
WorkspaceName=YOUR_WORKSPACE_NAME \
42+
ConnectionName=my-connection \
43+
ClusterName=YOUR_CLUSTER_NAME \
44+
DbRole=atlasAdmin \
45+
DbRoleType=BUILT_IN \
46+
--capabilities CAPABILITY_IAM \
47+
--region us-east-1
48+
```
49+
50+
## Verification
51+
52+
After deployment, verify the connection using Atlas CLI:
53+
54+
```bash
55+
# List all connections for the workspace
56+
atlas streams connections list <WORKSPACE_NAME> --projectId <PROJECT_ID>
57+
58+
# Get specific connection details
59+
atlas streams connections get <WORKSPACE_NAME> <CONNECTION_NAME> --projectId <PROJECT_ID>
60+
```
61+
62+
The connection should appear in the list with the correct type and configuration matching your CloudFormation template parameters.

cfn-resources/stream-connection/cmd/resource/mappings.go

Lines changed: 95 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,37 +15,67 @@
1515
package resource
1616

1717
import (
18-
admin20231115014 "go.mongodb.org/atlas-sdk/v20231115014/admin"
18+
"go.mongodb.org/atlas-sdk/v20250312012/admin"
1919

2020
"github.com/mongodb/mongodbatlas-cloudformation-resources/util"
2121
)
2222

23-
func GetStreamConnectionModel(streamsConn *admin20231115014.StreamsConnection, currentModel *Model) *Model {
24-
model := new(Model)
23+
func GetStreamConnectionModel(streamsConn *admin.StreamsConnection, currentModel *Model) *Model {
24+
var model *Model
2525

2626
if currentModel != nil {
2727
model = currentModel
28+
if model.WorkspaceName == nil && model.InstanceName != nil && *model.InstanceName != "" {
29+
model.WorkspaceName = model.InstanceName
30+
}
31+
} else {
32+
model = new(Model)
2833
}
2934

3035
model.ConnectionName = streamsConn.Name
3136
model.Type = streamsConn.Type
3237
model.ClusterName = streamsConn.ClusterName
38+
if streamsConn.ClusterGroupId != nil {
39+
model.ClusterProjectId = streamsConn.ClusterGroupId
40+
}
3341
model.BootstrapServers = streamsConn.BootstrapServers
42+
if streamsConn.Url != nil {
43+
model.Url = streamsConn.Url
44+
}
3445

3546
model.DbRoleToExecute = NewModelDBRoleToExecute(streamsConn.DbRoleToExecute)
3647

37-
model.Authentication = NewModelAuthentication(streamsConn.Authentication)
48+
model.Authentication = NewModelAuthentication(streamsConn.Authentication, currentModel)
3849

3950
model.Security = NewModelSecurity(streamsConn.Security)
4051

4152
if streamsConn.Config != nil {
4253
model.Config = *streamsConn.Config
4354
}
4455

56+
if streamsConn.Headers != nil {
57+
model.Headers = *streamsConn.Headers
58+
}
59+
60+
if streamsConn.Networking != nil && streamsConn.Networking.Access != nil {
61+
model.Networking = &Networking{
62+
Access: &Access{
63+
Type: streamsConn.Networking.Access.Type,
64+
ConnectionId: streamsConn.Networking.Access.ConnectionId,
65+
},
66+
}
67+
}
68+
69+
if streamsConn.Aws != nil {
70+
model.Aws = &Aws{
71+
RoleArn: streamsConn.Aws.RoleArn,
72+
}
73+
}
74+
4575
return model
4676
}
4777

48-
func NewModelDBRoleToExecute(dbRole *admin20231115014.DBRoleToExecute) *DBRoleToExecute {
78+
func NewModelDBRoleToExecute(dbRole *admin.DBRoleToExecute) *DBRoleToExecute {
4979
if dbRole == nil {
5080
return nil
5181
}
@@ -56,19 +86,25 @@ func NewModelDBRoleToExecute(dbRole *admin20231115014.DBRoleToExecute) *DBRoleTo
5686
}
5787
}
5888

59-
func NewModelAuthentication(authentication *admin20231115014.StreamsKafkaAuthentication) *StreamsKafkaAuthentication {
89+
func NewModelAuthentication(authentication *admin.StreamsKafkaAuthentication, currentModel *Model) *StreamsKafkaAuthentication {
6090
if authentication == nil {
6191
return nil
6292
}
6393

64-
return &StreamsKafkaAuthentication{
65-
Mechanism: authentication.Mechanism,
66-
Password: authentication.Password,
67-
Username: authentication.Username,
94+
authModel := &StreamsKafkaAuthentication{
95+
Mechanism: authentication.Mechanism,
96+
Method: authentication.Method,
97+
Username: authentication.Username,
98+
TokenEndpointUrl: authentication.TokenEndpointUrl,
99+
ClientId: authentication.ClientId,
100+
Scope: authentication.Scope,
101+
SaslOauthbearerExtensions: authentication.SaslOauthbearerExtensions,
68102
}
103+
104+
return authModel
69105
}
70106

71-
func NewModelSecurity(security *admin20231115014.StreamsKafkaSecurity) *StreamsKafkaSecurity {
107+
func NewModelSecurity(security *admin.StreamsKafkaSecurity) *StreamsKafkaSecurity {
72108
if security == nil {
73109
return nil
74110
}
@@ -79,60 +115,95 @@ func NewModelSecurity(security *admin20231115014.StreamsKafkaSecurity) *StreamsK
79115
}
80116
}
81117

82-
func newStreamConnectionReq(model *Model) *admin20231115014.StreamsConnection {
83-
streamConnReq := admin20231115014.StreamsConnection{
118+
func newStreamConnectionReq(model *Model) *admin.StreamsConnection {
119+
streamConnReq := admin.StreamsConnection{
84120
Name: model.ConnectionName,
85121
Type: model.Type,
86122
}
87123

88-
if util.SafeString(streamConnReq.Type) == ClusterConnectionType {
124+
typeStr := util.SafeString(streamConnReq.Type)
125+
126+
if typeStr == ClusterConnectionType {
89127
streamConnReq.ClusterName = model.ClusterName
128+
if model.ClusterProjectId != nil {
129+
streamConnReq.ClusterGroupId = model.ClusterProjectId
130+
}
90131
streamConnReq.DbRoleToExecute = NewDBRoleToExecute(model.DbRoleToExecute)
91132
}
92133

93-
if util.SafeString(streamConnReq.Type) == KafkaConnectionType {
134+
if typeStr == KafkaConnectionType {
94135
streamConnReq.BootstrapServers = model.BootstrapServers
95136
streamConnReq.Security = newStreamsKafkaSecurity(model.Security)
96137
streamConnReq.Authentication = newStreamsKafkaAuthentication(model.Authentication)
97138

98139
if model.Config != nil {
99140
streamConnReq.Config = &model.Config
100141
}
142+
143+
if model.Networking != nil && model.Networking.Access != nil {
144+
streamConnReq.Networking = &admin.StreamsKafkaNetworking{
145+
Access: &admin.StreamsKafkaNetworkingAccess{
146+
Type: model.Networking.Access.Type,
147+
ConnectionId: model.Networking.Access.ConnectionId,
148+
},
149+
}
150+
}
151+
}
152+
153+
if typeStr == AWSLambdaType {
154+
if model.Aws != nil {
155+
streamConnReq.Aws = &admin.StreamsAWSConnectionConfig{
156+
RoleArn: model.Aws.RoleArn,
157+
}
158+
}
159+
}
160+
161+
if typeStr == HTTPSType {
162+
streamConnReq.Url = model.Url
163+
if model.Headers != nil {
164+
streamConnReq.Headers = &model.Headers
165+
}
101166
}
102167

103168
return &streamConnReq
104169
}
105170

106-
func NewDBRoleToExecute(dbRoleToExecuteModel *DBRoleToExecute) *admin20231115014.DBRoleToExecute {
171+
func NewDBRoleToExecute(dbRoleToExecuteModel *DBRoleToExecute) *admin.DBRoleToExecute {
107172
if dbRoleToExecuteModel == nil {
108173
return nil
109174
}
110175

111-
return &admin20231115014.DBRoleToExecute{
176+
return &admin.DBRoleToExecute{
112177
Role: dbRoleToExecuteModel.Role,
113178
Type: dbRoleToExecuteModel.Type,
114179
}
115180
}
116181

117-
func newStreamsKafkaSecurity(securityModel *StreamsKafkaSecurity) *admin20231115014.StreamsKafkaSecurity {
182+
func newStreamsKafkaSecurity(securityModel *StreamsKafkaSecurity) *admin.StreamsKafkaSecurity {
118183
if securityModel == nil {
119184
return nil
120185
}
121186

122-
return &admin20231115014.StreamsKafkaSecurity{
187+
return &admin.StreamsKafkaSecurity{
123188
BrokerPublicCertificate: securityModel.BrokerPublicCertificate,
124189
Protocol: securityModel.Protocol,
125190
}
126191
}
127192

128-
func newStreamsKafkaAuthentication(authenticationModel *StreamsKafkaAuthentication) *admin20231115014.StreamsKafkaAuthentication {
193+
func newStreamsKafkaAuthentication(authenticationModel *StreamsKafkaAuthentication) *admin.StreamsKafkaAuthentication {
129194
if authenticationModel == nil {
130195
return nil
131196
}
132197

133-
return &admin20231115014.StreamsKafkaAuthentication{
134-
Mechanism: authenticationModel.Mechanism,
135-
Password: authenticationModel.Password,
136-
Username: authenticationModel.Username,
198+
return &admin.StreamsKafkaAuthentication{
199+
Mechanism: authenticationModel.Mechanism,
200+
Method: authenticationModel.Method,
201+
Username: authenticationModel.Username,
202+
Password: authenticationModel.Password,
203+
TokenEndpointUrl: authenticationModel.TokenEndpointUrl,
204+
ClientId: authenticationModel.ClientId,
205+
ClientSecret: authenticationModel.ClientSecret,
206+
Scope: authenticationModel.Scope,
207+
SaslOauthbearerExtensions: authenticationModel.SaslOauthbearerExtensions,
137208
}
138209
}

0 commit comments

Comments
 (0)