Skip to content

Commit 47853ba

Browse files
sivaram-mongodboutcomes-winter-rakhulsprakash
authored andcommitted
refactor(stream-processor): address review comments
1 parent e25ee80 commit 47853ba

20 files changed

Lines changed: 76 additions & 192 deletions

cfn-resources/stream-processor/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ For processors with DLQ:
138138
## Notes
139139

140140
- **AWS Only**: This CloudFormation resource is designed for AWS deployments. The provider is effectively AWS.
141-
- **WorkspaceName vs InstanceName**: Use `WorkspaceName` (preferred). `InstanceName` is supported for backward compatibility but is deprecated.
141+
- **WorkspaceName**: This field is the same as 'InstanceName' used in other stream resources.
142142
- **State Management**: When creating a processor, specify `State: STARTED` to automatically start processing, or `State: CREATED` to create it in a stopped state.
143143
- **Long-Running Operations**: Creating and starting stream processors can take several minutes. The resource uses callback-based state management to handle these operations asynchronously.
144144
- **Timeout Configuration**: Use `Timeouts.Create` to configure how long to wait for processor creation/startup (default: 20 minutes).

cfn-resources/stream-processor/cmd/resource/callbacks.go

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,14 @@ import (
2929
)
3030

3131
type CallbackData struct {
32-
ProjectID string
33-
WorkspaceOrInstanceName string
34-
ProcessorName string
35-
DesiredState string
36-
StartTime string
37-
TimeoutDuration string
38-
NeedsStarting bool
39-
DeleteOnCreateTimeout bool
32+
ProjectID string
33+
WorkspaceName string
34+
ProcessorName string
35+
DesiredState string
36+
StartTime string
37+
TimeoutDuration string
38+
NeedsStarting bool
39+
DeleteOnCreateTimeout bool
4040
}
4141

