diff --git a/errors/const.go b/errors/const.go index eced5d6..da8906a 100644 --- a/errors/const.go +++ b/errors/const.go @@ -25,4 +25,7 @@ const ( // Forbidden action error Forbidden ErrCode = 5 + + // Internal error (e.g., transient storage failure) + Internal ErrCode = 6 ) diff --git a/errors/errors.go b/errors/errors.go index fa8d2d4..27c879c 100644 --- a/errors/errors.go +++ b/errors/errors.go @@ -86,3 +86,9 @@ func IsUnauthorized(err error) bool { func IsForbidden(err error) bool { return GetErrCode(err) == Forbidden } + +// IsInternal returns true if err +// is due to an internal/transient failure +func IsInternal(err error) bool { + return GetErrCode(err) == Internal +} diff --git a/errors/errors_test.go b/errors/errors_test.go index b3fe9e9..c4af55e 100644 --- a/errors/errors_test.go +++ b/errors/errors_test.go @@ -28,4 +28,9 @@ func Test_ErrorValidations(t *testing.T) { if !IsNotFound(err) { t.Errorf("expected error type Not Found") } + + err = Wrap(Internal, "internal failure") + if !IsInternal(err) { + t.Errorf("expected error type Internal") + } } diff --git a/table/cached_generic.go b/table/cached_generic.go index 522b845..d506e85 100644 --- a/table/cached_generic.go +++ b/table/cached_generic.go @@ -22,6 +22,21 @@ type CachedTableConfig struct { // instead of eagerly loading all entries at initialization. // Default: false (eager loading) ReadThrough bool + + // Filter specifies an optional filter for the initial eager load (FindMany) + // and for ReconcilerGetAllKeys. When set, only entries matching this filter + // are loaded into the cache and enumerated by the reconciler. + // The filter is passed directly to StoreCollection.FindMany. + // Default: nil (all entries) + Filter any + + // WatchPipeline specifies an optional aggregation pipeline for the change + // stream (Watch). When set, only change events matching the pipeline are + // delivered to the cache callback. + // The pipeline is passed directly to StoreCollection.Watch and should be + // a mongo.Pipeline ([]bson.D) with $match stages or similar. + // Default: nil (all change events) + WatchPipeline any } // CachedTableOption is a functional option for configuring CachedTable. @@ -36,6 +51,77 @@ func WithReadThrough() CachedTableOption { } } +// WithFilter sets a filter for the initial eager load and ReconcilerGetAllKeys. +// Only entries matching this filter will be loaded into the cache at initialization +// and returned when the reconciler enumerates all keys. +// +// This is useful when multiple CachedTable instances share the same underlying +// MongoDB collection but each instance should only manage a subset of documents +// (e.g., filtering by a type discriminator field). +// +// The filter is passed directly to StoreCollection.FindMany and should be a +// valid MongoDB filter document (e.g., bson.M{"key.type": "slack"}). +// +// Example usage: +// +// import "go.mongodb.org/mongo-driver/v2/bson" +// +// err := table.InitializeWithConfig(col, +// WithFilter(bson.M{"key.type": "slack"})) +func WithFilter(filter any) CachedTableOption { + return func(cfg *CachedTableConfig) { + cfg.Filter = filter + } +} + +// WithWatchPipeline sets an aggregation pipeline for the change stream. +// Only change events matching the pipeline will be delivered to the cache +// callback, preventing the cache from receiving and processing irrelevant +// change events. +// +// This is useful when multiple CachedTable instances share the same underlying +// MongoDB collection but each instance should only react to changes for its +// own subset of documents. +// +// The pipeline is passed directly to StoreCollection.Watch and should be a +// mongo.Pipeline ([]bson.D) with appropriate $match stages. +// +// WARNING: MongoDB delete events omit the fullDocument field. If your pipeline +// filters on fullDocument fields (e.g., fullDocument.key.type), delete events +// will be silently dropped by the change stream and will NOT reach the cache +// callback. The cache relies on the reconciler to eventually evict stale entries, +// which means there will be a window of stale data until the next reconciliation +// sweep. For immediate delete propagation, use one of these approaches: +// +// - Use fullDocumentBeforeChange (requires MongoDB 6.0+ with pre-images enabled +// on the collection) to match deletes by the document's prior state +// - Include documentKey-based matching if the filter field is part of the _id +// - Accept the reconciler-based eviction delay for your use case +// +// Example with delete handling (MongoDB 6.0+ with pre-images): +// +// pipeline := mongo.Pipeline{ +// {{Key: "$match", Value: bson.M{"$or": bson.A{ +// bson.M{"operationType": bson.M{"$in": bson.A{"insert", "replace", "update"}}, +// "fullDocument.key.type": "slack"}, +// bson.M{"operationType": "delete", +// "fullDocumentBeforeChange.key.type": "slack"}, +// }}}}, +// } +// err := table.InitializeWithConfig(col, WithWatchPipeline(pipeline)) +// +// Simple example (deletes handled via reconciler): +// +// pipeline := mongo.Pipeline{ +// {{Key: "$match", Value: bson.M{"fullDocument.key.type": "slack"}}}, +// } +// err := table.InitializeWithConfig(col, WithWatchPipeline(pipeline)) +func WithWatchPipeline(pipeline any) CachedTableOption { + return func(cfg *CachedTableConfig) { + cfg.WatchPipeline = pipeline + } +} + // CachedTable is a generic table type providing common functions and types to specific // structures each table is built using. This table also ensure keeping an inmemory // cache information to enable better responsiveness for critical path data fetch, where @@ -48,10 +134,12 @@ func WithReadThrough() CachedTableOption { // E: Entry type (must NOT be a pointer type) type CachedTable[K comparable, E any] struct { reconciler.ManagerImpl - cacheMu sync.RWMutex - cache map[K]*E - col db.StoreCollection - readThrough bool + cacheMu sync.RWMutex + cache map[K]*E + col db.StoreCollection + readThrough bool + filter any // optional filter for FindMany (eager load + reconciler) + watchPipeline any // optional pipeline for Watch (change stream) } // Initialize sets up the Table with the provided db.StoreCollection using default configuration. @@ -76,6 +164,19 @@ func (t *CachedTable[K, E]) Initialize(col db.StoreCollection) error { // // Read-through caching // err := table.InitializeWithConfig(col, WithReadThrough()) // +// // Filtered eager loading (only load entries matching filter) +// // NOTE: For scoped isolation (e.g., type-discriminated caches sharing a +// // collection), both WithFilter and WithWatchPipeline should typically be +// // set together to keep the cache scope consistent. WithFilter scopes the +// // initial load and reconciler, while WithWatchPipeline scopes the change +// // stream. Using only one may cause cache drift — see each option's +// // documentation for details on partial configuration behavior. +// err := table.InitializeWithConfig(col, +// WithFilter(bson.M{"key.type": "slack"}), +// WithWatchPipeline(mongo.Pipeline{ +// {{Key: "$match", Value: bson.M{"fullDocument.key.type": "slack"}}}, +// })) +// // Returns an error if the table is already initialized, the entry or key type is a pointer, // or if the collection setup fails. func (t *CachedTable[K, E]) InitializeWithConfig(col db.StoreCollection, opts ...CachedTableOption) error { @@ -85,12 +186,16 @@ func (t *CachedTable[K, E]) InitializeWithConfig(col db.StoreCollection, opts .. // Apply configuration options config := &CachedTableConfig{ - ReadThrough: false, // Default to eager loading + ReadThrough: false, // Default to eager loading + Filter: nil, // Default to all entries + WatchPipeline: nil, // Default to all change events } for _, opt := range opts { opt(config) } t.readThrough = config.ReadThrough + t.filter = config.Filter + t.watchPipeline = config.WatchPipeline if t.cache == nil { t.cache = map[K]*E{} @@ -111,8 +216,22 @@ func (t *CachedTable[K, E]) InitializeWithConfig(col db.StoreCollection, opts .. return err } - // Register callback for collection changes - err = col.Watch(context.Background(), nil, t.callback) + // Preflight validation: if a filter is configured, validate it by running + // a lightweight Count before proceeding. This catches invalid filters at + // init time with a proper error return instead of deferring to the panic path + // in eager load or ReconcilerGetAllKeys. + // Count validates the filter server-side without fetching or decoding documents. + if t.filter != nil { + if _, err := col.Count(context.Background(), t.filter); err != nil { + if errors.GetErrCode(err) != errors.Unknown { + return err + } + return errors.Wrapf(errors.Internal, "WithFilter preflight count failed: %s", err) + } + } + + // Register callback for collection changes, using watch pipeline if configured + err = col.Watch(context.Background(), t.watchPipeline, t.callback) if err != nil { return err } @@ -128,9 +247,12 @@ func (t *CachedTable[K, E]) InitializeWithConfig(col db.StoreCollection, opts .. // Only eagerly load entries if read-through is disabled if !t.readThrough { list := []keyOnly[K]{} - err = t.col.FindMany(context.Background(), nil, &list) + err = t.col.FindMany(context.Background(), t.filter, &list) if err != nil { - log.Panicf("got error while fetching all keys %s", err) + if errors.GetErrCode(err) != errors.Unknown { + return err + } + return errors.Wrapf(errors.Internal, "failed to eager-load keys: %s", err) } for _, k := range list { entry, err := t.DBFind(context.Background(), &k.Key) @@ -182,11 +304,12 @@ func (t *CachedTable[K, E]) callback(op string, wKey any) { } // ReconcilerGetAllKeys returns all keys in the table. +// If a filter was configured via WithFilter, only keys matching the filter are returned. // Used by the reconciler to enumerate all managed entries. func (t *CachedTable[K, E]) ReconcilerGetAllKeys() []any { list := []keyOnly[K]{} keys := []any{} - err := t.col.FindMany(context.Background(), nil, &list) + err := t.col.FindMany(context.Background(), t.filter, &list) if err != nil { log.Panicf("got error while fetching all keys %s", err) } @@ -263,8 +386,11 @@ func (t *CachedTable[K, E]) Find(ctx context.Context, key *K) (*E, error) { return dbEntry, nil } -// DBFind retrieves an entry by key from the Database +// DBFind retrieves an entry by key from the Database. // Returns the entry and error if not found or if the table is not initialized. +// Preserves error codes from the db layer: a genuine missing document returns +// NotFound, while transient storage failures (timeouts, connection errors) +// return Internal so callers can distinguish and retry appropriately. func (t *CachedTable[K, E]) DBFind(ctx context.Context, key *K) (*E, error) { var data E if t.col == nil { @@ -272,13 +398,18 @@ func (t *CachedTable[K, E]) DBFind(ctx context.Context, key *K) (*E, error) { } err := t.col.FindOne(ctx, key, &data) if err != nil { - return nil, errors.Wrapf(errors.NotFound, "failed to find entry with key %v: %s", key, err) + if errors.GetErrCode(err) != errors.Unknown { + return nil, err + } + return nil, errors.Wrapf(errors.Internal, "failed to find entry with key %v: %s", key, err) } - return &data, err + return &data, nil } // DBFindMany retrieves multiple entries matching the provided filter from database. // Returns a slice of entries and error if none found or if the table is not initialized. +// Preserves error codes from the db layer so callers can distinguish transient +// storage failures from genuine empty results. func (t *CachedTable[K, E]) DBFindMany(ctx context.Context, filter any, offset, limit int32) ([]*E, error) { if t.col == nil { return nil, errors.Wrapf(errors.InvalidArgument, "Table not initialized") @@ -287,7 +418,10 @@ func (t *CachedTable[K, E]) DBFindMany(ctx context.Context, filter any, offset, opts := options.Find().SetLimit(int64(limit)).SetSkip(int64(offset)) err := t.col.FindMany(ctx, filter, &data, opts) if err != nil { - return nil, errors.Wrapf(errors.NotFound, "failed to find any entry: %s", err) + if errors.GetErrCode(err) != errors.Unknown { + return nil, err + } + return nil, errors.Wrapf(errors.Internal, "failed to find entries: %s", err) } return data, nil @@ -296,6 +430,8 @@ func (t *CachedTable[K, E]) DBFindMany(ctx context.Context, filter any, offset, // DBFindManyWithOpts retrieves multiple entries matching the provided filter from database with optional parameters. // Supports pagination (limit, offset) and sorting through functional options. // Returns a slice of entries and error if none found or if the table is not initialized. +// Preserves error codes from the db layer so callers can distinguish transient +// storage failures from genuine empty results. // // Example usage: // @@ -330,7 +466,10 @@ func (t *CachedTable[K, E]) DBFindManyWithOpts(ctx context.Context, filter any, var data []*E err := t.col.FindMany(ctx, filter, &data, mongoOpts) if err != nil { - return nil, errors.Wrapf(errors.NotFound, "failed to find any entry: %s", err) + if errors.GetErrCode(err) != errors.Unknown { + return nil, err + } + return nil, errors.Wrapf(errors.Internal, "failed to find entries: %s", err) } return data, nil diff --git a/table/generic.go b/table/generic.go index 932034e..0452de5 100644 --- a/table/generic.go +++ b/table/generic.go @@ -293,6 +293,9 @@ func (t *Table[K, E]) Update(ctx context.Context, key *K, entry *E) error { // Find retrieves an entry by key. // Returns the entry and error if not found or if the table is not initialized. +// Preserves error codes from the db layer: a genuine missing document returns +// NotFound, while transient storage failures (timeouts, connection errors) +// return Internal so callers can distinguish and retry appropriately. func (t *Table[K, E]) Find(ctx context.Context, key *K) (*E, error) { var data E if t.col == nil { @@ -300,13 +303,22 @@ func (t *Table[K, E]) Find(ctx context.Context, key *K) (*E, error) { } err := t.col.FindOne(ctx, key, &data) if err != nil { - return nil, errors.Wrapf(errors.NotFound, "failed to find entry with key %v: %s", key, err) + // If the db layer already assigned a recognized error code + // (e.g., NotFound from interpretMongoError for ErrNoDocuments), + // pass it through unchanged. Otherwise wrap as Internal so + // callers can distinguish transient failures from true misses. + if errors.GetErrCode(err) != errors.Unknown { + return nil, err + } + return nil, errors.Wrapf(errors.Internal, "failed to find entry with key %v: %s", key, err) } - return &data, err + return &data, nil } // FindMany retrieves multiple entries matching the provided filter. // Returns a slice of entries and error if none found or if the table is not initialized. +// Preserves error codes from the db layer so callers can distinguish transient +// storage failures from genuine empty results. func (t *Table[K, E]) FindMany(ctx context.Context, filter any, offset, limit int32) ([]*E, error) { if t.col == nil { return nil, errors.Wrapf(errors.InvalidArgument, "Table not initialized") @@ -315,7 +327,10 @@ func (t *Table[K, E]) FindMany(ctx context.Context, filter any, offset, limit in opts := options.Find().SetLimit(int64(limit)).SetSkip(int64(offset)) err := t.col.FindMany(ctx, filter, &data, opts) if err != nil { - return nil, errors.Wrapf(errors.NotFound, "failed to find any entry: %s", err) + if errors.GetErrCode(err) != errors.Unknown { + return nil, err + } + return nil, errors.Wrapf(errors.Internal, "failed to find entries: %s", err) } return data, nil @@ -324,6 +339,8 @@ func (t *Table[K, E]) FindMany(ctx context.Context, filter any, offset, limit in // FindManyWithOpts retrieves multiple entries matching the provided filter with optional parameters. // Supports pagination (limit, offset) and sorting through functional options. // Returns a slice of entries and error if none found or if the table is not initialized. +// Preserves error codes from the db layer so callers can distinguish transient +// storage failures from genuine empty results. // // Example usage: // @@ -358,7 +375,10 @@ func (t *Table[K, E]) FindManyWithOpts(ctx context.Context, filter any, opts ... var data []*E err := t.col.FindMany(ctx, filter, &data, mongoOpts) if err != nil { - return nil, errors.Wrapf(errors.NotFound, "failed to find any entry: %s", err) + if errors.GetErrCode(err) != errors.Unknown { + return nil, err + } + return nil, errors.Wrapf(errors.Internal, "failed to find entries: %s", err) } return data, nil