Skip to content

Commit 64d662a

Browse files
sivaram-mongodboutcomes-winter-rakhulsprakash
authored andcommitted
refactor: extract CRUD handlers and make internal functions private in stream processor
1 parent f8b0e65 commit 64d662a

4 files changed

Lines changed: 471 additions & 443 deletions

File tree

cfn-resources/stream-processor/cmd/resource/callbacks.go

Lines changed: 39 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
"github.com/aws-cloudformation/cloudformation-cli-go-plugin/cfn/handler"
2525

26+
"github.com/mongodb/mongodbatlas-cloudformation-resources/util"
2627
"github.com/mongodb/mongodbatlas-cloudformation-resources/util/constants"
2728
"github.com/mongodb/mongodbatlas-cloudformation-resources/util/logger"
2829
)
@@ -38,12 +39,7 @@ type CallbackData struct {
3839
DeleteOnCreateTimeout bool
3940
}
4041

41-
func IsCallback(req *handler.Request) bool {
42-
_, found := req.CallbackContext["callbackStreamProcessor"]
43-
return found
44-
}
45-
46-
func GetCallbackData(req handler.Request) *CallbackData {
42+
func getCallbackData(req handler.Request) *CallbackData {
4743
ctx := &CallbackData{}
4844

4945
if val, ok := req.CallbackContext["projectID"].(string); ok {
@@ -74,7 +70,7 @@ func GetCallbackData(req handler.Request) *CallbackData {
7470
return ctx
7571
}
7672

77-
func ValidateCallbackData(ctx *CallbackData) *handler.ProgressEvent {
73+
func validateCallbackData(ctx *CallbackData) *handler.ProgressEvent {
7874
if ctx.ProjectID == "" || ctx.WorkspaceOrInstanceName == "" || ctx.ProcessorName == "" {
7975
return &handler.ProgressEvent{
8076
OperationStatus: handler.Failed,
@@ -84,7 +80,7 @@ func ValidateCallbackData(ctx *CallbackData) *handler.ProgressEvent {
8480
return nil
8581
}
8682

87-
func BuildCallbackContext(projectID, workspaceOrInstanceName, processorName string, additionalFields map[string]any) map[string]any {
83+
func buildCallbackContext(projectID, workspaceOrInstanceName, processorName string, additionalFields map[string]any) map[string]any {
8884
ctx := map[string]any{
8985
"callbackStreamProcessor": true,
9086
"projectID": projectID,
@@ -97,27 +93,27 @@ func BuildCallbackContext(projectID, workspaceOrInstanceName, processorName stri
9793
return ctx
9894
}
9995

100-
func cleanupOnCreateTimeout(ctx context.Context, atlasClient *admin.APIClient, callbackCtx *CallbackData) error {
96+
func cleanupOnCreateTimeout(ctx context.Context, client *util.MongoDBClient, callbackCtx *CallbackData) error {
10197
if !callbackCtx.DeleteOnCreateTimeout {
10298
return nil
10399
}
104100

105-
_, err := atlasClient.StreamsApi.DeleteStreamProcessor(ctx, callbackCtx.ProjectID, callbackCtx.WorkspaceOrInstanceName, callbackCtx.ProcessorName).Execute()
101+
_, err := client.AtlasSDK.StreamsApi.DeleteStreamProcessor(ctx, callbackCtx.ProjectID, callbackCtx.WorkspaceOrInstanceName, callbackCtx.ProcessorName).Execute()
106102
if err != nil {
107103
_, _ = logger.Warnf("Cleanup delete failed: %v", err)
108104
}
109105
return nil
110106
}
111107

112-
func HandleCreateCallback(ctx context.Context, atlasClient *admin.APIClient, currentModel *Model, callbackCtx *CallbackData) (handler.ProgressEvent, error) {
108+
func handleCreateCallback(ctx context.Context, client *util.MongoDBClient, currentModel *Model, callbackCtx *CallbackData) handler.ProgressEvent {
113109
needsStarting := callbackCtx.NeedsStarting
114110

115-
if IsTimeoutExceeded(callbackCtx.StartTime, callbackCtx.TimeoutDuration) {
116-
if err := cleanupOnCreateTimeout(context.Background(), atlasClient, callbackCtx); err != nil {
111+
if isTimeoutExceeded(callbackCtx.StartTime, callbackCtx.TimeoutDuration) {
112+
if err := cleanupOnCreateTimeout(context.Background(), client, callbackCtx); err != nil {
117113
return handler.ProgressEvent{
118114
OperationStatus: handler.Failed,
119115
Message: fmt.Sprintf("Timeout reached and cleanup failed: %s", err.Error()),
120-
}, nil
116+
}
121117
}
122118
cleanupMsg := "Timeout reached when waiting for stream processor creation"
123119
if callbackCtx.DeleteOnCreateTimeout {
@@ -128,17 +124,17 @@ func HandleCreateCallback(ctx context.Context, atlasClient *admin.APIClient, cur
128124
return handler.ProgressEvent{
129125
OperationStatus: handler.Failed,
130126
Message: cleanupMsg,
131-
}, nil
127+
}
132128
}
133129

134-
streamProcessor, peErr := getStreamProcessor(ctx, atlasClient, callbackCtx.ProjectID, callbackCtx.WorkspaceOrInstanceName, callbackCtx.ProcessorName)
130+
streamProcessor, peErr := getStreamProcessor(ctx, client.AtlasSDK, callbackCtx.ProjectID, callbackCtx.WorkspaceOrInstanceName, callbackCtx.ProcessorName)
135131
if peErr != nil {
136-
return *peErr, nil
132+
return *peErr
137133
}
138134

139135
currentState := streamProcessor.GetState()
140136

141-
callbackContext := BuildCallbackContext(callbackCtx.ProjectID, callbackCtx.WorkspaceOrInstanceName, callbackCtx.ProcessorName, map[string]any{
137+
callbackContext := buildCallbackContext(callbackCtx.ProjectID, callbackCtx.WorkspaceOrInstanceName, callbackCtx.ProcessorName, map[string]any{
142138
"needsStarting": callbackCtx.NeedsStarting,
143139
"startTime": callbackCtx.StartTime,
144140
"timeoutDuration": callbackCtx.TimeoutDuration,
@@ -148,37 +144,37 @@ func HandleCreateCallback(ctx context.Context, atlasClient *admin.APIClient, cur
148144
switch currentState {
149145
case CreatedState:
150146
if needsStarting {
151-
if peErr := startStreamProcessor(ctx, atlasClient, callbackCtx.ProjectID, callbackCtx.WorkspaceOrInstanceName, callbackCtx.ProcessorName); peErr != nil {
152-
return *peErr, nil
147+
if peErr := startStreamProcessor(ctx, client.AtlasSDK, callbackCtx.ProjectID, callbackCtx.WorkspaceOrInstanceName, callbackCtx.ProcessorName); peErr != nil {
148+
return *peErr
153149
}
154-
return createInProgressEvent("Starting stream processor", currentModel, callbackContext), nil
150+
return createInProgressEvent("Starting stream processor", currentModel, callbackContext)
155151
}
156-
return FinalizeModel(streamProcessor, currentModel, "Create Completed")
152+
return finalizeModel(streamProcessor, currentModel, "Create Completed")
157153

158154
case StartedState:
159-
return FinalizeModel(streamProcessor, currentModel, "Create Completed")
155+
return finalizeModel(streamProcessor, currentModel, "Create Completed")
160156

161157
case InitiatingState, CreatingState:
162-
return createInProgressEvent(fmt.Sprintf("Creating stream processor (current state: %s)", currentState), currentModel, callbackContext), nil
158+
return createInProgressEvent(fmt.Sprintf("Creating stream processor (current state: %s)", currentState), currentModel, callbackContext)
163159

164160
case FailedState:
165161
return handler.ProgressEvent{
166162
OperationStatus: handler.Failed,
167163
Message: "Stream processor entered FAILED state",
168-
}, nil
164+
}
169165

170166
default:
171167
return handler.ProgressEvent{
172168
OperationStatus: handler.Failed,
173169
Message: fmt.Sprintf("Unexpected state during creation: %s", currentState),
174-
}, nil
170+
}
175171
}
176172
}
177173

178-
func HandleUpdateCallback(ctx context.Context, atlasClient *admin.APIClient, currentModel *Model, callbackCtx *CallbackData) (handler.ProgressEvent, error) {
179-
streamProcessor, peErr := getStreamProcessor(ctx, atlasClient, callbackCtx.ProjectID, callbackCtx.WorkspaceOrInstanceName, callbackCtx.ProcessorName)
174+
func handleUpdateCallback(ctx context.Context, client *util.MongoDBClient, currentModel *Model, callbackCtx *CallbackData) handler.ProgressEvent {
175+
streamProcessor, peErr := getStreamProcessor(ctx, client.AtlasSDK, callbackCtx.ProjectID, callbackCtx.WorkspaceOrInstanceName, callbackCtx.ProcessorName)
180176
if peErr != nil {
181-
return *peErr, nil
177+
return *peErr
182178
}
183179

184180
desiredState := callbackCtx.DesiredState
@@ -195,7 +191,7 @@ func HandleUpdateCallback(ctx context.Context, atlasClient *admin.APIClient, cur
195191

196192
currentState := streamProcessor.GetState()
197193

198-
callbackContext := BuildCallbackContext(callbackCtx.ProjectID, callbackCtx.WorkspaceOrInstanceName, callbackCtx.ProcessorName, map[string]any{
194+
callbackContext := buildCallbackContext(callbackCtx.ProjectID, callbackCtx.WorkspaceOrInstanceName, callbackCtx.ProcessorName, map[string]any{
199195
"desiredState": desiredState,
200196
})
201197

@@ -206,29 +202,29 @@ func HandleUpdateCallback(ctx context.Context, atlasClient *admin.APIClient, cur
206202
return handler.ProgressEvent{
207203
OperationStatus: handler.Failed,
208204
Message: fmt.Sprintf("Error creating update request: %s", err.Error()),
209-
}, nil
205+
}
210206
}
211207

212-
streamProcessorResp, apiResp, err := atlasClient.StreamsApi.UpdateStreamProcessorWithParams(ctx, modifyAPIRequestParams).Execute()
208+
streamProcessorResp, apiResp, err := client.AtlasSDK.StreamsApi.UpdateStreamProcessorWithParams(ctx, modifyAPIRequestParams).Execute()
213209
if err != nil {
214-
return HandleError(apiResp, constants.UPDATE, err)
210+
return handleError(apiResp, constants.UPDATE, err)
215211
}
216212

217213
if desiredState == StartedState {
218-
if peErr := startStreamProcessor(ctx, atlasClient, callbackCtx.ProjectID, callbackCtx.WorkspaceOrInstanceName, callbackCtx.ProcessorName); peErr != nil {
219-
return *peErr, nil
214+
if peErr := startStreamProcessor(ctx, client.AtlasSDK, callbackCtx.ProjectID, callbackCtx.WorkspaceOrInstanceName, callbackCtx.ProcessorName); peErr != nil {
215+
return *peErr
220216
}
221-
return createInProgressEvent("Starting stream processor", currentModel, callbackContext), nil
217+
return createInProgressEvent("Starting stream processor", currentModel, callbackContext)
222218
}
223219

224-
return FinalizeModel(streamProcessorResp, currentModel, "Update Completed")
220+
return finalizeModel(streamProcessorResp, currentModel, "Update Completed")
225221

226222
case StartedState:
227223
if desiredState == StartedState {
228-
return FinalizeModel(streamProcessor, currentModel, "Update Completed")
224+
return finalizeModel(streamProcessor, currentModel, "Update Completed")
229225
}
230226

231-
_, err := atlasClient.StreamsApi.StopStreamProcessorWithParams(ctx,
227+
_, err := client.AtlasSDK.StreamsApi.StopStreamProcessorWithParams(ctx,
232228
&admin.StopStreamProcessorApiParams{
233229
GroupId: callbackCtx.ProjectID,
234230
TenantName: callbackCtx.WorkspaceOrInstanceName,
@@ -239,17 +235,17 @@ func HandleUpdateCallback(ctx context.Context, atlasClient *admin.APIClient, cur
239235
return handler.ProgressEvent{
240236
OperationStatus: handler.Failed,
241237
Message: fmt.Sprintf("Error stopping stream processor: %s", err.Error()),
242-
}, nil
238+
}
243239
}
244-
return createInProgressEvent("Stopping stream processor", currentModel, callbackContext), nil
240+
return createInProgressEvent("Stopping stream processor", currentModel, callbackContext)
245241

246242
case FailedState:
247243
return handler.ProgressEvent{
248244
OperationStatus: handler.Failed,
249245
Message: "Stream processor entered FAILED state",
250-
}, nil
246+
}
251247

252248
default:
253-
return createInProgressEvent(fmt.Sprintf("Updating stream processor (current state: %s)", currentState), currentModel, callbackContext), nil
249+
return createInProgressEvent(fmt.Sprintf("Updating stream processor (current state: %s)", currentState), currentModel, callbackContext)
254250
}
255251
}

cfn-resources/stream-processor/cmd/resource/helpers.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import (
3030
"github.com/mongodb/mongodbatlas-cloudformation-resources/util/progressevent"
3131
)
3232

33-
func CopyIdentifyingFields(resourceModel, currentModel *Model) {
33+
func copyIdentifyingFields(resourceModel, currentModel *Model) {
3434
resourceModel.Profile = currentModel.Profile
3535
resourceModel.ProjectId = currentModel.ProjectId
3636
resourceModel.ProcessorName = currentModel.ProcessorName
@@ -48,7 +48,7 @@ func CopyIdentifyingFields(resourceModel, currentModel *Model) {
4848
}
4949
}
5050

51-
func ParseTimeout(timeoutStr string) time.Duration {
51+
func parseTimeout(timeoutStr string) time.Duration {
5252
if timeoutStr == "" {
5353
return DefaultCreateTimeout
5454
}
@@ -60,7 +60,7 @@ func ParseTimeout(timeoutStr string) time.Duration {
6060
return duration
6161
}
6262

63-
func IsTimeoutExceeded(startTimeStr, timeoutDurationStr string) bool {
63+
func isTimeoutExceeded(startTimeStr, timeoutDurationStr string) bool {
6464
if startTimeStr == "" || timeoutDurationStr == "" {
6565
return false
6666
}
@@ -71,28 +71,28 @@ func IsTimeoutExceeded(startTimeStr, timeoutDurationStr string) bool {
7171
return false
7272
}
7373

74-
timeoutDuration := ParseTimeout(timeoutDurationStr)
74+
timeoutDuration := parseTimeout(timeoutDurationStr)
7575
elapsed := time.Since(startTime)
7676

7777
return elapsed >= timeoutDuration
7878
}
7979

80-
func FinalizeModel(streamProcessor *admin.StreamsProcessorWithStats, currentModel *Model, message string) (handler.ProgressEvent, error) {
80+
func finalizeModel(streamProcessor *admin.StreamsProcessorWithStats, currentModel *Model, message string) handler.ProgressEvent {
8181
resourceModel, err := GetStreamProcessorModel(streamProcessor, currentModel)
8282
if err != nil {
8383
return handler.ProgressEvent{
8484
OperationStatus: handler.Failed,
8585
Message: fmt.Sprintf("Error converting stream processor model: %s", err.Error()),
86-
}, nil
86+
}
8787
}
8888

89-
CopyIdentifyingFields(resourceModel, currentModel)
89+
copyIdentifyingFields(resourceModel, currentModel)
9090

9191
return handler.ProgressEvent{
9292
OperationStatus: handler.Success,
9393
Message: message,
9494
ResourceModel: resourceModel,
95-
}, nil
95+
}
9696
}
9797

9898
func getAllStreamProcessors(ctx context.Context, atlasClient *admin.APIClient, projectID, workspaceOrInstanceName string) ([]admin.StreamsProcessorWithStats, *http.Response, error) {
@@ -169,7 +169,7 @@ func createInProgressEvent(message string, currentModel *Model, callbackContext
169169
*inProgressModel = *currentModel
170170
inProgressModel.DeleteOnCreateTimeout = nil
171171
}
172-
CopyIdentifyingFields(inProgressModel, currentModel)
172+
copyIdentifyingFields(inProgressModel, currentModel)
173173

174174
return handler.ProgressEvent{
175175
OperationStatus: handler.InProgress,
@@ -180,7 +180,7 @@ func createInProgressEvent(message string, currentModel *Model, callbackContext
180180
}
181181
}
182182

183-
func ValidateUpdateStateTransition(currentState, desiredState string) (errMsg string, isValidTransition bool) {
183+
func validateUpdateStateTransition(currentState, desiredState string) (errMsg string, isValidTransition bool) {
184184
if currentState == desiredState {
185185
return "", true
186186
}
@@ -196,16 +196,16 @@ func ValidateUpdateStateTransition(currentState, desiredState string) (errMsg st
196196
return "", true
197197
}
198198

199-
func HandleError(response *http.Response, method constants.CfnFunctions, err error) (handler.ProgressEvent, error) {
199+
func handleError(response *http.Response, method constants.CfnFunctions, err error) handler.ProgressEvent {
200200
errMsg := fmt.Sprintf("%s error:%s", method, err.Error())
201201

202202
if response != nil && response.StatusCode == http.StatusConflict {
203203
return handler.ProgressEvent{
204204
OperationStatus: handler.Failed,
205205
Message: errMsg,
206206
HandlerErrorCode: "AlreadyExists",
207-
}, nil
207+
}
208208
}
209209

210-
return progressevent.GetFailedEventByResponse(errMsg, response), nil
210+
return progressevent.GetFailedEventByResponse(errMsg, response)
211211
}

0 commit comments

Comments
 (0)