Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions errors/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,7 @@ const (

// Forbidden action error
Forbidden ErrCode = 5

// Internal error (e.g., transient storage failure)
Internal ErrCode = 6
)
6 changes: 6 additions & 0 deletions errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,9 @@ func IsUnauthorized(err error) bool {
func IsForbidden(err error) bool {
return GetErrCode(err) == Forbidden
}

// IsInternal returns true if err
// is due to an internal/transient failure
func IsInternal(err error) bool {
return GetErrCode(err) == Internal
}
5 changes: 5 additions & 0 deletions errors/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,9 @@ func Test_ErrorValidations(t *testing.T) {
if !IsNotFound(err) {
t.Errorf("expected error type Not Found")
}

err = Wrap(Internal, "internal failure")
if !IsInternal(err) {
t.Errorf("expected error type Internal")
}
}
169 changes: 154 additions & 15 deletions table/cached_generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,21 @@ type CachedTableConfig struct {
// instead of eagerly loading all entries at initialization.
// Default: false (eager loading)
ReadThrough bool

// Filter specifies an optional filter for the initial eager load (FindMany)
// and for ReconcilerGetAllKeys. When set, only entries matching this filter
// are loaded into the cache and enumerated by the reconciler.
// The filter is passed directly to StoreCollection.FindMany.
// Default: nil (all entries)
Filter any

// WatchPipeline specifies an optional aggregation pipeline for the change
// stream (Watch). When set, only change events matching the pipeline are
// delivered to the cache callback.
// The pipeline is passed directly to StoreCollection.Watch and should be
// a mongo.Pipeline ([]bson.D) with $match stages or similar.
// Default: nil (all change events)
WatchPipeline any
}

// CachedTableOption is a functional option for configuring CachedTable.
Expand All @@ -36,6 +51,77 @@ func WithReadThrough() CachedTableOption {
}
}

// WithFilter sets a filter for the initial eager load and ReconcilerGetAllKeys.
// Only entries matching this filter will be loaded into the cache at initialization
// and returned when the reconciler enumerates all keys.
//
// This is useful when multiple CachedTable instances share the same underlying
// MongoDB collection but each instance should only manage a subset of documents
// (e.g., filtering by a type discriminator field).
//
// The filter is passed directly to StoreCollection.FindMany and should be a
// valid MongoDB filter document (e.g., bson.M{"key.type": "slack"}).
//
// Example usage:
//
// import "go.mongodb.org/mongo-driver/v2/bson"
//
// err := table.InitializeWithConfig(col,
// WithFilter(bson.M{"key.type": "slack"}))
func WithFilter(filter any) CachedTableOption {
return func(cfg *CachedTableConfig) {
cfg.Filter = filter
}
}

// WithWatchPipeline sets an aggregation pipeline for the change stream.
// Only change events matching the pipeline will be delivered to the cache
// callback, preventing the cache from receiving and processing irrelevant
// change events.
//
// This is useful when multiple CachedTable instances share the same underlying
// MongoDB collection but each instance should only react to changes for its
// own subset of documents.
//
// The pipeline is passed directly to StoreCollection.Watch and should be a
// mongo.Pipeline ([]bson.D) with appropriate $match stages.
//
// WARNING: MongoDB delete events omit the fullDocument field. If your pipeline
// filters on fullDocument fields (e.g., fullDocument.key.type), delete events
// will be silently dropped by the change stream and will NOT reach the cache
// callback. The cache relies on the reconciler to eventually evict stale entries,
// which means there will be a window of stale data until the next reconciliation
// sweep. For immediate delete propagation, use one of these approaches:
//
// - Use fullDocumentBeforeChange (requires MongoDB 6.0+ with pre-images enabled
// on the collection) to match deletes by the document's prior state
// - Include documentKey-based matching if the filter field is part of the _id
// - Accept the reconciler-based eviction delay for your use case
//
// Example with delete handling (MongoDB 6.0+ with pre-images):
//
// pipeline := mongo.Pipeline{
// {{Key: "$match", Value: bson.M{"$or": bson.A{
// bson.M{"operationType": bson.M{"$in": bson.A{"insert", "replace", "update"}},
// "fullDocument.key.type": "slack"},
// bson.M{"operationType": "delete",
// "fullDocumentBeforeChange.key.type": "slack"},
// }}}},
// }
// err := table.InitializeWithConfig(col, WithWatchPipeline(pipeline))
//
// Simple example (deletes handled via reconciler):
//
// pipeline := mongo.Pipeline{
// {{Key: "$match", Value: bson.M{"fullDocument.key.type": "slack"}}},
// }
// err := table.InitializeWithConfig(col, WithWatchPipeline(pipeline))
func WithWatchPipeline(pipeline any) CachedTableOption {
return func(cfg *CachedTableConfig) {
cfg.WatchPipeline = pipeline
}
}