4242
func getCallbackData(req handler.Request) *CallbackData {
@@ -46,7 +46,7 @@ func getCallbackData(req handler.Request) *CallbackData {
4646
ctx.ProjectID = val
4747
}
4848
if val, ok := req.CallbackContext["workspaceName"].(string); ok {
49-
ctx.WorkspaceOrInstanceName = val
49+
ctx.WorkspaceName = val
5050
}
5151
if val, ok := req.CallbackContext["processorName"].(string); ok {
5252
ctx.ProcessorName = val
@@ -71,7 +71,7 @@ func getCallbackData(req handler.Request) *CallbackData {
7171
}
7272

7373
func validateCallbackData(ctx *CallbackData) *handler.ProgressEvent {
74-
if ctx.ProjectID == "" || ctx.WorkspaceOrInstanceName == "" || ctx.ProcessorName == "" {
74+
if ctx.ProjectID == "" || ctx.WorkspaceName == "" || ctx.ProcessorName == "" {
7575
return &handler.ProgressEvent{
7676
OperationStatus: handler.Failed,
7777
Message: "Missing required values in callback context",
@@ -80,11 +80,11 @@ func validateCallbackData(ctx *CallbackData) *handler.ProgressEvent {
8080
return nil
8181
}
8282

83-
func buildCallbackContext(projectID, workspaceOrInstanceName, processorName string, additionalFields map[string]any) map[string]any {
83+
func buildCallbackContext(projectID, workspaceName, processorName string, additionalFields map[string]any) map[string]any {
8484
ctx := map[string]any{
8585
"callbackStreamProcessor": true,
8686
"projectID": projectID,
87-
"workspaceName": workspaceOrInstanceName,
87+
"workspaceName": workspaceName,
8888
"processorName": processorName,
8989
}
9090

@@ -98,9 +98,10 @@ func cleanupOnCreateTimeout(ctx context.Context, client *util.MongoDBClient, cal
9898
return nil
9999
}
100100

101-
_, err := client.AtlasSDK.StreamsApi.DeleteStreamProcessor(ctx, callbackCtx.ProjectID, callbackCtx.WorkspaceOrInstanceName, callbackCtx.ProcessorName).Execute()
101+
_, err := client.AtlasSDK.StreamsApi.DeleteStreamProcessor(ctx, callbackCtx.ProjectID, callbackCtx.WorkspaceName, callbackCtx.ProcessorName).Execute()
102102
if err != nil {
103103
_, _ = logger.Warnf("Cleanup delete failed: %v", err)
104+
return err
104105
}
105106
return nil
106107
}
@@ -127,14 +128,14 @@ func handleCreateCallback(ctx context.Context, client *util.MongoDBClient, curre
127128
}
128129
}
129130

130-
streamProcessor, peErr := getStreamProcessor(ctx, client.AtlasSDK, callbackCtx.ProjectID, callbackCtx.WorkspaceOrInstanceName, callbackCtx.ProcessorName)
131+
streamProcessor, peErr := getStreamProcessor(ctx, client.AtlasSDK, callbackCtx.ProjectID, callbackCtx.WorkspaceName, callbackCtx.ProcessorName)
131132
if peErr != nil {
132133
return *peErr
133134
}
134135

135136
currentState := streamProcessor.GetState()
136137

137-
callbackContext := buildCallbackContext(callbackCtx.ProjectID, callbackCtx.WorkspaceOrInstanceName, callbackCtx.ProcessorName, map[string]any{
138+
callbackContext := buildCallbackContext(callbackCtx.ProjectID, callbackCtx.WorkspaceName, callbackCtx.ProcessorName, map[string]any{
138139
"needsStarting": callbackCtx.NeedsStarting,
139140
"startTime": callbackCtx.StartTime,
140141
"timeoutDuration": callbackCtx.TimeoutDuration,
@@ -144,7 +145,7 @@ func handleCreateCallback(ctx context.Context, client *util.MongoDBClient, curre
144145
switch currentState {
145146
case CreatedState:
146147
if needsStarting {
147-
if peErr := startStreamProcessor(ctx, client.AtlasSDK, callbackCtx.ProjectID, callbackCtx.WorkspaceOrInstanceName, callbackCtx.ProcessorName); peErr != nil {
148+
if peErr := startStreamProcessor(ctx, client.AtlasSDK, callbackCtx.ProjectID, callbackCtx.WorkspaceName, callbackCtx.ProcessorName); peErr != nil {
148149
return *peErr
149150
}
150151
return createInProgressEvent(constants.Pending, currentModel, callbackContext)
@@ -172,7 +173,7 @@ func handleCreateCallback(ctx context.Context, client *util.MongoDBClient, curre
172173
}
173174

174175
func handleUpdateCallback(ctx context.Context, client *util.MongoDBClient, currentModel *Model, callbackCtx *CallbackData) handler.ProgressEvent {
175-
streamProcessor, peErr := getStreamProcessor(ctx, client.AtlasSDK, callbackCtx.ProjectID, callbackCtx.WorkspaceOrInstanceName, callbackCtx.ProcessorName)
176+
streamProcessor, peErr := getStreamProcessor(ctx, client.AtlasSDK, callbackCtx.ProjectID, callbackCtx.WorkspaceName, callbackCtx.ProcessorName)
176177
if peErr != nil {
177178
return *peErr
178179
}
@@ -191,7 +192,7 @@ func handleUpdateCallback(ctx context.Context, client *util.MongoDBClient, curre
191192

192193
currentState := streamProcessor.GetState()
193194

194-
callbackContext := buildCallbackContext(callbackCtx.ProjectID, callbackCtx.WorkspaceOrInstanceName, callbackCtx.ProcessorName, map[string]any{
195+
callbackContext := buildCallbackContext(callbackCtx.ProjectID, callbackCtx.WorkspaceName, callbackCtx.ProcessorName, map[string]any{
195196
"desiredState": desiredState,
196197
})
197198

@@ -211,7 +212,7 @@ func handleUpdateCallback(ctx context.Context, client *util.MongoDBClient, curre
211212
}
212213

213214
if desiredState == StartedState {
214-
if peErr := startStreamProcessor(ctx, client.AtlasSDK, callbackCtx.ProjectID, callbackCtx.WorkspaceOrInstanceName, callbackCtx.ProcessorName); peErr != nil {
215+
if peErr := startStreamProcessor(ctx, client.AtlasSDK, callbackCtx.ProjectID, callbackCtx.WorkspaceName, callbackCtx.ProcessorName); peErr != nil {
215216
return *peErr
216217
}
217218
return createInProgressEvent(constants.Pending, currentModel, callbackContext)
@@ -227,7 +228,7 @@ func handleUpdateCallback(ctx context.Context, client *util.MongoDBClient, curre
227228
_, err := client.AtlasSDK.StreamsApi.StopStreamProcessorWithParams(ctx,
228229
&admin.StopStreamProcessorApiParams{
229230
GroupId: callbackCtx.ProjectID,
230-
TenantName: callbackCtx.WorkspaceOrInstanceName,
231+
TenantName: callbackCtx.WorkspaceName,
231232
ProcessorName: callbackCtx.ProcessorName,
232233
},
233234
).Execute()

cfn-resources/stream-processor/cmd/resource/share.go renamed to cfn-resources/stream-processor/cmd/resource/handlers.go

Lines changed: 15 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,7 @@ func HandleCreate(req *handler.Request, client *util.MongoDBClient, model *Model
4747
)
4848
}
4949

50-
workspaceOrInstanceName, err := GetWorkspaceOrInstanceName(model)
51-
if err != nil {
52-
return handler.ProgressEvent{
53-
OperationStatus: handler.Failed,
54-
Message: err.Error(),
55-
}
56-
}
50+
workspaceName := util.SafeString(model.WorkspaceName)
5751

5852
ctx := context.Background()
5953
projectID := util.SafeString(model.ProjectId)
@@ -83,7 +77,7 @@ func HandleCreate(req *handler.Request, client *util.MongoDBClient, model *Model
8377
}
8478
}
8579

86-
_, apiResp, err := client.AtlasSDK.StreamsApi.CreateStreamProcessor(ctx, projectID, workspaceOrInstanceName, streamProcessorReq).Execute()
80+
_, apiResp, err := client.AtlasSDK.StreamsApi.CreateStreamProcessor(ctx, projectID, workspaceName, streamProcessorReq).Execute()
8781
if err != nil {
8882
return handleError(apiResp, constants.CREATE, err)
8983
}
@@ -110,7 +104,7 @@ func HandleCreate(req *handler.Request, client *util.MongoDBClient, model *Model
110104
Message: constants.Pending,
111105
ResourceModel: inProgressModel,
112106
CallbackDelaySeconds: defaultCallbackDelaySeconds,
113-
CallbackContext: buildCallbackContext(projectID, workspaceOrInstanceName, processorName, map[string]any{
107+
CallbackContext: buildCallbackContext(projectID, workspaceName, processorName, map[string]any{
114108
"needsStarting": needsStarting,
115109
"startTime": time.Now().Format(time.RFC3339),
116110
"timeoutDuration": timeoutStr,
@@ -120,21 +114,14 @@ func HandleCreate(req *handler.Request, client *util.MongoDBClient, model *Model
120114
}
121115

122116
func HandleRead(req *handler.Request, client *util.MongoDBClient, model *Model) handler.ProgressEvent {
123-
workspaceOrInstanceName, err := GetWorkspaceOrInstanceName(model)
124-
if err != nil {
125-
return handler.ProgressEvent{
126-
OperationStatus: handler.Failed,
127-
Message: err.Error(),
128-
}
129-
}
130-
117+
workspaceName := util.SafeString(model.WorkspaceName)
131118
projectID := util.SafeString(model.ProjectId)
132119
processorName := util.SafeString(model.ProcessorName)
133120

134121
streamProcessor, apiResp, err := client.AtlasSDK.StreamsApi.GetStreamProcessorWithParams(context.Background(),
135122
&admin.GetStreamProcessorApiParams{
136123
GroupId: projectID,
137-
TenantName: workspaceOrInstanceName,
124+
TenantName: workspaceName,
138125
ProcessorName: processorName,
139126
}).Execute()
140127
if err != nil {
@@ -179,21 +166,15 @@ func HandleUpdate(req *handler.Request, client *util.MongoDBClient, prevModel *M
179166
)
180167
}
181168

182-
workspaceOrInstanceName, err := GetWorkspaceOrInstanceName(model)
183-
if err != nil {
184-
return handler.ProgressEvent{
185-
OperationStatus: handler.Failed,
186-
Message: err.Error(),
187-
}
188-
}
169+
workspaceName := util.SafeString(model.WorkspaceName)
189170

190171
ctx := context.Background()
191172
projectID := util.SafeString(model.ProjectId)
192173
processorName := util.SafeString(model.ProcessorName)
193174

194175
requestParams := &admin.GetStreamProcessorApiParams{
195176
GroupId: projectID,
196-
TenantName: workspaceOrInstanceName,
177+
TenantName: workspaceName,
197178
ProcessorName: processorName,
198179
}
199180

@@ -229,7 +210,7 @@ func HandleUpdate(req *handler.Request, client *util.MongoDBClient, prevModel *M
229210
_, err := client.AtlasSDK.StreamsApi.StopStreamProcessorWithParams(ctx,
230211
&admin.StopStreamProcessorApiParams{
231212
GroupId: projectID,
232-
TenantName: workspaceOrInstanceName,
213+
TenantName: workspaceName,
233214
ProcessorName: processorName,
234215
},
235216
).Execute()
@@ -252,7 +233,7 @@ func HandleUpdate(req *handler.Request, client *util.MongoDBClient, prevModel *M
252233
Message: constants.Pending,
253234
ResourceModel: inProgressModel,
254235
CallbackDelaySeconds: defaultCallbackDelaySeconds,
255-
CallbackContext: buildCallbackContext(projectID, workspaceOrInstanceName, processorName, map[string]any{
236+
CallbackContext: buildCallbackContext(projectID, workspaceName, processorName, map[string]any{
256237
"desiredState": desiredState,
257238
}),
258239
}
@@ -275,7 +256,7 @@ func HandleUpdate(req *handler.Request, client *util.MongoDBClient, prevModel *M
275256
_, err := client.AtlasSDK.StreamsApi.StartStreamProcessorWithParams(ctx,
276257
&admin.StartStreamProcessorApiParams{
277258
GroupId: projectID,
278-
TenantName: workspaceOrInstanceName,
259+
TenantName: workspaceName,
279260
ProcessorName: processorName,
280261
},
281262
).Execute()
@@ -298,7 +279,7 @@ func HandleUpdate(req *handler.Request, client *util.MongoDBClient, prevModel *M
298279
Message: constants.Pending,
299280
ResourceModel: inProgressModel,
300281
CallbackDelaySeconds: defaultCallbackDelaySeconds,
301-
CallbackContext: buildCallbackContext(projectID, workspaceOrInstanceName, processorName, map[string]any{
282+
CallbackContext: buildCallbackContext(projectID, workspaceName, processorName, map[string]any{
302283
"desiredState": desiredState,
303284
}),
304285
}
@@ -308,19 +289,13 @@ func HandleUpdate(req *handler.Request, client *util.MongoDBClient, prevModel *M
308289
}
309290

310291
func HandleDelete(req *handler.Request, client *util.MongoDBClient, model *Model) handler.ProgressEvent {
311-
workspaceOrInstanceName, err := GetWorkspaceOrInstanceName(model)
312-
if err != nil {
313-
return handler.ProgressEvent{
314-
OperationStatus: handler.Failed,
315-
Message: err.Error(),
316-
}
317-
}
292+
workspaceName := util.SafeString(model.WorkspaceName)
318293

319294
ctx := context.Background()
320295
projectID := util.SafeString(model.ProjectId)
321296
processorName := util.SafeString(model.ProcessorName)
322297

323-
apiResp, err := client.AtlasSDK.StreamsApi.DeleteStreamProcessor(ctx, projectID, workspaceOrInstanceName, processorName).Execute()
298+
apiResp, err := client.AtlasSDK.StreamsApi.DeleteStreamProcessor(ctx, projectID, workspaceName, processorName).Execute()
324299
if err != nil {
325300
if apiResp != nil && apiResp.StatusCode == http.StatusNotFound {
326301
return handler.ProgressEvent{
@@ -342,18 +317,12 @@ func HandleDelete(req *handler.Request, client *util.MongoDBClient, model *Model
342317
}
343318

344319
func HandleList(req *handler.Request, client *util.MongoDBClient, model *Model) handler.ProgressEvent {
345-
workspaceOrInstanceName, err := GetWorkspaceOrInstanceName(model)
346-
if err != nil {
347-
return handler.ProgressEvent{
348-
OperationStatus: handler.Failed,
349-
Message: err.Error(),
350-
}
351-
}
320+
workspaceName := util.SafeString(model.WorkspaceName)
352321

353322
ctx := context.Background()
354323
projectID := util.SafeString(model.ProjectId)
355324

356-
accumulatedProcessors, apiResp, err := getAllStreamProcessors(ctx, client.AtlasSDK, projectID, workspaceOrInstanceName)
325+
accumulatedProcessors, apiResp, err := getAllStreamProcessors(ctx, client.AtlasSDK, projectID, workspaceName)
357326
if err != nil {
358327
return handleError(apiResp, constants.LIST, err)
359328
}

cfn-resources/stream-processor/cmd/resource/helpers.go

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -34,18 +34,7 @@ func copyIdentifyingFields(resourceModel, currentModel *Model) {
3434
resourceModel.Profile = currentModel.Profile
3535
resourceModel.ProjectId = currentModel.ProjectId
3636
resourceModel.ProcessorName = currentModel.ProcessorName
37-
38-
switch {
39-
case currentModel.WorkspaceName != nil && *currentModel.WorkspaceName != "":
40-
resourceModel.WorkspaceName = currentModel.WorkspaceName
41-
resourceModel.InstanceName = util.Pointer(*currentModel.WorkspaceName)
42-
case currentModel.InstanceName != nil && *currentModel.InstanceName != "":
43-
resourceModel.InstanceName = currentModel.InstanceName
44-
resourceModel.WorkspaceName = util.Pointer(*currentModel.InstanceName)
45-
default:
46-
resourceModel.WorkspaceName = currentModel.WorkspaceName
47-
resourceModel.InstanceName = currentModel.InstanceName
48-
}
37+
resourceModel.WorkspaceName = currentModel.WorkspaceName
4938
}
5039

5140
func parseTimeout(timeoutStr string) time.Duration {
@@ -95,14 +84,14 @@ func finalizeModel(streamProcessor *admin.StreamsProcessorWithStats, currentMode
9584
}
9685
}
9786

98-
func getAllStreamProcessors(ctx context.Context, atlasClient *admin.APIClient, projectID, workspaceOrInstanceName string) ([]admin.StreamsProcessorWithStats, *http.Response, error) {
87+
func getAllStreamProcessors(ctx context.Context, atlasClient *admin.APIClient, projectID, workspaceName string) ([]admin.StreamsProcessorWithStats, *http.Response, error) {
9988
pageNum := 1
10089
accumulatedProcessors := make([]admin.StreamsProcessorWithStats, 0)
10190

10291
for allRecordsRetrieved := false; !allRecordsRetrieved; {
10392
processorsResp, apiResp, err := atlasClient.StreamsApi.GetStreamProcessorsWithParams(ctx, &admin.GetStreamProcessorsApiParams{
10493
GroupId: projectID,
105-
TenantName: workspaceOrInstanceName,
94+
TenantName: workspaceName,
10695
ItemsPerPage: util.Pointer(constants.DefaultListItemsPerPage),
10796
PageNum: util.Pointer(pageNum),
10897
}).Execute()
@@ -122,10 +111,10 @@ func getAllStreamProcessors(ctx context.Context, atlasClient *admin.APIClient, p
122111
return accumulatedProcessors, nil, nil
123112
}
124113

125-
func getStreamProcessor(ctx context.Context, atlasClient *admin.APIClient, projectID, workspaceOrInstanceName, processorName string) (*admin.StreamsProcessorWithStats, *handler.ProgressEvent) {
114+
func getStreamProcessor(ctx context.Context, atlasClient *admin.APIClient, projectID, workspaceName, processorName string) (*admin.StreamsProcessorWithStats, *handler.ProgressEvent) {
126115
requestParams := &admin.GetStreamProcessorApiParams{
127116
GroupId: projectID,
128-
TenantName: workspaceOrInstanceName,
117+
TenantName: workspaceName,
129118
ProcessorName: processorName,
130119
}
131120

@@ -146,11 +135,11 @@ func getStreamProcessor(ctx context.Context, atlasClient *admin.APIClient, proje
146135
return streamProcessor, nil
147136
}
148137

149-
func startStreamProcessor(ctx context.Context, atlasClient *admin.APIClient, projectID, workspaceOrInstanceName, processorName string) *handler.ProgressEvent {
138+
func startStreamProcessor(ctx context.Context, atlasClient *admin.APIClient, projectID, workspaceName, processorName string) *handler.ProgressEvent {
150139
_, err := atlasClient.StreamsApi.StartStreamProcessorWithParams(ctx,
151140
&admin.StartStreamProcessorApiParams{
152141
GroupId: projectID,
153-
TenantName: workspaceOrInstanceName,
142+
TenantName: workspaceName,
154143
ProcessorName: processorName,
155144
},
156145
).Execute()

cfn-resources/stream-processor/cmd/resource/mappings.go

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,6 @@ import (
2323
"github.com/mongodb/mongodbatlas-cloudformation-resources/util"
2424
)
2525

26-
func GetWorkspaceOrInstanceName(model *Model) (string, error) {
27-
if model.WorkspaceName != nil && *model.WorkspaceName != "" {
28-
return *model.WorkspaceName, nil
29-
}
30-
if model.InstanceName != nil && *model.InstanceName != "" {
31-
return *model.InstanceName, nil
32-
}
33-
return "", fmt.Errorf("either WorkspaceName or InstanceName must be provided")
34-
}
35-
3626
func ConvertPipelineToSdk(pipeline string) ([]any, error) {
3727
var pipelineSliceOfMaps []any
3828
err := json.Unmarshal([]byte(pipeline), &pipelineSliceOfMaps)
@@ -96,14 +86,11 @@ func NewStreamProcessorUpdateReq(model *Model) (*admin.UpdateStreamProcessorApiP
9686
return nil, err
9787
}
9888

99-
workspaceOrInstanceName, err := GetWorkspaceOrInstanceName(model)
100-
if err != nil {
101-
return nil, err
102-
}
89+
workspaceName := util.SafeString(model.WorkspaceName)
10390

10491
streamProcessorAPIParams := &admin.UpdateStreamProcessorApiParams{
10592
GroupId: util.SafeString(model.ProjectId),
106-
TenantName: workspaceOrInstanceName,
93+
TenantName: workspaceName,
10794
ProcessorName: util.SafeString(model.ProcessorName),
10895
StreamsModifyStreamProcessor: &admin.StreamsModifyStreamProcessor{
10996
Name: model.ProcessorName,

0 commit comments

Comments
 (0)