diff --git a/go.sum b/go.sum index 01239e039..befdc3799 100644 --- a/go.sum +++ b/go.sum @@ -146,8 +146,8 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk github.com/cloudwego/base64x v0.1.6 h1:t11wG9AECkCDk5fMSoxmufanudBtJ+/HemLstXDLI2M= github.com/cloudwego/base64x v0.1.6/go.mod h1:OFcloc187FXDaYHvrNIjxSe8ncn0OOM8gEHfghB2IPU= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= -github.com/cncf/xds/go v0.0.0-20251210132809-ee656c7534f5 h1:6xNmx7iTtyBRev0+D/Tv1FZd4SCg8axKApyNyRsAt/w= -github.com/cncf/xds/go v0.0.0-20251210132809-ee656c7534f5/go.mod h1:KdCmV+x/BuvyMxRnYBlmVaq4OLiKW6iRQfvC62cvdkI= +github.com/cncf/xds/go v0.0.0-20260202195803-dba9d589def2 h1:aBangftG7EVZoUb69Os8IaYg++6uMOdKK83QtkkvJik= +github.com/cncf/xds/go v0.0.0-20260202195803-dba9d589def2/go.mod h1:qwXFYgsP6T7XnJtbKlf1HP8AjxZZyzxMmc+Lq5GjlU4= github.com/containerd/errdefs v1.0.0 h1:tg5yIfIlQIrxYtu9ajqY42W3lpS19XqdxRQeEwYG8PI= github.com/containerd/errdefs v1.0.0/go.mod h1:+YBYIdtsnF4Iw6nWZhJcqGSg/dwvV7tyJ/kCkyJ2k+M= github.com/containerd/errdefs/pkg v0.3.0 h1:9IKJ06FvyNlexW690DXuQNx2KA2cUJXx151Xdx3ZPPE= @@ -520,8 +520,6 @@ github.com/nginx/nginx-plus-go-client/v3 v3.0.1 h1:SU8MoRQVSa1aXqNUI3fc+OA9GM30a github.com/nginx/nginx-plus-go-client/v3 v3.0.1/go.mod h1:PjlGB6drb5RCWnUp1XDTlzKFPRI2a3ePg2kNCb1AN94= github.com/nginx/nginx-prometheus-exporter v1.5.1 h1:44jvKTJ4S0SydKbXw7H9V5VjcGdM3tdOPDd4em86GZQ= github.com/nginx/nginx-prometheus-exporter v1.5.1/go.mod h1:5b+M6fx4v6UQsihjrN/UeBBR2DabEsfgTjhqG/Xt12w= -github.com/nginxinc/nginx-go-crossplane v0.4.88 h1:A/eeZZmiEcFEWtnfWXrM2Z4F38swWu6ARuaERgt2OtQ= -github.com/nginxinc/nginx-go-crossplane v0.4.88/go.mod h1:YW/lk3F6/HUSQyfB6bFPnL9TkLcyfRXWfBNgirZmFfI= github.com/nginxinc/nginx-go-crossplane v0.4.89 h1:ISWwck/oFJkOVYJOxJlf1dBXGsFjZLbMIECuZC7ZoG4= github.com/nginxinc/nginx-go-crossplane v0.4.89/go.mod h1:YW/lk3F6/HUSQyfB6bFPnL9TkLcyfRXWfBNgirZmFfI= github.com/nxadm/tail v1.4.11 h1:8feyoE3OzPrcshW5/MJ4sGESc5cqmGkGCWlco4l0bqY= @@ -1216,8 +1214,6 @@ google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyac google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk= -google.golang.org/grpc v1.80.0 h1:Xr6m2WmWZLETvUNvIUmeD5OAagMw3FiKmMlTdViWsHM= -google.golang.org/grpc v1.80.0/go.mod h1:ho/dLnxwi3EDJA4Zghp7k2Ec1+c2jqup0bFkw07bwF4= google.golang.org/grpc v1.81.1 h1:VnnIIZ88UzOOKLukQi+ImGz8O1Wdp8nAGGnvOfEIWQQ= google.golang.org/grpc v1.81.1/go.mod h1:xGH9GfzOyMTGIOXBJmXt+BX/V0kcdQbdcuwQ/zNw42I= google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= diff --git a/internal/command/command_plugin.go b/internal/command/command_plugin.go index ad8501bfb..02b0a83a4 100644 --- a/internal/command/command_plugin.go +++ b/internal/command/command_plugin.go @@ -50,6 +50,7 @@ type ( commandServerType model.ServerType subscribeMutex sync.Mutex agentConfigMutex sync.RWMutex + subscribeWg sync.WaitGroup } ) @@ -195,7 +196,11 @@ func (cp *CommandPlugin) createConnection(ctx context.Context, resource *mpi.Res subscribeCtx, cp.subscribeCancel = context.WithCancel(ctx) cp.subscribeMutex.Unlock() - go cp.commandService.Subscribe(subscribeCtx) + cp.subscribeWg.Add(1) + go func() { + defer cp.subscribeWg.Done() + cp.commandService.Subscribe(subscribeCtx) + }() cp.messagePipe.Process(ctx, &bus.Message{ Topic: bus.ConnectionCreatedTopic, @@ -294,18 +299,14 @@ func (cp *CommandPlugin) processConnectionReset(ctx context.Context, msg *bus.Me cp.subscribeMutex.Lock() defer cp.subscribeMutex.Unlock() - // Update the command service with the new client first - err := cp.commandService.UpdateClient(ctxWithMetadata, newConnection.CommandServiceClient()) - if err != nil { - slog.ErrorContext(ctx, "Failed to reset connection", "error", err) - return - } - - // Once the command service is updated, we close the old connection - slog.InfoContext(ctx, "Canceling old subscribe stream after connection reset") + // Cancel the old subscribe stream and close the old connection first, so the server removes + // the connection from its tracker before we call CreateConnection + // with the same UUID in UpdateClient. Without this ordering, the server would track the UUID + // from CreateConnection and then immediately remove it when the old stream exits. + slog.InfoContext(ctx, "Canceling old subscribe stream before connection reset") if cp.subscribeCancel != nil { cp.subscribeCancel() - slog.InfoContext(ctxWithMetadata, "Successfully canceled old subscribe stream after connection reset") + slog.InfoContext(ctxWithMetadata, "Successfully canceled old subscribe stream before connection reset") } connectionErr := cp.conn.Close(ctx) @@ -314,9 +315,24 @@ func (cp *CommandPlugin) processConnectionReset(ctx context.Context, msg *bus.Me } cp.conn = newConnection + + // Wait for the old Subscribe goroutine to fully exit before creating a new one with the new connection. + cp.subscribeWg.Wait() + + // Update the command service with the new client after the old stream has been torn down + err := cp.commandService.UpdateClient(ctxWithMetadata, newConnection.CommandServiceClient()) + if err != nil { + slog.ErrorContext(ctx, "Failed to reset connection", "error", err) + return + } + slog.InfoContext(ctxWithMetadata, "Starting new subscribe stream after connection reset") subscribeCtx, cp.subscribeCancel = context.WithCancel(ctxWithMetadata) - go cp.commandService.Subscribe(subscribeCtx) + cp.subscribeWg.Add(1) + go func() { + defer cp.subscribeWg.Done() + cp.commandService.Subscribe(subscribeCtx) + }() slog.InfoContext(ctx, "Command plugin connection reset finished successfully") } diff --git a/internal/file/file_service_operator.go b/internal/file/file_service_operator.go index cad5d68a4..6cf5f086f 100644 --- a/internal/file/file_service_operator.go +++ b/internal/file/file_service_operator.go @@ -33,10 +33,11 @@ import ( // FileServiceOperator handles requests to the grpc file service type FileServiceOperator struct { - fileServiceClient mpi.FileServiceClient - agentConfig *config.Config - fileOperator fileOperator - isConnected *atomic.Bool + fileServiceClient mpi.FileServiceClient + agentConfig *config.Config + fileOperator fileOperator + isConnected *atomic.Bool + fileServiceClientMu sync.RWMutex } var _ fileServiceOperatorInterface = (*FileServiceOperator)(nil) @@ -56,7 +57,9 @@ func NewFileServiceOperator(agentConfig *config.Config, fileServiceClient mpi.Fi } func (fso *FileServiceOperator) UpdateClient(ctx context.Context, fileServiceClient mpi.FileServiceClient) { + fso.fileServiceClientMu.Lock() fso.fileServiceClient = fileServiceClient + fso.fileServiceClientMu.Unlock() slog.DebugContext(ctx, "File service operator updated client") } @@ -82,7 +85,7 @@ func (fso *FileServiceOperator) File( grpcCtx, cancel := context.WithTimeout(ctx, fso.agentConfig.Client.FileDownloadTimeout) defer cancel() - return fso.fileServiceClient.GetFile(grpcCtx, &mpi.GetFileRequest{ + response, err := fso.client().GetFile(grpcCtx, &mpi.GetFileRequest{ MessageMeta: &mpi.MessageMeta{ MessageId: id.GenerateMessageID(), CorrelationId: logger.CorrelationID(ctx), @@ -90,6 +93,16 @@ func (fso *FileServiceOperator) File( }, FileMeta: file.GetFileMeta(), }) + + validatedError := internalgrpc.ValidateGrpcError(err) + + if validatedError != nil { + slog.ErrorContext(grpcCtx, "Failed to get file", "error", validatedError) + + return nil, validatedError + } + + return response, nil } getFileResp, getFileErr := backoff.RetryWithData( @@ -172,7 +185,8 @@ func (fso *FileServiceOperator) UpdateOverview( } sendUpdateOverview := func() (*mpi.UpdateOverviewResponse, error) { - if fso.fileServiceClient == nil { + client := fso.client() + if client == nil { return nil, errors.New("file service client is not initialized") } @@ -188,7 +202,7 @@ func (fso *FileServiceOperator) UpdateOverview( grpcCtx, cancel := context.WithTimeout(ctx, fso.agentConfig.Client.Grpc.ResponseTimeout) defer cancel() - response, updateError := fso.fileServiceClient.UpdateOverview(grpcCtx, request) + response, updateError := client.UpdateOverview(grpcCtx, request) validatedError := internalgrpc.ValidateGrpcError(updateError) @@ -234,7 +248,7 @@ func (fso *FileServiceOperator) ChunkedFile( grpcCtx, cancel := context.WithTimeout(ctx, fso.agentConfig.Client.FileDownloadTimeout) defer cancel() - stream, err := fso.fileServiceClient.GetFileStream(grpcCtx, &mpi.GetFileRequest{ + stream, err := fso.client().GetFileStream(grpcCtx, &mpi.GetFileRequest{ MessageMeta: &mpi.MessageMeta{ MessageId: id.GenerateMessageID(), CorrelationId: logger.CorrelationID(ctx), @@ -329,6 +343,14 @@ func (fso *FileServiceOperator) ValidateFileHash(ctx context.Context, filePath, return nil } +//nolint:ireturn // client needs to return an interface +func (fso *FileServiceOperator) client() mpi.FileServiceClient { + fso.fileServiceClientMu.RLock() + defer fso.fileServiceClientMu.RUnlock() + + return fso.fileServiceClient +} + func (fso *FileServiceOperator) updateFiles( ctx context.Context, delta map[string]*mpi.File, @@ -378,9 +400,10 @@ func (fso *FileServiceOperator) sendUpdateFileRequest( defer backoffCancel() sendUpdateFile := func() (*mpi.UpdateFileResponse, error) { + client := fso.client() slog.DebugContext(ctx, "Sending update file request", "request_file", request.GetFile(), "request_message_meta", request.GetMessageMeta()) - if fso.fileServiceClient == nil { + if client == nil { return nil, errors.New("file service client is not initialized") } @@ -391,7 +414,7 @@ func (fso *FileServiceOperator) sendUpdateFileRequest( grpcCtx, cancel := context.WithTimeout(ctx, fso.agentConfig.Client.FileDownloadTimeout) defer cancel() - response, updateError := fso.fileServiceClient.UpdateFile(grpcCtx, request) + response, updateError := client.UpdateFile(grpcCtx, request) validatedError := internalgrpc.ValidateGrpcError(updateError) @@ -429,7 +452,7 @@ func (fso *FileServiceOperator) sendUpdateFileStream( grpcCtx, cancel := context.WithTimeout(ctx, fso.agentConfig.Client.FileDownloadTimeout) defer cancel() - updateFileStreamClient, err := fso.fileServiceClient.UpdateFileStream(grpcCtx) + updateFileStreamClient, err := fso.client().UpdateFileStream(grpcCtx) if err != nil { return err } @@ -469,7 +492,7 @@ func (fso *FileServiceOperator) sendUpdateFileStreamHeader( sendUpdateFileHeader := func() error { slog.DebugContext(ctx, "Sending update file stream header", "header", header) - if fso.fileServiceClient == nil { + if fso.client() == nil { return errors.New("file service client is not initialized") } @@ -562,7 +585,7 @@ func (fso *FileServiceOperator) sendFileUpdateStreamChunk( sendUpdateFileChunk := func() error { slog.DebugContext(ctx, "Sending update file stream chunk", "chunk_id", chunk.Content.GetChunkId()) - if fso.fileServiceClient == nil { + if fso.client() == nil { return errors.New("file service client is not initialized") } diff --git a/internal/grpc/grpc.go b/internal/grpc/grpc.go index e25394776..c97edebaa 100644 --- a/internal/grpc/grpc.go +++ b/internal/grpc/grpc.go @@ -346,7 +346,9 @@ func ValidateGrpcError(err error) error { if err != nil { if statusError, ok := status.FromError(err); ok { if statusError.Code() == codes.InvalidArgument || statusError.Code() == codes.Unimplemented || - statusError.Code() == codes.Canceled { + statusError.Code() == codes.Canceled || statusError.Code() == codes.NotFound || + statusError.Code() == codes.AlreadyExists || statusError.Code() == codes.PermissionDenied || + statusError.Code() == codes.Unauthenticated { return backoff.Permanent(err) } }