Skip to content

Commit a0afa4a

Browse files
committed
Fix issue with resourcedetection processor failing with no detectors set.
Minor refactoring of some collector code. Add/adjust tests for different otel detector configs.
1 parent af59fbb commit a0afa4a

11 files changed

Lines changed: 857 additions & 93 deletions

File tree

internal/collector/patroni.go

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,8 @@ func EnablePatroniLogging(ctx context.Context,
1717
inCluster *v1beta1.PostgresCluster,
1818
outConfig *Config,
1919
) {
20-
var spec *v1beta1.InstrumentationLogsSpec
21-
if inCluster != nil && inCluster.Spec.Instrumentation != nil {
22-
spec = inCluster.Spec.Instrumentation.Logs
23-
}
24-
2520
if OpenTelemetryLogsEnabled(ctx, inCluster) {
21+
spec := inCluster.Spec.Instrumentation
2622
directory := naming.PatroniPGDataLogPath
2723

2824
// Keep track of what log records and files have been processed.
@@ -117,21 +113,31 @@ func EnablePatroniLogging(ctx context.Context,
117113
// If there are exporters to be added to the logs pipelines defined in
118114
// the spec, add them to the pipeline. Otherwise, add the DebugExporter.
119115
exporters := []ComponentID{DebugExporter}
120-
if spec != nil && spec.Exporters != nil {
121-
exporters = slices.Clone(spec.Exporters)
116+
if spec.Logs != nil && spec.Logs.Exporters != nil {
117+
exporters = slices.Clone(spec.Logs.Exporters)
118+
}
119+
120+
patroniProcessors := []ComponentID{
121+
"resource/patroni",
122+
"transform/patroni_logs",
123+
}
124+
125+
// We can only add the ResourceDetectionProcessor if there are detectors set,
126+
// otherwise it will fail. This is due to a change in the following upstream commmit:
127+
// https://github.com/open-telemetry/opentelemetry-collector-contrib/commit/50cd2e8433cee1e292e7b7afac9758365f3a1298
128+
if spec.Config != nil && spec.Config.Detectors != nil && len(spec.Config.Detectors) > 0 {
129+
patroniProcessors = append(patroniProcessors, ResourceDetectionProcessor)
122130
}
123131

132+
// Order of processors matter so we add the batching and compacting processors after
133+
// potentially adding the resourcedetection processor
134+
patroniProcessors = append(patroniProcessors, LogsBatchProcessor, CompactingProcessor)
135+
124136
outConfig.Pipelines["logs/patroni"] = Pipeline{
125137
Extensions: []ComponentID{"file_storage/patroni_logs"},
126138
Receivers: []ComponentID{"filelog/patroni_jsonlog"},
127-
Processors: []ComponentID{
128-
"resource/patroni",
129-
"transform/patroni_logs",
130-
ResourceDetectionProcessor,
131-
LogsBatchProcessor,
132-
CompactingProcessor,
133-
},
134-
Exporters: exporters,
139+
Processors: patroniProcessors,
140+
Exporters: exporters,
135141
}
136142
}
137143
}

internal/collector/patroni_test.go

Lines changed: 109 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,15 +109,14 @@ service:
109109
processors:
110110
- resource/patroni
111111
- transform/patroni_logs
112-
- resourcedetection
113112
- batch/logs
114113
- groupbyattrs/compact
115114
receivers:
116115
- filelog/patroni_jsonlog
117116
`)
118117
})
119118

120-
t.Run("InstrumentationSpecDefined", func(t *testing.T) {
119+
t.Run("InstrumentationSpecDefinedNoDetectors", func(t *testing.T) {
121120
gate := feature.NewGate()
122121
assert.NilError(t, gate.SetFromMap(map[string]bool{
123122
feature.OpenTelemetryLogs: true,
@@ -202,6 +201,114 @@ receivers:
202201
to: body.original
203202
type: move
204203
storage: file_storage/patroni_logs
204+
service:
205+
extensions:
206+
- file_storage/patroni_logs
207+
pipelines:
208+
logs/patroni:
209+
exporters:
210+
- googlecloud
211+
processors:
212+
- resource/patroni
213+
- transform/patroni_logs
214+
- batch/logs
215+
- groupbyattrs/compact
216+
receivers:
217+
- filelog/patroni_jsonlog
218+
`)
219+
})
220+
221+
t.Run("InstrumentationSpecDefinedDetectorSet", func(t *testing.T) {
222+
gate := feature.NewGate()
223+
assert.NilError(t, gate.SetFromMap(map[string]bool{
224+
feature.OpenTelemetryLogs: true,
225+
}))
226+
ctx := feature.NewContext(context.Background(), gate)
227+
228+
cluster := new(v1beta1.PostgresCluster)
229+
cluster.Spec.Instrumentation = testInstrumentationSpec()
230+
cluster.Spec.Instrumentation.Config.Detectors = []v1beta1.OpenTelemetryResourceDetector{
231+
{
232+
Name: "gcp",
233+
},
234+
}
235+
config := NewConfig(cluster.Spec.Instrumentation)
236+
237+
EnablePatroniLogging(ctx, cluster, config)
238+
239+
result, err := config.ToYAML()
240+
assert.NilError(t, err)
241+
assert.DeepEqual(t, result, `# Generated by postgres-operator. DO NOT EDIT.
242+
# Your changes will not be saved.
243+
exporters:
244+
debug:
245+
verbosity: detailed
246+
googlecloud:
247+
log:
248+
default_log_name: opentelemetry.io/collector-exported-log
249+
project: google-project-name
250+
extensions:
251+
file_storage/patroni_logs:
252+
create_directory: true
253+
directory: /pgdata/patroni/log/receiver
254+
fsync: true
255+
processors:
256+
batch/1s:
257+
timeout: 1s
258+
batch/200ms:
259+
timeout: 200ms
260+
batch/logs:
261+
send_batch_size: 8192
262+
timeout: 200ms
263+
groupbyattrs/compact: {}
264+
resource/patroni:
265+
attributes:
266+
- action: insert
267+
key: k8s.container.name
268+
value: database
269+
- action: insert
270+
key: k8s.namespace.name
271+
value: ${env:K8S_POD_NAMESPACE}
272+
- action: insert
273+
key: k8s.pod.name
274+
value: ${env:K8S_POD_NAME}
275+
- action: insert
276+
key: process.executable.name
277+
value: patroni
278+
resourcedetection:
279+
detectors:
280+
- gcp
281+
override: false
282+
timeout: 30s
283+
transform/patroni_logs:
284+
log_statements:
285+
- statements:
286+
- set(instrumentation_scope.name, "patroni")
287+
- set(log.cache, ParseJSON(log.body["original"]))
288+
- set(log.severity_text, log.cache["levelname"])
289+
- set(log.severity_number, SEVERITY_NUMBER_DEBUG) where log.severity_text ==
290+
"DEBUG"
291+
- set(log.severity_number, SEVERITY_NUMBER_INFO) where log.severity_text ==
292+
"INFO"
293+
- set(log.severity_number, SEVERITY_NUMBER_WARN) where log.severity_text ==
294+
"WARNING"
295+
- set(log.severity_number, SEVERITY_NUMBER_ERROR) where log.severity_text ==
296+
"ERROR"
297+
- set(log.severity_number, SEVERITY_NUMBER_FATAL) where log.severity_text ==
298+
"CRITICAL"
299+
- set(log.time, Time(log.cache["asctime"], "%F %T,%L")) where IsString(log.cache["asctime"])
300+
- set(log.attributes["log.record.original"], log.body["original"])
301+
- set(log.body, log.cache["message"])
302+
receivers:
303+
filelog/patroni_jsonlog:
304+
include:
305+
- /pgdata/patroni/log/*.log
306+
- /pgdata/patroni/log/*.log.1
307+
operators:
308+
- from: body
309+
to: body.original
310+
type: move
311+
storage: file_storage/patroni_logs
205312
service:
206313
extensions:
207314
- file_storage/patroni_logs

internal/collector/pgadmin.go

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -100,30 +100,34 @@ func EnablePgAdminLogging(ctx context.Context, spec *v1beta1.InstrumentationSpec
100100
exporters = slices.Clone(spec.Logs.Exporters)
101101
}
102102

103+
pgadminProcessors := []ComponentID{
104+
"resource/pgadmin",
105+
"transform/pgadmin_log",
106+
}
107+
108+
// We can only add the ResourceDetectionProcessor if there are detectors set,
109+
// otherwise it will fail. This is due to a change in the following upstream commmit:
110+
// https://github.com/open-telemetry/opentelemetry-collector-contrib/commit/50cd2e8433cee1e292e7b7afac9758365f3a1298
111+
if spec.Config != nil && spec.Config.Detectors != nil && len(spec.Config.Detectors) > 0 {
112+
pgadminProcessors = append(pgadminProcessors, ResourceDetectionProcessor)
113+
}
114+
115+
// Order of processors matter so we add the batching and compacting processors after
116+
// potentially adding the resourcedetection processor
117+
pgadminProcessors = append(pgadminProcessors, LogsBatchProcessor, CompactingProcessor)
118+
103119
otelConfig.Pipelines["logs/pgadmin"] = Pipeline{
104120
Extensions: []ComponentID{"file_storage/pgadmin_data_logs"},
105121
Receivers: []ComponentID{"filelog/pgadmin"},
106-
Processors: []ComponentID{
107-
"resource/pgadmin",
108-
"transform/pgadmin_log",
109-
ResourceDetectionProcessor,
110-
LogsBatchProcessor,
111-
CompactingProcessor,
112-
},
113-
Exporters: exporters,
122+
Processors: pgadminProcessors,
123+
Exporters: exporters,
114124
}
115125

116126
otelConfig.Pipelines["logs/gunicorn"] = Pipeline{
117127
Extensions: []ComponentID{"file_storage/pgadmin_data_logs"},
118128
Receivers: []ComponentID{"filelog/gunicorn"},
119-
Processors: []ComponentID{
120-
"resource/pgadmin",
121-
"transform/pgadmin_log",
122-
ResourceDetectionProcessor,
123-
LogsBatchProcessor,
124-
CompactingProcessor,
125-
},
126-
Exporters: exporters,
129+
Processors: pgadminProcessors,
130+
Exporters: exporters,
127131
}
128132

129133
otelYAML, err := otelConfig.ToYAML()

internal/collector/pgadmin_test.go

Lines changed: 129 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,6 @@ collector.yaml: |
114114
processors:
115115
- resource/pgadmin
116116
- transform/pgadmin_log
117-
- resourcedetection
118117
- batch/logs
119118
- groupbyattrs/compact
120119
receivers:
@@ -125,15 +124,14 @@ collector.yaml: |
125124
processors:
126125
- resource/pgadmin
127126
- transform/pgadmin_log
128-
- resourcedetection
129127
- batch/logs
130128
- groupbyattrs/compact
131129
receivers:
132130
- filelog/pgadmin
133131
`))
134132
})
135133

