Skip to content

Commit 49eb4ad

Browse files
mnafeeswsehl
authored andcommitted
Cache CEL programs in order to avoid expensive heap allocations on each event match (#3667)
* use LRU cache for CEL programs * err check boundary * use hash expr
1 parent 8b41337 commit 49eb4ad

2 files changed

Lines changed: 29 additions & 9 deletions

File tree

pkg/repository/match.go

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7+
"hash/fnv"
78
"slices"
89
"time"
910

@@ -932,18 +933,29 @@ func (m *sharedRepository) processCELExpressions(ctx context.Context, events []C
932933
expr = "true"
933934
}
934935

935-
ast, issues := m.env.Compile(expr)
936+
hasher := fnv.New64a()
937+
hasher.Write([]byte(expr))
938+
exprHash := hasher.Sum64()
936939

937-
if issues != nil {
938-
m.l.Error().Ctx(ctx).Msgf("failed to compile CEL expression: %s", issues.String())
939-
continue
940-
}
940+
program, ok := m.celProgramCache.Get(exprHash)
941941

942-
program, err := m.env.Program(ast)
942+
if !ok {
943+
ast, issues := m.env.Compile(expr)
943944

944-
if err != nil {
945-
m.l.Error().Ctx(ctx).Err(err).Msgf("failed to create CEL program: %s", expr)
946-
continue
945+
if issues != nil && issues.Err() != nil {
946+
m.l.Error().Ctx(ctx).Err(issues.Err()).Msgf("failed to compile CEL expression: %s", expr)
947+
continue
948+
}
949+
950+
compiled, err := m.env.Program(ast)
951+
952+
if err != nil {
953+
m.l.Error().Ctx(ctx).Err(err).Msgf("failed to create CEL program: %s", expr)
954+
continue
955+
}
956+
957+
m.celProgramCache.Add(exprHash, compiled)
958+
program = compiled
947959
}
948960

949961
programs[condition.ID] = program

pkg/repository/shared.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ type sharedRepository struct {
4343

4444
celParser *cel.CELParser
4545
env *celgo.Env
46+
celProgramCache *lru.Cache[uint64, celgo.Program]
4647
taskLookupCache *lru.Cache[taskExternalIdTenantIdTuple, *sqlcv1.FlattenExternalIdsRow]
4748
payloadStore PayloadStoreRepository
4849
m TenantLimitRepository
@@ -86,6 +87,12 @@ func newSharedRepository(
8687
log.Fatalf("failed to create LRU cache: %v", err)
8788
}
8889

90+
celProgramCache, err := lru.New[uint64, celgo.Program](50000)
91+
92+
if err != nil {
93+
log.Fatalf("failed to create CEL program cache: %v", err)
94+
}
95+
8996
s := &sharedRepository{
9097
pool: pool,
9198
ddlPool: ddlPool,
@@ -101,6 +108,7 @@ func newSharedRepository(
101108
stepIdSlotRequestsCache: stepIdSlotRequestsCache,
102109
celParser: celParser,
103110
env: env,
111+
celProgramCache: celProgramCache,
104112
taskLookupCache: lookupCache,
105113
payloadStore: payloadStore,
106114
}

0 commit comments

Comments
 (0)