From 1a7e37f813f15e5270e447123555fd7b1fa044e8 Mon Sep 17 00:00:00 2001 From: AdheipSingh Date: Tue, 9 Jun 2026 09:12:31 +0530 Subject: [PATCH 1/4] Refactor logs + metrics shape; add cluster-metrics receivers, podSelector, default tolerations --- api/v1alpha1/parseableconfig_types.go | 133 +++- api/v1alpha1/zz_generated.deepcopy.go | 213 +++++-- ...bility.parseable.com_parseableconfigs.yaml | 225 +++++-- config/samples/azure-staging.yaml | 73 +++ config/samples/generated_collector_log.yaml | 76 ++- .../generated_collector_metrics_events.yaml | 95 ++- ...bservability_v1alpha1_parseableconfig.yaml | 37 +- helm/pai/Chart.yaml | 4 +- ...bility.parseable.com_parseableconfigs.yaml | 225 +++++-- .../controller/parseableconfig_controller.go | 585 ++++++++++++------ java-demo.yaml | 31 - 11 files changed, 1245 insertions(+), 452 deletions(-) create mode 100644 config/samples/azure-staging.yaml delete mode 100644 java-demo.yaml diff --git a/api/v1alpha1/parseableconfig_types.go b/api/v1alpha1/parseableconfig_types.go index 42dedc3..b5a77cb 100644 --- a/api/v1alpha1/parseableconfig_types.go +++ b/api/v1alpha1/parseableconfig_types.go @@ -93,49 +93,138 @@ type TracesConfig struct { Instrumentation InstrumentationConfig `json:"instrumentation,omitempty"` } -// LogsConfig defines logging configuration +// LogsConfig defines logging configuration: a built-in toggle for cluster pod +// logs plus an array of host-path tail pipelines for arbitrary log directories. type LogsConfig struct { - // TargetDataset is the Parseable dataset name for log data - TargetDataset string `json:"targetDataset"` + // PodLogs enables collection of all Kubernetes pod logs from /var/log/pods on each node. + PodLogs *PodLogsConfig `json:"podLogs,omitempty"` + + // Files is a list of host-path tail pipelines (e.g. audit logs, server logs). + Files []FileLogConfig `json:"files,omitempty"` +} + +// PodLogsConfig configures collection of Kubernetes pod logs via the filelog +// receiver with CRI container parsing and optional namespace filtering. +type PodLogsConfig struct { + // Enabled controls whether Kubernetes pod logs are collected + Enabled bool `json:"enabled"` - // Headers are additional HTTP headers for the logs exporter. Overrides global headers with the same key. + // TargetDataset is the Parseable dataset name for pod log data (required when Enabled is true) + TargetDataset string `json:"targetDataset,omitempty"` + + // Headers are additional HTTP headers for the pod logs exporter. Overrides global headers with the same key. Headers map[string]string `json:"headers,omitempty"` - // NamespaceSelector defines which namespaces to collect logs from + // NamespaceSelector defines which namespaces to collect pod logs from NamespaceSelector NamespaceSelector `json:"namespaceSelector,omitempty"` } -// PodMetricsConfig defines pod/container-level metrics collection settings -type PodMetricsConfig struct { - // TargetDataset is the Parseable dataset name for pod/container metric data +// FileLogConfig defines a host-path tail pipeline. Every *.log file under +// HostPath (recursive) is tailed without CRI/container parsing. +type FileLogConfig struct { + // Name uniquely identifies this file pipeline; used to name the receiver, exporter, volume, and pipeline. + Name string `json:"name"` + + // HostPath is the directory on the node to mount and tail recursively + HostPath string `json:"hostPath"` + + // TargetDataset is the Parseable dataset name for this pipeline's log data + TargetDataset string `json:"targetDataset"` + + // Headers are additional HTTP headers for this exporter. Overrides global headers with the same key. + Headers map[string]string `json:"headers,omitempty"` +} + +// ClusterMetricsConfig enables built-in cluster-wide metrics from up to three +// receivers (k8s_cluster, kubelet /metrics, kube-state-metrics). All enabled +// receivers ship to the same TargetDataset. +type ClusterMetricsConfig struct { + // TargetDataset is the Parseable dataset name shared by every enabled built-in receiver TargetDataset string `json:"targetDataset"` - // Headers are additional HTTP headers for the pod metrics exporter. Overrides global headers with the same key. + // Headers are additional HTTP headers for the cluster metrics exporter. Overrides global headers with the same key. Headers map[string]string `json:"headers,omitempty"` - // NamespaceSelector defines which namespaces to collect pod metrics from + // NamespaceSelector filters pod-scope metrics by namespace. Node-scope metrics are not filtered. NamespaceSelector NamespaceSelector `json:"namespaceSelector,omitempty"` + + // K8sCluster collects cluster object state (pod/deployment status, node conditions) via the k8s_cluster receiver. + K8sCluster *K8sClusterConfig `json:"k8sCluster,omitempty"` + + // Kubelet scrapes each node's kubelet /metrics endpoint (Prometheus format, TLS + service-account bearer). + Kubelet *KubeletConfig `json:"kubelet,omitempty"` + + // KubeState scrapes the kube-state-metrics service via Kubernetes service discovery. + KubeState *KubeStateConfig `json:"kubeState,omitempty"` } -// NodeMetricsConfig defines node-level metrics collection settings -type NodeMetricsConfig struct { - // TargetDataset is the Parseable dataset name for node metric data +// K8sClusterConfig configures the k8s_cluster receiver. +type K8sClusterConfig struct { + // Enabled controls whether the k8s_cluster receiver runs + Enabled bool `json:"enabled"` + + // NodeConditions is the list of node conditions to report (e.g. Ready, DiskPressure, MemoryPressure). + // Defaults to ["Ready"] when empty. + NodeConditions []string `json:"nodeConditions,omitempty"` + + // AllocatableResources is the list of allocatable resources to report (e.g. cpu, memory, storage). + // Defaults to no allocatable metrics when empty. + AllocatableResources []string `json:"allocatableResources,omitempty"` +} + +// KubeletConfig configures the prometheus scrape of each node's kubelet /metrics endpoint. +type KubeletConfig struct { + // Enabled controls whether kubelet /metrics scraping runs + Enabled bool `json:"enabled"` +} + +// KubeStateConfig configures the prometheus scrape of the kube-state-metrics service. +type KubeStateConfig struct { + // Enabled controls whether kube-state-metrics scraping runs + Enabled bool `json:"enabled"` + + // Namespaces restricts where the kube-state-metrics service is discovered. + // Defaults to ["kube-system", "kube-state-metrics", "default"] when empty. + Namespaces []string `json:"namespaces,omitempty"` +} + +// ScrapeConfig defines a single Prometheus-style scrape pipeline. Pods are +// discovered via Kubernetes service discovery and scraped at the given path+port. +// Two pod-selection modes are supported: +// - PodSelector (label match) — recommended; matches pods by label key/value +// - port-only — keeps pods whose container exposes the named port (legacy) +type ScrapeConfig struct { + // Name uniquely identifies this scrape pipeline + Name string `json:"name"` + + // URI is the HTTP path to scrape on each discovered pod (e.g. "/metrics") + URI string `json:"uri"` + + // Port is the container port to scrape + Port int32 `json:"port"` + + // TargetDataset is the Parseable dataset name for this scrape pipeline's metric data TargetDataset string `json:"targetDataset"` - // Headers are additional HTTP headers for the node metrics exporter. Overrides global headers with the same key. + // Headers are additional HTTP headers for this exporter. Overrides global headers with the same key. Headers map[string]string `json:"headers,omitempty"` - // NamespaceSelector defines which namespaces to collect node metrics from + // NamespaceSelector limits service discovery to the matching namespaces NamespaceSelector NamespaceSelector `json:"namespaceSelector,omitempty"` + + // PodSelector selects pods by label key/value pairs. When set, the operator emits + // a Prometheus keep-relabel per label and skips the port-number filter. + PodSelector map[string]string `json:"podSelector,omitempty"` } -// MetricsConfig defines metrics configuration +// MetricsConfig defines metrics configuration. ClusterMetrics enables built-in +// kubelet/cluster metrics; ScrapeConfigs adds Prometheus-style scrape pipelines. type MetricsConfig struct { - // PodMetrics controls collection of pod/container-level metrics via kubeletstats and k8s_cluster receivers - PodMetrics *PodMetricsConfig `json:"podMetrics,omitempty"` + // ClusterMetrics toggles built-in node/pod/cluster metrics via kubeletstats + k8s_cluster receivers + ClusterMetrics *ClusterMetricsConfig `json:"clusterMetrics,omitempty"` - // NodeMetrics controls collection of node-level metrics via kubeletstats receiver - NodeMetrics *NodeMetricsConfig `json:"nodeMetrics,omitempty"` + // ScrapeConfigs is a list of Prometheus-style scrape pipelines + ScrapeConfigs []ScrapeConfig `json:"scrapeConfigs,omitempty"` } // EventsConfig defines Kubernetes events collection configuration @@ -165,10 +254,10 @@ type ParseableConfigSpec struct { // Traces defines tracing configuration Traces *TracesConfig `json:"traces,omitempty"` - // Logs defines logging configuration + // Logs defines logging configuration (built-in pod logs toggle + host-path tail pipelines) Logs *LogsConfig `json:"logs,omitempty"` - // Metrics defines metrics configuration + // Metrics defines metrics configuration (cluster metrics toggle + scrape configs) Metrics *MetricsConfig `json:"metrics,omitempty"` // Events defines Kubernetes events collection configuration diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 65e66b7..6979c9a 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -25,6 +25,44 @@ import ( runtime "k8s.io/apimachinery/pkg/runtime" ) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ClusterMetricsConfig) DeepCopyInto(out *ClusterMetricsConfig) { + *out = *in + if in.Headers != nil { + in, out := &in.Headers, &out.Headers + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + in.NamespaceSelector.DeepCopyInto(&out.NamespaceSelector) + if in.K8sCluster != nil { + in, out := &in.K8sCluster, &out.K8sCluster + *out = new(K8sClusterConfig) + (*in).DeepCopyInto(*out) + } + if in.Kubelet != nil { + in, out := &in.Kubelet, &out.Kubelet + *out = new(KubeletConfig) + **out = **in + } + if in.KubeState != nil { + in, out := &in.KubeState, &out.KubeState + *out = new(KubeStateConfig) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterMetricsConfig. +func (in *ClusterMetricsConfig) DeepCopy() *ClusterMetricsConfig { + if in == nil { + return nil + } + out := new(ClusterMetricsConfig) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *EventsConfig) DeepCopyInto(out *EventsConfig) { *out = *in @@ -48,6 +86,28 @@ func (in *EventsConfig) DeepCopy() *EventsConfig { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FileLogConfig) DeepCopyInto(out *FileLogConfig) { + *out = *in + if in.Headers != nil { + in, out := &in.Headers, &out.Headers + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FileLogConfig. +func (in *FileLogConfig) DeepCopy() *FileLogConfig { + if in == nil { + return nil + } + out := new(FileLogConfig) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *InstrumentationConfig) DeepCopyInto(out *InstrumentationConfig) { *out = *in @@ -68,17 +128,81 @@ func (in *InstrumentationConfig) DeepCopy() *InstrumentationConfig { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *K8sClusterConfig) DeepCopyInto(out *K8sClusterConfig) { + *out = *in + if in.NodeConditions != nil { + in, out := &in.NodeConditions, &out.NodeConditions + *out = make([]string, len(*in)) + copy(*out, *in) + } + if in.AllocatableResources != nil { + in, out := &in.AllocatableResources, &out.AllocatableResources + *out = make([]string, len(*in)) + copy(*out, *in) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new K8sClusterConfig. +func (in *K8sClusterConfig) DeepCopy() *K8sClusterConfig { + if in == nil { + return nil + } + out := new(K8sClusterConfig) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *KubeStateConfig) DeepCopyInto(out *KubeStateConfig) { + *out = *in + if in.Namespaces != nil { + in, out := &in.Namespaces, &out.Namespaces + *out = make([]string, len(*in)) + copy(*out, *in) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new KubeStateConfig. +func (in *KubeStateConfig) DeepCopy() *KubeStateConfig { + if in == nil { + return nil + } + out := new(KubeStateConfig) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *KubeletConfig) DeepCopyInto(out *KubeletConfig) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new KubeletConfig. +func (in *KubeletConfig) DeepCopy() *KubeletConfig { + if in == nil { + return nil + } + out := new(KubeletConfig) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *LogsConfig) DeepCopyInto(out *LogsConfig) { *out = *in - if in.Headers != nil { - in, out := &in.Headers, &out.Headers - *out = make(map[string]string, len(*in)) - for key, val := range *in { - (*out)[key] = val + if in.PodLogs != nil { + in, out := &in.PodLogs, &out.PodLogs + *out = new(PodLogsConfig) + (*in).DeepCopyInto(*out) + } + if in.Files != nil { + in, out := &in.Files, &out.Files + *out = make([]FileLogConfig, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) } } - in.NamespaceSelector.DeepCopyInto(&out.NamespaceSelector) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LogsConfig. @@ -94,15 +218,17 @@ func (in *LogsConfig) DeepCopy() *LogsConfig { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *MetricsConfig) DeepCopyInto(out *MetricsConfig) { *out = *in - if in.PodMetrics != nil { - in, out := &in.PodMetrics, &out.PodMetrics - *out = new(PodMetricsConfig) + if in.ClusterMetrics != nil { + in, out := &in.ClusterMetrics, &out.ClusterMetrics + *out = new(ClusterMetricsConfig) (*in).DeepCopyInto(*out) } - if in.NodeMetrics != nil { - in, out := &in.NodeMetrics, &out.NodeMetrics - *out = new(NodeMetricsConfig) - (*in).DeepCopyInto(*out) + if in.ScrapeConfigs != nil { + in, out := &in.ScrapeConfigs, &out.ScrapeConfigs + *out = make([]ScrapeConfig, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } } } @@ -136,29 +262,6 @@ func (in *NamespaceSelector) DeepCopy() *NamespaceSelector { return out } -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *NodeMetricsConfig) DeepCopyInto(out *NodeMetricsConfig) { - *out = *in - if in.Headers != nil { - in, out := &in.Headers, &out.Headers - *out = make(map[string]string, len(*in)) - for key, val := range *in { - (*out)[key] = val - } - } - in.NamespaceSelector.DeepCopyInto(&out.NamespaceSelector) -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NodeMetricsConfig. -func (in *NodeMetricsConfig) DeepCopy() *NodeMetricsConfig { - if in == nil { - return nil - } - out := new(NodeMetricsConfig) - in.DeepCopyInto(out) - return out -} - // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ParseableConfig) DeepCopyInto(out *ParseableConfig) { *out = *in @@ -284,7 +387,7 @@ func (in *ParseableConfigStatus) DeepCopy() *ParseableConfigStatus { } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *PodMetricsConfig) DeepCopyInto(out *PodMetricsConfig) { +func (in *PodLogsConfig) DeepCopyInto(out *PodLogsConfig) { *out = *in if in.Headers != nil { in, out := &in.Headers, &out.Headers @@ -296,12 +399,42 @@ func (in *PodMetricsConfig) DeepCopyInto(out *PodMetricsConfig) { in.NamespaceSelector.DeepCopyInto(&out.NamespaceSelector) } -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PodMetricsConfig. -func (in *PodMetricsConfig) DeepCopy() *PodMetricsConfig { +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PodLogsConfig. +func (in *PodLogsConfig) DeepCopy() *PodLogsConfig { + if in == nil { + return nil + } + out := new(PodLogsConfig) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ScrapeConfig) DeepCopyInto(out *ScrapeConfig) { + *out = *in + if in.Headers != nil { + in, out := &in.Headers, &out.Headers + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + in.NamespaceSelector.DeepCopyInto(&out.NamespaceSelector) + if in.PodSelector != nil { + in, out := &in.PodSelector, &out.PodSelector + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ScrapeConfig. +func (in *ScrapeConfig) DeepCopy() *ScrapeConfig { if in == nil { return nil } - out := new(PodMetricsConfig) + out := new(ScrapeConfig) in.DeepCopyInto(out) return out } diff --git a/config/crd/bases/observability.parseable.com_parseableconfigs.yaml b/config/crd/bases/observability.parseable.com_parseableconfigs.yaml index 98f62fc..b72685c 100644 --- a/config/crd/bases/observability.parseable.com_parseableconfigs.yaml +++ b/config/crd/bases/observability.parseable.com_parseableconfigs.yaml @@ -76,55 +76,58 @@ spec: - enabled type: object logs: - description: Logs defines logging configuration + description: Logs defines logging configuration (built-in pod logs + toggle + host-path tail pipelines) properties: - headers: - additionalProperties: - type: string - description: Headers are additional HTTP headers for the logs - exporter. Overrides global headers with the same key. - type: object - namespaceSelector: - description: NamespaceSelector defines which namespaces to collect - logs from - properties: - mode: - description: Mode specifies whether to include or exclude - the listed namespaces - enum: - - include - - exclude - type: string - namespaces: - description: Namespaces is the list of namespace names - items: + files: + description: Files is a list of host-path tail pipelines (e.g. + audit logs, server logs). + items: + description: |- + FileLogConfig defines a host-path tail pipeline. Every *.log file under + HostPath (recursive) is tailed without CRI/container parsing. + properties: + headers: + additionalProperties: + type: string + description: Headers are additional HTTP headers for this + exporter. Overrides global headers with the same key. + type: object + hostPath: + description: HostPath is the directory on the node to mount + and tail recursively type: string - type: array - type: object - targetDataset: - description: TargetDataset is the Parseable dataset name for log - data - type: string - required: - - targetDataset - type: object - metrics: - description: Metrics defines metrics configuration - properties: - nodeMetrics: - description: NodeMetrics controls collection of node-level metrics - via kubeletstats receiver + name: + description: Name uniquely identifies this file pipeline; + used to name the receiver, exporter, volume, and pipeline. + type: string + targetDataset: + description: TargetDataset is the Parseable dataset name + for this pipeline's log data + type: string + required: + - hostPath + - name + - targetDataset + type: object + type: array + podLogs: + description: PodLogs enables collection of all Kubernetes pod + logs from /var/log/pods on each node. properties: + enabled: + description: Enabled controls whether Kubernetes pod logs + are collected + type: boolean headers: additionalProperties: type: string - description: Headers are additional HTTP headers for the node - metrics exporter. Overrides global headers with the same - key. + description: Headers are additional HTTP headers for the pod + logs exporter. Overrides global headers with the same key. type: object namespaceSelector: description: NamespaceSelector defines which namespaces to - collect node metrics from + collect pod logs from properties: mode: description: Mode specifies whether to include or exclude @@ -141,25 +144,84 @@ spec: type: object targetDataset: description: TargetDataset is the Parseable dataset name for - node metric data + pod log data (required when Enabled is true) type: string required: - - targetDataset + - enabled type: object - podMetrics: - description: PodMetrics controls collection of pod/container-level - metrics via kubeletstats and k8s_cluster receivers + type: object + metrics: + description: Metrics defines metrics configuration (cluster metrics + toggle + scrape configs) + properties: + clusterMetrics: + description: ClusterMetrics toggles built-in node/pod/cluster + metrics via kubeletstats + k8s_cluster receivers properties: headers: additionalProperties: type: string - description: Headers are additional HTTP headers for the pod + description: Headers are additional HTTP headers for the cluster metrics exporter. Overrides global headers with the same key. type: object + k8sCluster: + description: K8sCluster collects cluster object state (pod/deployment + status, node conditions) via the k8s_cluster receiver. + properties: + allocatableResources: + description: |- + AllocatableResources is the list of allocatable resources to report (e.g. cpu, memory, storage). + Defaults to no allocatable metrics when empty. + items: + type: string + type: array + enabled: + description: Enabled controls whether the k8s_cluster + receiver runs + type: boolean + nodeConditions: + description: |- + NodeConditions is the list of node conditions to report (e.g. Ready, DiskPressure, MemoryPressure). + Defaults to ["Ready"] when empty. + items: + type: string + type: array + required: + - enabled + type: object + kubeState: + description: KubeState scrapes the kube-state-metrics service + via Kubernetes service discovery. + properties: + enabled: + description: Enabled controls whether kube-state-metrics + scraping runs + type: boolean + namespaces: + description: |- + Namespaces restricts where the kube-state-metrics service is discovered. + Defaults to ["kube-system", "kube-state-metrics", "default"] when empty. + items: + type: string + type: array + required: + - enabled + type: object + kubelet: + description: Kubelet scrapes each node's kubelet /metrics + endpoint (Prometheus format, TLS + service-account bearer). + properties: + enabled: + description: Enabled controls whether kubelet /metrics + scraping runs + type: boolean + required: + - enabled + type: object namespaceSelector: - description: NamespaceSelector defines which namespaces to - collect pod metrics from + description: NamespaceSelector filters pod-scope metrics by + namespace. Node-scope metrics are not filtered. properties: mode: description: Mode specifies whether to include or exclude @@ -175,12 +237,75 @@ spec: type: array type: object targetDataset: - description: TargetDataset is the Parseable dataset name for - pod/container metric data + description: TargetDataset is the Parseable dataset name shared + by every enabled built-in receiver type: string required: - targetDataset type: object + scrapeConfigs: + description: ScrapeConfigs is a list of Prometheus-style scrape + pipelines + items: + description: |- + ScrapeConfig defines a single Prometheus-style scrape pipeline. Pods are + discovered via Kubernetes service discovery and scraped at the given path+port. + Two pod-selection modes are supported: + - PodSelector (label match) — recommended; matches pods by label key/value + - port-only — keeps pods whose container exposes the named port (legacy) + properties: + headers: + additionalProperties: + type: string + description: Headers are additional HTTP headers for this + exporter. Overrides global headers with the same key. + type: object + name: + description: Name uniquely identifies this scrape pipeline + type: string + namespaceSelector: + description: NamespaceSelector limits service discovery + to the matching namespaces + properties: + mode: + description: Mode specifies whether to include or exclude + the listed namespaces + enum: + - include + - exclude + type: string + namespaces: + description: Namespaces is the list of namespace names + items: + type: string + type: array + type: object + podSelector: + additionalProperties: + type: string + description: |- + PodSelector selects pods by label key/value pairs. When set, the operator emits + a Prometheus keep-relabel per label and skips the port-number filter. + type: object + port: + description: Port is the container port to scrape + format: int32 + type: integer + targetDataset: + description: TargetDataset is the Parseable dataset name + for this scrape pipeline's metric data + type: string + uri: + description: URI is the HTTP path to scrape on each discovered + pod (e.g. "/metrics") + type: string + required: + - name + - port + - targetDataset + - uri + type: object + type: array type: object paused: description: |- diff --git a/config/samples/azure-staging.yaml b/config/samples/azure-staging.yaml new file mode 100644 index 0000000..c2f750f --- /dev/null +++ b/config/samples/azure-staging.yaml @@ -0,0 +1,73 @@ +# ParseableConfig for the Azure staging cluster. +# +# Before applying: +# 1. Replace with the real ingest URL. +# 2. Create the parseable-creds secret in parseable-operator-system: +# kubectl create secret generic parseable-creds \ +# -n parseable-operator-system \ +# --from-literal=username= \ +# --from-literal=password= +# 3. Make sure the parseable-operator-system namespace exists, then: +# kubectl apply -f azure-staging.yaml +# +# Collects: +# - all Kubernetes pod logs (cluster-wide) → dataset azure-staging-pod-logs +# - parseable audit logs from the host → dataset azure-staging-parseable-audit +# - parseable server logs from the host → dataset azure-staging-parseable-server +apiVersion: observability.parseable.com/v1alpha1 +kind: ParseableConfig +metadata: + name: azure-staging + namespace: parseable-operator-system +spec: + target: + endpoint: + credentialsSecret: + name: parseable-creds + namespace: parseable-operator-system + + logs: + podLogs: + enabled: true + targetDataset: azure-staging-pod-logs + + files: + - name: parseable audit logs + hostPath: /var/log/parseable/audit_logs + targetDataset: azure-staging-parseable-audit + + - name: parseable server logs + hostPath: /var/log/parseable/server_logs + targetDataset: azure-staging-parseable-server + + events: + enabled: true + targetDataset: azure-staging-events + + metrics: + clusterMetrics: + targetDataset: azure-staging-cluster-metrics + k8sCluster: + enabled: true + nodeConditions: + - Ready + - DiskPressure + - MemoryPressure + - PIDPressure + - NetworkUnavailable + allocatableResources: + - cpu + - memory + - storage + kubelet: + enabled: true + kubeState: + enabled: true + + scrapeConfigs: + - name: parseable metrics + targetDataset: azure-staging-parseable-metrics + uri: /v1/metrics + port: 8000 + podSelector: + app: parseable-cluster diff --git a/config/samples/generated_collector_log.yaml b/config/samples/generated_collector_log.yaml index e40cfd0..1fe5feb 100644 --- a/config/samples/generated_collector_log.yaml +++ b/config/samples/generated_collector_log.yaml @@ -1,12 +1,25 @@ -# This is the OpenTelemetryCollector CR that PAI generates when logs are enabled. -# PAI translates spec.logs into this DaemonSet-mode collector. +# This is the OpenTelemetryCollector CR that PAI generates for the log DaemonSet. +# PAI builds three kinds of pipelines inside this collector: +# - one pod-logs pipeline (when spec.logs.podLogs.enabled) tailing /var/log/pods with CRI parsing +# - one file pipeline per spec.logs.files[] entry tailing the configured host-path recursively +# - one cluster-metrics pipeline (when spec.metrics.clusterMetrics.enabled) using kubeletstats # # Input (from ParseableConfig): # logs: -# stream: pai-logs -# namespaceSelector: -# mode: include -# namespaces: [demo] +# podLogs: +# enabled: true +# targetDataset: pai-logs +# namespaceSelector: +# mode: include +# namespaces: [demo] +# files: +# - name: app audit logs +# hostPath: /var/log/app/audit +# targetDataset: pai-app-audit +# metrics: +# clusterMetrics: +# enabled: true +# targetDataset: pai-cluster-metrics # apiVersion: opentelemetry.io/v1beta1 kind: OpenTelemetryCollector @@ -16,21 +29,38 @@ metadata: spec: mode: daemonset volumes: - - name: varlogpods + - name: pod-logs hostPath: path: /var/log/pods + - name: app-audit-logs + hostPath: + path: /var/log/app/audit volumeMounts: - - name: varlogpods + - name: pod-logs mountPath: /var/log/pods readOnly: true + - name: app-audit-logs + mountPath: /var/log/app/audit + readOnly: true config: receivers: - filelog: + filelog/pod-logs: include: - /var/log/pods/demo_*/*/*.log + include_file_path: true operators: - type: container id: container-parser + filelog/app-audit-logs: + include: + - /var/log/app/audit/*.log + - /var/log/app/audit/**/*.log + include_file_path: true + kubeletstats: + collection_interval: 30s + auth_type: serviceAccount + endpoint: https://${env:K8S_NODE_NAME}:10250 + insecure_skip_verify: true processors: k8sattributes: extract: @@ -41,15 +71,35 @@ spec: - k8s.node.name batch: {} exporters: - otlphttp: + otlphttp/logs_pod-logs: endpoint: http://:8010 headers: Authorization: "Basic " X-P-Log-Source: otel-logs X-P-Stream: pai-logs + otlphttp/logs_app-audit-logs: + endpoint: http://:8010 + headers: + Authorization: "Basic " + X-P-Log-Source: otel-logs + X-P-Stream: pai-app-audit + otlphttp/clustermetrics: + endpoint: http://:8010 + headers: + Authorization: "Basic " + X-P-Log-Source: otel-metrics + X-P-Stream: pai-cluster-metrics service: pipelines: - logs: - receivers: [filelog] + logs/pod-logs: + receivers: [filelog/pod-logs] processors: [k8sattributes, batch] - exporters: [otlphttp] + exporters: [otlphttp/logs_pod-logs] + logs/app-audit-logs: + receivers: [filelog/app-audit-logs] + processors: [batch] + exporters: [otlphttp/logs_app-audit-logs] + metrics/cluster: + receivers: [kubeletstats] + processors: [batch] + exporters: [otlphttp/clustermetrics] diff --git a/config/samples/generated_collector_metrics_events.yaml b/config/samples/generated_collector_metrics_events.yaml index fcd3327..21116a1 100644 --- a/config/samples/generated_collector_metrics_events.yaml +++ b/config/samples/generated_collector_metrics_events.yaml @@ -1,17 +1,31 @@ -# This is the OpenTelemetryCollector CR that PAI generates when metrics and/or events are enabled. -# PAI translates spec.metrics + spec.events into a single Deployment-mode collector with two pipelines. +# This is the OpenTelemetryCollector CR that PAI generates for the metrics+events Deployment. +# PAI translates spec.metrics + spec.events into one Deployment-mode collector with one pipeline +# per ScrapeConfigs[] entry, one pipeline for cluster object state (when ClusterMetrics is enabled), +# and one pipeline for Kubernetes events (when Events is enabled). +# +# Note: kubeletstats for pod+node resource metrics lives in the log DaemonSet instead, because each +# kubelet only exposes its own node. Both kubeletstats and k8s_cluster ship to the same +# ClusterMetrics.TargetDataset. # # Input (from ParseableConfig): # metrics: -# stream: pai-metrics -# namespaceSelector: -# mode: include -# namespaces: [demo] -# nodeMetrics: +# clusterMetrics: # enabled: true +# targetDataset: pai-cluster-metrics +# namespaceSelector: +# mode: include +# namespaces: [demo] +# scrapeConfigs: +# - name: app metrics +# uri: /metrics +# port: 8080 +# targetDataset: pai-app-metrics +# namespaceSelector: +# mode: include +# namespaces: [demo] # events: # enabled: true -# stream: pai-events +# targetDataset: pai-events # namespaceSelector: # mode: include # namespaces: [demo] @@ -25,42 +39,56 @@ spec: mode: deployment config: receivers: - # Cluster-level metrics (deployments, pods, nodes, etc.) + # Cluster-level object state (deployments, pods, nodes, etc.) k8s_cluster: collection_interval: 30s + namespaces: [demo] - # Node-level metrics (CPU, memory, disk, network) — only if nodeMetrics.enabled - kubeletstats: - collection_interval: 30s - auth_type: serviceAccount - insecure_skip_verify: true + # One prometheus receiver per ScrapeConfigs[] entry, with Kubernetes pod SD. + prometheus/app-metrics: + config: + scrape_configs: + - job_name: app-metrics + scrape_interval: 30s + metrics_path: /metrics + kubernetes_sd_configs: + - role: pod + namespaces: + names: [demo] + relabel_configs: + - source_labels: [__meta_kubernetes_pod_container_port_number] + action: keep + regex: "8080" + - source_labels: [__meta_kubernetes_pod_ip] + action: replace + target_label: __address__ + replacement: "${1}:8080" - # Kubernetes events — only if events.enabled + # Kubernetes events k8sobjects: objects: - name: events mode: watch - namespaces: [demo] # from events.namespaceSelector (include mode) + namespaces: [demo] processors: - # Namespace filter for metrics (OTTL syntax) — only if metrics.namespaceSelector is set - filter/metrics_ns: - error_mode: ignore - metrics: - datapoint: - - 'resource.attributes["k8s.namespace.name"] == nil or not IsMatch(resource.attributes["k8s.namespace.name"], "^(demo)$")' batch: {} exporters: - # Metrics exporter — sends to pai-metrics stream - otlphttp/metrics: + otlphttp/clustermetrics: endpoint: http://:8010 headers: Authorization: "Basic " X-P-Log-Source: otel-metrics - X-P-Stream: pai-metrics + X-P-Stream: pai-cluster-metrics + + otlphttp/metrics_app-metrics: + endpoint: http://:8010 + headers: + Authorization: "Basic " + X-P-Log-Source: otel-metrics + X-P-Stream: pai-app-metrics - # Events exporter — sends to pai-events stream otlphttp/events: endpoint: http://:8010 headers: @@ -70,13 +98,16 @@ spec: service: pipelines: - # Metrics pipeline - metrics: - receivers: [k8s_cluster, kubeletstats] - processors: [filter/metrics_ns, batch] - exporters: [otlphttp/metrics] + metrics/cluster: + receivers: [k8s_cluster] + processors: [batch] + exporters: [otlphttp/clustermetrics] + + metrics/app-metrics: + receivers: [prometheus/app-metrics] + processors: [batch] + exporters: [otlphttp/metrics_app-metrics] - # Events pipeline (events are logs in OTel) logs: receivers: [k8sobjects] processors: [batch] diff --git a/config/samples/observability_v1alpha1_parseableconfig.yaml b/config/samples/observability_v1alpha1_parseableconfig.yaml index 1bc903b..2894c84 100644 --- a/config/samples/observability_v1alpha1_parseableconfig.yaml +++ b/config/samples/observability_v1alpha1_parseableconfig.yaml @@ -27,21 +27,36 @@ spec: detectionTimeout: "1m" logs: - targetDataset: pai-logs - namespaceSelector: - mode: include - namespaces: - - demo - - metrics: - podMetrics: - targetDataset: pai-pod-metrics + podLogs: + enabled: true + targetDataset: pai-logs namespaceSelector: mode: include namespaces: - demo - nodeMetrics: - targetDataset: pai-node-metrics + files: + - name: app audit logs + hostPath: /var/log/app/audit + targetDataset: pai-app-audit + + metrics: + clusterMetrics: + targetDataset: pai-cluster-metrics + k8sCluster: + enabled: true + nodeConditions: [Ready, DiskPressure, MemoryPressure, PIDPressure, NetworkUnavailable] + allocatableResources: [cpu, memory, storage] + kubelet: + enabled: true + kubeState: + enabled: true + scrapeConfigs: + - name: app metrics + uri: /metrics + port: 8080 + targetDataset: pai-app-metrics + podSelector: + app.kubernetes.io/name: my-app events: enabled: true diff --git a/helm/pai/Chart.yaml b/helm/pai/Chart.yaml index c08c625..baa1a3a 100644 --- a/helm/pai/Chart.yaml +++ b/helm/pai/Chart.yaml @@ -2,8 +2,8 @@ apiVersion: v2 name: pai description: Parseable Auto Instrumentation (PAI) operator for Kubernetes type: application -version: 0.2.0 -appVersion: "0.2.0" +version: 0.3.0 +appVersion: "0.3.0" keywords: - observability - opentelemetry diff --git a/helm/pai/crds/observability.parseable.com_parseableconfigs.yaml b/helm/pai/crds/observability.parseable.com_parseableconfigs.yaml index 98f62fc..b72685c 100644 --- a/helm/pai/crds/observability.parseable.com_parseableconfigs.yaml +++ b/helm/pai/crds/observability.parseable.com_parseableconfigs.yaml @@ -76,55 +76,58 @@ spec: - enabled type: object logs: - description: Logs defines logging configuration + description: Logs defines logging configuration (built-in pod logs + toggle + host-path tail pipelines) properties: - headers: - additionalProperties: - type: string - description: Headers are additional HTTP headers for the logs - exporter. Overrides global headers with the same key. - type: object - namespaceSelector: - description: NamespaceSelector defines which namespaces to collect - logs from - properties: - mode: - description: Mode specifies whether to include or exclude - the listed namespaces - enum: - - include - - exclude - type: string - namespaces: - description: Namespaces is the list of namespace names - items: + files: + description: Files is a list of host-path tail pipelines (e.g. + audit logs, server logs). + items: + description: |- + FileLogConfig defines a host-path tail pipeline. Every *.log file under + HostPath (recursive) is tailed without CRI/container parsing. + properties: + headers: + additionalProperties: + type: string + description: Headers are additional HTTP headers for this + exporter. Overrides global headers with the same key. + type: object + hostPath: + description: HostPath is the directory on the node to mount + and tail recursively type: string - type: array - type: object - targetDataset: - description: TargetDataset is the Parseable dataset name for log - data - type: string - required: - - targetDataset - type: object - metrics: - description: Metrics defines metrics configuration - properties: - nodeMetrics: - description: NodeMetrics controls collection of node-level metrics - via kubeletstats receiver + name: + description: Name uniquely identifies this file pipeline; + used to name the receiver, exporter, volume, and pipeline. + type: string + targetDataset: + description: TargetDataset is the Parseable dataset name + for this pipeline's log data + type: string + required: + - hostPath + - name + - targetDataset + type: object + type: array + podLogs: + description: PodLogs enables collection of all Kubernetes pod + logs from /var/log/pods on each node. properties: + enabled: + description: Enabled controls whether Kubernetes pod logs + are collected + type: boolean headers: additionalProperties: type: string - description: Headers are additional HTTP headers for the node - metrics exporter. Overrides global headers with the same - key. + description: Headers are additional HTTP headers for the pod + logs exporter. Overrides global headers with the same key. type: object namespaceSelector: description: NamespaceSelector defines which namespaces to - collect node metrics from + collect pod logs from properties: mode: description: Mode specifies whether to include or exclude @@ -141,25 +144,84 @@ spec: type: object targetDataset: description: TargetDataset is the Parseable dataset name for - node metric data + pod log data (required when Enabled is true) type: string required: - - targetDataset + - enabled type: object - podMetrics: - description: PodMetrics controls collection of pod/container-level - metrics via kubeletstats and k8s_cluster receivers + type: object + metrics: + description: Metrics defines metrics configuration (cluster metrics + toggle + scrape configs) + properties: + clusterMetrics: + description: ClusterMetrics toggles built-in node/pod/cluster + metrics via kubeletstats + k8s_cluster receivers properties: headers: additionalProperties: type: string - description: Headers are additional HTTP headers for the pod + description: Headers are additional HTTP headers for the cluster metrics exporter. Overrides global headers with the same key. type: object + k8sCluster: + description: K8sCluster collects cluster object state (pod/deployment + status, node conditions) via the k8s_cluster receiver. + properties: + allocatableResources: + description: |- + AllocatableResources is the list of allocatable resources to report (e.g. cpu, memory, storage). + Defaults to no allocatable metrics when empty. + items: + type: string + type: array + enabled: + description: Enabled controls whether the k8s_cluster + receiver runs + type: boolean + nodeConditions: + description: |- + NodeConditions is the list of node conditions to report (e.g. Ready, DiskPressure, MemoryPressure). + Defaults to ["Ready"] when empty. + items: + type: string + type: array + required: + - enabled + type: object + kubeState: + description: KubeState scrapes the kube-state-metrics service + via Kubernetes service discovery. + properties: + enabled: + description: Enabled controls whether kube-state-metrics + scraping runs + type: boolean + namespaces: + description: |- + Namespaces restricts where the kube-state-metrics service is discovered. + Defaults to ["kube-system", "kube-state-metrics", "default"] when empty. + items: + type: string + type: array + required: + - enabled + type: object + kubelet: + description: Kubelet scrapes each node's kubelet /metrics + endpoint (Prometheus format, TLS + service-account bearer). + properties: + enabled: + description: Enabled controls whether kubelet /metrics + scraping runs + type: boolean + required: + - enabled + type: object namespaceSelector: - description: NamespaceSelector defines which namespaces to - collect pod metrics from + description: NamespaceSelector filters pod-scope metrics by + namespace. Node-scope metrics are not filtered. properties: mode: description: Mode specifies whether to include or exclude @@ -175,12 +237,75 @@ spec: type: array type: object targetDataset: - description: TargetDataset is the Parseable dataset name for - pod/container metric data + description: TargetDataset is the Parseable dataset name shared + by every enabled built-in receiver type: string required: - targetDataset type: object + scrapeConfigs: + description: ScrapeConfigs is a list of Prometheus-style scrape + pipelines + items: + description: |- + ScrapeConfig defines a single Prometheus-style scrape pipeline. Pods are + discovered via Kubernetes service discovery and scraped at the given path+port. + Two pod-selection modes are supported: + - PodSelector (label match) — recommended; matches pods by label key/value + - port-only — keeps pods whose container exposes the named port (legacy) + properties: + headers: + additionalProperties: + type: string + description: Headers are additional HTTP headers for this + exporter. Overrides global headers with the same key. + type: object + name: + description: Name uniquely identifies this scrape pipeline + type: string + namespaceSelector: + description: NamespaceSelector limits service discovery + to the matching namespaces + properties: + mode: + description: Mode specifies whether to include or exclude + the listed namespaces + enum: + - include + - exclude + type: string + namespaces: + description: Namespaces is the list of namespace names + items: + type: string + type: array + type: object + podSelector: + additionalProperties: + type: string + description: |- + PodSelector selects pods by label key/value pairs. When set, the operator emits + a Prometheus keep-relabel per label and skips the port-number filter. + type: object + port: + description: Port is the container port to scrape + format: int32 + type: integer + targetDataset: + description: TargetDataset is the Parseable dataset name + for this scrape pipeline's metric data + type: string + uri: + description: URI is the HTTP path to scrape on each discovered + pod (e.g. "/metrics") + type: string + required: + - name + - port + - targetDataset + - uri + type: object + type: array type: object paused: description: |- diff --git a/internal/controller/parseableconfig_controller.go b/internal/controller/parseableconfig_controller.go index 5dc9726..0a5570f 100644 --- a/internal/controller/parseableconfig_controller.go +++ b/internal/controller/parseableconfig_controller.go @@ -425,7 +425,9 @@ func (r *ParseableConfigReconciler) ensureInstrumentation(ctx context.Context, c } // ensureLogCollector creates or updates a DaemonSet-mode OpenTelemetryCollector CR for log collection. -// If logs are not configured, it deletes any existing log collector. +// The DaemonSet hosts filelog pipelines (one per Logs[] entry) and, when ClusterMetrics is enabled, +// a kubeletstats pipeline that scrapes each node's local kubelet for pod+node resource metrics. +// If neither is configured, it deletes any existing log collector. func (r *ParseableConfigReconciler) ensureLogCollector(ctx context.Context, config *observabilityv1alpha1.ParseableConfig) error { logger := log.FromContext(ctx) @@ -439,8 +441,22 @@ func (r *ParseableConfigReconciler) ensureLogCollector(ctx context.Context, conf existing.SetGroupVersionKind(gvk) err := r.Get(ctx, client.ObjectKey{Name: logCollectorName, Namespace: config.Namespace}, existing) - // If logs not configured, clean up any existing collector and return - if config.Spec.Logs == nil || config.Spec.Logs.TargetDataset == "" { + podLogsEnabled := config.Spec.Logs != nil && + config.Spec.Logs.PodLogs != nil && + config.Spec.Logs.PodLogs.Enabled && + config.Spec.Logs.PodLogs.TargetDataset != "" + + hasFiles := false + if config.Spec.Logs != nil { + for _, f := range config.Spec.Logs.Files { + if f.Name != "" && f.HostPath != "" && f.TargetDataset != "" { + hasFiles = true + break + } + } + } + + if !podLogsEnabled && !hasFiles { if err == nil { logger.Info("Logs not configured, deleting log collector") if delErr := r.Delete(ctx, existing); delErr != nil && !errors.IsNotFound(delErr) { @@ -455,25 +471,51 @@ func (r *ParseableConfigReconciler) ensureLogCollector(ctx context.Context, conf return cfgErr } + // Build hostPath volumes/mounts: /var/log/pods for podLogs, plus each Files entry. + volumes := []interface{}{} + volumeMounts := []interface{}{} + seen := map[string]bool{} + addVolume := func(name, hostPath string) { + if hostPath == "" || seen[hostPath] { + return + } + seen[hostPath] = true + volumes = append(volumes, map[string]interface{}{ + "name": name, + "hostPath": map[string]interface{}{"path": hostPath}, + }) + volumeMounts = append(volumeMounts, map[string]interface{}{ + "name": name, + "mountPath": hostPath, + "readOnly": true, + }) + } + if podLogsEnabled { + addVolume("pod-logs", "/var/log/pods") + } + if config.Spec.Logs != nil { + for _, f := range config.Spec.Logs.Files { + volName := sanitizeName(f.Name) + if volName == "" { + volName = sanitizeName(f.HostPath) + } + addVolume(volName, f.HostPath) + } + } + spec := map[string]interface{}{ "mode": "daemonset", "config": collectorConfig, - "volumes": []interface{}{ - map[string]interface{}{ - "name": "varlogpods", - "hostPath": map[string]interface{}{ - "path": "/var/log/pods", - }, - }, - }, - "volumeMounts": []interface{}{ - map[string]interface{}{ - "name": "varlogpods", - "mountPath": "/var/log/pods", - "readOnly": true, - }, + // Tolerate every taint so the log DaemonSet can run on every node — required to + // collect cluster-wide pod logs and host-path logs that live on tainted nodepools. + "tolerations": []interface{}{ + map[string]interface{}{"operator": "Exists"}, }, } + if len(volumes) > 0 { + spec["volumes"] = volumes + spec["volumeMounts"] = volumeMounts + } if err == nil { // Update existing @@ -511,59 +553,22 @@ func (r *ParseableConfigReconciler) ensureLogCollector(ctx context.Context, conf return nil } -// buildLogCollectorConfig builds the OTel collector pipeline config YAML for log collection. -// It uses the filelog receiver with include/exclude patterns based on the namespace selector, -// k8sattributes processor for metadata enrichment, and otlphttp exporter for Parseable. +// buildLogCollectorConfig builds the OTel collector pipeline config for the log DaemonSet. +// Each Logs[] entry becomes its own filelog receiver + exporter pair. If ClusterMetrics is enabled, +// a single kubeletstats receiver feeds pod+node resource metrics into the same target dataset. func (r *ParseableConfigReconciler) buildLogCollectorConfig(ctx context.Context, config *observabilityv1alpha1.ParseableConfig) (map[string]interface{}, error) { - logs := config.Spec.Logs - - // Build filelog include/exclude patterns from namespace selector - var includePatterns, excludePatterns []interface{} - - switch logs.NamespaceSelector.Mode { - case "include": - for _, ns := range logs.NamespaceSelector.Namespaces { - includePatterns = append(includePatterns, fmt.Sprintf("/var/log/pods/%s_*/*/*.log", ns)) - } - case "exclude": - includePatterns = []interface{}{"/var/log/pods/*/*/*.log"} - for _, ns := range logs.NamespaceSelector.Namespaces { - excludePatterns = append(excludePatterns, fmt.Sprintf("/var/log/pods/%s_*/*/*.log", ns)) - } - default: - // No selector — collect from all namespaces - includePatterns = []interface{}{"/var/log/pods/*/*/*.log"} - } - - filelogReceiver := map[string]interface{}{ - "include": includePatterns, - "include_file_path": true, - "operators": []interface{}{ - map[string]interface{}{ - "type": "container", - "id": "container-parser", - }, - }, - } - if len(excludePatterns) > 0 { - filelogReceiver["exclude"] = excludePatterns - } - - // Read credentials for the exporter secret := &corev1.Secret{} secretRef := config.Spec.Target.CredentialsSecret if err := r.Get(ctx, client.ObjectKey{Name: secretRef.Name, Namespace: secretRef.Namespace}, secret); err != nil { return nil, fmt.Errorf("failed to read credentials secret %s/%s: %w", secretRef.Namespace, secretRef.Name, err) } - username := string(secret.Data["username"]) password := string(secret.Data["password"]) basicAuth := base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%s:%s", username, password))) endpoint := strings.TrimRight(config.Spec.Target.Endpoint, "/") + tenantID := config.Spec.Target.GlobalTenantID - receivers := map[string]interface{}{ - "filelog": filelogReceiver, - } + receivers := map[string]interface{}{} processors := map[string]interface{}{ "batch": map[string]interface{}{}, "k8sattributes": map[string]interface{}{ @@ -577,131 +582,94 @@ func (r *ParseableConfigReconciler) buildLogCollectorConfig(ctx context.Context, }, }, } - tenantID := config.Spec.Target.GlobalTenantID - exporters := map[string]interface{}{ - "otlphttp/logs": map[string]interface{}{ - "endpoint": endpoint, - "headers": r.buildExporterHeaders(basicAuth, "otel-logs", logs.TargetDataset, tenantID, config.Spec.Target.Headers, logs.Headers), - }, - } - pipelines := map[string]interface{}{ - "logs": map[string]interface{}{ - "receivers": []interface{}{"filelog"}, - "processors": []interface{}{"k8sattributes", "batch"}, - "exporters": []interface{}{"otlphttp/logs"}, - }, - } + exporters := map[string]interface{}{} + pipelines := map[string]interface{}{} - // Add kubeletstats metrics pipelines — split into node metrics and pod metrics on separate streams. - if config.Spec.Metrics != nil { - kubeletstatsAdded := false - addKubeletstats := func() { - if !kubeletstatsAdded { - receivers["kubeletstats"] = map[string]interface{}{ - "collection_interval": "30s", - "auth_type": "serviceAccount", - "endpoint": "https://${env:K8S_NODE_NAME}:10250", - "insecure_skip_verify": true, - } - kubeletstatsAdded = true + // Pod logs — built-in pipeline tailing /var/log/pods with the CRI container parser. + if config.Spec.Logs != nil && + config.Spec.Logs.PodLogs != nil && + config.Spec.Logs.PodLogs.Enabled && + config.Spec.Logs.PodLogs.TargetDataset != "" { + + pl := config.Spec.Logs.PodLogs + + var includePatterns, excludePatterns []interface{} + switch pl.NamespaceSelector.Mode { + case "include": + for _, ns := range pl.NamespaceSelector.Namespaces { + includePatterns = append(includePatterns, fmt.Sprintf("/var/log/pods/%s_*/*/*.log", ns)) } + case "exclude": + includePatterns = []interface{}{"/var/log/pods/*/*/*.log"} + for _, ns := range pl.NamespaceSelector.Namespaces { + excludePatterns = append(excludePatterns, fmt.Sprintf("/var/log/pods/%s_*/*/*.log", ns)) + } + default: + includePatterns = []interface{}{"/var/log/pods/*/*/*.log"} } - // Node metrics pipeline → separate stream - if config.Spec.Metrics.NodeMetrics != nil && config.Spec.Metrics.NodeMetrics.TargetDataset != "" { - addKubeletstats() - exporters["otlphttp/nodemetrics"] = map[string]interface{}{ - "endpoint": endpoint, - "headers": r.buildExporterHeaders(basicAuth, "otel-metrics", config.Spec.Metrics.NodeMetrics.TargetDataset, tenantID, config.Spec.Target.Headers, config.Spec.Metrics.NodeMetrics.Headers), - } - // filter/node_only — keep only k8s.node.* metrics - processors["filter/node_only"] = map[string]interface{}{ - "error_mode": "ignore", - "metrics": map[string]interface{}{ - "metric": []interface{}{ - `not IsMatch(name, "^k8s\\.node\\.")`, - }, + filelogReceiver := map[string]interface{}{ + "include": includePatterns, + "include_file_path": true, + "operators": []interface{}{ + map[string]interface{}{ + "type": "container", + "id": "container-parser", }, - } - pipelines["metrics/node"] = map[string]interface{}{ - "receivers": []interface{}{"kubeletstats"}, - "processors": []interface{}{"filter/node_only", "batch"}, - "exporters": []interface{}{"otlphttp/nodemetrics"}, - } + }, + } + if len(excludePatterns) > 0 { + filelogReceiver["exclude"] = excludePatterns } - // Pod metrics pipeline → separate stream, namespace-filtered - if config.Spec.Metrics.PodMetrics != nil && config.Spec.Metrics.PodMetrics.TargetDataset != "" { - addKubeletstats() - exporters["otlphttp/podmetrics"] = map[string]interface{}{ - "endpoint": endpoint, - "headers": r.buildExporterHeaders(basicAuth, "otel-metrics", config.Spec.Metrics.PodMetrics.TargetDataset, tenantID, config.Spec.Target.Headers, config.Spec.Metrics.PodMetrics.Headers), + receivers["filelog/pod-logs"] = filelogReceiver + exporters["otlphttp/logs_pod-logs"] = map[string]interface{}{ + "endpoint": endpoint, + "headers": r.buildExporterHeaders(basicAuth, "otel-logs", pl.TargetDataset, tenantID, config.Spec.Target.Headers, pl.Headers), + } + pipelines["logs/pod-logs"] = map[string]interface{}{ + "receivers": []interface{}{"filelog/pod-logs"}, + "processors": []interface{}{"k8sattributes", "batch"}, + "exporters": []interface{}{"otlphttp/logs_pod-logs"}, + } + } + + // File pipelines — one per Files[] entry. Tail every *.log under HostPath recursively, no parser. + if config.Spec.Logs != nil { + for _, f := range config.Spec.Logs.Files { + id := sanitizeName(f.Name) + if id == "" || f.HostPath == "" || f.TargetDataset == "" { + continue } - // filter/pod_only — drop k8s.node.* metrics, keep pod/container metrics - processors["filter/pod_only"] = map[string]interface{}{ - "error_mode": "ignore", - "metrics": map[string]interface{}{ - "metric": []interface{}{ - `IsMatch(name, "^k8s\\.node\\.")`, - }, + base := strings.TrimRight(f.HostPath, "/") + + receivers["filelog/"+id] = map[string]interface{}{ + "include": []interface{}{ + fmt.Sprintf("%s/*", base), + fmt.Sprintf("%s/**/*", base), }, + "include_file_path": true, } - podProcessors := []interface{}{"filter/pod_only"} - - // filter/pod_ns — filter pod metrics to configured namespaces - podNs := config.Spec.Metrics.PodMetrics.NamespaceSelector - if len(podNs.Namespaces) > 0 { - switch podNs.Mode { - case "include": - // Drop metrics NOT in the allowed namespaces - parts := make([]string, 0, len(podNs.Namespaces)) - for _, ns := range podNs.Namespaces { - parts = append(parts, fmt.Sprintf(`resource.attributes["k8s.namespace.name"] != "%s"`, ns)) - } - processors["filter/pod_ns"] = map[string]interface{}{ - "error_mode": "ignore", - "metrics": map[string]interface{}{ - "metric": []interface{}{ - strings.Join(parts, " and "), - }, - }, - } - podProcessors = append(podProcessors, "filter/pod_ns") - case "exclude": - // Drop metrics IN the excluded namespaces - var conditions []interface{} - for _, ns := range podNs.Namespaces { - conditions = append(conditions, fmt.Sprintf(`resource.attributes["k8s.namespace.name"] == "%s"`, ns)) - } - processors["filter/pod_ns"] = map[string]interface{}{ - "error_mode": "ignore", - "metrics": map[string]interface{}{ - "metric": conditions, - }, - } - podProcessors = append(podProcessors, "filter/pod_ns") - } + exporters["otlphttp/logs_"+id] = map[string]interface{}{ + "endpoint": endpoint, + "headers": r.buildExporterHeaders(basicAuth, "otel-logs", f.TargetDataset, tenantID, config.Spec.Target.Headers, f.Headers), } - - podProcessors = append(podProcessors, "batch") - pipelines["metrics/pod"] = map[string]interface{}{ - "receivers": []interface{}{"kubeletstats"}, - "processors": podProcessors, - "exporters": []interface{}{"otlphttp/podmetrics"}, + pipelines["logs/"+id] = map[string]interface{}{ + "receivers": []interface{}{"filelog/" + id}, + "processors": []interface{}{"batch"}, + "exporters": []interface{}{"otlphttp/logs_" + id}, } } } - collectorConfig := map[string]interface{}{ + return map[string]interface{}{ "receivers": receivers, "processors": processors, "exporters": exporters, "service": map[string]interface{}{ "pipelines": pipelines, }, - } - - return collectorConfig, nil + }, nil } // ensureMetricsEventsCollector creates or updates a single Deployment-mode OpenTelemetryCollector CR @@ -720,7 +688,18 @@ func (r *ParseableConfigReconciler) ensureMetricsEventsCollector(ctx context.Con existing.SetGroupVersionKind(gvk) err := r.Get(ctx, client.ObjectKey{Name: metricsEventsCollectorName, Namespace: config.Namespace}, existing) - metricsEnabled := config.Spec.Metrics != nil && config.Spec.Metrics.PodMetrics != nil && config.Spec.Metrics.PodMetrics.TargetDataset != "" + metricsEnabled := false + if config.Spec.Metrics != nil { + if anyClusterMetricEnabled(config.Spec.Metrics.ClusterMetrics) { + metricsEnabled = true + } + for _, sc := range config.Spec.Metrics.ScrapeConfigs { + if sc.Name != "" && sc.TargetDataset != "" && sc.Port > 0 { + metricsEnabled = true + break + } + } + } eventsEnabled := config.Spec.Events != nil && config.Spec.Events.Enabled && config.Spec.Events.TargetDataset != "" if !metricsEnabled && !eventsEnabled { @@ -741,6 +720,11 @@ func (r *ParseableConfigReconciler) ensureMetricsEventsCollector(ctx context.Con spec := map[string]interface{}{ "mode": "deployment", "config": collectorConfig, + // Tolerate every taint so the metrics+events Deployment can land on any node — + // k8sobjects/k8s_cluster only need API access, not specific nodepool placement. + "tolerations": []interface{}{ + map[string]interface{}{"operator": "Exists"}, + }, } if err == nil { @@ -777,18 +761,17 @@ func (r *ParseableConfigReconciler) ensureMetricsEventsCollector(ctx context.Con return nil } -// buildMetricsEventsCollectorConfig builds a single OTel collector config with: -// - metrics pipeline: k8s_cluster → filter → batch → otlphttp/metrics -// - logs pipeline (events): k8sobjects → filter → batch → otlphttp/events -// -// Each pipeline has its own exporter with the correct stream/headers for Parseable. +// buildMetricsEventsCollectorConfig builds a single Deployment-mode collector config that hosts: +// - one pipeline using the k8s_cluster receiver (cluster-level object state) when ClusterMetrics is enabled; +// - one pipeline per ScrapeConfigs[] entry using the prometheus receiver with Kubernetes service discovery; +// - one pipeline using the k8sobjects receiver when Events is enabled. func (r *ParseableConfigReconciler) buildMetricsEventsCollectorConfig( ctx context.Context, config *observabilityv1alpha1.ParseableConfig, metricsEnabled, eventsEnabled bool, ) (map[string]interface{}, error) { + _ = metricsEnabled // gating is per-section below - // Read credentials (shared by both pipelines) secret := &corev1.Secret{} secretRef := config.Spec.Target.CredentialsSecret if err := r.Get(ctx, client.ObjectKey{Name: secretRef.Name, Namespace: secretRef.Namespace}, secret); err != nil { @@ -808,48 +791,195 @@ func (r *ParseableConfigReconciler) buildMetricsEventsCollectorConfig( exporters := map[string]interface{}{} pipelines := map[string]interface{}{} - // --- Metrics pipeline (k8s_cluster receiver → podMetrics stream) --- - if metricsEnabled { - podMetrics := config.Spec.Metrics.PodMetrics + // Cluster metrics — up to three receivers (k8s_cluster, kubelet /metrics, kube-state-metrics) + // feed a single shared exporter at ClusterMetrics.TargetDataset. + if config.Spec.Metrics != nil && anyClusterMetricEnabled(config.Spec.Metrics.ClusterMetrics) { + cm := config.Spec.Metrics.ClusterMetrics - k8sClusterCfg := map[string]interface{}{ - "collection_interval": "30s", - } - // Use the receiver's native namespaces field to filter at source (include mode only) - if len(podMetrics.NamespaceSelector.Namespaces) > 0 && podMetrics.NamespaceSelector.Mode == "include" { - k8sClusterCfg["namespaces"] = toInterfaceSlice(podMetrics.NamespaceSelector.Namespaces) - } - receivers["k8s_cluster"] = k8sClusterCfg - - metricsProcessorList := []interface{}{} + var clusterReceivers []interface{} - // Filter processor using OTTL metric context — drops metrics not in allowed namespaces - if len(podMetrics.NamespaceSelector.Namespaces) > 0 && podMetrics.NamespaceSelector.Mode == "include" { - parts := make([]string, 0, len(podMetrics.NamespaceSelector.Namespaces)) - for _, ns := range podMetrics.NamespaceSelector.Namespaces { - parts = append(parts, fmt.Sprintf(`resource.attributes["k8s.namespace.name"] != "%s"`, ns)) + if cm.K8sCluster != nil && cm.K8sCluster.Enabled { + k8sClusterCfg := map[string]interface{}{ + "collection_interval": "30s", + "auth_type": "serviceAccount", + } + if len(cm.K8sCluster.NodeConditions) > 0 { + k8sClusterCfg["node_conditions_to_report"] = toInterfaceSlice(cm.K8sCluster.NodeConditions) + } + if len(cm.K8sCluster.AllocatableResources) > 0 { + k8sClusterCfg["allocatable_types_to_report"] = toInterfaceSlice(cm.K8sCluster.AllocatableResources) } - condition := strings.Join(parts, " and ") - processors["filter/metrics_ns"] = map[string]interface{}{ - "error_mode": "ignore", - "metrics": map[string]interface{}{ - "metric": []interface{}{condition}, + if len(cm.NamespaceSelector.Namespaces) > 0 && cm.NamespaceSelector.Mode == "include" { + k8sClusterCfg["namespaces"] = toInterfaceSlice(cm.NamespaceSelector.Namespaces) + } + receivers["k8s_cluster"] = k8sClusterCfg + clusterReceivers = append(clusterReceivers, "k8s_cluster") + } + + if cm.Kubelet != nil && cm.Kubelet.Enabled { + receivers["prometheus/kubelet"] = map[string]interface{}{ + "config": map[string]interface{}{ + "scrape_configs": []interface{}{ + map[string]interface{}{ + "job_name": "kubelet", + "scrape_interval": "30s", + "scheme": "https", + "bearer_token_file": "/var/run/secrets/kubernetes.io/serviceaccount/token", + "tls_config": map[string]interface{}{ + "ca_file": "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt", + "insecure_skip_verify": true, + }, + "kubernetes_sd_configs": []interface{}{ + map[string]interface{}{"role": "node"}, + }, + "relabel_configs": []interface{}{ + map[string]interface{}{ + "target_label": "__metrics_path__", + "replacement": "/metrics", + }, + map[string]interface{}{ + "source_labels": []interface{}{"__meta_kubernetes_node_name"}, + "target_label": "node", + }, + }, + }, + }, }, } - metricsProcessorList = append(metricsProcessorList, "filter/metrics_ns") + clusterReceivers = append(clusterReceivers, "prometheus/kubelet") } - metricsProcessorList = append(metricsProcessorList, "batch") + if cm.KubeState != nil && cm.KubeState.Enabled { + ksNamespaces := cm.KubeState.Namespaces + if len(ksNamespaces) == 0 { + ksNamespaces = []string{"kube-system", "kube-state-metrics", "default"} + } + receivers["prometheus/kube-state-metrics"] = map[string]interface{}{ + "config": map[string]interface{}{ + "scrape_configs": []interface{}{ + map[string]interface{}{ + "job_name": "kube-state-metrics", + "scrape_interval": "30s", + "kubernetes_sd_configs": []interface{}{ + map[string]interface{}{ + "role": "service", + "namespaces": map[string]interface{}{ + "names": toInterfaceSlice(ksNamespaces), + }, + }, + }, + "relabel_configs": []interface{}{ + map[string]interface{}{ + "source_labels": []interface{}{"__meta_kubernetes_service_name"}, + "regex": ".*kube-state-metrics.*", + "action": "keep", + }, + map[string]interface{}{ + "source_labels": []interface{}{"__meta_kubernetes_service_port_name"}, + "regex": "metrics|http-metrics", + "action": "keep", + }, + }, + }, + }, + }, + } + clusterReceivers = append(clusterReceivers, "prometheus/kube-state-metrics") + } - exporters["otlphttp/metrics"] = map[string]interface{}{ + exporters["otlphttp/clustermetrics"] = map[string]interface{}{ "endpoint": endpoint, - "headers": r.buildExporterHeaders(basicAuth, "otel-metrics", podMetrics.TargetDataset, tenantID, config.Spec.Target.Headers, podMetrics.Headers), + "headers": r.buildExporterHeaders(basicAuth, "otel-metrics", cm.TargetDataset, tenantID, config.Spec.Target.Headers, cm.Headers), + } + pipelines["metrics/cluster"] = map[string]interface{}{ + "receivers": clusterReceivers, + "processors": []interface{}{"batch"}, + "exporters": []interface{}{"otlphttp/clustermetrics"}, } + } + + // Per-scrape-entry Prometheus pipelines with Kubernetes pod service discovery. + if config.Spec.Metrics != nil { + for _, sc := range config.Spec.Metrics.ScrapeConfigs { + id := sanitizeName(sc.Name) + if id == "" || sc.TargetDataset == "" || sc.Port <= 0 { + continue + } + metricsPath := sc.URI + if metricsPath == "" { + metricsPath = "/metrics" + } + if !strings.HasPrefix(metricsPath, "/") { + metricsPath = "/" + metricsPath + } + + sdConfig := map[string]interface{}{ + "role": "pod", + } + if len(sc.NamespaceSelector.Namespaces) > 0 && sc.NamespaceSelector.Mode == "include" { + sdConfig["namespaces"] = map[string]interface{}{ + "names": toInterfaceSlice(sc.NamespaceSelector.Namespaces), + } + } - pipelines["metrics"] = map[string]interface{}{ - "receivers": []interface{}{"k8s_cluster"}, - "processors": metricsProcessorList, - "exporters": []interface{}{"otlphttp/metrics"}, + var relabelConfigs []interface{} + if len(sc.PodSelector) > 0 { + // Label-based filter: one keep relabel per label key/value. + for k, v := range sc.PodSelector { + relabelConfigs = append(relabelConfigs, map[string]interface{}{ + "source_labels": []interface{}{"__meta_kubernetes_pod_label_" + sanitizePromLabel(k)}, + "action": "keep", + "regex": v, + }) + } + } else { + // Port-based filter: keep only pods whose container exposes the configured port. + relabelConfigs = append(relabelConfigs, map[string]interface{}{ + "source_labels": []interface{}{"__meta_kubernetes_pod_container_port_number"}, + "action": "keep", + "regex": fmt.Sprintf("%d", sc.Port), + }) + } + relabelConfigs = append(relabelConfigs, map[string]interface{}{ + "source_labels": []interface{}{"__meta_kubernetes_pod_ip"}, + "action": "replace", + "target_label": "__address__", + // $$1 → $1 after OTel confmap escape, then Prometheus expands to the captured pod IP. + "replacement": fmt.Sprintf("$$1:%d", sc.Port), + }) + if len(sc.NamespaceSelector.Namespaces) > 0 && sc.NamespaceSelector.Mode == "exclude" { + regex := strings.Join(sc.NamespaceSelector.Namespaces, "|") + relabelConfigs = append([]interface{}{ + map[string]interface{}{ + "source_labels": []interface{}{"__meta_kubernetes_namespace"}, + "action": "drop", + "regex": regex, + }, + }, relabelConfigs...) + } + + receivers["prometheus/"+id] = map[string]interface{}{ + "config": map[string]interface{}{ + "scrape_configs": []interface{}{ + map[string]interface{}{ + "job_name": id, + "scrape_interval": "30s", + "metrics_path": metricsPath, + "kubernetes_sd_configs": []interface{}{sdConfig}, + "relabel_configs": relabelConfigs, + }, + }, + }, + } + exporters["otlphttp/metrics_"+id] = map[string]interface{}{ + "endpoint": endpoint, + "headers": r.buildExporterHeaders(basicAuth, "otel-metrics", sc.TargetDataset, tenantID, config.Spec.Target.Headers, sc.Headers), + } + pipelines["metrics/"+id] = map[string]interface{}{ + "receivers": []interface{}{"prometheus/" + id}, + "processors": []interface{}{"batch"}, + "exporters": []interface{}{"otlphttp/metrics_" + id}, + } } } @@ -921,6 +1051,59 @@ func toInterfaceSlice(ss []string) []interface{} { return out } +// sanitizePromLabel converts a Kubernetes label key into the Prometheus relabel form +// (alphanumeric + underscore). Mirrors Prometheus' own label-name sanitization rules. +func sanitizePromLabel(s string) string { + var b strings.Builder + for _, r := range s { + switch { + case (r >= 'a' && r <= 'z') || (r >= 'A' && r <= 'Z') || (r >= '0' && r <= '9'): + b.WriteRune(r) + default: + b.WriteRune('_') + } + } + return b.String() +} + +// anyClusterMetricEnabled reports whether at least one built-in cluster-metrics +// receiver is enabled with a target dataset to ship to. +func anyClusterMetricEnabled(cm *observabilityv1alpha1.ClusterMetricsConfig) bool { + if cm == nil || cm.TargetDataset == "" { + return false + } + if cm.K8sCluster != nil && cm.K8sCluster.Enabled { + return true + } + if cm.Kubelet != nil && cm.Kubelet.Enabled { + return true + } + if cm.KubeState != nil && cm.KubeState.Enabled { + return true + } + return false +} + +// sanitizeName returns a DNS-1123 compliant lowercase identifier derived from s. +// Used for volume names and OTel pipeline/receiver/exporter IDs. +func sanitizeName(s string) string { + var b strings.Builder + s = strings.ToLower(s) + for _, r := range s { + switch { + case (r >= 'a' && r <= 'z') || (r >= '0' && r <= '9'): + b.WriteRune(r) + case r == '-' || r == '_' || r == ' ' || r == '/' || r == '.': + b.WriteRune('-') + } + } + out := strings.Trim(b.String(), "-") + for strings.Contains(out, "--") { + out = strings.ReplaceAll(out, "--", "-") + } + return out +} + // buildExporterHeaders returns a merged headers map for collector exporters. // Merge order: globalHeaders → signalHeaders → built-in headers (Authorization, X-P-Stream, X-P-Log-Source, X-P-Tenant). // Built-in headers always win. diff --git a/java-demo.yaml b/java-demo.yaml deleted file mode 100644 index af29b3c..0000000 --- a/java-demo.yaml +++ /dev/null @@ -1,31 +0,0 @@ -apiVersion: apps/v1 -kind: Deployment -metadata: - name: java-demo -spec: - selector: - matchLabels: - app: java-demo - replicas: 2 - template: - metadata: - labels: - app: java-demo - spec: - containers: - - image: fstab/java-demo - name: java-demo - ports: - - containerPort: 8080 ---- -kind: Service -apiVersion: v1 -metadata: - name: java-demo -spec: - selector: - app: java-demo - ports: - - protocol: TCP - port: 80 - targetPort: 8080 From e000de1c624a14f3d9f7881f318f7744d11a4b9a Mon Sep 17 00:00:00 2001 From: AdheipSingh Date: Tue, 9 Jun 2026 09:28:56 +0530 Subject: [PATCH 2/4] Add target.encoding (json default, proto opt-in) for OTLP wire format --- api/v1alpha1/parseableconfig_types.go | 5 ++++ ...bility.parseable.com_parseableconfigs.yaml | 8 ++++++ ...bility.parseable.com_parseableconfigs.yaml | 8 ++++++ .../controller/parseableconfig_controller.go | 28 ++++++++++++++++++- 4 files changed, 48 insertions(+), 1 deletion(-) diff --git a/api/v1alpha1/parseableconfig_types.go b/api/v1alpha1/parseableconfig_types.go index b5a77cb..9804901 100644 --- a/api/v1alpha1/parseableconfig_types.go +++ b/api/v1alpha1/parseableconfig_types.go @@ -52,6 +52,11 @@ type TargetConfig struct { // Headers are additional HTTP headers applied to all signal exporters. Signal-level headers override these. Headers map[string]string `json:"headers,omitempty"` + + // Encoding controls the OTLP HTTP wire format. "json" (default) is universally supported by Parseable; + // "proto" is more efficient but not every Parseable cluster supports it. + // +kubebuilder:validation:Enum=json;proto + Encoding string `json:"encoding,omitempty"` } // InstrumentationConfig defines auto-instrumentation settings diff --git a/config/crd/bases/observability.parseable.com_parseableconfigs.yaml b/config/crd/bases/observability.parseable.com_parseableconfigs.yaml index b72685c..5291d2a 100644 --- a/config/crd/bases/observability.parseable.com_parseableconfigs.yaml +++ b/config/crd/bases/observability.parseable.com_parseableconfigs.yaml @@ -329,6 +329,14 @@ spec: - name - namespace type: object + encoding: + description: |- + Encoding controls the OTLP HTTP wire format. "json" (default) is universally supported by Parseable; + "proto" is more efficient but not every Parseable cluster supports it. + enum: + - json + - proto + type: string endpoint: description: Endpoint is the Parseable API endpoint URL for ingestion type: string diff --git a/helm/pai/crds/observability.parseable.com_parseableconfigs.yaml b/helm/pai/crds/observability.parseable.com_parseableconfigs.yaml index b72685c..5291d2a 100644 --- a/helm/pai/crds/observability.parseable.com_parseableconfigs.yaml +++ b/helm/pai/crds/observability.parseable.com_parseableconfigs.yaml @@ -329,6 +329,14 @@ spec: - name - namespace type: object + encoding: + description: |- + Encoding controls the OTLP HTTP wire format. "json" (default) is universally supported by Parseable; + "proto" is more efficient but not every Parseable cluster supports it. + enum: + - json + - proto + type: string endpoint: description: Endpoint is the Parseable API endpoint URL for ingestion type: string diff --git a/internal/controller/parseableconfig_controller.go b/internal/controller/parseableconfig_controller.go index 0a5570f..a13dc48 100644 --- a/internal/controller/parseableconfig_controller.go +++ b/internal/controller/parseableconfig_controller.go @@ -344,7 +344,7 @@ func (r *ParseableConfigReconciler) buildInstrumentationSpec(ctx context.Context "env": []interface{}{ map[string]interface{}{ "name": "OTEL_EXPORTER_OTLP_PROTOCOL", - "value": "http/protobuf", + "value": resolveOtlpProtocol(config.Spec.Target.Encoding), }, map[string]interface{}{ "name": "OTEL_EXPORTER_OTLP_HEADERS", @@ -566,6 +566,7 @@ func (r *ParseableConfigReconciler) buildLogCollectorConfig(ctx context.Context, password := string(secret.Data["password"]) basicAuth := base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%s:%s", username, password))) endpoint := strings.TrimRight(config.Spec.Target.Endpoint, "/") + encoding := resolveOtlpEncoding(config.Spec.Target.Encoding) tenantID := config.Spec.Target.GlobalTenantID receivers := map[string]interface{}{} @@ -625,6 +626,7 @@ func (r *ParseableConfigReconciler) buildLogCollectorConfig(ctx context.Context, receivers["filelog/pod-logs"] = filelogReceiver exporters["otlphttp/logs_pod-logs"] = map[string]interface{}{ "endpoint": endpoint, + "encoding": encoding, "headers": r.buildExporterHeaders(basicAuth, "otel-logs", pl.TargetDataset, tenantID, config.Spec.Target.Headers, pl.Headers), } pipelines["logs/pod-logs"] = map[string]interface{}{ @@ -652,6 +654,7 @@ func (r *ParseableConfigReconciler) buildLogCollectorConfig(ctx context.Context, } exporters["otlphttp/logs_"+id] = map[string]interface{}{ "endpoint": endpoint, + "encoding": encoding, "headers": r.buildExporterHeaders(basicAuth, "otel-logs", f.TargetDataset, tenantID, config.Spec.Target.Headers, f.Headers), } pipelines["logs/"+id] = map[string]interface{}{ @@ -782,6 +785,7 @@ func (r *ParseableConfigReconciler) buildMetricsEventsCollectorConfig( password := string(secret.Data["password"]) basicAuth := base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%s:%s", username, password))) endpoint := strings.TrimRight(config.Spec.Target.Endpoint, "/") + encoding := resolveOtlpEncoding(config.Spec.Target.Encoding) tenantID := config.Spec.Target.GlobalTenantID receivers := map[string]interface{}{} @@ -889,6 +893,7 @@ func (r *ParseableConfigReconciler) buildMetricsEventsCollectorConfig( exporters["otlphttp/clustermetrics"] = map[string]interface{}{ "endpoint": endpoint, + "encoding": encoding, "headers": r.buildExporterHeaders(basicAuth, "otel-metrics", cm.TargetDataset, tenantID, config.Spec.Target.Headers, cm.Headers), } pipelines["metrics/cluster"] = map[string]interface{}{ @@ -973,6 +978,7 @@ func (r *ParseableConfigReconciler) buildMetricsEventsCollectorConfig( } exporters["otlphttp/metrics_"+id] = map[string]interface{}{ "endpoint": endpoint, + "encoding": encoding, "headers": r.buildExporterHeaders(basicAuth, "otel-metrics", sc.TargetDataset, tenantID, config.Spec.Target.Headers, sc.Headers), } pipelines["metrics/"+id] = map[string]interface{}{ @@ -1020,6 +1026,7 @@ func (r *ParseableConfigReconciler) buildMetricsEventsCollectorConfig( exporters["otlphttp/events"] = map[string]interface{}{ "endpoint": endpoint, + "encoding": encoding, "headers": r.buildExporterHeaders(basicAuth, "otel-logs", events.TargetDataset, tenantID, config.Spec.Target.Headers, events.Headers), } @@ -1051,6 +1058,25 @@ func toInterfaceSlice(ss []string) []interface{} { return out } +// resolveOtlpEncoding returns the otlphttp exporter `encoding` value for the +// configured target encoding. Defaults to "json" (Parseable's universally +// supported wire format); "proto" is honored when explicitly set. +func resolveOtlpEncoding(target string) string { + if target == "proto" { + return "proto" + } + return "json" +} + +// resolveOtlpProtocol returns the OTEL_EXPORTER_OTLP_PROTOCOL env-var value +// for SDK instrumentation, matching the target encoding. +func resolveOtlpProtocol(target string) string { + if target == "proto" { + return "http/protobuf" + } + return "http/json" +} + // sanitizePromLabel converts a Kubernetes label key into the Prometheus relabel form // (alphanumeric + underscore). Mirrors Prometheus' own label-name sanitization rules. func sanitizePromLabel(s string) string { From 157d9c6e0f7f9ee3685321ecefa31c337389bd47 Mon Sep 17 00:00:00 2001 From: AdheipSingh Date: Tue, 9 Jun 2026 09:29:43 +0530 Subject: [PATCH 3/4] Add explicit encoding: json to staging sample --- config/samples/azure-staging.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/config/samples/azure-staging.yaml b/config/samples/azure-staging.yaml index c2f750f..fac9bb0 100644 --- a/config/samples/azure-staging.yaml +++ b/config/samples/azure-staging.yaml @@ -22,6 +22,7 @@ metadata: spec: target: endpoint: + encoding: json credentialsSecret: name: parseable-creds namespace: parseable-operator-system From f8ebec60988244a85f012c82657539f722eb272b Mon Sep 17 00:00:00 2001 From: AdheipSingh Date: Wed, 10 Jun 2026 05:56:33 +0530 Subject: [PATCH 4/4] Grant nodes/metrics to collector RBAC for kubelet /metrics scrape --- internal/controller/parseableconfig_controller.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/controller/parseableconfig_controller.go b/internal/controller/parseableconfig_controller.go index a13dc48..3b17844 100644 --- a/internal/controller/parseableconfig_controller.go +++ b/internal/controller/parseableconfig_controller.go @@ -231,7 +231,8 @@ func (r *ParseableConfigReconciler) ensureCollectorRBAC(ctx context.Context, nam Rules: []rbacv1.PolicyRule{ { APIGroups: []string{""}, - Resources: []string{"events", "namespaces", "namespaces/status", "nodes", "nodes/spec", "nodes/stats", "nodes/proxy", + Resources: []string{"events", "namespaces", "namespaces/status", + "nodes", "nodes/spec", "nodes/stats", "nodes/proxy", "nodes/metrics", "pods", "pods/status", "replicationcontrollers", "replicationcontrollers/status", "resourcequotas", "services"}, Verbs: []string{"get", "list", "watch"},