From c925b4f594aa9b10d43eef1f9d89c2cc1e58d79d Mon Sep 17 00:00:00 2001 From: Sofia Simdianova Date: Tue, 2 Jun 2026 12:53:00 +0200 Subject: [PATCH 1/3] feat: add OtelExporterEndpoint crd --- .../v1beta1/otelexporterendpoint_types.go | 97 ++++ .../v1beta1/zz_generated.deepcopy.go | 152 ++++++ cmd/main.go | 3 + .../formance.com_otelexporterendpoints.yaml | 216 ++++++++ config/crd/kustomization.yaml | 1 + config/rbac/role.yaml | 3 + ...ance.com_v1beta1_otelexporterendpoint.yaml | 46 ++ .../02-Custom Resource Definitions.md | 90 ++++ .../settings.catalog.json | 21 +- ...on_otelexporterendpoints.formance.com.yaml | 219 ++++++++ helm/operator/templates/deployment.yaml | 3 + ..._v1_clusterrole_formance-manager-role.yaml | 3 + .../otelexporterendpoint-support.yaml | 37 ++ helm/operator/values.yaml | 22 +- internal/core/platform.go | 2 + internal/resources/all.go | 1 + .../otelexporterendpoints/collector_config.go | 304 +++++++++++ .../collector_config_test.go | 293 +++++++++++ .../resources/otelexporterendpoints/init.go | 470 ++++++++++++++++++ internal/resources/settings/opentelemetry.go | 97 +++- .../otelexporterendpoint_controller_test.go | 254 ++++++++++ 21 files changed, 2323 insertions(+), 11 deletions(-) create mode 100644 api/formance.com/v1beta1/otelexporterendpoint_types.go create mode 100644 config/crd/bases/formance.com_otelexporterendpoints.yaml create mode 100644 config/samples/formance.com_v1beta1_otelexporterendpoint.yaml create mode 100644 helm/crds/templates/crds/apiextensions.k8s.io_v1_customresourcedefinition_otelexporterendpoints.formance.com.yaml create mode 100644 helm/operator/templates/otelexporterendpoint-support.yaml create mode 100644 internal/resources/otelexporterendpoints/collector_config.go create mode 100644 internal/resources/otelexporterendpoints/collector_config_test.go create mode 100644 internal/resources/otelexporterendpoints/init.go create mode 100644 internal/tests/otelexporterendpoint_controller_test.go diff --git a/api/formance.com/v1beta1/otelexporterendpoint_types.go b/api/formance.com/v1beta1/otelexporterendpoint_types.go new file mode 100644 index 000000000..da527102a --- /dev/null +++ b/api/formance.com/v1beta1/otelexporterendpoint_types.go @@ -0,0 +1,97 @@ +/* +Copyright 2023. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1beta1 + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +type OtelAuthConfig struct { + // +kubebuilder:validation:Enum=bearer + Type string `json:"type"` + FromSecret string `json:"fromSecret"` +} + +type OtelSignalConfig struct { + Endpoint string `json:"endpoint"` + + // +optional + Auth *OtelAuthConfig `json:"auth,omitempty"` +} + +type OtelExporterEndpointSpec struct { + // +optional + StackSelector *metav1.LabelSelector `json:"stackSelector,omitempty"` + + // +optional + Traces *OtelSignalConfig `json:"traces,omitempty"` + + // +optional + Metrics *OtelSignalConfig `json:"metrics,omitempty"` + + // +optional + ResourceAttributes map[string]string `json:"resourceAttributes,omitempty"` +} + +type OtelExporterEndpointStatus struct { + Status `json:",inline"` + // +optional + Stacks []string `json:"stacks,omitempty"` +} + +// OtelExporterEndpoint configures an OpenTelemetry collector proxy for exporting traces and metrics. +// Multiple OtelExporterEndpoints can target the same stacks — the collector fans out to all matching destinations. +// +// +kubebuilder:object:root=true +// +kubebuilder:subresource:status +// +kubebuilder:resource:scope=Cluster +// +kubebuilder:printcolumn:name="Ready",type=string,JSONPath=".status.ready",description="Is ready" +// +kubebuilder:printcolumn:name="Info",type=string,JSONPath=".status.info",description="Info" +type OtelExporterEndpoint struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec OtelExporterEndpointSpec `json:"spec,omitempty"` + Status OtelExporterEndpointStatus `json:"status,omitempty"` +} + +func (in *OtelExporterEndpoint) IsReady() bool { + return in.Status.Ready +} + +func (in *OtelExporterEndpoint) SetReady(b bool) { + in.Status.Ready = b +} + +func (in *OtelExporterEndpoint) SetError(s string) { + in.Status.Info = s +} + +func (in *OtelExporterEndpoint) GetConditions() *Conditions { + return &in.Status.Conditions +} + +// +kubebuilder:object:root=true +type OtelExporterEndpointList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []OtelExporterEndpoint `json:"items"` +} + +func init() { + SchemeBuilder.Register(&OtelExporterEndpoint{}, &OtelExporterEndpointList{}) +} diff --git a/api/formance.com/v1beta1/zz_generated.deepcopy.go b/api/formance.com/v1beta1/zz_generated.deepcopy.go index 552862474..c69a69356 100644 --- a/api/formance.com/v1beta1/zz_generated.deepcopy.go +++ b/api/formance.com/v1beta1/zz_generated.deepcopy.go @@ -1516,6 +1516,158 @@ func (in *OrchestrationStatus) DeepCopy() *OrchestrationStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *OtelAuthConfig) DeepCopyInto(out *OtelAuthConfig) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OtelAuthConfig. +func (in *OtelAuthConfig) DeepCopy() *OtelAuthConfig { + if in == nil { + return nil + } + out := new(OtelAuthConfig) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *OtelExporterEndpoint) DeepCopyInto(out *OtelExporterEndpoint) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + in.Status.DeepCopyInto(&out.Status) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OtelExporterEndpoint. +func (in *OtelExporterEndpoint) DeepCopy() *OtelExporterEndpoint { + if in == nil { + return nil + } + out := new(OtelExporterEndpoint) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *OtelExporterEndpoint) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *OtelExporterEndpointList) DeepCopyInto(out *OtelExporterEndpointList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]OtelExporterEndpoint, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OtelExporterEndpointList. +func (in *OtelExporterEndpointList) DeepCopy() *OtelExporterEndpointList { + if in == nil { + return nil + } + out := new(OtelExporterEndpointList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *OtelExporterEndpointList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *OtelExporterEndpointSpec) DeepCopyInto(out *OtelExporterEndpointSpec) { + *out = *in + if in.StackSelector != nil { + in, out := &in.StackSelector, &out.StackSelector + *out = new(metav1.LabelSelector) + (*in).DeepCopyInto(*out) + } + if in.Traces != nil { + in, out := &in.Traces, &out.Traces + *out = new(OtelSignalConfig) + (*in).DeepCopyInto(*out) + } + if in.Metrics != nil { + in, out := &in.Metrics, &out.Metrics + *out = new(OtelSignalConfig) + (*in).DeepCopyInto(*out) + } + if in.ResourceAttributes != nil { + in, out := &in.ResourceAttributes, &out.ResourceAttributes + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OtelExporterEndpointSpec. +func (in *OtelExporterEndpointSpec) DeepCopy() *OtelExporterEndpointSpec { + if in == nil { + return nil + } + out := new(OtelExporterEndpointSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *OtelExporterEndpointStatus) DeepCopyInto(out *OtelExporterEndpointStatus) { + *out = *in + in.Status.DeepCopyInto(&out.Status) + if in.Stacks != nil { + in, out := &in.Stacks, &out.Stacks + *out = make([]string, len(*in)) + copy(*out, *in) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OtelExporterEndpointStatus. +func (in *OtelExporterEndpointStatus) DeepCopy() *OtelExporterEndpointStatus { + if in == nil { + return nil + } + out := new(OtelExporterEndpointStatus) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *OtelSignalConfig) DeepCopyInto(out *OtelSignalConfig) { + *out = *in + if in.Auth != nil { + in, out := &in.Auth, &out.Auth + *out = new(OtelAuthConfig) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OtelSignalConfig. +func (in *OtelSignalConfig) DeepCopy() *OtelSignalConfig { + if in == nil { + return nil + } + out := new(OtelSignalConfig) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Payments) DeepCopyInto(out *Payments) { *out = *in diff --git a/cmd/main.go b/cmd/main.go index 494a7ea9b..4a7174757 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -63,6 +63,7 @@ func main() { licenceSecret string licenceNamespace string utilsVersion string + collectorImage string ) flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.") flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") @@ -74,6 +75,7 @@ func main() { flag.StringVar(&licenceSecret, "licence-secret", "", "The licence secret that contains the token and the issuer") flag.StringVar(&licenceNamespace, "licence-namespace", "", "The namespace where the licence secret lives (defaults to operator namespace)") flag.StringVar(&utilsVersion, "utils-version", "latest", "The version of the operator utils image") + flag.StringVar(&collectorImage, "collector-image", "otel/opentelemetry-collector-contrib:0.151.0", "The OTel Collector image for OtelExporterEndpoint resources") opts := zap.Options{ Development: false, } @@ -125,6 +127,7 @@ func main() { LicenceSecret: licenceSecret, LicenceNamespace: licenceNamespace, UtilsVersion: utilsVersion, + CollectorImage: collectorImage, } if licenceSecret != "" { diff --git a/config/crd/bases/formance.com_otelexporterendpoints.yaml b/config/crd/bases/formance.com_otelexporterendpoints.yaml new file mode 100644 index 000000000..b9a0c96bd --- /dev/null +++ b/config/crd/bases/formance.com_otelexporterendpoints.yaml @@ -0,0 +1,216 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.18.0 + name: otelexporterendpoints.formance.com +spec: + group: formance.com + names: + kind: OtelExporterEndpoint + listKind: OtelExporterEndpointList + plural: otelexporterendpoints + singular: otelexporterendpoint + scope: Cluster + versions: + - additionalPrinterColumns: + - description: Is ready + jsonPath: .status.ready + name: Ready + type: string + - description: Info + jsonPath: .status.info + name: Info + type: string + name: v1beta1 + schema: + openAPIV3Schema: + description: |- + OtelExporterEndpoint configures an OpenTelemetry collector proxy for exporting traces and metrics. + Multiple OtelExporterEndpoints can target the same stacks — the collector fans out to all matching destinations. + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + type: string + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + metadata: + type: object + spec: + properties: + metrics: + properties: + auth: + properties: + fromSecret: + type: string + type: + enum: + - bearer + type: string + required: + - fromSecret + - type + type: object + endpoint: + type: string + required: + - endpoint + type: object + resourceAttributes: + additionalProperties: + type: string + type: object + stackSelector: + description: |- + A label selector is a label query over a set of resources. The result of matchLabels and + matchExpressions are ANDed. An empty label selector matches all objects. A null + label selector matches no objects. + properties: + matchExpressions: + description: matchExpressions is a list of label selector requirements. + The requirements are ANDed. + items: + description: |- + A label selector requirement is a selector that contains values, a key, and an operator that + relates the key and values. + properties: + key: + description: key is the label key that the selector applies + to. + type: string + operator: + description: |- + operator represents a key's relationship to a set of values. + Valid operators are In, NotIn, Exists and DoesNotExist. + type: string + values: + description: |- + values is an array of string values. If the operator is In or NotIn, + the values array must be non-empty. If the operator is Exists or DoesNotExist, + the values array must be empty. This array is replaced during a strategic + merge patch. + items: + type: string + type: array + x-kubernetes-list-type: atomic + required: + - key + - operator + type: object + type: array + x-kubernetes-list-type: atomic + matchLabels: + additionalProperties: + type: string + description: |- + matchLabels is a map of {key,value} pairs. A single {key,value} in the matchLabels + map is equivalent to an element of matchExpressions, whose key field is "key", the + operator is "In", and the values array contains only "value". The requirements are ANDed. + type: object + type: object + x-kubernetes-map-type: atomic + traces: + properties: + auth: + properties: + fromSecret: + type: string + type: + enum: + - bearer + type: string + required: + - fromSecret + - type + type: object + endpoint: + type: string + required: + - endpoint + type: object + type: object + status: + properties: + conditions: + items: + description: Condition contains details for one aspect of the current + state of this API Resource. + properties: + lastTransitionTime: + description: |- + lastTransitionTime is the last time the condition transitioned from one status to another. + This should be when the underlying condition changed. If that is not known, then using the time when the API field changed is acceptable. + format: date-time + type: string + message: + description: |- + message is a human readable message indicating details about the transition. + This may be an empty string. + maxLength: 32768 + type: string + observedGeneration: + description: |- + observedGeneration represents the .metadata.generation that the condition was set based upon. + For instance, if .metadata.generation is currently 12, but the .status.conditions[x].observedGeneration is 9, the condition is out of date + with respect to the current state of the instance. + format: int64 + minimum: 0 + type: integer + reason: + description: |- + reason contains a programmatic identifier indicating the reason for the condition's last transition. + Producers of specific condition types may define expected values and meanings for this field, + and whether the values are considered a guaranteed API. + The value should be a CamelCase string. + This field may not be empty. + maxLength: 1024 + pattern: ^([A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?)?$ + type: string + status: + description: status of the condition, one of True, False, Unknown. + enum: + - "True" + - "False" + - Unknown + type: string + type: + description: type of condition in CamelCase or in foo.example.com/CamelCase. + maxLength: 316 + pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ + type: string + required: + - lastTransitionTime + - message + - status + - type + type: object + type: array + info: + description: Info can contain any additional like reconciliation errors + type: string + ready: + description: Ready indicates if the resource is seen as completely + reconciled + type: boolean + stacks: + items: + type: string + type: array + type: object + type: object + served: true + storage: true + subresources: + status: {} diff --git a/config/crd/kustomization.yaml b/config/crd/kustomization.yaml index 65e9b2c58..cb501590e 100644 --- a/config/crd/kustomization.yaml +++ b/config/crd/kustomization.yaml @@ -26,6 +26,7 @@ resources: - bases/formance.com_brokerconsumers.yaml - bases/formance.com_brokers.yaml - bases/formance.com_transactionplanes.yaml +- bases/formance.com_otelexporterendpoints.yaml #+kubebuilder:scaffold:crdkustomizeresource diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 9e2bae12a..bf8876117 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -94,6 +94,7 @@ rules: - ledgers - mcps - orchestrations + - otelexporterendpoints - payments - reconciliations - resourcereferences @@ -129,6 +130,7 @@ rules: - ledgers/finalizers - mcps/finalizers - orchestrations/finalizers + - otelexporterendpoints/finalizers - payments/finalizers - reconciliations/finalizers - resourcereferences/finalizers @@ -158,6 +160,7 @@ rules: - ledgers/status - mcps/status - orchestrations/status + - otelexporterendpoints/status - payments/status - reconciliations/status - resourcereferences/status diff --git a/config/samples/formance.com_v1beta1_otelexporterendpoint.yaml b/config/samples/formance.com_v1beta1_otelexporterendpoint.yaml new file mode 100644 index 000000000..cd94ce793 --- /dev/null +++ b/config/samples/formance.com_v1beta1_otelexporterendpoint.yaml @@ -0,0 +1,46 @@ +apiVersion: formance.com/v1beta1 +kind: OtelExporterEndpoint +metadata: + labels: + app.kubernetes.io/name: otelexporterendpoint + app.kubernetes.io/instance: otelexporterendpoint0 + app.kubernetes.io/part-of: operatorv2 + app.kubernetes.io/managed-by: kustomize + app.kubernetes.io/created-by: operatorv2 + name: otelexporterendpoint0 +spec: + stackSelector: + matchLabels: + formance.com/stack: any + traces: + endpoint: "http://otel-collector-opentelemetry-collector.formance.svc.cluster.local:4318" + metrics: + endpoint: "http://otel-collector-opentelemetry-collector.formance.svc.cluster.local:4318" +--- +apiVersion: formance.com/v1beta1 +kind: OtelExporterEndpoint +metadata: + labels: + app.kubernetes.io/name: otelexporterendpoint + app.kubernetes.io/instance: formance-support + app.kubernetes.io/part-of: operatorv2 + app.kubernetes.io/managed-by: kustomize + app.kubernetes.io/created-by: operatorv2 + name: formance-support +spec: + stackSelector: + matchExpressions: + - key: formance.com/stack + operator: Exists + traces: + endpoint: "https://otel-licence.telemetry-support-c1f3ad1.v2.formance.dev" + auth: + type: bearer + fromSecret: "formance-license" + metrics: + endpoint: "https://otel-licence.telemetry-support-c1f3ad1.v2.formance.dev" + auth: + type: bearer + fromSecret: "formance-license" + resourceAttributes: + cluster.id: "abc-123" diff --git a/docs/09-Configuration reference/02-Custom Resource Definitions.md b/docs/09-Configuration reference/02-Custom Resource Definitions.md index 3d03b7035..f728ee6dd 100644 --- a/docs/09-Configuration reference/02-Custom Resource Definitions.md +++ b/docs/09-Configuration reference/02-Custom Resource Definitions.md @@ -40,6 +40,7 @@ Other resources : - [BrokerTopic](#brokertopic) - [Database](#database) - [GatewayHTTPAPI](#gatewayhttpapi) +- [OtelExporterEndpoint](#otelexporterendpoint) - [ResourceReference](#resourcereference) - [Versions](#versions) @@ -2177,6 +2178,95 @@ GatewayHTTPAPI is the Schema for the HTTPAPIs API | `ready` _boolean_ | | | | +#### OtelExporterEndpoint + + + +OtelExporterEndpoint configures an OpenTelemetry collector proxy for exporting traces and metrics. +Multiple OtelExporterEndpoints can target the same stacks — the collector fans out to all matching destinations. + + + + + + + + + + + + + + + +| Field | Description | Default | Validation | +| --- | --- | --- | --- | +| `apiVersion` _string_ | `formance.com/v1beta1` | | | +| `kind` _string_ | `OtelExporterEndpoint` | | | +| `metadata` _[ObjectMeta](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#objectmeta-v1-meta)_ | Refer to Kubernetes API documentation for fields of `metadata`. | | | +| `spec` _[OtelExporterEndpointSpec](#otelexporterendpointspec)_ | | | | +| `status` _[OtelExporterEndpointStatus](#otelexporterendpointstatus)_ | | | | + + + +##### OtelExporterEndpointSpec + + + + + + + + + + + + + + + + + + + +| Field | Description | Default | Validation | +| --- | --- | --- | --- | +| `stackSelector` _[LabelSelector](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#labelselector-v1-meta)_ | | | | +| `traces` _[OtelSignalConfig](#otelsignalconfig)_ | | | | +| `metrics` _[OtelSignalConfig](#otelsignalconfig)_ | | | | +| `resourceAttributes` _object (keys:string, values:string)_ | | | | + + + + + +##### OtelExporterEndpointStatus + + + + + + + + + + + + + + + + + + + +| Field | Description | Default | Validation | +| --- | --- | --- | --- | +| `ready` _boolean_ | Ready indicates if the resource is seen as completely reconciled | | | +| `info` _string_ | Info can contain any additional like reconciliation errors | | | +| `stacks` _string array_ | | | | + + #### ResourceReference diff --git a/docs/09-Configuration reference/settings.catalog.json b/docs/09-Configuration reference/settings.catalog.json index a4316ca86..add432716 100644 --- a/docs/09-Configuration reference/settings.catalog.json +++ b/docs/09-Configuration reference/settings.catalog.json @@ -540,21 +540,36 @@ "key": "opentelemetry.\u003cmonitoring-type\u003e.dsn", "valueType": "uri", "sources": [ - "internal/resources/settings/opentelemetry.go:52" + "internal/resources/settings/opentelemetry.go:135" ] }, { "key": "opentelemetry.\u003cmonitoring-type\u003e.resource-attributes", "valueType": "map[string]string", "sources": [ - "internal/resources/settings/opentelemetry.go:96" + "internal/resources/settings/opentelemetry.go:179" + ] + }, + { + "key": "opentelemetry.\u003csignal\u003e.resource-attributes", + "valueType": "map[string]string", + "sources": [ + "internal/resources/settings/opentelemetry.go:68" + ] + }, + { + "key": "opentelemetry.metrics.dsn", + "valueType": "uri", + "sources": [ + "internal/resources/otelexporterendpoints/init.go:373" ] }, { "key": "opentelemetry.traces.dsn", "valueType": "uri", "sources": [ - "internal/resources/settings/opentelemetry.go:38" + "internal/resources/otelexporterendpoints/init.go:369", + "internal/resources/settings/opentelemetry.go:121" ] }, { diff --git a/helm/crds/templates/crds/apiextensions.k8s.io_v1_customresourcedefinition_otelexporterendpoints.formance.com.yaml b/helm/crds/templates/crds/apiextensions.k8s.io_v1_customresourcedefinition_otelexporterendpoints.formance.com.yaml new file mode 100644 index 000000000..ff613832d --- /dev/null +++ b/helm/crds/templates/crds/apiextensions.k8s.io_v1_customresourcedefinition_otelexporterendpoints.formance.com.yaml @@ -0,0 +1,219 @@ +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.18.0 + helm.sh/resource-policy: keep + {{- with .Values.annotations }} + {{- toYaml . | nindent 4 }} + {{- end }} + name: otelexporterendpoints.formance.com +spec: + group: formance.com + names: + kind: OtelExporterEndpoint + listKind: OtelExporterEndpointList + plural: otelexporterendpoints + singular: otelexporterendpoint + scope: Cluster + versions: + - additionalPrinterColumns: + - description: Is ready + jsonPath: .status.ready + name: Ready + type: string + - description: Info + jsonPath: .status.info + name: Info + type: string + name: v1beta1 + schema: + openAPIV3Schema: + description: |- + OtelExporterEndpoint configures an OpenTelemetry collector proxy for exporting traces and metrics. + Multiple OtelExporterEndpoints can target the same stacks — the collector fans out to all matching destinations. + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + type: string + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + metadata: + type: object + spec: + properties: + metrics: + properties: + auth: + properties: + fromSecret: + type: string + type: + enum: + - bearer + type: string + required: + - fromSecret + - type + type: object + endpoint: + type: string + required: + - endpoint + type: object + resourceAttributes: + additionalProperties: + type: string + type: object + stackSelector: + description: |- + A label selector is a label query over a set of resources. The result of matchLabels and + matchExpressions are ANDed. An empty label selector matches all objects. A null + label selector matches no objects. + properties: + matchExpressions: + description: matchExpressions is a list of label selector requirements. + The requirements are ANDed. + items: + description: |- + A label selector requirement is a selector that contains values, a key, and an operator that + relates the key and values. + properties: + key: + description: key is the label key that the selector applies + to. + type: string + operator: + description: |- + operator represents a key's relationship to a set of values. + Valid operators are In, NotIn, Exists and DoesNotExist. + type: string + values: + description: |- + values is an array of string values. If the operator is In or NotIn, + the values array must be non-empty. If the operator is Exists or DoesNotExist, + the values array must be empty. This array is replaced during a strategic + merge patch. + items: + type: string + type: array + x-kubernetes-list-type: atomic + required: + - key + - operator + type: object + type: array + x-kubernetes-list-type: atomic + matchLabels: + additionalProperties: + type: string + description: |- + matchLabels is a map of {key,value} pairs. A single {key,value} in the matchLabels + map is equivalent to an element of matchExpressions, whose key field is "key", the + operator is "In", and the values array contains only "value". The requirements are ANDed. + type: object + type: object + x-kubernetes-map-type: atomic + traces: + properties: + auth: + properties: + fromSecret: + type: string + type: + enum: + - bearer + type: string + required: + - fromSecret + - type + type: object + endpoint: + type: string + required: + - endpoint + type: object + type: object + status: + properties: + conditions: + items: + description: Condition contains details for one aspect of the current + state of this API Resource. + properties: + lastTransitionTime: + description: |- + lastTransitionTime is the last time the condition transitioned from one status to another. + This should be when the underlying condition changed. If that is not known, then using the time when the API field changed is acceptable. + format: date-time + type: string + message: + description: |- + message is a human readable message indicating details about the transition. + This may be an empty string. + maxLength: 32768 + type: string + observedGeneration: + description: |- + observedGeneration represents the .metadata.generation that the condition was set based upon. + For instance, if .metadata.generation is currently 12, but the .status.conditions[x].observedGeneration is 9, the condition is out of date + with respect to the current state of the instance. + format: int64 + minimum: 0 + type: integer + reason: + description: |- + reason contains a programmatic identifier indicating the reason for the condition's last transition. + Producers of specific condition types may define expected values and meanings for this field, + and whether the values are considered a guaranteed API. + The value should be a CamelCase string. + This field may not be empty. + maxLength: 1024 + pattern: ^([A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?)?$ + type: string + status: + description: status of the condition, one of True, False, Unknown. + enum: + - "True" + - "False" + - Unknown + type: string + type: + description: type of condition in CamelCase or in foo.example.com/CamelCase. + maxLength: 316 + pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ + type: string + required: + - lastTransitionTime + - message + - status + - type + type: object + type: array + info: + description: Info can contain any additional like reconciliation errors + type: string + ready: + description: Ready indicates if the resource is seen as completely + reconciled + type: boolean + stacks: + items: + type: string + type: array + type: object + type: object + served: true + storage: true + subresources: + status: {} diff --git a/helm/operator/templates/deployment.yaml b/helm/operator/templates/deployment.yaml index 57c3db1a0..47a93de79 100644 --- a/helm/operator/templates/deployment.yaml +++ b/helm/operator/templates/deployment.yaml @@ -63,6 +63,9 @@ spec: - --disable-webhooks {{- end }} - --utils-version={{ .Values.operator.utils.tag | default .Chart.AppVersion }} + {{- with .Values.operator.collectorImage }} + - --collector-image={{ . }} + {{- end }} {{- if .Values.operator.dev }} - --zap-devel - Development diff --git a/helm/operator/templates/gen/rbac.authorization.k8s.io_v1_clusterrole_formance-manager-role.yaml b/helm/operator/templates/gen/rbac.authorization.k8s.io_v1_clusterrole_formance-manager-role.yaml index 24e51fa55..16f6c6e34 100644 --- a/helm/operator/templates/gen/rbac.authorization.k8s.io_v1_clusterrole_formance-manager-role.yaml +++ b/helm/operator/templates/gen/rbac.authorization.k8s.io_v1_clusterrole_formance-manager-role.yaml @@ -93,6 +93,7 @@ rules: - ledgers - mcps - orchestrations + - otelexporterendpoints - payments - reconciliations - resourcereferences @@ -128,6 +129,7 @@ rules: - ledgers/finalizers - mcps/finalizers - orchestrations/finalizers + - otelexporterendpoints/finalizers - payments/finalizers - reconciliations/finalizers - resourcereferences/finalizers @@ -157,6 +159,7 @@ rules: - ledgers/status - mcps/status - orchestrations/status + - otelexporterendpoints/status - payments/status - reconciliations/status - resourcereferences/status diff --git a/helm/operator/templates/otelexporterendpoint-support.yaml b/helm/operator/templates/otelexporterendpoint-support.yaml new file mode 100644 index 000000000..db83b5904 --- /dev/null +++ b/helm/operator/templates/otelexporterendpoint-support.yaml @@ -0,0 +1,37 @@ +{{- if .Values.global.monitoring.support.enabled }} +{{- $authSecret := .Values.global.monitoring.support.authSecret }} +{{- if not $authSecret }} + {{- if .Values.global.licence.existingSecret }} + {{- $authSecret = .Values.global.licence.existingSecret }} + {{- else if and .Values.global.licence.createSecret .Values.global.licence.token }} + {{- $authSecret = (printf "%s-licence" (include "operator.fullname" .)) }} + {{- end }} +{{- end }} +apiVersion: formance.com/v1beta1 +kind: OtelExporterEndpoint +metadata: + name: formance-support + labels: + {{- include "operator.labels" . | nindent 4 }} +spec: + stackSelector: + {{- toYaml .Values.global.monitoring.support.stackSelector | nindent 4 }} + traces: + endpoint: {{ .Values.global.monitoring.support.endpoint | quote }} + {{- if $authSecret }} + auth: + type: bearer + fromSecret: {{ $authSecret | quote }} + {{- end }} + metrics: + endpoint: {{ .Values.global.monitoring.support.endpoint | quote }} + {{- if $authSecret }} + auth: + type: bearer + fromSecret: {{ $authSecret | quote }} + {{- end }} + {{- with .Values.global.monitoring.support.resourceAttributes }} + resourceAttributes: + {{- toYaml . | nindent 4 }} + {{- end }} +{{- end }} diff --git a/helm/operator/values.yaml b/helm/operator/values.yaml index 8e34b8cc9..6021c9d8f 100644 --- a/helm/operator/values.yaml +++ b/helm/operator/values.yaml @@ -3,7 +3,24 @@ # Declare variables to be passed into your templates. global: - # Add your license information + monitoring: + support: + # Enable the Formance support OtelExporterEndpoint + enabled: false + # The support collector endpoint + endpoint: "https://otel-licence.telemetry-support-c1f3ad1.v2.formance.dev" + # Secret name containing the bearer token for auth (key: "token"). + # Defaults to the licence secret if set. + authSecret: "" + # Resource attributes added to all signals sent to this endpoint + resourceAttributes: {} + # Stack selector for the support OtelExporterEndpoint + stackSelector: + matchExpressions: + - key: formance.com/stack + operator: Exists + + # Add your license information licence: # -- Is disabled automatically if an existingSecret is provided # @section -- Licence @@ -55,6 +72,9 @@ operator: utils: tag: "" + # Override the OTel Collector image used by OtelExporterEndpoint resources + collectorImage: "" + podAnnotations: {} podSecurityContext: {} # fsGroup: 2000 diff --git a/internal/core/platform.go b/internal/core/platform.go index 31152589a..10cc6bb29 100644 --- a/internal/core/platform.go +++ b/internal/core/platform.go @@ -16,4 +16,6 @@ type Platform struct { LicenceState LicenceState // Human-readable message about the licence state LicenceMessage string + // The OTel Collector image used by OtelExporterEndpoint resources + CollectorImage string } diff --git a/internal/resources/all.go b/internal/resources/all.go index 619fde7af..a1dc3e5e9 100644 --- a/internal/resources/all.go +++ b/internal/resources/all.go @@ -13,6 +13,7 @@ import ( _ "github.com/formancehq/operator/v3/internal/resources/ledgers" _ "github.com/formancehq/operator/v3/internal/resources/mcps" _ "github.com/formancehq/operator/v3/internal/resources/orchestrations" + _ "github.com/formancehq/operator/v3/internal/resources/otelexporterendpoints" _ "github.com/formancehq/operator/v3/internal/resources/payments" _ "github.com/formancehq/operator/v3/internal/resources/reconciliations" _ "github.com/formancehq/operator/v3/internal/resources/resourcereferences" diff --git a/internal/resources/otelexporterendpoints/collector_config.go b/internal/resources/otelexporterendpoints/collector_config.go new file mode 100644 index 000000000..be9f36756 --- /dev/null +++ b/internal/resources/otelexporterendpoints/collector_config.go @@ -0,0 +1,304 @@ +package otelexporterendpoints + +import ( + "fmt" + "net/url" + "slices" + "sort" + "strings" + + "gopkg.in/yaml.v3" + + "github.com/formancehq/operator/v3/api/formance.com/v1beta1" +) + +type collectorConfig struct { + Receivers collectorReceivers `yaml:"receivers"` + Processors map[string]any `yaml:"processors,omitempty"` + Exporters map[string]any `yaml:"exporters"` + Service collectorService `yaml:"service"` +} + +type collectorReceivers struct { + OTLP otlpReceiver `yaml:"otlp"` +} + +type otlpReceiver struct { + Protocols otlpProtocols `yaml:"protocols"` +} + +type otlpProtocols struct { + HTTP otlpHTTP `yaml:"http"` +} + +type otlpHTTP struct { + Endpoint string `yaml:"endpoint"` +} + +type collectorService struct { + Pipelines map[string]collectorPipeline `yaml:"pipelines"` +} + +type collectorPipeline struct { + Receivers []string `yaml:"receivers"` + Processors []string `yaml:"processors,omitempty"` + Exporters []string `yaml:"exporters"` +} + +type otlpExporter struct { + Endpoint string `yaml:"endpoint"` + Headers map[string]string `yaml:"headers,omitempty"` +} + +type resourceProcessorAttribute struct { + Key string `yaml:"key"` + Value string `yaml:"value"` + Action string `yaml:"action"` +} + +type resourceProcessor struct { + Attributes []resourceProcessorAttribute `yaml:"attributes"` +} + +type exporterInput struct { + name string + signal *v1beta1.OtelSignalConfig + envAlias string +} + +func inferProtocol(endpoint string) string { + u, err := url.Parse(endpoint) + if err == nil && u.Scheme == "grpc" { + return "grpc" + } + return "http" +} + +func stripScheme(endpoint string) string { + u, err := url.Parse(endpoint) + if err != nil { + return endpoint + } + if u.Scheme == "grpc" { + return u.Host + } + return endpoint +} + +func buildExporter(input exporterInput) (string, any) { + protocol := inferProtocol(input.signal.Endpoint) + endpoint := stripScheme(input.signal.Endpoint) + + var headers map[string]string + if input.signal.Auth != nil && input.signal.Auth.Type == "bearer" { + headers = map[string]string{ + "authorization": fmt.Sprintf("Bearer ${env:%s}", input.envAlias), + } + } + + prefix := "otlphttp/" + if protocol == "grpc" { + prefix = "otlp/" + } + return prefix + input.name, otlpExporter{ + Endpoint: endpoint, + Headers: headers, + } +} + +type collectorInput struct { + Endpoint *v1beta1.OtelExporterEndpoint + TracesEnvAlias string + MetricsEnvAlias string +} + +type otelSettingsInput struct { + TracesEndpoint string + MetricsEndpoint string +} + +func generateMergedCollectorConfig(endpoints []collectorInput, otelSettings *otelSettingsInput) (string, error) { + exporters := map[string]any{} + processors := map[string]any{} + var tracesPipelines []pipelineContribution + var metricsPipelines []pipelineContribution + + sortedEndpoints := make([]collectorInput, len(endpoints)) + copy(sortedEndpoints, endpoints) + sort.Slice(sortedEndpoints, func(i, j int) bool { + return sortedEndpoints[i].Endpoint.Name < sortedEndpoints[j].Endpoint.Name + }) + + for _, ci := range sortedEndpoints { + ep := ci.Endpoint + crdName := sanitizeName(ep.Name) + + var resourceProc string + if len(ep.Spec.ResourceAttributes) > 0 { + procName := "resource/" + crdName + attrs := make([]resourceProcessorAttribute, 0, len(ep.Spec.ResourceAttributes)) + keys := make([]string, 0, len(ep.Spec.ResourceAttributes)) + for k := range ep.Spec.ResourceAttributes { + keys = append(keys, k) + } + slices.Sort(keys) + for _, k := range keys { + attrs = append(attrs, resourceProcessorAttribute{ + Key: k, + Value: ep.Spec.ResourceAttributes[k], + Action: "upsert", + }) + } + processors[procName] = resourceProcessor{Attributes: attrs} + resourceProc = procName + } + + if ep.Spec.Traces != nil && ep.Spec.Traces.Endpoint != "" { + name, exp := buildExporter(exporterInput{ + name: crdName + "-traces", + signal: ep.Spec.Traces, + envAlias: ci.TracesEnvAlias, + }) + exporters[name] = exp + tracesPipelines = append(tracesPipelines, pipelineContribution{ + exporter: name, + processor: resourceProc, + }) + } + + if ep.Spec.Metrics != nil && ep.Spec.Metrics.Endpoint != "" { + name, exp := buildExporter(exporterInput{ + name: crdName + "-metrics", + signal: ep.Spec.Metrics, + envAlias: ci.MetricsEnvAlias, + }) + exporters[name] = exp + metricsPipelines = append(metricsPipelines, pipelineContribution{ + exporter: name, + processor: resourceProc, + }) + } + } + + if otelSettings != nil { + if otelSettings.TracesEndpoint != "" { + name, exp := buildExporter(exporterInput{ + name: "settings-traces", + signal: &v1beta1.OtelSignalConfig{Endpoint: otelSettings.TracesEndpoint}, + }) + exporters[name] = exp + tracesPipelines = append(tracesPipelines, pipelineContribution{exporter: name}) + } + if otelSettings.MetricsEndpoint != "" { + name, exp := buildExporter(exporterInput{ + name: "settings-metrics", + signal: &v1beta1.OtelSignalConfig{Endpoint: otelSettings.MetricsEndpoint}, + }) + exporters[name] = exp + metricsPipelines = append(metricsPipelines, pipelineContribution{exporter: name}) + } + } + + if len(tracesPipelines) == 0 && len(metricsPipelines) == 0 { + exporters["nop"] = struct{}{} + tracesPipelines = []pipelineContribution{{exporter: "nop"}} + metricsPipelines = []pipelineContribution{{exporter: "nop"}} + } + + if len(tracesPipelines) == 0 { + exporters["nop"] = struct{}{} + tracesPipelines = []pipelineContribution{{exporter: "nop"}} + } + if len(metricsPipelines) == 0 { + exporters["nop"] = struct{}{} + metricsPipelines = []pipelineContribution{{exporter: "nop"}} + } + + pipelines := buildPipelines(tracesPipelines, metricsPipelines) + + cfg := collectorConfig{ + Receivers: collectorReceivers{ + OTLP: otlpReceiver{ + Protocols: otlpProtocols{ + HTTP: otlpHTTP{ + Endpoint: "0.0.0.0:4318", + }, + }, + }, + }, + Exporters: exporters, + Processors: processors, + Service: collectorService{ + Pipelines: pipelines, + }, + } + + if len(processors) == 0 { + cfg.Processors = nil + } + + data, err := yaml.Marshal(cfg) + if err != nil { + return "", err + } + + return string(data), nil +} + +type pipelineContribution struct { + exporter string + processor string +} + +func buildPipelines(traces, metrics []pipelineContribution) map[string]collectorPipeline { + pipelines := map[string]collectorPipeline{} + addSignalPipelines(pipelines, "traces", traces) + addSignalPipelines(pipelines, "metrics", metrics) + return pipelines +} + +func addSignalPipelines(pipelines map[string]collectorPipeline, signal string, contributions []pipelineContribution) { + grouped := groupByProcessor(contributions) + if len(grouped) == 1 { + for proc, exporters := range grouped { + p := collectorPipeline{ + Receivers: []string{"otlp"}, + Exporters: exporters, + } + if proc != "" { + p.Processors = []string{proc} + } + pipelines[signal] = p + } + return + } + for proc, exporterList := range grouped { + suffix := "default" + if proc != "" { + parts := strings.SplitN(proc, "/", 2) + if len(parts) == 2 { + suffix = parts[1] + } + } + p := collectorPipeline{ + Receivers: []string{"otlp"}, + Exporters: exporterList, + } + if proc != "" { + p.Processors = []string{proc} + } + pipelines[signal+"/"+suffix] = p + } +} + +func groupByProcessor(contributions []pipelineContribution) map[string][]string { + grouped := map[string][]string{} + for _, c := range contributions { + grouped[c.processor] = append(grouped[c.processor], c.exporter) + } + return grouped +} + +func sanitizeName(name string) string { + return strings.ReplaceAll(name, ".", "-") +} diff --git a/internal/resources/otelexporterendpoints/collector_config_test.go b/internal/resources/otelexporterendpoints/collector_config_test.go new file mode 100644 index 000000000..ea6bea116 --- /dev/null +++ b/internal/resources/otelexporterendpoints/collector_config_test.go @@ -0,0 +1,293 @@ +package otelexporterendpoints + +import ( + "strings" + "testing" + + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/formancehq/operator/v3/api/formance.com/v1beta1" +) + +func endpoint(name string, spec v1beta1.OtelExporterEndpointSpec) *v1beta1.OtelExporterEndpoint { + return &v1beta1.OtelExporterEndpoint{ + ObjectMeta: metav1.ObjectMeta{Name: name}, + Spec: spec, + } +} + +func TestGenerateMergedCollectorConfig(t *testing.T) { + t.Parallel() + + type testCase struct { + name string + inputs []collectorInput + otelSettings *otelSettingsInput + expectedContains []string + expectedNotContains []string + } + + testCases := []testCase{ + { + name: "single CRD with traces endpoint", + inputs: []collectorInput{ + { + Endpoint: endpoint("monitoring", v1beta1.OtelExporterEndpointSpec{ + Traces: &v1beta1.OtelSignalConfig{ + Endpoint: "http://my-collector:4318", + }, + }), + }, + }, + expectedContains: []string{ + "otlphttp/monitoring-traces", + "http://my-collector:4318", + "nop", + }, + }, + { + name: "single CRD with grpc endpoint", + inputs: []collectorInput{ + { + Endpoint: endpoint("monitoring", v1beta1.OtelExporterEndpointSpec{ + Traces: &v1beta1.OtelSignalConfig{ + Endpoint: "grpc://my-collector:4317", + }, + }), + }, + }, + expectedContains: []string{ + "otlp/monitoring-traces", + "my-collector:4317", + }, + expectedNotContains: []string{"otlphttp/monitoring-traces"}, + }, + { + name: "single CRD with auth", + inputs: []collectorInput{ + { + Endpoint: endpoint("support", v1beta1.OtelExporterEndpointSpec{ + Traces: &v1beta1.OtelSignalConfig{ + Endpoint: "https://support.frmnc.net", + Auth: &v1beta1.OtelAuthConfig{ + Type: "bearer", + FromSecret: "formance-license", + }, + }, + }), + TracesEnvAlias: "AUTH_SUPPORT_TRACES", + }, + }, + expectedContains: []string{ + "otlphttp/support-traces", + "https://support.frmnc.net", + "authorization: Bearer ${env:AUTH_SUPPORT_TRACES}", + }, + }, + { + name: "multiple CRDs fan out", + inputs: []collectorInput{ + { + Endpoint: endpoint("monitoring", v1beta1.OtelExporterEndpointSpec{ + Traces: &v1beta1.OtelSignalConfig{ + Endpoint: "http://my-collector:4318", + }, + }), + }, + { + Endpoint: endpoint("support", v1beta1.OtelExporterEndpointSpec{ + Traces: &v1beta1.OtelSignalConfig{ + Endpoint: "https://support.frmnc.net", + Auth: &v1beta1.OtelAuthConfig{ + Type: "bearer", + FromSecret: "formance-license", + }, + }, + }), + TracesEnvAlias: "AUTH_SUPPORT_TRACES", + }, + }, + expectedContains: []string{ + "otlphttp/monitoring-traces", + "http://my-collector:4318", + "otlphttp/support-traces", + "https://support.frmnc.net", + "authorization: Bearer ${env:AUTH_SUPPORT_TRACES}", + }, + }, + { + name: "no endpoints produces nop", + inputs: []collectorInput{}, + expectedContains: []string{"nop"}, + expectedNotContains: []string{"otlphttp", "otlp/"}, + }, + { + name: "otel settings traces", + inputs: []collectorInput{}, + otelSettings: &otelSettingsInput{ + TracesEndpoint: "http://settings-collector:4318", + }, + expectedContains: []string{ + "otlphttp/settings-traces", + "http://settings-collector:4318", + }, + }, + { + name: "CRD plus otel settings both appear", + inputs: []collectorInput{ + { + Endpoint: endpoint("monitoring", v1beta1.OtelExporterEndpointSpec{ + Traces: &v1beta1.OtelSignalConfig{ + Endpoint: "http://my-collector:4318", + }, + }), + }, + }, + otelSettings: &otelSettingsInput{ + TracesEndpoint: "http://settings-collector:4318", + }, + expectedContains: []string{ + "otlphttp/monitoring-traces", + "otlphttp/settings-traces", + }, + }, + { + name: "resource attributes produce processor", + inputs: []collectorInput{ + { + Endpoint: endpoint("support", v1beta1.OtelExporterEndpointSpec{ + Traces: &v1beta1.OtelSignalConfig{ + Endpoint: "https://support.frmnc.net", + }, + ResourceAttributes: map[string]string{ + "cluster.id": "abc-123", + }, + }), + }, + }, + expectedContains: []string{ + "resource/support", + "cluster.id", + "abc-123", + "upsert", + }, + }, + { + name: "traces and metrics with separate endpoints", + inputs: []collectorInput{ + { + Endpoint: endpoint("monitoring", v1beta1.OtelExporterEndpointSpec{ + Traces: &v1beta1.OtelSignalConfig{ + Endpoint: "http://traces-collector:4318", + }, + Metrics: &v1beta1.OtelSignalConfig{ + Endpoint: "http://metrics-collector:4318", + }, + }), + }, + }, + expectedContains: []string{ + "otlphttp/monitoring-traces", + "http://traces-collector:4318", + "otlphttp/monitoring-metrics", + "http://metrics-collector:4318", + }, + expectedNotContains: []string{"nop"}, + }, + { + name: "metrics-only uses nop for traces", + inputs: []collectorInput{ + { + Endpoint: endpoint("monitoring", v1beta1.OtelExporterEndpointSpec{ + Metrics: &v1beta1.OtelSignalConfig{ + Endpoint: "http://metrics-collector:4318", + }, + }), + }, + }, + expectedContains: []string{ + "otlphttp/monitoring-metrics", + "http://metrics-collector:4318", + "nop", + }, + expectedNotContains: []string{"otlphttp/monitoring-traces"}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + config, err := generateMergedCollectorConfig(tc.inputs, tc.otelSettings) + require.NoError(t, err) + + for _, s := range tc.expectedContains { + require.True(t, strings.Contains(config, s), + "expected config to contain %q, got:\n%s", s, config) + } + for _, s := range tc.expectedNotContains { + require.False(t, strings.Contains(config, s), + "expected config NOT to contain %q, got:\n%s", s, config) + } + }) + } +} + +func TestInferProtocol(t *testing.T) { + t.Parallel() + + require.Equal(t, "grpc", inferProtocol("grpc://my-collector:4317")) + require.Equal(t, "http", inferProtocol("http://my-collector:4318")) + require.Equal(t, "http", inferProtocol("https://support.frmnc.net")) + require.Equal(t, "http", inferProtocol("my-collector:4318")) +} + +func TestStripScheme(t *testing.T) { + t.Parallel() + + require.Equal(t, "my-collector:4317", stripScheme("grpc://my-collector:4317")) + require.Equal(t, "http://my-collector:4318", stripScheme("http://my-collector:4318")) + require.Equal(t, "https://support.frmnc.net", stripScheme("https://support.frmnc.net")) +} + +func TestEnvSafe(t *testing.T) { + t.Parallel() + + require.Equal(t, "FORMANCE_SUPPORT", envSafe("formance-support")) + require.Equal(t, "MY_MONITORING", envSafe("my-monitoring")) + require.Equal(t, "TEST_123", envSafe("test.123")) +} + +func TestBuildCollectorInputs(t *testing.T) { + t.Parallel() + + endpoints := []v1beta1.OtelExporterEndpoint{ + { + ObjectMeta: metav1.ObjectMeta{Name: "support"}, + Spec: v1beta1.OtelExporterEndpointSpec{ + Traces: &v1beta1.OtelSignalConfig{ + Endpoint: "https://support.frmnc.net", + Auth: &v1beta1.OtelAuthConfig{ + Type: "bearer", + FromSecret: "formance-license", + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "monitoring"}, + Spec: v1beta1.OtelExporterEndpointSpec{ + Traces: &v1beta1.OtelSignalConfig{ + Endpoint: "http://my-collector:4318", + }, + }, + }, + } + + inputs, envVars := buildCollectorInputs(endpoints) + require.Len(t, inputs, 2) + require.Len(t, envVars, 1) + require.Equal(t, "AUTH_SUPPORT_TRACES", envVars[0].Name) + require.Equal(t, "formance-license", envVars[0].ValueFrom.SecretKeyRef.LocalObjectReference.Name) +} diff --git a/internal/resources/otelexporterendpoints/init.go b/internal/resources/otelexporterendpoints/init.go new file mode 100644 index 000000000..d9fd67800 --- /dev/null +++ b/internal/resources/otelexporterendpoints/init.go @@ -0,0 +1,470 @@ +/* +Copyright 2023. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package otelexporterendpoints + +import ( + "context" + "crypto/sha256" + "encoding/base64" + "encoding/json" + "fmt" + "slices" + "sort" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + collectionutils "github.com/formancehq/go-libs/v5/pkg/types/collections" + + v1beta1 "github.com/formancehq/operator/v3/api/formance.com/v1beta1" + . "github.com/formancehq/operator/v3/internal/core" + "github.com/formancehq/operator/v3/internal/resources/settings" +) + +//+kubebuilder:rbac:groups=formance.com,resources=otelexporterendpoints,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups=formance.com,resources=otelexporterendpoints/status,verbs=get;update;patch +//+kubebuilder:rbac:groups=formance.com,resources=otelexporterendpoints/finalizers,verbs=update + +const ( + defaultCollectorImage = "otel/opentelemetry-collector-contrib:0.151.0" + deploymentName = "otel-collector" + serviceName = "otel-collector" + collectorPort = 4318 + + collectorFinalizer = "otelexporterendpoint.formance.com/finalizer" +) + +func Reconcile(ctx Context, endpoint *v1beta1.OtelExporterEndpoint) error { + selector, err := selectorFromSpec(endpoint.Spec.StackSelector) + if err != nil { + return err + } + + var stacks v1beta1.StackList + if err := ctx.GetClient().List(ctx, &stacks, client.MatchingLabelsSelector{Selector: selector}); err != nil { + return err + } + + stackNames := make([]string, 0, len(stacks.Items)) + for i := range stacks.Items { + stack := &stacks.Items[i] + if !stack.GetDeletionTimestamp().IsZero() { + continue + } + if err := reconcileStackCollector(ctx, stack); err != nil { + return fmt.Errorf("reconciling collector for stack %s: %w", stack.Name, err) + } + stackNames = append(stackNames, stack.Name) + } + + slices.Sort(stackNames) + endpoint.Status.Stacks = stackNames + return nil +} + +func Cleanup(ctx Context, endpoint *v1beta1.OtelExporterEndpoint) error { + selector, err := selectorFromSpec(endpoint.Spec.StackSelector) + if err != nil { + return err + } + + var stacks v1beta1.StackList + if err := ctx.GetClient().List(ctx, &stacks, client.MatchingLabelsSelector{Selector: selector}); err != nil { + return err + } + + for i := range stacks.Items { + if err := reconcileStackCollector(ctx, &stacks.Items[i]); err != nil { + return err + } + } + return nil +} + +func selectorFromSpec(ls *metav1.LabelSelector) (labels.Selector, error) { + if ls == nil { + return labels.Everything(), nil + } + return metav1.LabelSelectorAsSelector(ls) +} + +func reconcileStackCollector(ctx Context, stack *v1beta1.Stack) error { + endpoints, err := findMatchingEndpoints(ctx, stack) + if err != nil { + return err + } + + if len(endpoints) == 0 { + return cleanupStackCollector(ctx, stack.Name) + } + + inputs, envVars := buildCollectorInputs(endpoints) + + otelSettings, err := readOtelSettings(ctx, stack.Name) + if err != nil { + return err + } + + collectorConfigYAML, err := generateMergedCollectorConfig(inputs, otelSettings) + if err != nil { + return fmt.Errorf("generating collector config: %w", err) + } + + configMap, _, err := CreateOrUpdate(ctx, types.NamespacedName{ + Namespace: stack.Name, + Name: "otel-collector-config", + }, + func(cm *corev1.ConfigMap) error { + cm.Data = map[string]string{ + "otel-collector-config.yaml": collectorConfigYAML, + } + return nil + }, + WithOwner[*corev1.ConfigMap](ctx.GetScheme(), stack), + ) + if err != nil { + return fmt.Errorf("creating collector configmap: %w", err) + } + + secretHashes, err := hashAuthSecrets(ctx, stack.Name, endpoints) + if err != nil { + return err + } + + annotations := map[string]string{ + "config-hash": HashFromConfigMaps(configMap), + } + if secretHashes != "" { + annotations["secret-hash"] = secretHashes + } + + replicas := int32(1) + _, _, err = CreateOrUpdate(ctx, types.NamespacedName{ + Namespace: stack.Name, + Name: deploymentName, + }, + func(deployment *appsv1.Deployment) error { + deployment.Spec = appsv1.DeploymentSpec{ + Replicas: &replicas, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app.kubernetes.io/name": deploymentName, + }, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app.kubernetes.io/name": deploymentName, + }, + Annotations: annotations, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: "otel-collector", + Image: collectorImageForPlatform(ctx), + Args: []string{"--config=/etc/otel/otel-collector-config.yaml"}, + Env: envVars, + Ports: []corev1.ContainerPort{{ + Name: "otlp-http", + ContainerPort: collectorPort, + Protocol: corev1.ProtocolTCP, + }}, + VolumeMounts: []corev1.VolumeMount{ + NewVolumeMount("config", "/etc/otel", true), + NewVolumeMount("tmp", "/tmp", false), + }, + }}, + Volumes: []corev1.Volume{ + { + Name: "config", + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: configMap.Name, + }, + }, + }, + }, + { + Name: "tmp", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }, + }, + }, + }, + } + return nil + }, + WithOwner[*appsv1.Deployment](ctx.GetScheme(), stack), + ) + if err != nil { + return fmt.Errorf("creating collector deployment: %w", err) + } + + _, _, err = CreateOrUpdate(ctx, types.NamespacedName{ + Namespace: stack.Name, + Name: serviceName, + }, + func(svc *corev1.Service) error { + svc.Spec = corev1.ServiceSpec{ + Selector: map[string]string{ + "app.kubernetes.io/name": deploymentName, + }, + Ports: []corev1.ServicePort{{ + Name: "otlp-http", + Port: collectorPort, + Protocol: corev1.ProtocolTCP, + }}, + } + return nil + }, + WithOwner[*corev1.Service](ctx.GetScheme(), stack), + ) + if err != nil { + return fmt.Errorf("creating collector service: %w", err) + } + + return nil +} + +func findMatchingEndpoints(ctx Context, stack *v1beta1.Stack) ([]v1beta1.OtelExporterEndpoint, error) { + var allEndpoints v1beta1.OtelExporterEndpointList + if err := ctx.GetClient().List(ctx, &allEndpoints); err != nil { + return nil, err + } + + stackLabels := labels.Set(stack.GetLabels()) + var matching []v1beta1.OtelExporterEndpoint + + for _, ep := range allEndpoints.Items { + if !ep.GetDeletionTimestamp().IsZero() { + continue + } + selector, err := selectorFromSpec(ep.Spec.StackSelector) + if err != nil { + log.FromContext(ctx).Error(err, "invalid stackSelector on OtelExporterEndpoint, skipping", "endpoint", ep.Name) + continue + } + if selector.Matches(stackLabels) { + matching = append(matching, ep) + } + } + + sort.Slice(matching, func(i, j int) bool { + return matching[i].Name < matching[j].Name + }) + return matching, nil +} + +type authSecretRef struct { + SecretName string + Signal string + CRDName string +} + +func referencedAuthSecrets(endpoints []v1beta1.OtelExporterEndpoint) []authSecretRef { + var refs []authSecretRef + for _, ep := range endpoints { + crdName := sanitizeName(ep.Name) + for _, entry := range []struct { + signal string + config *v1beta1.OtelSignalConfig + }{ + {"TRACES", ep.Spec.Traces}, + {"METRICS", ep.Spec.Metrics}, + } { + if entry.config == nil || entry.config.Auth == nil || entry.config.Auth.Type != "bearer" { + continue + } + refs = append(refs, authSecretRef{ + SecretName: entry.config.Auth.FromSecret, + Signal: entry.signal, + CRDName: crdName, + }) + } + } + return refs +} + +func buildCollectorInputs(endpoints []v1beta1.OtelExporterEndpoint) ([]collectorInput, []corev1.EnvVar) { + var inputs []collectorInput + var envVars []corev1.EnvVar + + refs := referencedAuthSecrets(endpoints) + refsByKey := map[string]string{} + for _, ref := range refs { + envName := fmt.Sprintf("AUTH_%s_%s", envSafe(ref.CRDName), ref.Signal) + refsByKey[ref.CRDName+"/"+ref.Signal] = envName + envVars = append(envVars, EnvFromSecret(envName, ref.SecretName, "token")) + } + + for _, ep := range endpoints { + crdName := sanitizeName(ep.Name) + ci := collectorInput{Endpoint: &ep} + ci.TracesEnvAlias = refsByKey[crdName+"/TRACES"] + ci.MetricsEnvAlias = refsByKey[crdName+"/METRICS"] + inputs = append(inputs, ci) + } + + return inputs, envVars +} + +func hashAuthSecrets(ctx Context, stackNamespace string, endpoints []v1beta1.OtelExporterEndpoint) (string, error) { + refs := referencedAuthSecrets(endpoints) + if len(refs) == 0 { + return "", nil + } + + seen := map[string]bool{} + digest := sha256.New() + + for _, ref := range refs { + if seen[ref.SecretName] { + continue + } + seen[ref.SecretName] = true + + secret := &corev1.Secret{} + err := ctx.GetClient().Get(ctx, types.NamespacedName{ + Name: ref.SecretName, + Namespace: stackNamespace, + }, secret) + if err != nil { + return "", fmt.Errorf("auth secret %q not found in namespace %q: %w", ref.SecretName, stackNamespace, err) + } + if err := json.NewEncoder(digest).Encode(secret.Data); err != nil { + return "", err + } + } + + return base64.StdEncoding.EncodeToString(digest.Sum(nil)), nil +} + +func readOtelSettings(ctx Context, stackName string) (*otelSettingsInput, error) { + tracesURL, err := settings.GetURL(ctx, stackName, "opentelemetry", "traces", "dsn") + if err != nil { + return nil, err + } + metricsURL, err := settings.GetURL(ctx, stackName, "opentelemetry", "metrics", "dsn") + if err != nil { + return nil, err + } + + if tracesURL == nil && metricsURL == nil { + return nil, nil + } + + input := &otelSettingsInput{} + if tracesURL != nil { + input.TracesEndpoint = tracesURL.String() + } + if metricsURL != nil { + input.MetricsEndpoint = metricsURL.String() + } + return input, nil +} + +func cleanupStackCollector(ctx Context, namespace string) error { + if err := DeleteIfExists[*corev1.Service](ctx, types.NamespacedName{ + Name: serviceName, Namespace: namespace, + }); err != nil { + return err + } + if err := DeleteIfExists[*appsv1.Deployment](ctx, types.NamespacedName{ + Name: deploymentName, Namespace: namespace, + }); err != nil { + return err + } + return DeleteIfExists[*corev1.ConfigMap](ctx, types.NamespacedName{ + Name: "otel-collector-config", Namespace: namespace, + }) +} + +func collectorImageForPlatform(ctx Context) string { + if img := ctx.GetPlatform().CollectorImage; img != "" { + return img + } + return defaultCollectorImage +} + +func envSafe(s string) string { + replacer := func(r rune) rune { + if (r >= 'A' && r <= 'Z') || (r >= '0' && r <= '9') || r == '_' { + return r + } + if r >= 'a' && r <= 'z' { + return r - 32 + } + return '_' + } + result := make([]rune, 0, len(s)) + for _, r := range s { + result = append(result, replacer(r)) + } + return string(result) +} + +func isCollectorResource(obj client.Object) bool { + return obj.GetName() == deploymentName || obj.GetName() == serviceName || obj.GetName() == "otel-collector-config" +} + +func enqueueAllEndpoints(ctx Context) []reconcile.Request { + var endpoints v1beta1.OtelExporterEndpointList + if err := ctx.GetClient().List(ctx, &endpoints); err != nil { + return nil + } + return MapObjectToReconcileRequests( + collectionutils.Map(endpoints.Items, func(e v1beta1.OtelExporterEndpoint) *v1beta1.OtelExporterEndpoint { return &e })..., + ) +} + +func init() { + Init( + WithStdReconciler(Reconcile, + WithFinalizer[*v1beta1.OtelExporterEndpoint](collectorFinalizer, Cleanup), + WithWatch[*v1beta1.OtelExporterEndpoint, *v1beta1.Stack](func(ctx Context, _ *v1beta1.Stack) []reconcile.Request { + return enqueueAllEndpoints(ctx) + }), + WithWatch[*v1beta1.OtelExporterEndpoint, *v1beta1.Settings](func(ctx Context, _ *v1beta1.Settings) []reconcile.Request { + return enqueueAllEndpoints(ctx) + }), + WithRaw[*v1beta1.OtelExporterEndpoint](func(ctx Context, b *builder.Builder) error { + collectorPredicate := predicate.NewPredicateFuncs(isCollectorResource) + enqueueHandler := handler.EnqueueRequestsFromMapFunc( + func(_ context.Context, _ client.Object) []reconcile.Request { + return enqueueAllEndpoints(ctx) + }, + ) + b.Watches(&corev1.ConfigMap{}, enqueueHandler, builder.WithPredicates(collectorPredicate)) + b.Watches(&appsv1.Deployment{}, enqueueHandler, builder.WithPredicates(collectorPredicate)) + b.Watches(&corev1.Service{}, enqueueHandler, builder.WithPredicates(collectorPredicate)) + return nil + }), + ), + ) +} diff --git a/internal/resources/settings/opentelemetry.go b/internal/resources/settings/opentelemetry.go index 3065c37e9..63a8dd13b 100644 --- a/internal/resources/settings/opentelemetry.go +++ b/internal/resources/settings/opentelemetry.go @@ -5,7 +5,9 @@ import ( "slices" "strings" - v1 "k8s.io/api/core/v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" "github.com/formancehq/operator/v3/internal/core" ) @@ -15,9 +17,20 @@ type MonitoringType string const ( MonitoringTypeTraces MonitoringType = "TRACES" MonitoringTypeMetrics MonitoringType = "METRICS" + + collectorServiceName = "otel-collector" + collectorServicePort = 4318 ) -func GetOTELEnvVars(ctx core.Context, stack, serviceName string, sliceStringSeparator string) ([]v1.EnvVar, error) { +func GetOTELEnvVars(ctx core.Context, stack, serviceName string, sliceStringSeparator string) ([]corev1.EnvVar, error) { + collectorEndpoint, err := getCollectorEndpoint(ctx, stack) + if err != nil { + return nil, err + } + if collectorEndpoint != "" { + return collectorEnvVars(ctx, collectorEndpoint, stack, serviceName, sliceStringSeparator) + } + traces, err := otelEnvVars(ctx, stack, MonitoringTypeTraces, serviceName, sliceStringSeparator) if err != nil { return nil, err @@ -34,7 +47,77 @@ func GetOTELEnvVars(ctx core.Context, stack, serviceName string, sliceStringSepa return append(traces, metrics...), nil } +func getCollectorEndpoint(ctx core.Context, stack string) (string, error) { + svc := &corev1.Service{} + err := ctx.GetClient().Get(ctx, types.NamespacedName{ + Namespace: stack, + Name: collectorServiceName, + }, svc) + if err != nil { + if apierrors.IsNotFound(err) { + return "", nil + } + return "", err + } + return fmt.Sprintf("%s.%s:%d", collectorServiceName, stack, collectorServicePort), nil +} + +func collectorEnvVars(ctx core.Context, collectorEndpoint, stack, serviceName, sliceStringSeparator string) ([]corev1.EnvVar, error) { + resourceAttributes := map[string]string{} + for _, signal := range []string{"traces", "metrics"} { + attrs, err := GetMap(ctx, stack, "opentelemetry", signal, "resource-attributes") + if err != nil { + return nil, err + } + for k, v := range attrs { + resourceAttributes[k] = v + } + } + resourceAttributes["stack"] = stack + resourceAttributes["pod-name"] = "$(POD_NAME)" + + resourceAttributesArray := make([]string, 0, len(resourceAttributes)) + for k, v := range resourceAttributes { + resourceAttributesArray = append(resourceAttributesArray, fmt.Sprintf("%s=%s", k, v)) + } + slices.Sort(resourceAttributesArray) + + return []corev1.EnvVar{ + core.Env("OTEL_TRACES", "true"), + core.Env("OTEL_TRACES_BATCH", "true"), + core.Env("OTEL_TRACES_EXPORTER", "otlp"), + core.Env("OTEL_TRACES_EXPORTER_OTLP_ENDPOINT", collectorEndpoint), + core.Env("OTEL_TRACES_EXPORTER_OTLP_MODE", "http"), + core.EnvFromBool("OTEL_TRACES_EXPORTER_OTLP_INSECURE", true), + core.Env("OTEL_METRICS", "true"), + core.Env("OTEL_METRICS_BATCH", "true"), + core.Env("OTEL_METRICS_EXPORTER", "otlp"), + core.Env("OTEL_METRICS_EXPORTER_OTLP_ENDPOINT", collectorEndpoint), + core.Env("OTEL_METRICS_EXPORTER_OTLP_MODE", "http"), + core.EnvFromBool("OTEL_METRICS_EXPORTER_OTLP_INSECURE", true), + core.Env("OTEL_METRICS_RUNTIME", "true"), + core.Env("OTEL_SERVICE_NAME", serviceName), + { + Name: "POD_NAME", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "metadata.name", + }, + }, + }, + core.Env("OTEL_RESOURCE_ATTRIBUTES", strings.Join(resourceAttributesArray, sliceStringSeparator)), + }, nil +} + func HasOpenTelemetryTracesEnabled(ctx core.Context, stack string) (bool, error) { + collectorEndpoint, err := getCollectorEndpoint(ctx, stack) + if err != nil { + return false, err + } + if collectorEndpoint != "" { + return true, nil + } + v, err := GetURL(ctx, stack, "opentelemetry", "traces", "dsn") if err != nil { return false, err @@ -47,7 +130,7 @@ func HasOpenTelemetryTracesEnabled(ctx core.Context, stack string) (bool, error) return true, nil } -func otelEnvVars(ctx core.Context, stack string, monitoringType MonitoringType, serviceName, sliceStringSeparator string) ([]v1.EnvVar, error) { +func otelEnvVars(ctx core.Context, stack string, monitoringType MonitoringType, serviceName, sliceStringSeparator string) ([]corev1.EnvVar, error) { otlp, err := GetURL(ctx, stack, "opentelemetry", strings.ToLower(string(monitoringType)), "dsn") if err != nil { @@ -57,7 +140,7 @@ func otelEnvVars(ctx core.Context, stack string, monitoringType MonitoringType, return nil, nil } - ret := []v1.EnvVar{ + ret := []corev1.EnvVar{ core.Env(fmt.Sprintf("OTEL_%s", string(monitoringType)), "true"), core.Env(fmt.Sprintf("OTEL_%s_BATCH", string(monitoringType)), "true"), core.Env(fmt.Sprintf("OTEL_%s_EXPORTER", string(monitoringType)), "otlp"), @@ -66,8 +149,8 @@ func otelEnvVars(ctx core.Context, stack string, monitoringType MonitoringType, core.Env(fmt.Sprintf("OTEL_%s_EXPORTER_OTLP_MODE", string(monitoringType)), otlp.Scheme), { Name: "POD_NAME", - ValueFrom: &v1.EnvVarSource{ - FieldRef: &v1.ObjectFieldSelector{ + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ FieldPath: "metadata.name", }, }, @@ -75,7 +158,7 @@ func otelEnvVars(ctx core.Context, stack string, monitoringType MonitoringType, } // If the path is not empty, we use the full URL as the endpoint. - var otlpEndpoint v1.EnvVar + var otlpEndpoint corev1.EnvVar otlpEndpointEnvName := fmt.Sprintf("OTEL_%s_EXPORTER_OTLP_ENDPOINT", string(monitoringType)) if otlp.Path != "" { otlpEndpoint = core.Env(otlpEndpointEnvName, otlp.String()) diff --git a/internal/tests/otelexporterendpoint_controller_test.go b/internal/tests/otelexporterendpoint_controller_test.go new file mode 100644 index 000000000..8b45dfbcd --- /dev/null +++ b/internal/tests/otelexporterendpoint_controller_test.go @@ -0,0 +1,254 @@ +package tests_test + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + v1beta1 "github.com/formancehq/operator/v3/api/formance.com/v1beta1" + . "github.com/formancehq/operator/v3/internal/tests/internal" +) + +var _ = Describe("OtelExporterEndpointController", func() { + Context("When creating an OtelExporterEndpoint with stackSelector matching a stack", func() { + var ( + stack *v1beta1.Stack + endpoint *v1beta1.OtelExporterEndpoint + ) + BeforeEach(func() { + stack = &v1beta1.Stack{ + ObjectMeta: metav1.ObjectMeta{ + Name: RandObjectMeta().Name, + Labels: map[string]string{ + "formance.com/stack": "sdymzzszghxw-ryeg", + }, + }, + Spec: v1beta1.StackSpec{Version: "v99.0.0"}, + } + endpoint = &v1beta1.OtelExporterEndpoint{ + ObjectMeta: RandObjectMeta(), + Spec: v1beta1.OtelExporterEndpointSpec{ + StackSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "formance.com/stack": "sdymzzszghxw-ryeg", + }, + }, + Traces: &v1beta1.OtelSignalConfig{ + Endpoint: "http://my-collector:4318", + }, + }, + } + }) + JustBeforeEach(func() { + Expect(Create(stack)).To(Succeed()) + Expect(Create(endpoint)).To(Succeed()) + }) + AfterEach(func() { + Expect(Delete(endpoint)).To(Succeed()) + Expect(Delete(stack)).To(Succeed()) + }) + It("Should create a ConfigMap, Deployment, and Service", func() { + By("Should set the status to ready", func() { + Eventually(func(g Gomega) bool { + g.Expect(LoadResource("", endpoint.Name, endpoint)).To(Succeed()) + return endpoint.Status.Ready + }).Should(BeTrue()) + }) + By("Should track the matching stack", func() { + Expect(endpoint.Status.Stacks).To(ContainElement(stack.Name)) + }) + By("Should create a ConfigMap with collector config", func() { + cm := &corev1.ConfigMap{} + Eventually(func() error { + return LoadResource(stack.Name, "otel-collector-config", cm) + }).Should(Succeed()) + Expect(cm.Data).To(HaveKey("otel-collector-config.yaml")) + Expect(cm.Data["otel-collector-config.yaml"]).To(ContainSubstring("http://my-collector:4318")) + }) + By("Should create a Deployment", func() { + deployment := &appsv1.Deployment{} + Eventually(func() error { + return LoadResource(stack.Name, "otel-collector", deployment) + }).Should(Succeed()) + }) + By("Should create a Service", func() { + svc := &corev1.Service{} + Eventually(func() error { + return LoadResource(stack.Name, "otel-collector", svc) + }).Should(Succeed()) + }) + }) + }) + + Context("When no OtelExporterEndpoints exist and no Settings", func() { + var ( + stack *v1beta1.Stack + ) + BeforeEach(func() { + stack = &v1beta1.Stack{ + ObjectMeta: metav1.ObjectMeta{ + Name: RandObjectMeta().Name, + Labels: map[string]string{ + "formance.com/stack": "sdymzzszghxw-ryeg", + }, + }, + Spec: v1beta1.StackSpec{Version: "v99.0.0"}, + } + }) + JustBeforeEach(func() { + Expect(Create(stack)).To(Succeed()) + }) + AfterEach(func() { + Expect(Delete(stack)).To(Succeed()) + }) + It("Should not create a collector service", func() { + svc := &corev1.Service{} + Consistently(func() error { + return LoadResource(stack.Name, "otel-collector", svc) + }).ShouldNot(Succeed()) + }) + }) + + Context("When creating an OtelExporterEndpoint targeting all stacks", func() { + var ( + stack *v1beta1.Stack + endpoint *v1beta1.OtelExporterEndpoint + authSecret *corev1.Secret + ) + BeforeEach(func() { + stack = &v1beta1.Stack{ + ObjectMeta: metav1.ObjectMeta{ + Name: RandObjectMeta().Name, + Labels: map[string]string{ + "formance.com/stack": "sdymzzszghxw-ryeg", + }, + }, + Spec: v1beta1.StackSpec{Version: "v99.0.0"}, + } + endpoint = &v1beta1.OtelExporterEndpoint{ + ObjectMeta: RandObjectMeta(), + Spec: v1beta1.OtelExporterEndpointSpec{ + StackSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "formance.com/stack", + Operator: metav1.LabelSelectorOpExists, + }, + }, + }, + Traces: &v1beta1.OtelSignalConfig{ + Endpoint: "https://support.frmnc.net", + Auth: &v1beta1.OtelAuthConfig{ + Type: "bearer", + FromSecret: "formance-license", + }, + }, + ResourceAttributes: map[string]string{ + "cluster.id": "test-cluster", + }, + }, + } + }) + JustBeforeEach(func() { + Expect(Create(stack)).To(Succeed()) + Eventually(func() error { + ns := &corev1.Namespace{} + return LoadResource("", stack.Name, ns) + }).Should(Succeed()) + authSecret = &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "formance-license", + Namespace: stack.Name, + }, + Data: map[string][]byte{ + "token": []byte("test-token"), + }, + } + Expect(Create(authSecret)).To(Succeed()) + Expect(Create(endpoint)).To(Succeed()) + }) + AfterEach(func() { + Expect(Delete(endpoint)).To(Succeed()) + Expect(Delete(authSecret)).To(Succeed()) + Expect(Delete(stack)).To(Succeed()) + }) + It("Should create a collector with auth headers", func() { + Eventually(func(g Gomega) bool { + g.Expect(LoadResource("", endpoint.Name, endpoint)).To(Succeed()) + return endpoint.Status.Ready + }).Should(BeTrue()) + + cm := &corev1.ConfigMap{} + Eventually(func() error { + return LoadResource(stack.Name, "otel-collector-config", cm) + }).Should(Succeed()) + Expect(cm.Data["otel-collector-config.yaml"]).To(ContainSubstring("https://support.frmnc.net")) + Expect(cm.Data["otel-collector-config.yaml"]).To(ContainSubstring("authorization")) + }) + }) + + Context("When creating an OtelExporterEndpoint with Settings fallback", func() { + var ( + stack *v1beta1.Stack + setting *v1beta1.Settings + endpoint *v1beta1.OtelExporterEndpoint + ) + BeforeEach(func() { + stack = &v1beta1.Stack{ + ObjectMeta: metav1.ObjectMeta{ + Name: RandObjectMeta().Name, + Labels: map[string]string{ + "formance.com/stack": "sdymzzszghxw-ryeg", + }, + }, + Spec: v1beta1.StackSpec{Version: "v99.0.0"}, + } + setting = &v1beta1.Settings{ + ObjectMeta: RandObjectMeta(), + Spec: v1beta1.SettingsSpec{ + Stacks: []string{"*"}, + Key: "opentelemetry.traces.dsn", + Value: "http://settings-collector:4318", + }, + } + endpoint = &v1beta1.OtelExporterEndpoint{ + ObjectMeta: RandObjectMeta(), + Spec: v1beta1.OtelExporterEndpointSpec{ + StackSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "formance.com/stack": "sdymzzszghxw-ryeg", + }, + }, + Traces: &v1beta1.OtelSignalConfig{ + Endpoint: "http://my-collector:4318", + }, + }, + } + }) + JustBeforeEach(func() { + Expect(Create(stack)).To(Succeed()) + Expect(Create(setting)).To(Succeed()) + Expect(Create(endpoint)).To(Succeed()) + }) + AfterEach(func() { + Expect(Delete(setting)).To(Succeed()) + Expect(Delete(endpoint)).To(Succeed()) + Expect(Delete(stack)).To(Succeed()) + }) + It("Should include both CRD and Settings endpoints in collector config", func() { + Eventually(func(g Gomega) bool { + g.Expect(LoadResource("", endpoint.Name, endpoint)).To(Succeed()) + return endpoint.Status.Ready + }).Should(BeTrue()) + + cm := &corev1.ConfigMap{} + Eventually(func() error { + return LoadResource(stack.Name, "otel-collector-config", cm) + }).Should(Succeed()) + Expect(cm.Data["otel-collector-config.yaml"]).To(ContainSubstring("http://my-collector:4318")) + Expect(cm.Data["otel-collector-config.yaml"]).To(ContainSubstring("settings-collector:4318")) + }) + }) +}) From 896f3da95991af9ebd26c6bc5b32fbad5cdcbd4c Mon Sep 17 00:00:00 2001 From: Sofia Simdianova Date: Tue, 2 Jun 2026 12:53:00 +0200 Subject: [PATCH 2/3] feat: add OtelExporterEndpoint crd --- .../formance.com_v1beta1_otelexporterendpoint.yaml | 8 ++++---- helm/operator/values.yaml | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/config/samples/formance.com_v1beta1_otelexporterendpoint.yaml b/config/samples/formance.com_v1beta1_otelexporterendpoint.yaml index cd94ce793..2bd6b44f0 100644 --- a/config/samples/formance.com_v1beta1_otelexporterendpoint.yaml +++ b/config/samples/formance.com_v1beta1_otelexporterendpoint.yaml @@ -33,14 +33,14 @@ spec: - key: formance.com/stack operator: Exists traces: - endpoint: "https://otel-licence.telemetry-support-c1f3ad1.v2.formance.dev" + endpoint: "https://otel-support.internal.frmnc.net/v1/traces" auth: type: bearer - fromSecret: "formance-license" + fromSecret: "formance-licence" metrics: - endpoint: "https://otel-licence.telemetry-support-c1f3ad1.v2.formance.dev" + endpoint: "https://otel-support.internal.frmnc.net/v1/metrics" auth: type: bearer - fromSecret: "formance-license" + fromSecret: "formance-licence" resourceAttributes: cluster.id: "abc-123" diff --git a/helm/operator/values.yaml b/helm/operator/values.yaml index 6021c9d8f..b8692618b 100644 --- a/helm/operator/values.yaml +++ b/helm/operator/values.yaml @@ -8,7 +8,7 @@ global: # Enable the Formance support OtelExporterEndpoint enabled: false # The support collector endpoint - endpoint: "https://otel-licence.telemetry-support-c1f3ad1.v2.formance.dev" + endpoint: "https://otel-support.internal.frmnc.net" # Secret name containing the bearer token for auth (key: "token"). # Defaults to the licence secret if set. authSecret: "" From b3699699d9a9275918d7deb93c4d8180d792da49 Mon Sep 17 00:00:00 2001 From: Sofia Simdianova Date: Wed, 3 Jun 2026 16:43:41 +0200 Subject: [PATCH 3/3] feat: add OtelExporterEndpoint CRD --- .../v1beta1/otelexporterendpoint_types.go | 33 +- cmd/main.go | 2 +- .../formance.com_otelexporterendpoints.yaml | 47 +- .../02-Custom Resource Definitions.md | 12 +- .../settings.catalog.json | 12 +- ...on_otelexporterendpoints.formance.com.yaml | 47 +- internal/core/platform.go | 10 + .../otelexporterendpoints/collector_config.go | 71 ++- .../collector_config_test.go | 70 +++ .../resources/otelexporterendpoints/init.go | 483 ++++++++++++++++-- internal/resources/settings/opentelemetry.go | 83 +-- .../otelexporterendpoint_controller_test.go | 64 +++ 12 files changed, 822 insertions(+), 112 deletions(-) diff --git a/api/formance.com/v1beta1/otelexporterendpoint_types.go b/api/formance.com/v1beta1/otelexporterendpoint_types.go index da527102a..e4b321f3a 100644 --- a/api/formance.com/v1beta1/otelexporterendpoint_types.go +++ b/api/formance.com/v1beta1/otelexporterendpoint_types.go @@ -20,35 +20,62 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +// OtelAuthConfig configures per-signal authentication. +// Auth is per-signal so traces and metrics can use different credentials if needed. type OtelAuthConfig struct { + // Type is the authentication type. // +kubebuilder:validation:Enum=bearer - Type string `json:"type"` + Type string `json:"type"` + // FromSecret references a Secret name (expected key is "token"). + // The controller creates a ResourceReference to replicate the secret into each target stack namespace. + // The source secret must have a "formance.com/stack" label set to "any" or a specific stack name. + // +kubebuilder:validation:MinLength=1 FromSecret string `json:"fromSecret"` } +// OtelSignalConfig configures a single signal type (traces or metrics). +// Each signal type has its own endpoint and authentication block, allowing +// different destinations or credentials per signal. +// Protocol is inferred from the URL scheme: grpc:// for gRPC, http:// or https:// for HTTP/protobuf (default). type OtelSignalConfig struct { + // Endpoint URL for the signal (e.g., "http://my-collector:4318", "grpc://my-collector:4317"). + // Protocol is inferred from the URL scheme. HTTP/protobuf is the default for firewall compatibility. + // +kubebuilder:validation:MinLength=1 Endpoint string `json:"endpoint"` + // Auth is the optional per-signal authentication configuration. // +optional Auth *OtelAuthConfig `json:"auth,omitempty"` } +// +kubebuilder:validation:XValidation:rule="has(self.traces) || has(self.metrics)",message="at least one signal (traces or metrics) must be configured" type OtelExporterEndpointSpec struct { - // +optional - StackSelector *metav1.LabelSelector `json:"stackSelector,omitempty"` + // StackSelector is a standard Kubernetes LabelSelector (matchLabels/matchExpressions). + // One CRD can target all current and future stacks with a single selector. + // Matches the pattern established by Settings. + StackSelector *metav1.LabelSelector `json:"stackSelector"` + // Traces configures the traces signal. At least one of traces or metrics must be set. + // Logs are intentionally out of scope. // +optional Traces *OtelSignalConfig `json:"traces,omitempty"` + // Metrics configures the metrics signal. At least one of traces or metrics must be set. + // Logs are intentionally out of scope. // +optional Metrics *OtelSignalConfig `json:"metrics,omitempty"` + // ResourceAttributes are injected into outgoing telemetry via a collector processor. // +optional ResourceAttributes map[string]string `json:"resourceAttributes,omitempty"` } +// OtelExporterEndpointStatus represents the observed state of an OtelExporterEndpoint. type OtelExporterEndpointStatus struct { Status `json:",inline"` + // Stacks is a sorted list of stack names currently targeted by this endpoint. + // Includes stacks with successful reconciliation and stacks with transient errors or pending cleanup. + // Used by the finalizer to find previously matched stacks during deletion. // +optional Stacks []string `json:"stacks,omitempty"` } diff --git a/cmd/main.go b/cmd/main.go index 4a7174757..bcde35a70 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -75,7 +75,7 @@ func main() { flag.StringVar(&licenceSecret, "licence-secret", "", "The licence secret that contains the token and the issuer") flag.StringVar(&licenceNamespace, "licence-namespace", "", "The namespace where the licence secret lives (defaults to operator namespace)") flag.StringVar(&utilsVersion, "utils-version", "latest", "The version of the operator utils image") - flag.StringVar(&collectorImage, "collector-image", "otel/opentelemetry-collector-contrib:0.151.0", "The OTel Collector image for OtelExporterEndpoint resources") + flag.StringVar(&collectorImage, "collector-image", core.DefaultCollectorImage, "The OTel Collector image for OtelExporterEndpoint resources") opts := zap.Options{ Development: false, } diff --git a/config/crd/bases/formance.com_otelexporterendpoints.yaml b/config/crd/bases/formance.com_otelexporterendpoints.yaml index b9a0c96bd..b7ac9e7c8 100644 --- a/config/crd/bases/formance.com_otelexporterendpoints.yaml +++ b/config/crd/bases/formance.com_otelexporterendpoints.yaml @@ -50,12 +50,22 @@ spec: spec: properties: metrics: + description: |- + Metrics configures the metrics signal. At least one of traces or metrics must be set. + Logs are intentionally out of scope. properties: auth: + description: Auth is the optional per-signal authentication configuration. properties: fromSecret: + description: |- + FromSecret references a Secret name (expected key is "token"). + The controller creates a ResourceReference to replicate the secret into each target stack namespace. + The source secret must have a "formance.com/stack" label set to "any" or a specific stack name. + minLength: 1 type: string type: + description: Type is the authentication type. enum: - bearer type: string @@ -64,6 +74,10 @@ spec: - type type: object endpoint: + description: |- + Endpoint URL for the signal (e.g., "http://my-collector:4318", "grpc://my-collector:4317"). + Protocol is inferred from the URL scheme. HTTP/protobuf is the default for firewall compatibility. + minLength: 1 type: string required: - endpoint @@ -71,12 +85,14 @@ spec: resourceAttributes: additionalProperties: type: string + description: ResourceAttributes are injected into outgoing telemetry + via a collector processor. type: object stackSelector: description: |- - A label selector is a label query over a set of resources. The result of matchLabels and - matchExpressions are ANDed. An empty label selector matches all objects. A null - label selector matches no objects. + StackSelector is a standard Kubernetes LabelSelector (matchLabels/matchExpressions). + One CRD can target all current and future stacks with a single selector. + Matches the pattern established by Settings. properties: matchExpressions: description: matchExpressions is a list of label selector requirements. @@ -122,12 +138,22 @@ spec: type: object x-kubernetes-map-type: atomic traces: + description: |- + Traces configures the traces signal. At least one of traces or metrics must be set. + Logs are intentionally out of scope. properties: auth: + description: Auth is the optional per-signal authentication configuration. properties: fromSecret: + description: |- + FromSecret references a Secret name (expected key is "token"). + The controller creates a ResourceReference to replicate the secret into each target stack namespace. + The source secret must have a "formance.com/stack" label set to "any" or a specific stack name. + minLength: 1 type: string type: + description: Type is the authentication type. enum: - bearer type: string @@ -136,12 +162,23 @@ spec: - type type: object endpoint: + description: |- + Endpoint URL for the signal (e.g., "http://my-collector:4318", "grpc://my-collector:4317"). + Protocol is inferred from the URL scheme. HTTP/protobuf is the default for firewall compatibility. + minLength: 1 type: string required: - endpoint type: object + required: + - stackSelector type: object + x-kubernetes-validations: + - message: at least one signal (traces or metrics) must be configured + rule: has(self.traces) || has(self.metrics) status: + description: OtelExporterEndpointStatus represents the observed state + of an OtelExporterEndpoint. properties: conditions: items: @@ -205,6 +242,10 @@ spec: reconciled type: boolean stacks: + description: |- + Stacks is a sorted list of stack names currently targeted by this endpoint. + Includes stacks with successful reconciliation and stacks with transient errors or pending cleanup. + Used by the finalizer to find previously matched stacks during deletion. items: type: string type: array diff --git a/docs/09-Configuration reference/02-Custom Resource Definitions.md b/docs/09-Configuration reference/02-Custom Resource Definitions.md index f728ee6dd..2c26b4944 100644 --- a/docs/09-Configuration reference/02-Custom Resource Definitions.md +++ b/docs/09-Configuration reference/02-Custom Resource Definitions.md @@ -2231,10 +2231,10 @@ Multiple OtelExporterEndpoints can target the same stacks — the collector fans | Field | Description | Default | Validation | | --- | --- | --- | --- | -| `stackSelector` _[LabelSelector](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#labelselector-v1-meta)_ | | | | -| `traces` _[OtelSignalConfig](#otelsignalconfig)_ | | | | -| `metrics` _[OtelSignalConfig](#otelsignalconfig)_ | | | | -| `resourceAttributes` _object (keys:string, values:string)_ | | | | +| `stackSelector` _[LabelSelector](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#labelselector-v1-meta)_ | StackSelector is a standard Kubernetes LabelSelector (matchLabels/matchExpressions).
One CRD can target all current and future stacks with a single selector.
Matches the pattern established by Settings. | | | +| `traces` _[OtelSignalConfig](#otelsignalconfig)_ | Traces configures the traces signal. At least one of traces or metrics must be set.
Logs are intentionally out of scope. | | | +| `metrics` _[OtelSignalConfig](#otelsignalconfig)_ | Metrics configures the metrics signal. At least one of traces or metrics must be set.
Logs are intentionally out of scope. | | | +| `resourceAttributes` _object (keys:string, values:string)_ | ResourceAttributes are injected into outgoing telemetry via a collector processor. | | | @@ -2244,7 +2244,7 @@ Multiple OtelExporterEndpoints can target the same stacks — the collector fans - +OtelExporterEndpointStatus represents the observed state of an OtelExporterEndpoint. @@ -2264,7 +2264,7 @@ Multiple OtelExporterEndpoints can target the same stacks — the collector fans | --- | --- | --- | --- | | `ready` _boolean_ | Ready indicates if the resource is seen as completely reconciled | | | | `info` _string_ | Info can contain any additional like reconciliation errors | | | -| `stacks` _string array_ | | | | +| `stacks` _string array_ | Stacks is a sorted list of stack names currently targeted by this endpoint.
Includes stacks with successful reconciliation and stacks with transient errors or pending cleanup.
Used by the finalizer to find previously matched stacks during deletion. | | | #### ResourceReference diff --git a/docs/09-Configuration reference/settings.catalog.json b/docs/09-Configuration reference/settings.catalog.json index add432716..f4460d63b 100644 --- a/docs/09-Configuration reference/settings.catalog.json +++ b/docs/09-Configuration reference/settings.catalog.json @@ -540,36 +540,36 @@ "key": "opentelemetry.\u003cmonitoring-type\u003e.dsn", "valueType": "uri", "sources": [ - "internal/resources/settings/opentelemetry.go:135" + "internal/resources/settings/opentelemetry.go:156" ] }, { "key": "opentelemetry.\u003cmonitoring-type\u003e.resource-attributes", "valueType": "map[string]string", "sources": [ - "internal/resources/settings/opentelemetry.go:179" + "internal/resources/settings/opentelemetry.go:200" ] }, { "key": "opentelemetry.\u003csignal\u003e.resource-attributes", "valueType": "map[string]string", "sources": [ - "internal/resources/settings/opentelemetry.go:68" + "internal/resources/settings/opentelemetry.go:81" ] }, { "key": "opentelemetry.metrics.dsn", "valueType": "uri", "sources": [ - "internal/resources/otelexporterendpoints/init.go:373" + "internal/resources/otelexporterendpoints/init.go:627" ] }, { "key": "opentelemetry.traces.dsn", "valueType": "uri", "sources": [ - "internal/resources/otelexporterendpoints/init.go:369", - "internal/resources/settings/opentelemetry.go:121" + "internal/resources/otelexporterendpoints/init.go:623", + "internal/resources/settings/opentelemetry.go:146" ] }, { diff --git a/helm/crds/templates/crds/apiextensions.k8s.io_v1_customresourcedefinition_otelexporterendpoints.formance.com.yaml b/helm/crds/templates/crds/apiextensions.k8s.io_v1_customresourcedefinition_otelexporterendpoints.formance.com.yaml index ff613832d..ef20dafad 100644 --- a/helm/crds/templates/crds/apiextensions.k8s.io_v1_customresourcedefinition_otelexporterendpoints.formance.com.yaml +++ b/helm/crds/templates/crds/apiextensions.k8s.io_v1_customresourcedefinition_otelexporterendpoints.formance.com.yaml @@ -53,12 +53,22 @@ spec: spec: properties: metrics: + description: |- + Metrics configures the metrics signal. At least one of traces or metrics must be set. + Logs are intentionally out of scope. properties: auth: + description: Auth is the optional per-signal authentication configuration. properties: fromSecret: + description: |- + FromSecret references a Secret name (expected key is "token"). + The controller creates a ResourceReference to replicate the secret into each target stack namespace. + The source secret must have a "formance.com/stack" label set to "any" or a specific stack name. + minLength: 1 type: string type: + description: Type is the authentication type. enum: - bearer type: string @@ -67,6 +77,10 @@ spec: - type type: object endpoint: + description: |- + Endpoint URL for the signal (e.g., "http://my-collector:4318", "grpc://my-collector:4317"). + Protocol is inferred from the URL scheme. HTTP/protobuf is the default for firewall compatibility. + minLength: 1 type: string required: - endpoint @@ -74,12 +88,14 @@ spec: resourceAttributes: additionalProperties: type: string + description: ResourceAttributes are injected into outgoing telemetry + via a collector processor. type: object stackSelector: description: |- - A label selector is a label query over a set of resources. The result of matchLabels and - matchExpressions are ANDed. An empty label selector matches all objects. A null - label selector matches no objects. + StackSelector is a standard Kubernetes LabelSelector (matchLabels/matchExpressions). + One CRD can target all current and future stacks with a single selector. + Matches the pattern established by Settings. properties: matchExpressions: description: matchExpressions is a list of label selector requirements. @@ -125,12 +141,22 @@ spec: type: object x-kubernetes-map-type: atomic traces: + description: |- + Traces configures the traces signal. At least one of traces or metrics must be set. + Logs are intentionally out of scope. properties: auth: + description: Auth is the optional per-signal authentication configuration. properties: fromSecret: + description: |- + FromSecret references a Secret name (expected key is "token"). + The controller creates a ResourceReference to replicate the secret into each target stack namespace. + The source secret must have a "formance.com/stack" label set to "any" or a specific stack name. + minLength: 1 type: string type: + description: Type is the authentication type. enum: - bearer type: string @@ -139,12 +165,23 @@ spec: - type type: object endpoint: + description: |- + Endpoint URL for the signal (e.g., "http://my-collector:4318", "grpc://my-collector:4317"). + Protocol is inferred from the URL scheme. HTTP/protobuf is the default for firewall compatibility. + minLength: 1 type: string required: - endpoint type: object + required: + - stackSelector type: object + x-kubernetes-validations: + - message: at least one signal (traces or metrics) must be configured + rule: has(self.traces) || has(self.metrics) status: + description: OtelExporterEndpointStatus represents the observed state + of an OtelExporterEndpoint. properties: conditions: items: @@ -208,6 +245,10 @@ spec: reconciled type: boolean stacks: + description: |- + Stacks is a sorted list of stack names currently targeted by this endpoint. + Includes stacks with successful reconciliation and stacks with transient errors or pending cleanup. + Used by the finalizer to find previously matched stacks during deletion. items: type: string type: array diff --git a/internal/core/platform.go b/internal/core/platform.go index 10cc6bb29..f66dc4429 100644 --- a/internal/core/platform.go +++ b/internal/core/platform.go @@ -1,5 +1,15 @@ package core +const ( + DefaultCollectorImage = "otel/opentelemetry-collector-contrib:0.151.0" + + SignalTracesAnnotation = "formance.com/otel-traces-enabled" + SignalMetricsAnnotation = "formance.com/otel-metrics-enabled" + + CollectorManagedByLabel = "formance.com/managed-by" + CollectorManagedByValue = "otelexporterendpoint" +) + type Platform struct { // Cloud region where the stack is deployed Region string diff --git a/internal/resources/otelexporterendpoints/collector_config.go b/internal/resources/otelexporterendpoints/collector_config.go index be9f36756..dd8f6963c 100644 --- a/internal/resources/otelexporterendpoints/collector_config.go +++ b/internal/resources/otelexporterendpoints/collector_config.go @@ -1,6 +1,8 @@ package otelexporterendpoints import ( + "crypto/sha256" + "encoding/hex" "fmt" "net/url" "slices" @@ -13,10 +15,19 @@ import ( ) type collectorConfig struct { - Receivers collectorReceivers `yaml:"receivers"` - Processors map[string]any `yaml:"processors,omitempty"` - Exporters map[string]any `yaml:"exporters"` - Service collectorService `yaml:"service"` + Extensions collectorExtensions `yaml:"extensions"` + Receivers collectorReceivers `yaml:"receivers"` + Processors map[string]any `yaml:"processors,omitempty"` + Exporters map[string]any `yaml:"exporters"` + Service collectorService `yaml:"service"` +} + +type collectorExtensions struct { + HealthCheck healthCheckExtension `yaml:"health_check"` +} + +type healthCheckExtension struct { + Endpoint string `yaml:"endpoint"` } type collectorReceivers struct { @@ -36,7 +47,8 @@ type otlpHTTP struct { } type collectorService struct { - Pipelines map[string]collectorPipeline `yaml:"pipelines"` + Extensions []string `yaml:"extensions"` + Pipelines map[string]collectorPipeline `yaml:"pipelines"` } type collectorPipeline struct { @@ -48,6 +60,11 @@ type collectorPipeline struct { type otlpExporter struct { Endpoint string `yaml:"endpoint"` Headers map[string]string `yaml:"headers,omitempty"` + TLS *otlpTLSConfig `yaml:"tls,omitempty"` +} + +type otlpTLSConfig struct { + Insecure bool `yaml:"insecure"` } type resourceProcessorAttribute struct { @@ -85,6 +102,14 @@ func stripScheme(endpoint string) string { return endpoint } +func isInsecure(endpoint string) bool { + u, err := url.Parse(endpoint) + if err != nil { + return false + } + return u.Query().Get("insecure") == "true" +} + func buildExporter(input exporterInput) (string, any) { protocol := inferProtocol(input.signal.Endpoint) endpoint := stripScheme(input.signal.Endpoint) @@ -96,6 +121,11 @@ func buildExporter(input exporterInput) (string, any) { } } + var tls *otlpTLSConfig + if protocol == "grpc" && isInsecure(input.signal.Endpoint) { + tls = &otlpTLSConfig{Insecure: true} + } + prefix := "otlphttp/" if protocol == "grpc" { prefix = "otlp/" @@ -103,6 +133,7 @@ func buildExporter(input exporterInput) (string, any) { return prefix + input.name, otlpExporter{ Endpoint: endpoint, Headers: headers, + TLS: tls, } } @@ -199,12 +230,6 @@ func generateMergedCollectorConfig(endpoints []collectorInput, otelSettings *ote } } - if len(tracesPipelines) == 0 && len(metricsPipelines) == 0 { - exporters["nop"] = struct{}{} - tracesPipelines = []pipelineContribution{{exporter: "nop"}} - metricsPipelines = []pipelineContribution{{exporter: "nop"}} - } - if len(tracesPipelines) == 0 { exporters["nop"] = struct{}{} tracesPipelines = []pipelineContribution{{exporter: "nop"}} @@ -217,6 +242,11 @@ func generateMergedCollectorConfig(endpoints []collectorInput, otelSettings *ote pipelines := buildPipelines(tracesPipelines, metricsPipelines) cfg := collectorConfig{ + Extensions: collectorExtensions{ + HealthCheck: healthCheckExtension{ + Endpoint: fmt.Sprintf("0.0.0.0:%d", healthCheckPort), + }, + }, Receivers: collectorReceivers{ OTLP: otlpReceiver{ Protocols: otlpProtocols{ @@ -229,7 +259,8 @@ func generateMergedCollectorConfig(endpoints []collectorInput, otelSettings *ote Exporters: exporters, Processors: processors, Service: collectorService{ - Pipelines: pipelines, + Extensions: []string{"health_check"}, + Pipelines: pipelines, }, } @@ -272,7 +303,14 @@ func addSignalPipelines(pipelines map[string]collectorPipeline, signal string, c } return } - for proc, exporterList := range grouped { + procs := make([]string, 0, len(grouped)) + for proc := range grouped { + procs = append(procs, proc) + } + slices.Sort(procs) + + for _, proc := range procs { + exporterList := grouped[proc] suffix := "default" if proc != "" { parts := strings.SplitN(proc, "/", 2) @@ -300,5 +338,10 @@ func groupByProcessor(contributions []pipelineContribution) map[string][]string } func sanitizeName(name string) string { - return strings.ReplaceAll(name, ".", "-") + sanitized := strings.ReplaceAll(name, ".", "-") + if sanitized != name { + h := sha256.Sum256([]byte(name)) + sanitized = sanitized + "-" + hex.EncodeToString(h[:3]) + } + return sanitized } diff --git a/internal/resources/otelexporterendpoints/collector_config_test.go b/internal/resources/otelexporterendpoints/collector_config_test.go index ea6bea116..19dfd5a73 100644 --- a/internal/resources/otelexporterendpoints/collector_config_test.go +++ b/internal/resources/otelexporterendpoints/collector_config_test.go @@ -44,6 +44,8 @@ func TestGenerateMergedCollectorConfig(t *testing.T) { "otlphttp/monitoring-traces", "http://my-collector:4318", "nop", + "health_check", + "13133", }, }, { @@ -116,6 +118,54 @@ func TestGenerateMergedCollectorConfig(t *testing.T) { "authorization: Bearer ${env:AUTH_SUPPORT_TRACES}", }, }, + { + name: "grpc endpoint with insecure query param", + inputs: []collectorInput{ + { + Endpoint: endpoint("monitoring", v1beta1.OtelExporterEndpointSpec{ + Traces: &v1beta1.OtelSignalConfig{ + Endpoint: "grpc://my-collector:4317?insecure=true", + }, + }), + }, + }, + expectedContains: []string{ + "otlp/monitoring-traces", + "my-collector:4317", + "tls:", + "insecure: true", + }, + expectedNotContains: []string{"otlphttp/monitoring-traces"}, + }, + { + name: "grpc endpoint without insecure has no tls config", + inputs: []collectorInput{ + { + Endpoint: endpoint("monitoring", v1beta1.OtelExporterEndpointSpec{ + Traces: &v1beta1.OtelSignalConfig{ + Endpoint: "grpc://my-collector:4317", + }, + }), + }, + }, + expectedContains: []string{"otlp/monitoring-traces", "my-collector:4317"}, + expectedNotContains: []string{"tls:"}, + }, + { + name: "otel settings grpc with insecure", + inputs: []collectorInput{}, + otelSettings: &otelSettingsInput{ + TracesEndpoint: "grpc://settings-collector:4317?insecure=true", + MetricsEndpoint: "grpc://settings-collector:4317?insecure=true", + }, + expectedContains: []string{ + "otlp/settings-traces", + "otlp/settings-metrics", + "settings-collector:4317", + "tls:", + "insecure: true", + }, + }, { name: "no endpoints produces nop", inputs: []collectorInput{}, @@ -243,6 +293,16 @@ func TestInferProtocol(t *testing.T) { require.Equal(t, "http", inferProtocol("my-collector:4318")) } +func TestIsInsecure(t *testing.T) { + t.Parallel() + + require.True(t, isInsecure("grpc://my-collector:4317?insecure=true")) + require.False(t, isInsecure("grpc://my-collector:4317")) + require.False(t, isInsecure("grpc://my-collector:4317?insecure=false")) + require.False(t, isInsecure("http://my-collector:4318")) + require.False(t, isInsecure("https://support.frmnc.net")) +} + func TestStripScheme(t *testing.T) { t.Parallel() @@ -251,6 +311,16 @@ func TestStripScheme(t *testing.T) { require.Equal(t, "https://support.frmnc.net", stripScheme("https://support.frmnc.net")) } +func TestSanitizeName(t *testing.T) { + t.Parallel() + + require.Equal(t, "my-endpoint", sanitizeName("my-endpoint")) + require.Contains(t, sanitizeName("my.endpoint"), "my-endpoint-") + require.NotEqual(t, sanitizeName("my-endpoint"), sanitizeName("my.endpoint")) + + require.NotEqual(t, sanitizeName("a.b-c"), sanitizeName("a-b.c")) +} + func TestEnvSafe(t *testing.T) { t.Parallel() diff --git a/internal/resources/otelexporterendpoints/init.go b/internal/resources/otelexporterendpoints/init.go index d9fd67800..78a81d96d 100644 --- a/internal/resources/otelexporterendpoints/init.go +++ b/internal/resources/otelexporterendpoints/init.go @@ -20,16 +20,21 @@ import ( "context" "crypto/sha256" "encoding/base64" - "encoding/json" + "encoding/hex" + "errors" "fmt" + "reflect" "slices" "sort" + "strings" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/handler" @@ -38,6 +43,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" collectionutils "github.com/formancehq/go-libs/v5/pkg/types/collections" + "github.com/formancehq/go-libs/v5/pkg/types/pointer" v1beta1 "github.com/formancehq/operator/v3/api/formance.com/v1beta1" . "github.com/formancehq/operator/v3/internal/core" @@ -49,14 +55,26 @@ import ( //+kubebuilder:rbac:groups=formance.com,resources=otelexporterendpoints/finalizers,verbs=update const ( - defaultCollectorImage = "otel/opentelemetry-collector-contrib:0.151.0" - deploymentName = "otel-collector" - serviceName = "otel-collector" - collectorPort = 4318 + deploymentName = "otel-collector" + serviceName = "otel-collector" + collectorPort = 4318 + healthCheckPort = 13133 collectorFinalizer = "otelexporterendpoint.formance.com/finalizer" + + managedByLabel = CollectorManagedByLabel + managedByValue = CollectorManagedByValue + + collectorSignalAnnotation = "formance.com/otel-collector-signals" ) +func collectorLabels() map[string]string { + return map[string]string{ + "app.kubernetes.io/name": deploymentName, + managedByLabel: managedByValue, + } +} + func Reconcile(ctx Context, endpoint *v1beta1.OtelExporterEndpoint) error { selector, err := selectorFromSpec(endpoint.Spec.StackSelector) if err != nil { @@ -68,23 +86,85 @@ func Reconcile(ctx Context, endpoint *v1beta1.OtelExporterEndpoint) error { return err } + logger := log.FromContext(ctx) + targetedStacks := map[string]bool{} stackNames := make([]string, 0, len(stacks.Items)) + var stackErrors []string + var pendingStacks []string for i := range stacks.Items { stack := &stacks.Items[i] if !stack.GetDeletionTimestamp().IsZero() { continue } + targetedStacks[stack.Name] = true + stackNames = append(stackNames, stack.Name) if err := reconcileStackCollector(ctx, stack); err != nil { - return fmt.Errorf("reconciling collector for stack %s: %w", stack.Name, err) + if IsApplicationError(err) { + pendingStacks = append(pendingStacks, stack.Name) + } else { + logger.Error(err, "skipping stack due to reconciliation error", "stack", stack.Name) + stackErrors = append(stackErrors, fmt.Sprintf("%s: %s", stack.Name, err.Error())) + } + continue + } + + if !checkCollectorReady(ctx, stack.Name) { + pendingStacks = append(pendingStacks, stack.Name) + } + } + + for _, prev := range endpoint.Status.Stacks { + if !targetedStacks[prev] { + if err := cleanupStackCollector(ctx, prev); err != nil { + logger.Error(err, "failed to clean up orphaned collector", "stack", prev) + stackNames = append(stackNames, prev) + stackErrors = append(stackErrors, fmt.Sprintf("%s cleanup: %s", prev, err.Error())) + } } - stackNames = append(stackNames, stack.Name) } slices.Sort(stackNames) - endpoint.Status.Stacks = stackNames + endpoint.Status.Stacks = slices.Compact(stackNames) + + if len(stackErrors) > 0 { + endpoint.SetError(fmt.Sprintf("errors in stacks: %s", strings.Join(stackErrors, "; "))) + } else { + endpoint.SetError("") + } + + condition := v1beta1.NewCondition("CollectorsReady", endpoint.GetGeneration()) + if len(pendingStacks) > 0 { + condition.Fail(fmt.Sprintf("waiting for collectors in: %s", strings.Join(pendingStacks, ", "))) + } + endpoint.GetConditions().AppendOrReplace(*condition, func(c v1beta1.Condition) bool { + return c.Type == "CollectorsReady" + }) + + if len(stackErrors) > 0 { + return fmt.Errorf("partial failure: %s", strings.Join(stackErrors, "; ")) + } return nil } +func checkCollectorReady(ctx Context, stackName string) bool { + deployment := &appsv1.Deployment{} + err := ctx.GetClient().Get(ctx, types.NamespacedName{ + Namespace: stackName, + Name: deploymentName, + }, deployment) + if err != nil { + log.FromContext(ctx).V(1).Info("collector deployment not found", "stack", stackName, "error", err) + return false + } + if deployment.Status.ObservedGeneration != deployment.Generation { + return false + } + if deployment.Spec.Replicas != nil && deployment.Status.UpdatedReplicas < *deployment.Spec.Replicas { + return false + } + return deployment.Status.AvailableReplicas >= deployment.Status.UpdatedReplicas +} + func Cleanup(ctx Context, endpoint *v1beta1.OtelExporterEndpoint) error { selector, err := selectorFromSpec(endpoint.Spec.StackSelector) if err != nil { @@ -96,38 +176,94 @@ func Cleanup(ctx Context, endpoint *v1beta1.OtelExporterEndpoint) error { return err } + var errs []error + reconciledStacks := map[string]bool{} for i := range stacks.Items { + if !stacks.Items[i].GetDeletionTimestamp().IsZero() { + continue + } + reconciledStacks[stacks.Items[i].Name] = true if err := reconcileStackCollector(ctx, &stacks.Items[i]); err != nil { - return err + errs = append(errs, err) } } - return nil + + for _, stackName := range endpoint.Status.Stacks { + if reconciledStacks[stackName] { + continue + } + stack := &v1beta1.Stack{} + if err := ctx.GetClient().Get(ctx, types.NamespacedName{Name: stackName}, stack); err != nil { + if client.IgnoreNotFound(err) == nil { + if err := cleanupStackCollector(ctx, stackName); err != nil { + errs = append(errs, err) + } + continue + } + errs = append(errs, err) + continue + } + if err := reconcileStackCollector(ctx, stack); err != nil { + errs = append(errs, err) + } + } + return errors.Join(errs...) } func selectorFromSpec(ls *metav1.LabelSelector) (labels.Selector, error) { if ls == nil { - return labels.Everything(), nil + return nil, fmt.Errorf("stackSelector is required") } return metav1.LabelSelectorAsSelector(ls) } +func hasTraces(ep *v1beta1.OtelExporterEndpoint) bool { + return ep.Spec.Traces != nil && ep.Spec.Traces.Endpoint != "" +} + +func hasMetrics(ep *v1beta1.OtelExporterEndpoint) bool { + return ep.Spec.Metrics != nil && ep.Spec.Metrics.Endpoint != "" +} + +func hasRealSignal(ep *v1beta1.OtelExporterEndpoint) bool { + return hasTraces(ep) || hasMetrics(ep) +} + func reconcileStackCollector(ctx Context, stack *v1beta1.Stack) error { endpoints, err := findMatchingEndpoints(ctx, stack) if err != nil { return err } - if len(endpoints) == 0 { - return cleanupStackCollector(ctx, stack.Name) + activeEndpoints := make([]v1beta1.OtelExporterEndpoint, 0, len(endpoints)) + for i := range endpoints { + if hasRealSignal(&endpoints[i]) { + activeEndpoints = append(activeEndpoints, endpoints[i]) + } } - inputs, envVars := buildCollectorInputs(endpoints) - otelSettings, err := readOtelSettings(ctx, stack.Name) if err != nil { return err } + hasSettingsSignal := otelSettings != nil && (otelSettings.TracesEndpoint != "" || otelSettings.MetricsEndpoint != "") + if len(activeEndpoints) == 0 && !hasSettingsSignal { + return cleanupStackCollector(ctx, stack.Name) + } + + if err := ensureNoConflict(ctx, stack.Name); err != nil { + return err + } + + if err := ensureAuthSecretReferences(ctx, stack, activeEndpoints); err != nil { + return err + } + + inputs, envVars := buildCollectorInputs(activeEndpoints) + + hasTraces, hasMetrics := computeActiveSignals(activeEndpoints, otelSettings) + collectorConfigYAML, err := generateMergedCollectorConfig(inputs, otelSettings) if err != nil { return fmt.Errorf("generating collector config: %w", err) @@ -144,21 +280,22 @@ func reconcileStackCollector(ctx Context, stack *v1beta1.Stack) error { return nil }, WithOwner[*corev1.ConfigMap](ctx.GetScheme(), stack), + WithLabels[*corev1.ConfigMap](collectorLabels()), ) if err != nil { return fmt.Errorf("creating collector configmap: %w", err) } - secretHashes, err := hashAuthSecrets(ctx, stack.Name, endpoints) + secretHashes, err := hashAuthSecrets(ctx, stack.Name, activeEndpoints) if err != nil { return err } - annotations := map[string]string{ + podAnnotations := map[string]string{ "config-hash": HashFromConfigMaps(configMap), } if secretHashes != "" { - annotations["secret-hash"] = secretHashes + podAnnotations["secret-hash"] = secretHashes } replicas := int32(1) @@ -176,10 +313,8 @@ func reconcileStackCollector(ctx Context, stack *v1beta1.Stack) error { }, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - "app.kubernetes.io/name": deploymentName, - }, - Annotations: annotations, + Labels: collectorLabels(), + Annotations: podAnnotations, }, Spec: corev1.PodSpec{ Containers: []corev1.Container{{ @@ -187,11 +322,58 @@ func reconcileStackCollector(ctx Context, stack *v1beta1.Stack) error { Image: collectorImageForPlatform(ctx), Args: []string{"--config=/etc/otel/otel-collector-config.yaml"}, Env: envVars, - Ports: []corev1.ContainerPort{{ - Name: "otlp-http", - ContainerPort: collectorPort, - Protocol: corev1.ProtocolTCP, - }}, + Ports: []corev1.ContainerPort{ + { + Name: "otlp-http", + ContainerPort: collectorPort, + Protocol: corev1.ProtocolTCP, + }, + { + Name: "health", + ContainerPort: healthCheckPort, + Protocol: corev1.ProtocolTCP, + }, + }, + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("50m"), + corev1.ResourceMemory: resource.MustParse("64Mi"), + }, + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("200m"), + corev1.ResourceMemory: resource.MustParse("256Mi"), + }, + }, + LivenessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/", + Port: intstr.FromInt32(healthCheckPort), + }, + }, + InitialDelaySeconds: 5, + PeriodSeconds: 10, + }, + ReadinessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/", + Port: intstr.FromInt32(healthCheckPort), + }, + }, + InitialDelaySeconds: 3, + PeriodSeconds: 5, + }, + SecurityContext: &corev1.SecurityContext{ + Capabilities: &corev1.Capabilities{ + Drop: []corev1.Capability{"ALL"}, + }, + Privileged: pointer.For(false), + ReadOnlyRootFilesystem: pointer.For(true), + AllowPrivilegeEscalation: pointer.For(false), + RunAsNonRoot: pointer.For(true), + RunAsUser: pointer.For(int64(65534)), + }, VolumeMounts: []corev1.VolumeMount{ NewVolumeMount("config", "/etc/otel", true), NewVolumeMount("tmp", "/tmp", false), @@ -221,11 +403,16 @@ func reconcileStackCollector(ctx Context, stack *v1beta1.Stack) error { return nil }, WithOwner[*appsv1.Deployment](ctx.GetScheme(), stack), + WithLabels[*appsv1.Deployment](collectorLabels()), ) if err != nil { return fmt.Errorf("creating collector deployment: %w", err) } + signalAnnotations := map[string]string{ + SignalTracesAnnotation: fmt.Sprintf("%t", hasTraces), + SignalMetricsAnnotation: fmt.Sprintf("%t", hasMetrics), + } _, _, err = CreateOrUpdate(ctx, types.NamespacedName{ Namespace: stack.Name, Name: serviceName, @@ -241,15 +428,78 @@ func reconcileStackCollector(ctx Context, stack *v1beta1.Stack) error { Protocol: corev1.ProtocolTCP, }}, } + if svc.Annotations == nil { + svc.Annotations = map[string]string{} + } + for k, v := range signalAnnotations { + svc.Annotations[k] = v + } return nil }, WithOwner[*corev1.Service](ctx.GetScheme(), stack), + WithLabels[*corev1.Service](collectorLabels()), ) if err != nil { return fmt.Errorf("creating collector service: %w", err) } - return nil + return annotateStackCollectorSignals(ctx, stack, hasTraces, hasMetrics) +} + +func annotateStackCollectorSignals(ctx Context, stack *v1beta1.Stack, hasTraces, hasMetrics bool) error { + value := fmt.Sprintf("traces=%t,metrics=%t", hasTraces, hasMetrics) + annotations := stack.GetAnnotations() + if annotations != nil && annotations[collectorSignalAnnotation] == value { + return nil + } + patch := client.MergeFrom(stack.DeepCopy()) + if annotations == nil { + annotations = map[string]string{} + } + annotations[collectorSignalAnnotation] = value + stack.SetAnnotations(annotations) + return ctx.GetClient().Patch(ctx, stack, patch) +} + +func removeStackCollectorAnnotation(ctx Context, stackName string) error { + stack := &v1beta1.Stack{} + if err := ctx.GetClient().Get(ctx, types.NamespacedName{Name: stackName}, stack); err != nil { + if client.IgnoreNotFound(err) == nil { + return nil + } + return err + } + annotations := stack.GetAnnotations() + if annotations == nil { + return nil + } + if _, ok := annotations[collectorSignalAnnotation]; !ok { + return nil + } + patch := client.MergeFrom(stack.DeepCopy()) + delete(annotations, collectorSignalAnnotation) + stack.SetAnnotations(annotations) + return ctx.GetClient().Patch(ctx, stack, patch) +} + +func computeActiveSignals(endpoints []v1beta1.OtelExporterEndpoint, otelSettings *otelSettingsInput) (activeTraces, activeMetrics bool) { + for i := range endpoints { + if hasTraces(&endpoints[i]) { + activeTraces = true + } + if hasMetrics(&endpoints[i]) { + activeMetrics = true + } + } + if otelSettings != nil { + if otelSettings.TracesEndpoint != "" { + activeTraces = true + } + if otelSettings.MetricsEndpoint != "" { + activeMetrics = true + } + } + return } func findMatchingEndpoints(ctx Context, stack *v1beta1.Stack) ([]v1beta1.OtelExporterEndpoint, error) { @@ -355,9 +605,13 @@ func hashAuthSecrets(ctx Context, stackNamespace string, endpoints []v1beta1.Ote Namespace: stackNamespace, }, secret) if err != nil { - return "", fmt.Errorf("auth secret %q not found in namespace %q: %w", ref.SecretName, stackNamespace, err) + return "", fmt.Errorf("auth secret %q in namespace %q: %w", ref.SecretName, stackNamespace, err) + } + tokenData, ok := secret.Data["token"] + if !ok { + return "", fmt.Errorf("auth secret %q in namespace %q is missing required key \"token\"", ref.SecretName, stackNamespace) } - if err := json.NewEncoder(digest).Encode(secret.Data); err != nil { + if _, err := digest.Write(tokenData); err != nil { return "", err } } @@ -389,31 +643,165 @@ func readOtelSettings(ctx Context, stackName string) (*otelSettingsInput, error) return input, nil } +func deleteIfManaged[T client.Object](ctx Context, name types.NamespacedName) error { + var t T + t = reflect.New(reflect.TypeOf(t).Elem()).Interface().(T) + if err := ctx.GetClient().Get(ctx, name, t); err != nil { + if client.IgnoreNotFound(err) == nil { + return nil + } + return err + } + if t.GetLabels()[managedByLabel] != managedByValue { + return nil + } + LogDeletion(ctx, t, "deleteIfManaged") + return ctx.GetClient().Delete(ctx, t) +} + +func checkNotUnmanaged[T client.Object](ctx Context, name types.NamespacedName) error { + var t T + t = reflect.New(reflect.TypeOf(t).Elem()).Interface().(T) + if err := ctx.GetClient().Get(ctx, name, t); err != nil { + if client.IgnoreNotFound(err) == nil { + return nil + } + return err + } + if t.GetLabels()[managedByLabel] != managedByValue { + return fmt.Errorf("resource %s/%s already exists and is not managed by otelexporterendpoint", name.Namespace, name.Name) + } + return nil +} + +func otelRefName(stackName, secretName string) string { + name := fmt.Sprintf("%s--otel--%s", stackName, secretName) + if len(name) > 253 { + h := sha256.Sum256([]byte(name)) + name = name[:240] + "-" + hex.EncodeToString(h[:6]) + } + return name +} + +func ensureAuthSecretReferences(ctx Context, stack *v1beta1.Stack, endpoints []v1beta1.OtelExporterEndpoint) error { + wanted := map[string]bool{} + for _, ref := range referencedAuthSecrets(endpoints) { + if wanted[ref.SecretName] { + continue + } + wanted[ref.SecretName] = true + + existing := &corev1.Secret{} + if err := ctx.GetClient().Get(ctx, types.NamespacedName{ + Name: ref.SecretName, + Namespace: stack.Name, + }, existing); err == nil { + continue + } else if client.IgnoreNotFound(err) != nil { + return err + } + + refName := otelRefName(stack.Name, ref.SecretName) + rr, _, err := CreateOrUpdate(ctx, types.NamespacedName{ + Name: refName, + }, func(rr *v1beta1.ResourceReference) error { + rr.Spec.Stack = stack.Name + rr.Spec.Name = ref.SecretName + rr.Spec.GroupVersionKind = &metav1.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "Secret", + } + return nil + }, + WithOwner[*v1beta1.ResourceReference](ctx.GetScheme(), stack), + ) + if err != nil { + return fmt.Errorf("creating ResourceReference for secret %q in stack %q: %w", ref.SecretName, stack.Name, err) + } + if !rr.Status.Ready { + return NewPendingError() + } + } + + wantedRefNames := map[string]bool{} + for secretName := range wanted { + wantedRefNames[otelRefName(stack.Name, secretName)] = true + } + + return reconcileOtelResourceReferences(ctx, stack.Name, wantedRefNames) +} + +func ensureNoConflict(ctx Context, namespace string) error { + if err := checkNotUnmanaged[*corev1.ConfigMap](ctx, types.NamespacedName{ + Name: "otel-collector-config", Namespace: namespace, + }); err != nil { + return err + } + if err := checkNotUnmanaged[*appsv1.Deployment](ctx, types.NamespacedName{ + Name: deploymentName, Namespace: namespace, + }); err != nil { + return err + } + return checkNotUnmanaged[*corev1.Service](ctx, types.NamespacedName{ + Name: serviceName, Namespace: namespace, + }) +} + func cleanupStackCollector(ctx Context, namespace string) error { - if err := DeleteIfExists[*corev1.Service](ctx, types.NamespacedName{ + if err := deleteIfManaged[*corev1.Service](ctx, types.NamespacedName{ Name: serviceName, Namespace: namespace, }); err != nil { return err } - if err := DeleteIfExists[*appsv1.Deployment](ctx, types.NamespacedName{ + if err := deleteIfManaged[*appsv1.Deployment](ctx, types.NamespacedName{ Name: deploymentName, Namespace: namespace, }); err != nil { return err } - return DeleteIfExists[*corev1.ConfigMap](ctx, types.NamespacedName{ + if err := deleteIfManaged[*corev1.ConfigMap](ctx, types.NamespacedName{ Name: "otel-collector-config", Namespace: namespace, - }) + }); err != nil { + return err + } + if err := reconcileOtelResourceReferences(ctx, namespace, nil); err != nil { + return err + } + return removeStackCollectorAnnotation(ctx, namespace) +} + +func reconcileOtelResourceReferences(ctx Context, stackName string, wanted map[string]bool) error { + var allRefs v1beta1.ResourceReferenceList + if err := ctx.GetClient().List(ctx, &allRefs, client.MatchingFields{ + "stack": stackName, + }); err != nil { + return err + } + prefix := stackName + "--otel--" + for i := range allRefs.Items { + rr := &allRefs.Items[i] + if !strings.HasPrefix(rr.Name, prefix) { + continue + } + if wanted[rr.Name] { + continue + } + if err := ctx.GetClient().Delete(ctx, rr); client.IgnoreNotFound(err) != nil { + return err + } + } + return nil } func collectorImageForPlatform(ctx Context) string { if img := ctx.GetPlatform().CollectorImage; img != "" { return img } - return defaultCollectorImage + return DefaultCollectorImage } func envSafe(s string) string { - replacer := func(r rune) rune { + return strings.Map(func(r rune) rune { if (r >= 'A' && r <= 'Z') || (r >= '0' && r <= '9') || r == '_' { return r } @@ -421,21 +809,17 @@ func envSafe(s string) string { return r - 32 } return '_' - } - result := make([]rune, 0, len(s)) - for _, r := range s { - result = append(result, replacer(r)) - } - return string(result) + }, s) } func isCollectorResource(obj client.Object) bool { - return obj.GetName() == deploymentName || obj.GetName() == serviceName || obj.GetName() == "otel-collector-config" + return obj.GetLabels()[managedByLabel] == managedByValue } func enqueueAllEndpoints(ctx Context) []reconcile.Request { var endpoints v1beta1.OtelExporterEndpointList if err := ctx.GetClient().List(ctx, &endpoints); err != nil { + log.FromContext(ctx).Error(err, "failed to list OtelExporterEndpoints for requeue") return nil } return MapObjectToReconcileRequests( @@ -450,8 +834,17 @@ func init() { WithWatch[*v1beta1.OtelExporterEndpoint, *v1beta1.Stack](func(ctx Context, _ *v1beta1.Stack) []reconcile.Request { return enqueueAllEndpoints(ctx) }), - WithWatch[*v1beta1.OtelExporterEndpoint, *v1beta1.Settings](func(ctx Context, _ *v1beta1.Settings) []reconcile.Request { - return enqueueAllEndpoints(ctx) + WithRaw[*v1beta1.OtelExporterEndpoint](func(ctx Context, b *builder.Builder) error { + b.Watches(&v1beta1.Settings{}, handler.EnqueueRequestsFromMapFunc( + func(_ context.Context, obj client.Object) []reconcile.Request { + s := obj.(*v1beta1.Settings) + if !strings.HasPrefix(s.Spec.Key, "opentelemetry.") { + return nil + } + return enqueueAllEndpoints(ctx) + }, + )) + return nil }), WithRaw[*v1beta1.OtelExporterEndpoint](func(ctx Context, b *builder.Builder) error { collectorPredicate := predicate.NewPredicateFuncs(isCollectorResource) diff --git a/internal/resources/settings/opentelemetry.go b/internal/resources/settings/opentelemetry.go index 63a8dd13b..68986209e 100644 --- a/internal/resources/settings/opentelemetry.go +++ b/internal/resources/settings/opentelemetry.go @@ -23,12 +23,12 @@ const ( ) func GetOTELEnvVars(ctx core.Context, stack, serviceName string, sliceStringSeparator string) ([]corev1.EnvVar, error) { - collectorEndpoint, err := getCollectorEndpoint(ctx, stack) + info, err := getCollectorInfo(ctx, stack) if err != nil { return nil, err } - if collectorEndpoint != "" { - return collectorEnvVars(ctx, collectorEndpoint, stack, serviceName, sliceStringSeparator) + if info != nil { + return collectorEnvVars(ctx, info, stack, serviceName, sliceStringSeparator) } traces, err := otelEnvVars(ctx, stack, MonitoringTypeTraces, serviceName, sliceStringSeparator) @@ -47,7 +47,13 @@ func GetOTELEnvVars(ctx core.Context, stack, serviceName string, sliceStringSepa return append(traces, metrics...), nil } -func getCollectorEndpoint(ctx core.Context, stack string) (string, error) { +type collectorInfo struct { + endpoint string + hasTraces bool + hasMetrics bool +} + +func getCollectorInfo(ctx core.Context, stack string) (*collectorInfo, error) { svc := &corev1.Service{} err := ctx.GetClient().Get(ctx, types.NamespacedName{ Namespace: stack, @@ -55,14 +61,21 @@ func getCollectorEndpoint(ctx core.Context, stack string) (string, error) { }, svc) if err != nil { if apierrors.IsNotFound(err) { - return "", nil + return nil, nil } - return "", err + return nil, err + } + if svc.Labels[core.CollectorManagedByLabel] != core.CollectorManagedByValue { + return nil, nil } - return fmt.Sprintf("%s.%s:%d", collectorServiceName, stack, collectorServicePort), nil + return &collectorInfo{ + endpoint: fmt.Sprintf("%s.%s:%d", collectorServiceName, stack, collectorServicePort), + hasTraces: svc.Annotations[core.SignalTracesAnnotation] == "true", + hasMetrics: svc.Annotations[core.SignalMetricsAnnotation] == "true", + }, nil } -func collectorEnvVars(ctx core.Context, collectorEndpoint, stack, serviceName, sliceStringSeparator string) ([]corev1.EnvVar, error) { +func collectorEnvVars(ctx core.Context, info *collectorInfo, stack, serviceName, sliceStringSeparator string) ([]corev1.EnvVar, error) { resourceAttributes := map[string]string{} for _, signal := range []string{"traces", "metrics"} { attrs, err := GetMap(ctx, stack, "opentelemetry", signal, "resource-attributes") @@ -82,20 +95,7 @@ func collectorEnvVars(ctx core.Context, collectorEndpoint, stack, serviceName, s } slices.Sort(resourceAttributesArray) - return []corev1.EnvVar{ - core.Env("OTEL_TRACES", "true"), - core.Env("OTEL_TRACES_BATCH", "true"), - core.Env("OTEL_TRACES_EXPORTER", "otlp"), - core.Env("OTEL_TRACES_EXPORTER_OTLP_ENDPOINT", collectorEndpoint), - core.Env("OTEL_TRACES_EXPORTER_OTLP_MODE", "http"), - core.EnvFromBool("OTEL_TRACES_EXPORTER_OTLP_INSECURE", true), - core.Env("OTEL_METRICS", "true"), - core.Env("OTEL_METRICS_BATCH", "true"), - core.Env("OTEL_METRICS_EXPORTER", "otlp"), - core.Env("OTEL_METRICS_EXPORTER_OTLP_ENDPOINT", collectorEndpoint), - core.Env("OTEL_METRICS_EXPORTER_OTLP_MODE", "http"), - core.EnvFromBool("OTEL_METRICS_EXPORTER_OTLP_INSECURE", true), - core.Env("OTEL_METRICS_RUNTIME", "true"), + envVars := []corev1.EnvVar{ core.Env("OTEL_SERVICE_NAME", serviceName), { Name: "POD_NAME", @@ -106,16 +106,41 @@ func collectorEnvVars(ctx core.Context, collectorEndpoint, stack, serviceName, s }, }, core.Env("OTEL_RESOURCE_ATTRIBUTES", strings.Join(resourceAttributesArray, sliceStringSeparator)), - }, nil + } + + if info.hasTraces { + envVars = append(envVars, + core.Env("OTEL_TRACES", "true"), + core.Env("OTEL_TRACES_BATCH", "true"), + core.Env("OTEL_TRACES_EXPORTER", "otlp"), + core.Env("OTEL_TRACES_EXPORTER_OTLP_ENDPOINT", info.endpoint), + core.Env("OTEL_TRACES_EXPORTER_OTLP_MODE", "http"), + core.EnvFromBool("OTEL_TRACES_EXPORTER_OTLP_INSECURE", true), + ) + } + + if info.hasMetrics { + envVars = append(envVars, + core.Env("OTEL_METRICS", "true"), + core.Env("OTEL_METRICS_BATCH", "true"), + core.Env("OTEL_METRICS_EXPORTER", "otlp"), + core.Env("OTEL_METRICS_EXPORTER_OTLP_ENDPOINT", info.endpoint), + core.Env("OTEL_METRICS_EXPORTER_OTLP_MODE", "http"), + core.EnvFromBool("OTEL_METRICS_EXPORTER_OTLP_INSECURE", true), + core.Env("OTEL_METRICS_RUNTIME", "true"), + ) + } + + return envVars, nil } func HasOpenTelemetryTracesEnabled(ctx core.Context, stack string) (bool, error) { - collectorEndpoint, err := getCollectorEndpoint(ctx, stack) + info, err := getCollectorInfo(ctx, stack) if err != nil { return false, err } - if collectorEndpoint != "" { - return true, nil + if info != nil { + return info.hasTraces, nil } v, err := GetURL(ctx, stack, "opentelemetry", "traces", "dsn") @@ -123,11 +148,7 @@ func HasOpenTelemetryTracesEnabled(ctx core.Context, stack string) (bool, error) return false, err } - if v == nil { - return false, nil - } - - return true, nil + return v != nil, nil } func otelEnvVars(ctx core.Context, stack string, monitoringType MonitoringType, serviceName, sliceStringSeparator string) ([]corev1.EnvVar, error) { diff --git a/internal/tests/otelexporterendpoint_controller_test.go b/internal/tests/otelexporterendpoint_controller_test.go index 8b45dfbcd..03c9777db 100644 --- a/internal/tests/otelexporterendpoint_controller_test.go +++ b/internal/tests/otelexporterendpoint_controller_test.go @@ -82,6 +82,70 @@ var _ = Describe("OtelExporterEndpointController", func() { }) }) + Context("When deleting an OtelExporterEndpoint", func() { + var ( + stack *v1beta1.Stack + endpoint *v1beta1.OtelExporterEndpoint + ) + BeforeEach(func() { + stack = &v1beta1.Stack{ + ObjectMeta: metav1.ObjectMeta{ + Name: RandObjectMeta().Name, + Labels: map[string]string{ + "formance.com/stack": "sdymzzszghxw-ryeg", + }, + }, + Spec: v1beta1.StackSpec{Version: "v99.0.0"}, + } + endpoint = &v1beta1.OtelExporterEndpoint{ + ObjectMeta: RandObjectMeta(), + Spec: v1beta1.OtelExporterEndpointSpec{ + StackSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "formance.com/stack": "sdymzzszghxw-ryeg", + }, + }, + Traces: &v1beta1.OtelSignalConfig{ + Endpoint: "http://my-collector:4318", + }, + }, + } + }) + JustBeforeEach(func() { + Expect(Create(stack)).To(Succeed()) + Expect(Create(endpoint)).To(Succeed()) + }) + AfterEach(func() { + _ = Delete(endpoint) + Expect(Delete(stack)).To(Succeed()) + }) + It("Should clean up collector resources from the stack namespace", func() { + By("Waiting for the collector to be created", func() { + Eventually(func() error { + return LoadResource(stack.Name, "otel-collector", &appsv1.Deployment{}) + }).Should(Succeed()) + }) + By("Deleting the endpoint", func() { + Expect(Delete(endpoint)).To(Succeed()) + }) + By("Verifying the Deployment is removed", func() { + Eventually(func() error { + return LoadResource(stack.Name, "otel-collector", &appsv1.Deployment{}) + }).ShouldNot(Succeed()) + }) + By("Verifying the Service is removed", func() { + Eventually(func() error { + return LoadResource(stack.Name, "otel-collector", &corev1.Service{}) + }).ShouldNot(Succeed()) + }) + By("Verifying the ConfigMap is removed", func() { + Eventually(func() error { + return LoadResource(stack.Name, "otel-collector-config", &corev1.ConfigMap{}) + }).ShouldNot(Succeed()) + }) + }) + }) + Context("When no OtelExporterEndpoints exist and no Settings", func() { var ( stack *v1beta1.Stack