Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go/adk/pkg/a2a/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ func (e *KAgentExecutor) Execute(ctx context.Context, reqCtx *a2asrv.RequestCont
ctx, invocationSpan := telemetry.StartInvocationSpan(ctx)
defer invocationSpan.End()

telemetry.SetMessageMetadataAttributes(ctx, reqCtx.Message.Metadata)

// 3. Initialize skills session path.
if e.skillsDirectory != "" && sessionID != "" {
if _, err := skills.InitializeSessionPath(sessionID, e.skillsDirectory); err != nil {
Expand Down
27 changes: 27 additions & 0 deletions go/adk/pkg/telemetry/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package telemetry
import (
"context"
"encoding/json"
"fmt"
"os"
"strings"

Expand Down Expand Up @@ -87,6 +88,32 @@ func stringAttributes(attrs map[string]string) []attribute.KeyValue {
return out
}

// SetMessageMetadataAttributes sets scalar values from an A2A message's metadata as span attributes.
func SetMessageMetadataAttributes(ctx context.Context, metadata map[string]any) {
if len(metadata) == 0 {
return
}
var attrs []attribute.KeyValue
for k, v := range metadata {
key := "a2a.message.metadata." + k
switch val := v.(type) {
case string:
if val != "" {
attrs = append(attrs, attribute.String(key, val))
}
case bool:
attrs = append(attrs, attribute.Bool(key, val))
case float64:
attrs = append(attrs, attribute.String(key, fmt.Sprintf("%g", val)))
case int:
attrs = append(attrs, attribute.Int(key, val))
case int64:
attrs = append(attrs, attribute.Int64(key, val))
}
}
setSpanAttributes(ctx, attrs...)
}

func setSpanAttributes(ctx context.Context, attrs ...attribute.KeyValue) {
span := trace.SpanFromContext(ctx)
if !span.IsRecording() || len(attrs) == 0 {
Expand Down
71 changes: 71 additions & 0 deletions go/adk/pkg/telemetry/attributes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package telemetry

import (
"context"
"strings"
"testing"

"go.opentelemetry.io/otel"
Expand Down Expand Up @@ -170,6 +171,76 @@ func TestSetLLMAttributes_EmitsEmptyPayloadWhenContentCaptureDisabled(t *testing
}
}

func TestSetMessageMetadataAttributes(t *testing.T) {
exporter := tracetest.NewInMemoryExporter()
tp := sdktrace.NewTracerProvider(sdktrace.WithSyncer(exporter))
t.Cleanup(func() { _ = tp.Shutdown(context.Background()) })

tracer := tp.Tracer("test")
ctx, span := tracer.Start(context.Background(), "test-span")

SetMessageMetadataAttributes(ctx, map[string]any{
"approver_email": "admin@example.com",
"attempt_count": float64(3),
"dry_run": true,
"nested": map[string]any{"should": "be skipped"},
"list_val": []string{"also", "skipped"},
"empty_str": "",
})
span.End()

spans := exporter.GetSpans()
if len(spans) == 0 {
t.Fatal("no spans recorded")
}
attrs := make(map[string]attribute.Value)
for _, a := range spans[0].Attributes {
attrs[string(a.Key)] = a.Value
}

if got := attrs["a2a.message.metadata.approver_email"].AsString(); got != "admin@example.com" {
t.Errorf("approver_email: got %q, want %q", got, "admin@example.com")
}
if got := attrs["a2a.message.metadata.attempt_count"].AsString(); got != "3" {
t.Errorf("attempt_count: got %q, want %q", got, "3")
}
if got := attrs["a2a.message.metadata.dry_run"].AsBool(); !got {
t.Errorf("dry_run: got %v, want true", got)
}
if _, exists := attrs["a2a.message.metadata.nested"]; exists {
t.Error("nested map should not be set as span attribute")
}
if _, exists := attrs["a2a.message.metadata.list_val"]; exists {
t.Error("list value should not be set as span attribute")
}
if _, exists := attrs["a2a.message.metadata.empty_str"]; exists {
t.Error("empty string should not be set as span attribute")
}
}

func TestSetMessageMetadataAttributes_NilAndEmpty(t *testing.T) {
exporter := tracetest.NewInMemoryExporter()
tp := sdktrace.NewTracerProvider(sdktrace.WithSyncer(exporter))
t.Cleanup(func() { _ = tp.Shutdown(context.Background()) })

tracer := tp.Tracer("test")
ctx, span := tracer.Start(context.Background(), "test-span")

SetMessageMetadataAttributes(ctx, nil)
SetMessageMetadataAttributes(ctx, map[string]any{})
span.End()

spans := exporter.GetSpans()
if len(spans) == 0 {
t.Fatal("no spans recorded")
}
for _, a := range spans[0].Attributes {
if strings.HasPrefix(string(a.Key), "a2a.message.metadata.") {
t.Errorf("no metadata attributes expected, got %q", a.Key)
}
}
}

func spanAttributesByName(t *testing.T, spans tracetest.SpanStubs, name string) map[string]attribute.Value {
t.Helper()

Expand Down
Loading