Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions pkg/workflow/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,12 @@ func HistoryWide(ctx context.Context, opts HistoryOptions) ([]*HistoryOutputWide
row.addAttr("timerName", *t.TimerCreated.Name)
}
row.addAttr("fireAt", t.TimerCreated.FireAt.AsTime().Format(time.RFC3339))
if o := timerOriginInfo(t.TimerCreated); o != nil {
row.addAttr("origin", o.kind)
if o.attrKey != "" {
row.addAttr(o.attrKey, o.attrVal)
}
}
case *protos.HistoryEvent_TimerFired:
row.addAttr("timerId", fmt.Sprintf("%d", t.TimerFired.TimerId))
row.addAttr("fireAt", t.TimerFired.FireAt.AsTime().Format(time.RFC3339))
Expand Down Expand Up @@ -364,6 +370,9 @@ func deriveDetails(first *protos.HistoryEvent, h *protos.HistoryEvent) *string {
if in := t.TimerCreated.RerunParentInstanceInfo; in != nil {
det += fmt.Sprintf(",rerunParent=%s", in.InstanceID)
}
if o := timerOriginString(t.TimerCreated); o != "" {
det += ",origin=" + o
}
return ptr.Of(det)
case *protos.HistoryEvent_EventRaised:
return ptr.Of(fmt.Sprintf("event=%s", t.EventRaised.Name))
Expand Down Expand Up @@ -432,6 +441,38 @@ func flatTags(tags map[string]string, max int) string {
return s
}

type timerOrigin struct {
kind string // e.g. "createTimer", "externalEvent"
attrKey string // extra attr key, empty if none
attrVal string // extra attr value
}

func timerOriginInfo(tc *protos.TimerCreatedEvent) *timerOrigin {
switch x := tc.GetOrigin().(type) {
case *protos.TimerCreatedEvent_CreateTimer:
return &timerOrigin{kind: "createTimer"}
case *protos.TimerCreatedEvent_ExternalEvent:
return &timerOrigin{kind: "externalEvent", attrKey: "eventName", attrVal: x.ExternalEvent.GetName()}
case *protos.TimerCreatedEvent_ActivityRetry:
return &timerOrigin{kind: "activityRetry", attrKey: "taskExecId", attrVal: x.ActivityRetry.GetTaskExecutionId()}
case *protos.TimerCreatedEvent_ChildWorkflowRetry:
return &timerOrigin{kind: "childWorkflowRetry", attrKey: "instanceId", attrVal: x.ChildWorkflowRetry.GetInstanceId()}
default:
return nil
}
}

func timerOriginString(tc *protos.TimerCreatedEvent) string {
o := timerOriginInfo(tc)
if o == nil {
return ""
}
if o.attrVal != "" {
return o.kind + "(" + o.attrVal + ")"
}
return o.kind
}

func trim(ww *wrapperspb.StringValue, limit int) string {
if ww == nil {
return ""
Expand Down
180 changes: 180 additions & 0 deletions pkg/workflow/history_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
Copyright 2026 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package workflow

import (
"testing"

"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/dapr/durabletask-go/api/protos"
)

func TestTimerOriginString(t *testing.T) {
tests := []struct {
name string
event *protos.TimerCreatedEvent
expect string
}{
{
name: "nil origin",
event: &protos.TimerCreatedEvent{
FireAt: timestamppb.Now(),
},
expect: "",
},
{
name: "createTimer",
event: &protos.TimerCreatedEvent{
FireAt: timestamppb.Now(),
Origin: &protos.TimerCreatedEvent_CreateTimer{
CreateTimer: &protos.TimerOriginCreateTimer{},
},
},
expect: "createTimer",
},
{
name: "externalEvent",
event: &protos.TimerCreatedEvent{
FireAt: timestamppb.Now(),
Origin: &protos.TimerCreatedEvent_ExternalEvent{
ExternalEvent: &protos.TimerOriginExternalEvent{
Name: "myEvent",
},
},
},
expect: "externalEvent(myEvent)",
},
{
name: "activityRetry",
event: &protos.TimerCreatedEvent{
FireAt: timestamppb.Now(),
Origin: &protos.TimerCreatedEvent_ActivityRetry{
ActivityRetry: &protos.TimerOriginActivityRetry{
TaskExecutionId: "exec-123",
},
},
},
expect: "activityRetry(exec-123)",
},
{
name: "childWorkflowRetry",
event: &protos.TimerCreatedEvent{
FireAt: timestamppb.Now(),
Origin: &protos.TimerCreatedEvent_ChildWorkflowRetry{
ChildWorkflowRetry: &protos.TimerOriginChildWorkflowRetry{
InstanceId: "wf-456",
},
},
},
expect: "childWorkflowRetry(wf-456)",
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
got := timerOriginString(tc.event)
assert.Equal(t, tc.expect, got)
})
}
}

func TestDeriveDetails_TimerCreated(t *testing.T) {
first := &protos.HistoryEvent{
Timestamp: timestamppb.Now(),
}

tests := []struct {
name string
event *protos.HistoryEvent
contains []string
excludes []string
}{
{
name: "timer with no origin",
event: &protos.HistoryEvent{
EventType: &protos.HistoryEvent_TimerCreated{
TimerCreated: &protos.TimerCreatedEvent{
FireAt: timestamppb.Now(),
},
},
},
contains: []string{"fireAt="},
excludes: []string{"origin="},
},
{
name: "timer with createTimer origin",
event: &protos.HistoryEvent{
EventType: &protos.HistoryEvent_TimerCreated{
TimerCreated: &protos.TimerCreatedEvent{
FireAt: timestamppb.Now(),
Origin: &protos.TimerCreatedEvent_CreateTimer{
CreateTimer: &protos.TimerOriginCreateTimer{},
},
},
},
},
contains: []string{"fireAt=", "origin=createTimer"},
},
{
name: "timer with activityRetry origin",
event: &protos.HistoryEvent{
EventType: &protos.HistoryEvent_TimerCreated{
TimerCreated: &protos.TimerCreatedEvent{
FireAt: timestamppb.Now(),
Origin: &protos.TimerCreatedEvent_ActivityRetry{
ActivityRetry: &protos.TimerOriginActivityRetry{
TaskExecutionId: "exec-abc",
},
},
},
},
},
contains: []string{"fireAt=", "origin=activityRetry(exec-abc)"},
},
{
name: "timer with rerunParent and origin",
event: &protos.HistoryEvent{
EventType: &protos.HistoryEvent_TimerCreated{
TimerCreated: &protos.TimerCreatedEvent{
FireAt: timestamppb.Now(),
RerunParentInstanceInfo: &protos.RerunParentInstanceInfo{
InstanceID: "parent-123",
},
Origin: &protos.TimerCreatedEvent_ExternalEvent{
ExternalEvent: &protos.TimerOriginExternalEvent{
Name: "approval",
},
},
},
},
},
contains: []string{"fireAt=", "rerunParent=parent-123", "origin=externalEvent(approval)"},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
details := deriveDetails(first, tc.event)
assert.NotNil(t, details)
for _, s := range tc.contains {
assert.Contains(t, *details, s)
}
for _, s := range tc.excludes {
assert.NotContains(t, *details, s)
}
})
}
}
45 changes: 45 additions & 0 deletions tests/apps/workflow/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,16 @@ func register(ctx context.Context) {
RecursiveChildWorkflow,
FanOutWorkflow,
DataWorkflow,
ActivityRetryWorkflow,
ChildWorkflowRetryWorkflow,
FailingChildWorkflow,
}
activities := []workflow.Activity{
ANoOP,
SimpleActivity,
LongRunningActivity,
DataProcessingActivity,
FailingActivity,
}

for _, w := range workflows {
Expand Down Expand Up @@ -368,3 +372,44 @@ func FanOutWorkflow(ctx *workflow.WorkflowContext) (any, error) {
"results": results,
}, nil
}

// FailingActivity always returns an error so that retry policies create
// timer events with origin=activityRetry.
func FailingActivity(ctx workflow.ActivityContext) (any, error) {
return nil, fmt.Errorf("intentional failure")
}

// ActivityRetryWorkflow calls FailingActivity with a retry policy.
// The activity always fails, producing TimerCreated events with
// origin=activityRetry. The workflow itself will eventually fail
// after max attempts, but the history will contain the retry timers.
func ActivityRetryWorkflow(ctx *workflow.WorkflowContext) (any, error) {
var result any
err := ctx.CallActivity(FailingActivity, workflow.WithActivityRetryPolicy(&workflow.RetryPolicy{
MaxAttempts: 3,
InitialRetryInterval: time.Second,
BackoffCoefficient: 1,
MaxRetryInterval: time.Second,
})).Await(&result)
return result, err
}

// FailingChildWorkflow always returns an error so that retry policies create
// timer events with origin=childWorkflowRetry.
func FailingChildWorkflow(ctx *workflow.WorkflowContext) (any, error) {
return nil, fmt.Errorf("intentional child failure")
}

// ChildWorkflowRetryWorkflow calls FailingChildWorkflow with a retry policy.
// The child always fails, producing TimerCreated events with
// origin=childWorkflowRetry.
func ChildWorkflowRetryWorkflow(ctx *workflow.WorkflowContext) (any, error) {
var result any
err := ctx.CallChildWorkflow(FailingChildWorkflow, workflow.WithChildWorkflowRetryPolicy(&workflow.RetryPolicy{
MaxAttempts: 3,
InitialRetryInterval: time.Second,
BackoffCoefficient: 1,
MaxRetryInterval: time.Second,
})).Await(&result)
return result, err
}
38 changes: 24 additions & 14 deletions tests/apps/workflow/go.mod
Original file line number Diff line number Diff line change
@@ -1,29 +1,39 @@
module workflow

go 1.24.7
go 1.26.0

require (
github.com/dapr/durabletask-go v0.10.0
github.com/dapr/durabletask-go v0.11.4-0.20260413145313-c4b7279b6a8e
github.com/dapr/go-sdk v1.13.0
github.com/dapr/kit v0.16.1
github.com/dapr/kit v0.17.1-0.20260402173438-be272d92042b
)

require (
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/dapr/dapr v1.16.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/go-logr/logr v1.4.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/ncruces/go-strftime v0.1.9 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/otel v1.36.0 // indirect
go.opentelemetry.io/otel/metric v1.36.0 // indirect
go.opentelemetry.io/otel/trace v1.36.0 // indirect
golang.org/x/net v0.41.0 // indirect
golang.org/x/sys v0.33.0 // indirect
golang.org/x/text v0.26.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822 // indirect
google.golang.org/grpc v1.73.0 // indirect
google.golang.org/protobuf v1.36.6 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/otel v1.39.0 // indirect
go.opentelemetry.io/otel/metric v1.39.0 // indirect
go.opentelemetry.io/otel/trace v1.39.0 // indirect
golang.org/x/exp v0.0.0-20250408133849-7e4ce0ab07d0 // indirect
golang.org/x/net v0.52.0 // indirect
golang.org/x/sys v0.42.0 // indirect
golang.org/x/text v0.35.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260316180232-0b37fe3546d5 // indirect
google.golang.org/grpc v1.79.3 // indirect
google.golang.org/protobuf v1.36.11 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
modernc.org/libc v1.61.9 // indirect
modernc.org/mathutil v1.7.1 // indirect
modernc.org/memory v1.8.2 // indirect
modernc.org/sqlite v1.34.5 // indirect
)
Loading
Loading