// CachedTable is a generic table type providing common functions and types to specific
// structures each table is built using. This table also ensure keeping an inmemory
// cache information to enable better responsiveness for critical path data fetch, where
Expand All @@ -48,10 +134,12 @@ func WithReadThrough() CachedTableOption {
// E: Entry type (must NOT be a pointer type)
type CachedTable[K comparable, E any] struct {
reconciler.ManagerImpl
cacheMu sync.RWMutex
cache map[K]*E
col db.StoreCollection
readThrough bool
cacheMu sync.RWMutex
cache map[K]*E
col db.StoreCollection
readThrough bool
filter any // optional filter for FindMany (eager load + reconciler)
watchPipeline any // optional pipeline for Watch (change stream)
}

// Initialize sets up the Table with the provided db.StoreCollection using default configuration.
Expand All @@ -76,6 +164,19 @@ func (t *CachedTable[K, E]) Initialize(col db.StoreCollection) error {
// // Read-through caching
// err := table.InitializeWithConfig(col, WithReadThrough())
//
// // Filtered eager loading (only load entries matching filter)
// // NOTE: For scoped isolation (e.g., type-discriminated caches sharing a
// // collection), both WithFilter and WithWatchPipeline should typically be
// // set together to keep the cache scope consistent. WithFilter scopes the
// // initial load and reconciler, while WithWatchPipeline scopes the change
// // stream. Using only one may cause cache drift — see each option's
// // documentation for details on partial configuration behavior.
// err := table.InitializeWithConfig(col,
// WithFilter(bson.M{"key.type": "slack"}),
// WithWatchPipeline(mongo.Pipeline{
// {{Key: "$match", Value: bson.M{"fullDocument.key.type": "slack"}}},
// }))
//
// Returns an error if the table is already initialized, the entry or key type is a pointer,
// or if the collection setup fails.
func (t *CachedTable[K, E]) InitializeWithConfig(col db.StoreCollection, opts ...CachedTableOption) error {
Expand All @@ -85,12 +186,16 @@ func (t *CachedTable[K, E]) InitializeWithConfig(col db.StoreCollection, opts ..

// Apply configuration options
config := &CachedTableConfig{
ReadThrough: false, // Default to eager loading
ReadThrough: false, // Default to eager loading
Filter: nil, // Default to all entries
WatchPipeline: nil, // Default to all change events
}
for _, opt := range opts {
opt(config)
}
t.readThrough = config.ReadThrough
t.filter = config.Filter
t.watchPipeline = config.WatchPipeline

if t.cache == nil {
t.cache = map[K]*E{}
Expand All @@ -111,8 +216,22 @@ func (t *CachedTable[K, E]) InitializeWithConfig(col db.StoreCollection, opts ..
return err
}

// Register callback for collection changes
err = col.Watch(context.Background(), nil, t.callback)
// Preflight validation: if a filter is configured, validate it by running
// a lightweight Count before proceeding. This catches invalid filters at
// init time with a proper error return instead of deferring to the panic path
// in eager load or ReconcilerGetAllKeys.
// Count validates the filter server-side without fetching or decoding documents.
if t.filter != nil {
if _, err := col.Count(context.Background(), t.filter); err != nil {
if errors.GetErrCode(err) != errors.Unknown {
return err
}
return errors.Wrapf(errors.Internal, "WithFilter preflight count failed: %s", err)
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

// Register callback for collection changes, using watch pipeline if configured
err = col.Watch(context.Background(), t.watchPipeline, t.callback)
if err != nil {
return err
}
Expand All @@ -128,9 +247,12 @@ func (t *CachedTable[K, E]) InitializeWithConfig(col db.StoreCollection, opts ..
// Only eagerly load entries if read-through is disabled
if !t.readThrough {
list := []keyOnly[K]{}
err = t.col.FindMany(context.Background(), nil, &list)
err = t.col.FindMany(context.Background(), t.filter, &list)
if err != nil {
log.Panicf("got error while fetching all keys %s", err)
if errors.GetErrCode(err) != errors.Unknown {
return err
}
return errors.Wrapf(errors.Internal, "failed to eager-load keys: %s", err)
}
for _, k := range list {
entry, err := t.DBFind(context.Background(), &k.Key)
Expand Down Expand Up @@ -182,11 +304,12 @@ func (t *CachedTable[K, E]) callback(op string, wKey any) {
}

// ReconcilerGetAllKeys returns all keys in the table.
// If a filter was configured via WithFilter, only keys matching the filter are returned.
// Used by the reconciler to enumerate all managed entries.
func (t *CachedTable[K, E]) ReconcilerGetAllKeys() []any {
list := []keyOnly[K]{}
keys := []any{}
err := t.col.FindMany(context.Background(), nil, &list)
err := t.col.FindMany(context.Background(), t.filter, &list)
if err != nil {
log.Panicf("got error while fetching all keys %s", err)
}
Expand Down Expand Up @@ -263,22 +386,30 @@ func (t *CachedTable[K, E]) Find(ctx context.Context, key *K) (*E, error) {
return dbEntry, nil
}

// DBFind retrieves an entry by key from the Database
// DBFind retrieves an entry by key from the Database.
// Returns the entry and error if not found or if the table is not initialized.
// Preserves error codes from the db layer: a genuine missing document returns
// NotFound, while transient storage failures (timeouts, connection errors)
// return Internal so callers can distinguish and retry appropriately.
func (t *CachedTable[K, E]) DBFind(ctx context.Context, key *K) (*E, error) {
var data E
if t.col == nil {
return nil, errors.Wrapf(errors.InvalidArgument, "Table not initialized")
}
err := t.col.FindOne(ctx, key, &data)
if err != nil {
return nil, errors.Wrapf(errors.NotFound, "failed to find entry with key %v: %s", key, err)
if errors.GetErrCode(err) != errors.Unknown {
return nil, err
}
return nil, errors.Wrapf(errors.Internal, "failed to find entry with key %v: %s", key, err)
}
return &data, err
return &data, nil
}

// DBFindMany retrieves multiple entries matching the provided filter from database.
// Returns a slice of entries and error if none found or if the table is not initialized.
// Preserves error codes from the db layer so callers can distinguish transient
// storage failures from genuine empty results.
func (t *CachedTable[K, E]) DBFindMany(ctx context.Context, filter any, offset, limit int32) ([]*E, error) {
if t.col == nil {
return nil, errors.Wrapf(errors.InvalidArgument, "Table not initialized")
Expand All @@ -287,7 +418,10 @@ func (t *CachedTable[K, E]) DBFindMany(ctx context.Context, filter any, offset,
opts := options.Find().SetLimit(int64(limit)).SetSkip(int64(offset))
err := t.col.FindMany(ctx, filter, &data, opts)
if err != nil {
return nil, errors.Wrapf(errors.NotFound, "failed to find any entry: %s", err)
if errors.GetErrCode(err) != errors.Unknown {
return nil, err
}
return nil, errors.Wrapf(errors.Internal, "failed to find entries: %s", err)
}

return data, nil
Expand All @@ -296,6 +430,8 @@ func (t *CachedTable[K, E]) DBFindMany(ctx context.Context, filter any, offset,
// DBFindManyWithOpts retrieves multiple entries matching the provided filter from database with optional parameters.
// Supports pagination (limit, offset) and sorting through functional options.
// Returns a slice of entries and error if none found or if the table is not initialized.
// Preserves error codes from the db layer so callers can distinguish transient
// storage failures from genuine empty results.
//
// Example usage:
//
Expand Down Expand Up @@ -330,7 +466,10 @@ func (t *CachedTable[K, E]) DBFindManyWithOpts(ctx context.Context, filter any,
var data []*E
err := t.col.FindMany(ctx, filter, &data, mongoOpts)
if err != nil {
return nil, errors.Wrapf(errors.NotFound, "failed to find any entry: %s", err)
if errors.GetErrCode(err) != errors.Unknown {
return nil, err
}
return nil, errors.Wrapf(errors.Internal, "failed to find entries: %s", err)
}

return data, nil
Expand Down
28 changes: 24 additions & 4 deletions table/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,20 +293,32 @@ func (t *Table[K, E]) Update(ctx context.Context, key *K, entry *E) error {

// Find retrieves an entry by key.
// Returns the entry and error if not found or if the table is not initialized.
// Preserves error codes from the db layer: a genuine missing document returns
// NotFound, while transient storage failures (timeouts, connection errors)
// return Internal so callers can distinguish and retry appropriately.
func (t *Table[K, E]) Find(ctx context.Context, key *K) (*E, error) {
var data E
if t.col == nil {
return nil, errors.Wrapf(errors.InvalidArgument, "Table not initialized")
}
err := t.col.FindOne(ctx, key, &data)
if err != nil {
return nil, errors.Wrapf(errors.NotFound, "failed to find entry with key %v: %s", key, err)
// If the db layer already assigned a recognized error code
// (e.g., NotFound from interpretMongoError for ErrNoDocuments),
// pass it through unchanged. Otherwise wrap as Internal so
// callers can distinguish transient failures from true misses.
if errors.GetErrCode(err) != errors.Unknown {
return nil, err
}
return nil, errors.Wrapf(errors.Internal, "failed to find entry with key %v: %s", key, err)
}
return &data, err
return &data, nil
}

// FindMany retrieves multiple entries matching the provided filter.
// Returns a slice of entries and error if none found or if the table is not initialized.
// Preserves error codes from the db layer so callers can distinguish transient
// storage failures from genuine empty results.
func (t *Table[K, E]) FindMany(ctx context.Context, filter any, offset, limit int32) ([]*E, error) {
if t.col == nil {
return nil, errors.Wrapf(errors.InvalidArgument, "Table not initialized")
Expand All @@ -315,7 +327,10 @@ func (t *Table[K, E]) FindMany(ctx context.Context, filter any, offset, limit in
opts := options.Find().SetLimit(int64(limit)).SetSkip(int64(offset))
err := t.col.FindMany(ctx, filter, &data, opts)
if err != nil {
return nil, errors.Wrapf(errors.NotFound, "failed to find any entry: %s", err)
if errors.GetErrCode(err) != errors.Unknown {
return nil, err
}
return nil, errors.Wrapf(errors.Internal, "failed to find entries: %s", err)
}

return data, nil
Expand All @@ -324,6 +339,8 @@ func (t *Table[K, E]) FindMany(ctx context.Context, filter any, offset, limit in
// FindManyWithOpts retrieves multiple entries matching the provided filter with optional parameters.
// Supports pagination (limit, offset) and sorting through functional options.
// Returns a slice of entries and error if none found or if the table is not initialized.
// Preserves error codes from the db layer so callers can distinguish transient
// storage failures from genuine empty results.
//
// Example usage:
//
Expand Down Expand Up @@ -358,7 +375,10 @@ func (t *Table[K, E]) FindManyWithOpts(ctx context.Context, filter any, opts ...
var data []*E
err := t.col.FindMany(ctx, filter, &data, mongoOpts)
if err != nil {
return nil, errors.Wrapf(errors.NotFound, "failed to find any entry: %s", err)
if errors.GetErrCode(err) != errors.Unknown {
return nil, err
}
return nil, errors.Wrapf(errors.Internal, "failed to find entries: %s", err)
}

return data, nil
Expand Down