136-
t.Run("InstrumentationSpecDefined", func(t *testing.T) {
134+
t.Run("InstrumentationSpecDefinedNoDetectors", func(t *testing.T) {
137135
gate := feature.NewGate()
138136
assert.NilError(t, gate.SetFromMap(map[string]bool{
139137
feature.OpenTelemetryLogs: true,
@@ -232,6 +230,134 @@ collector.yaml: |
232230
include:
233231
- /var/lib/pgadmin/logs/pgadmin.log
234232
storage: file_storage/pgadmin_data_logs
233+
service:
234+
extensions:
235+
- file_storage/pgadmin_data_logs
236+
pipelines:
237+
logs/gunicorn:
238+
exporters:
239+
- googlecloud
240+
processors:
241+
- resource/pgadmin
242+
- transform/pgadmin_log
243+
- batch/logs
244+
- groupbyattrs/compact
245+
receivers:
246+
- filelog/gunicorn
247+
logs/pgadmin:
248+
exporters:
249+
- googlecloud
250+
processors:
251+
- resource/pgadmin
252+
- transform/pgadmin_log
253+
- batch/logs
254+
- groupbyattrs/compact
255+
receivers:
256+
- filelog/pgadmin
257+
`))
258+
})
259+
260+
t.Run("InstrumentationSpecDefinedDetectorSet", func(t *testing.T) {
261+
gate := feature.NewGate()
262+
assert.NilError(t, gate.SetFromMap(map[string]bool{
263+
feature.OpenTelemetryLogs: true,
264+
}))
265+
266+
ctx := feature.NewContext(context.Background(), gate)
267+
268+
var spec v1beta1.InstrumentationSpec
269+
require.UnmarshalInto(t, &spec, `{
270+
config: {
271+
detectors: [{name: gcp}],
272+
exporters: {
273+
googlecloud: {
274+
log: { default_log_name: opentelemetry.io/collector-exported-log },
275+
project: google-project-name,
276+
},
277+
},
278+
},
279+
logs: { exporters: [googlecloud] },
280+
}`)
281+
282+
configmap := new(corev1.ConfigMap)
283+
initialize.Map(&configmap.Data)
284+
err := collector.EnablePgAdminLogging(ctx, &spec, configmap)
285+
assert.NilError(t, err)
286+
287+
assert.Assert(t, cmp.MarshalMatches(configmap.Data, `
288+
collector.yaml: |
289+
# Generated by postgres-operator. DO NOT EDIT.
290+
# Your changes will not be saved.
291+
exporters:
292+
debug:
293+
verbosity: detailed
294+
googlecloud:
295+
log:
296+
default_log_name: opentelemetry.io/collector-exported-log
297+
project: google-project-name
298+
extensions:
299+
file_storage/pgadmin_data_logs:
300+
create_directory: false
301+
directory: /var/lib/pgadmin/logs/receiver
302+
fsync: true
303+
processors:
304+
batch/1s:
305+
timeout: 1s
306+
batch/200ms:
307+
timeout: 200ms
308+
batch/logs:
309+
send_batch_size: 8192
310+
timeout: 200ms
311+
groupbyattrs/compact: {}
312+
resource/pgadmin:
313+
attributes:
314+
- action: insert
315+
key: k8s.container.name
316+
value: pgadmin
317+
- action: insert
318+
key: k8s.namespace.name
319+
value: ${env:K8S_POD_NAMESPACE}
320+
- action: insert
321+
key: k8s.pod.name
322+
value: ${env:K8S_POD_NAME}
323+
- action: insert
324+
key: process.executable.name
325+
value: pgadmin
326+
resourcedetection:
327+
detectors:
328+
- gcp
329+
override: false
330+
timeout: 30s
331+
transform/pgadmin_log:
332+
log_statements:
333+
- statements:
334+
- set(log.attributes["log.record.original"], log.body)
335+
- set(log.cache, ParseJSON(log.body))
336+
- merge_maps(log.attributes, ExtractPatterns(log.cache["message"], "(?P<webrequest>[A-Z]{3}.*?[\\d]{3})"),
337+
"insert")
338+
- set(log.body, log.cache["message"])
339+
- set(instrumentation_scope.name, log.cache["name"])
340+
- set(log.severity_text, log.cache["level"])
341+
- set(log.time_unix_nano, Int(log.cache["time"]*1000000000))
342+
- set(log.severity_number, SEVERITY_NUMBER_DEBUG) where log.severity_text ==
343+
"DEBUG"
344+
- set(log.severity_number, SEVERITY_NUMBER_INFO) where log.severity_text ==
345+
"INFO"
346+
- set(log.severity_number, SEVERITY_NUMBER_WARN) where log.severity_text ==
347+
"WARNING"
348+
- set(log.severity_number, SEVERITY_NUMBER_ERROR) where log.severity_text ==
349+
"ERROR"
350+
- set(log.severity_number, SEVERITY_NUMBER_FATAL) where log.severity_text ==
351+
"CRITICAL"
352+
receivers:
353+
filelog/gunicorn:
354+
include:
355+
- /var/lib/pgadmin/logs/gunicorn.log
356+
storage: file_storage/pgadmin_data_logs
357+
filelog/pgadmin:
358+
include:
359+
- /var/lib/pgadmin/logs/pgadmin.log
360+
storage: file_storage/pgadmin_data_logs
235361
service:
236362
extensions:
237363
- file_storage/pgadmin_data_logs

0 commit comments

Comments
 (0)