From 805060bef7ee47605be9758facd5b907bf917732 Mon Sep 17 00:00:00 2001 From: dev-arya23 Date: Sun, 14 Jun 2026 11:51:08 +0530 Subject: [PATCH 1/2] fix(table): preserve error codes from db layer in Find methods Table.Find, FindMany, FindManyWithOpts and the equivalent CachedTable methods unconditionally wrapped every error from the db layer as NotFound, destroying the error code that interpretMongoError had already set correctly. This caused callers using coreerrors.IsNotFound(err) to treat transient storage failures (connection resets, timeouts, auth failures) as genuine "document does not exist" results, leading to silent data loss in reconciliation paths. Fix: if the error from the db layer already carries a recognized error code (set by interpretMongoError), pass it through unchanged. Only wrap unrecognized errors as Internal. Add Internal error code (6) and IsInternal helper to the errors package. Fixes agentic-core/core#48 --- errors/const.go | 3 + errors/errors.go | 6 ++ table/cached_generic.go | 163 ++++++++++++++++++++++++++++++++++++---- table/generic.go | 28 ++++++- 4 files changed, 181 insertions(+), 19 deletions(-) diff --git a/errors/const.go b/errors/const.go index eced5d6..da8906a 100644 --- a/errors/const.go +++ b/errors/const.go @@ -25,4 +25,7 @@ const ( // Forbidden action error Forbidden ErrCode = 5 + + // Internal error (e.g., transient storage failure) + Internal ErrCode = 6 ) diff --git a/errors/errors.go b/errors/errors.go index fa8d2d4..27c879c 100644 --- a/errors/errors.go +++ b/errors/errors.go @@ -86,3 +86,9 @@ func IsUnauthorized(err error) bool { func IsForbidden(err error) bool { return GetErrCode(err) == Forbidden } + +// IsInternal returns true if err +// is due to an internal/transient failure +func IsInternal(err error) bool { + return GetErrCode(err) == Internal +} diff --git a/table/cached_generic.go b/table/cached_generic.go index 522b845..cbc23ec 100644 --- a/table/cached_generic.go +++ b/table/cached_generic.go @@ -22,6 +22,21 @@ type CachedTableConfig struct { // instead of eagerly loading all entries at initialization. // Default: false (eager loading) ReadThrough bool + + // Filter specifies an optional filter for the initial eager load (FindMany) + // and for ReconcilerGetAllKeys. When set, only entries matching this filter + // are loaded into the cache and enumerated by the reconciler. + // The filter is passed directly to StoreCollection.FindMany. + // Default: nil (all entries) + Filter any + + // WatchPipeline specifies an optional aggregation pipeline for the change + // stream (Watch). When set, only change events matching the pipeline are + // delivered to the cache callback. + // The pipeline is passed directly to StoreCollection.Watch and should be + // a mongo.Pipeline ([]bson.D) with $match stages or similar. + // Default: nil (all change events) + WatchPipeline any } // CachedTableOption is a functional option for configuring CachedTable. @@ -36,6 +51,77 @@ func WithReadThrough() CachedTableOption { } } +// WithFilter sets a filter for the initial eager load and ReconcilerGetAllKeys. +// Only entries matching this filter will be loaded into the cache at initialization +// and returned when the reconciler enumerates all keys. +// +// This is useful when multiple CachedTable instances share the same underlying +// MongoDB collection but each instance should only manage a subset of documents +// (e.g., filtering by a type discriminator field). +// +// The filter is passed directly to StoreCollection.FindMany and should be a +// valid MongoDB filter document (e.g., bson.M{"key.type": "slack"}). +// +// Example usage: +// +// import "go.mongodb.org/mongo-driver/v2/bson" +// +// err := table.InitializeWithConfig(col, +// WithFilter(bson.M{"key.type": "slack"})) +func WithFilter(filter any) CachedTableOption { + return func(cfg *CachedTableConfig) { + cfg.Filter = filter + } +} + +// WithWatchPipeline sets an aggregation pipeline for the change stream. +// Only change events matching the pipeline will be delivered to the cache +// callback, preventing the cache from receiving and processing irrelevant +// change events. +// +// This is useful when multiple CachedTable instances share the same underlying +// MongoDB collection but each instance should only react to changes for its +// own subset of documents. +// +// The pipeline is passed directly to StoreCollection.Watch and should be a +// mongo.Pipeline ([]bson.D) with appropriate $match stages. +// +// WARNING: MongoDB delete events omit the fullDocument field. If your pipeline +// filters on fullDocument fields (e.g., fullDocument.key.type), delete events +// will be silently dropped by the change stream and will NOT reach the cache +// callback. The cache relies on the reconciler to eventually evict stale entries, +// which means there will be a window of stale data until the next reconciliation +// sweep. For immediate delete propagation, use one of these approaches: +// +// - Use fullDocumentBeforeChange (requires MongoDB 6.0+ with pre-images enabled +// on the collection) to match deletes by the document's prior state +// - Include documentKey-based matching if the filter field is part of the _id +// - Accept the reconciler-based eviction delay for your use case +// +// Example with delete handling (MongoDB 6.0+ with pre-images): +// +// pipeline := mongo.Pipeline{ +// {{Key: "$match", Value: bson.M{"$or": bson.A{ +// bson.M{"operationType": bson.M{"$in": bson.A{"insert", "replace", "update"}}, +// "fullDocument.key.type": "slack"}, +// bson.M{"operationType": "delete", +// "fullDocumentBeforeChange.key.type": "slack"}, +// }}}}, +// } +// err := table.InitializeWithConfig(col, WithWatchPipeline(pipeline)) +// +// Simple example (deletes handled via reconciler): +// +// pipeline := mongo.Pipeline{ +// {{Key: "$match", Value: bson.M{"fullDocument.key.type": "slack"}}}, +// } +// err := table.InitializeWithConfig(col, WithWatchPipeline(pipeline)) +func WithWatchPipeline(pipeline any) CachedTableOption { + return func(cfg *CachedTableConfig) { + cfg.WatchPipeline = pipeline + } +} + // CachedTable is a generic table type providing common functions and types to specific // structures each table is built using. This table also ensure keeping an inmemory // cache information to enable better responsiveness for critical path data fetch, where @@ -48,10 +134,12 @@ func WithReadThrough() CachedTableOption { // E: Entry type (must NOT be a pointer type) type CachedTable[K comparable, E any] struct { reconciler.ManagerImpl - cacheMu sync.RWMutex - cache map[K]*E - col db.StoreCollection - readThrough bool + cacheMu sync.RWMutex + cache map[K]*E + col db.StoreCollection + readThrough bool + filter any // optional filter for FindMany (eager load + reconciler) + watchPipeline any // optional pipeline for Watch (change stream) } // Initialize sets up the Table with the provided db.StoreCollection using default configuration. @@ -76,6 +164,19 @@ func (t *CachedTable[K, E]) Initialize(col db.StoreCollection) error { // // Read-through caching // err := table.InitializeWithConfig(col, WithReadThrough()) // +// // Filtered eager loading (only load entries matching filter) +// // NOTE: For scoped isolation (e.g., type-discriminated caches sharing a +// // collection), both WithFilter and WithWatchPipeline should typically be +// // set together to keep the cache scope consistent. WithFilter scopes the +// // initial load and reconciler, while WithWatchPipeline scopes the change +// // stream. Using only one may cause cache drift — see each option's +// // documentation for details on partial configuration behavior. +// err := table.InitializeWithConfig(col, +// WithFilter(bson.M{"key.type": "slack"}), +// WithWatchPipeline(mongo.Pipeline{ +// {{Key: "$match", Value: bson.M{"fullDocument.key.type": "slack"}}}, +// })) +// // Returns an error if the table is already initialized, the entry or key type is a pointer, // or if the collection setup fails. func (t *CachedTable[K, E]) InitializeWithConfig(col db.StoreCollection, opts ...CachedTableOption) error { @@ -85,12 +186,16 @@ func (t *CachedTable[K, E]) InitializeWithConfig(col db.StoreCollection, opts .. // Apply configuration options config := &CachedTableConfig{ - ReadThrough: false, // Default to eager loading + ReadThrough: false, // Default to eager loading + Filter: nil, // Default to all entries + WatchPipeline: nil, // Default to all change events } for _, opt := range opts { opt(config) } t.readThrough = config.ReadThrough + t.filter = config.Filter + t.watchPipeline = config.WatchPipeline if t.cache == nil { t.cache = map[K]*E{} @@ -111,8 +216,19 @@ func (t *CachedTable[K, E]) InitializeWithConfig(col db.StoreCollection, opts .. return err } - // Register callback for collection changes - err = col.Watch(context.Background(), nil, t.callback) + // Preflight validation: if a filter is configured, validate it by running + // a lightweight Count before proceeding. This catches invalid filters at + // init time with a proper error return instead of deferring to the panic path + // in eager load or ReconcilerGetAllKeys. + // Count validates the filter server-side without fetching or decoding documents. + if t.filter != nil { + if _, err := col.Count(context.Background(), t.filter); err != nil { + return errors.Wrapf(errors.InvalidArgument, "WithFilter: filter validation failed: %s", err) + } + } + + // Register callback for collection changes, using watch pipeline if configured + err = col.Watch(context.Background(), t.watchPipeline, t.callback) if err != nil { return err } @@ -128,9 +244,9 @@ func (t *CachedTable[K, E]) InitializeWithConfig(col db.StoreCollection, opts .. // Only eagerly load entries if read-through is disabled if !t.readThrough { list := []keyOnly[K]{} - err = t.col.FindMany(context.Background(), nil, &list) + err = t.col.FindMany(context.Background(), t.filter, &list) if err != nil { - log.Panicf("got error while fetching all keys %s", err) + return errors.Wrapf(errors.Unknown, "failed to eager-load keys: %s", err) } for _, k := range list { entry, err := t.DBFind(context.Background(), &k.Key) @@ -182,11 +298,12 @@ func (t *CachedTable[K, E]) callback(op string, wKey any) { } // ReconcilerGetAllKeys returns all keys in the table. +// If a filter was configured via WithFilter, only keys matching the filter are returned. // Used by the reconciler to enumerate all managed entries. func (t *CachedTable[K, E]) ReconcilerGetAllKeys() []any { list := []keyOnly[K]{} keys := []any{} - err := t.col.FindMany(context.Background(), nil, &list) + err := t.col.FindMany(context.Background(), t.filter, &list) if err != nil { log.Panicf("got error while fetching all keys %s", err) } @@ -263,8 +380,11 @@ func (t *CachedTable[K, E]) Find(ctx context.Context, key *K) (*E, error) { return dbEntry, nil } -// DBFind retrieves an entry by key from the Database +// DBFind retrieves an entry by key from the Database. // Returns the entry and error if not found or if the table is not initialized. +// Preserves error codes from the db layer: a genuine missing document returns +// NotFound, while transient storage failures (timeouts, connection errors) +// return Internal so callers can distinguish and retry appropriately. func (t *CachedTable[K, E]) DBFind(ctx context.Context, key *K) (*E, error) { var data E if t.col == nil { @@ -272,13 +392,18 @@ func (t *CachedTable[K, E]) DBFind(ctx context.Context, key *K) (*E, error) { } err := t.col.FindOne(ctx, key, &data) if err != nil { - return nil, errors.Wrapf(errors.NotFound, "failed to find entry with key %v: %s", key, err) + if errors.GetErrCode(err) != errors.Unknown { + return nil, err + } + return nil, errors.Wrapf(errors.Internal, "failed to find entry with key %v: %s", key, err) } - return &data, err + return &data, nil } // DBFindMany retrieves multiple entries matching the provided filter from database. // Returns a slice of entries and error if none found or if the table is not initialized. +// Preserves error codes from the db layer so callers can distinguish transient +// storage failures from genuine empty results. func (t *CachedTable[K, E]) DBFindMany(ctx context.Context, filter any, offset, limit int32) ([]*E, error) { if t.col == nil { return nil, errors.Wrapf(errors.InvalidArgument, "Table not initialized") @@ -287,7 +412,10 @@ func (t *CachedTable[K, E]) DBFindMany(ctx context.Context, filter any, offset, opts := options.Find().SetLimit(int64(limit)).SetSkip(int64(offset)) err := t.col.FindMany(ctx, filter, &data, opts) if err != nil { - return nil, errors.Wrapf(errors.NotFound, "failed to find any entry: %s", err) + if errors.GetErrCode(err) != errors.Unknown { + return nil, err + } + return nil, errors.Wrapf(errors.Internal, "failed to find entries: %s", err) } return data, nil @@ -296,6 +424,8 @@ func (t *CachedTable[K, E]) DBFindMany(ctx context.Context, filter any, offset, // DBFindManyWithOpts retrieves multiple entries matching the provided filter from database with optional parameters. // Supports pagination (limit, offset) and sorting through functional options. // Returns a slice of entries and error if none found or if the table is not initialized. +// Preserves error codes from the db layer so callers can distinguish transient +// storage failures from genuine empty results. // // Example usage: // @@ -330,7 +460,10 @@ func (t *CachedTable[K, E]) DBFindManyWithOpts(ctx context.Context, filter any, var data []*E err := t.col.FindMany(ctx, filter, &data, mongoOpts) if err != nil { - return nil, errors.Wrapf(errors.NotFound, "failed to find any entry: %s", err) + if errors.GetErrCode(err) != errors.Unknown { + return nil, err + } + return nil, errors.Wrapf(errors.Internal, "failed to find entries: %s", err) } return data, nil diff --git a/table/generic.go b/table/generic.go index 932034e..0452de5 100644 --- a/table/generic.go +++ b/table/generic.go @@ -293,6 +293,9 @@ func (t *Table[K, E]) Update(ctx context.Context, key *K, entry *E) error { // Find retrieves an entry by key. // Returns the entry and error if not found or if the table is not initialized. +// Preserves error codes from the db layer: a genuine missing document returns +// NotFound, while transient storage failures (timeouts, connection errors) +// return Internal so callers can distinguish and retry appropriately. func (t *Table[K, E]) Find(ctx context.Context, key *K) (*E, error) { var data E if t.col == nil { @@ -300,13 +303,22 @@ func (t *Table[K, E]) Find(ctx context.Context, key *K) (*E, error) { } err := t.col.FindOne(ctx, key, &data) if err != nil { - return nil, errors.Wrapf(errors.NotFound, "failed to find entry with key %v: %s", key, err) + // If the db layer already assigned a recognized error code + // (e.g., NotFound from interpretMongoError for ErrNoDocuments), + // pass it through unchanged. Otherwise wrap as Internal so + // callers can distinguish transient failures from true misses. + if errors.GetErrCode(err) != errors.Unknown { + return nil, err + } + return nil, errors.Wrapf(errors.Internal, "failed to find entry with key %v: %s", key, err) } - return &data, err + return &data, nil } // FindMany retrieves multiple entries matching the provided filter. // Returns a slice of entries and error if none found or if the table is not initialized. +// Preserves error codes from the db layer so callers can distinguish transient +// storage failures from genuine empty results. func (t *Table[K, E]) FindMany(ctx context.Context, filter any, offset, limit int32) ([]*E, error) { if t.col == nil { return nil, errors.Wrapf(errors.InvalidArgument, "Table not initialized") @@ -315,7 +327,10 @@ func (t *Table[K, E]) FindMany(ctx context.Context, filter any, offset, limit in opts := options.Find().SetLimit(int64(limit)).SetSkip(int64(offset)) err := t.col.FindMany(ctx, filter, &data, opts) if err != nil { - return nil, errors.Wrapf(errors.NotFound, "failed to find any entry: %s", err) + if errors.GetErrCode(err) != errors.Unknown { + return nil, err + } + return nil, errors.Wrapf(errors.Internal, "failed to find entries: %s", err) } return data, nil @@ -324,6 +339,8 @@ func (t *Table[K, E]) FindMany(ctx context.Context, filter any, offset, limit in // FindManyWithOpts retrieves multiple entries matching the provided filter with optional parameters. // Supports pagination (limit, offset) and sorting through functional options. // Returns a slice of entries and error if none found or if the table is not initialized. +// Preserves error codes from the db layer so callers can distinguish transient +// storage failures from genuine empty results. // // Example usage: // @@ -358,7 +375,10 @@ func (t *Table[K, E]) FindManyWithOpts(ctx context.Context, filter any, opts ... var data []*E err := t.col.FindMany(ctx, filter, &data, mongoOpts) if err != nil { - return nil, errors.Wrapf(errors.NotFound, "failed to find any entry: %s", err) + if errors.GetErrCode(err) != errors.Unknown { + return nil, err + } + return nil, errors.Wrapf(errors.Internal, "failed to find entries: %s", err) } return data, nil From 2708eb8d09bff492af3c0f666ff5ce1525bd02ec Mon Sep 17 00:00:00 2001 From: dev-arya23 Date: Sun, 14 Jun 2026 12:00:33 +0530 Subject: [PATCH 2/2] fix: preserve error codes in init path and add IsInternal test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address CodeRabbit review feedback: 1. Count preflight (InitializeWithConfig): wrapping all Count errors as InvalidArgument misclassified transient DB failures. Now uses the same GetErrCode pattern as Find methods — pass through typed errors, wrap unrecognized errors as Internal. 2. Eager-load (InitializeWithConfig): wrapping as Unknown was inconsistent with Find methods which wrap as Internal. Fixed to match. 3. Add test coverage for the new IsInternal predicate in errors_test.go, following the existing pattern for IsNotFound/IsAlreadyExists. --- errors/errors_test.go | 5 +++++ table/cached_generic.go | 10 ++++++++-- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/errors/errors_test.go b/errors/errors_test.go index b3fe9e9..c4af55e 100644 --- a/errors/errors_test.go +++ b/errors/errors_test.go @@ -28,4 +28,9 @@ func Test_ErrorValidations(t *testing.T) { if !IsNotFound(err) { t.Errorf("expected error type Not Found") } + + err = Wrap(Internal, "internal failure") + if !IsInternal(err) { + t.Errorf("expected error type Internal") + } } diff --git a/table/cached_generic.go b/table/cached_generic.go index cbc23ec..d506e85 100644 --- a/table/cached_generic.go +++ b/table/cached_generic.go @@ -223,7 +223,10 @@ func (t *CachedTable[K, E]) InitializeWithConfig(col db.StoreCollection, opts .. // Count validates the filter server-side without fetching or decoding documents. if t.filter != nil { if _, err := col.Count(context.Background(), t.filter); err != nil { - return errors.Wrapf(errors.InvalidArgument, "WithFilter: filter validation failed: %s", err) + if errors.GetErrCode(err) != errors.Unknown { + return err + } + return errors.Wrapf(errors.Internal, "WithFilter preflight count failed: %s", err) } } @@ -246,7 +249,10 @@ func (t *CachedTable[K, E]) InitializeWithConfig(col db.StoreCollection, opts .. list := []keyOnly[K]{} err = t.col.FindMany(context.Background(), t.filter, &list) if err != nil { - return errors.Wrapf(errors.Unknown, "failed to eager-load keys: %s", err) + if errors.GetErrCode(err) != errors.Unknown { + return err + } + return errors.Wrapf(errors.Internal, "failed to eager-load keys: %s", err) } for _, k := range list { entry, err := t.DBFind(context.Background(), &k.Key)