diff --git a/src/DynamicData/List/ObservableListEx.Adapt.cs b/src/DynamicData/List/ObservableListEx.Adapt.cs
new file mode 100644
index 00000000..a8b18073
--- /dev/null
+++ b/src/DynamicData/List/ObservableListEx.Adapt.cs
@@ -0,0 +1,60 @@
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Roland Pheasant licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System.Collections.ObjectModel;
+using System.ComponentModel;
+using System.Diagnostics.CodeAnalysis;
+using System.Linq.Expressions;
+using System.Reactive;
+using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+using System.Reactive.Linq;
+using DynamicData.Binding;
+using DynamicData.Cache.Internal;
+using DynamicData.List.Internal;
+using DynamicData.List.Linq;
+
+// ReSharper disable once CheckNamespace
+namespace DynamicData;
+
+///
+/// ObservableList extensions for Adapt.
+///
+public static partial class ObservableListEx
+{
+ ///
+ /// Injects a side effect into a changeset stream via an .
+ /// The adaptor's Adapt method is invoked for each changeset before it is forwarded downstream unchanged.
+ ///
+ /// The type of items in the list.
+ /// The source to observe and adapt.
+ /// The adaptor whose Adapt method is invoked for each changeset.
+ /// A list changeset stream identical to the source, with the adaptor side effect applied.
+ /// or is .
+ ///
+ ///
+ /// This is the primary extension point for custom UI binding adaptors (e.g.,
+ /// delegates to this operator). If the adaptor throws, the exception propagates downstream as OnError.
+ ///
+ ///
+ ///
+ public static IObservable> Adapt(this IObservable> source, IChangeSetAdaptor adaptor)
+ where T : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+ adaptor.ThrowArgumentNullExceptionIfNull(nameof(adaptor));
+
+ return Observable.Create>(
+ observer =>
+ {
+ var locker = InternalEx.NewLock();
+ return source.Synchronize(locker).Select(
+ changes =>
+ {
+ adaptor.Adapt(changes);
+ return changes;
+ }).SubscribeSafe(observer);
+ });
+ }
+}
diff --git a/src/DynamicData/List/ObservableListEx.AutoRefresh.cs b/src/DynamicData/List/ObservableListEx.AutoRefresh.cs
new file mode 100644
index 00000000..828289ff
--- /dev/null
+++ b/src/DynamicData/List/ObservableListEx.AutoRefresh.cs
@@ -0,0 +1,139 @@
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Roland Pheasant licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System.Collections.ObjectModel;
+using System.ComponentModel;
+using System.Diagnostics.CodeAnalysis;
+using System.Linq.Expressions;
+using System.Reactive;
+using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+using System.Reactive.Linq;
+using DynamicData.Binding;
+using DynamicData.Cache.Internal;
+using DynamicData.List.Internal;
+using DynamicData.List.Linq;
+
+// ReSharper disable once CheckNamespace
+namespace DynamicData;
+
+///
+/// ObservableList extensions for AutoRefresh.
+///
+public static partial class ObservableListEx
+{
+ ///
+ /// Monitors all properties on each item (via ) and emits Refresh
+ /// changes when any property changes, causing downstream operators to re-evaluate.
+ ///
+ /// The type of items, which must implement .
+ /// The source to monitor for property-driven refresh signals.
+ /// An optional buffer duration to batch multiple refresh signals into a single changeset.
+ /// An optional throttle applied to each item's property change notifications.
+ /// The scheduler for throttle and buffer timing. Defaults to .
+ /// A list changeset stream with additional Refresh changes injected when properties change.
+ /// is .
+ ///
+ ///
+ /// Wraps using WhenAnyPropertyChanged() as the re-evaluator.
+ /// Pair with or
+ /// to get reactive re-evaluation on property changes.
+ ///
+ ///
+ /// EventBehavior
+ /// - Add/AddRangeSubscribes to PropertyChanged on each new item. The original change is forwarded.
+ /// - ReplaceUnsubscribes from the old item, subscribes to the new. The original change is forwarded.
+ /// - Remove/RemoveRange/ClearUnsubscribes from removed items. The original change is forwarded.
+ /// - Moved/RefreshForwarded unchanged.
+ /// - Property changesA Refresh change is emitted for the item whose property changed.
+ ///
+ /// Worth noting: Each item generates a subscription. For large lists with frequent property changes, use and to reduce churn.
+ ///
+ ///
+ ///
+ ///
+ ///
+ public static IObservable> AutoRefresh(this IObservable> source, TimeSpan? changeSetBuffer = null, TimeSpan? propertyChangeThrottle = null, IScheduler? scheduler = null)
+ where TObject : INotifyPropertyChanged
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ return source.AutoRefreshOnObservable(
+ t =>
+ {
+ if (propertyChangeThrottle is null)
+ {
+ return t.WhenAnyPropertyChanged();
+ }
+
+ return t.WhenAnyPropertyChanged().Throttle(propertyChangeThrottle.Value, scheduler ?? GlobalConfig.DefaultScheduler);
+ },
+ changeSetBuffer,
+ scheduler);
+ }
+
+ ///
+ /// Monitors a single property (selected by ) on each item via
+ /// and emits Refresh changes when that property changes, causing downstream operators to re-evaluate. More efficient than
+ /// the all-properties overload when only one property (of type ) affects downstream behavior.
+ ///
+ ///
+ public static IObservable> AutoRefresh(this IObservable> source, Expression> propertyAccessor, TimeSpan? changeSetBuffer = null, TimeSpan? propertyChangeThrottle = null, IScheduler? scheduler = null)
+ where TObject : INotifyPropertyChanged
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+ propertyAccessor.ThrowArgumentNullExceptionIfNull(nameof(propertyAccessor));
+
+ return source.AutoRefreshOnObservable(
+ t =>
+ {
+ if (propertyChangeThrottle is null)
+ {
+ return t.WhenPropertyChanged(propertyAccessor, false);
+ }
+
+ return t.WhenPropertyChanged(propertyAccessor, false).Throttle(propertyChangeThrottle.Value, scheduler ?? GlobalConfig.DefaultScheduler);
+ },
+ changeSetBuffer,
+ scheduler);
+ }
+
+ ///
+ /// Monitors each item with a custom observable and emits Refresh changes whenever that observable fires,
+ /// causing downstream operators (Filter, Sort, Group) to re-evaluate.
+ ///
+ /// The type of items in the list.
+ /// The type emitted by the re-evaluator observable (value is ignored).
+ /// The source to monitor for observable-driven refresh signals.
+ /// A factory that, given an item, returns an observable whose emissions trigger a Refresh for that item.
+ /// An optional buffer duration to batch refresh signals into a single changeset.
+ /// The for buffering.
+ /// A list changeset stream with additional Refresh changes injected when per-item observables fire.
+ /// or is .
+ ///
+ ///
+ /// This is the general-purpose refresh mechanism.
+ /// is a convenience wrapper that uses WhenAnyPropertyChanged() as the re-evaluator.
+ ///
+ ///
+ /// EventBehavior
+ /// - Add/AddRangeSubscribes to the re-evaluator observable for each new item. The original change is forwarded.
+ /// - ReplaceUnsubscribes from the old item's observable, subscribes to the new. The original change is forwarded.
+ /// - Remove/RemoveRange/ClearUnsubscribes from removed items. The original change is forwarded.
+ /// - Moved/RefreshForwarded unchanged.
+ /// - Re-evaluator firesThe item's current index is looked up and a Refresh change is emitted.
+ ///
+ ///
+ ///
+ ///
+ ///
+ public static IObservable> AutoRefreshOnObservable(this IObservable> source, Func> reevaluator, TimeSpan? changeSetBuffer = null, IScheduler? scheduler = null)
+ where TObject : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+ reevaluator.ThrowArgumentNullExceptionIfNull(nameof(reevaluator));
+
+ return new AutoRefresh(source, reevaluator, changeSetBuffer, scheduler).Run();
+ }
+}
diff --git a/src/DynamicData/List/ObservableListEx.Bind.cs b/src/DynamicData/List/ObservableListEx.Bind.cs
new file mode 100644
index 00000000..e5c9a031
--- /dev/null
+++ b/src/DynamicData/List/ObservableListEx.Bind.cs
@@ -0,0 +1,151 @@
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Roland Pheasant licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System.Collections.ObjectModel;
+using System.ComponentModel;
+using System.Diagnostics.CodeAnalysis;
+using System.Linq.Expressions;
+using System.Reactive;
+using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+using System.Reactive.Linq;
+using DynamicData.Binding;
+using DynamicData.Cache.Internal;
+using DynamicData.List.Internal;
+using DynamicData.List.Linq;
+
+// ReSharper disable once CheckNamespace
+namespace DynamicData;
+
+///
+/// ObservableList extensions for Bind.
+///
+public static partial class ObservableListEx
+{
+ ///
+ /// Applies changeset mutations to a target for UI data binding.
+ ///
+ /// The type of items in the list.
+ /// The source to bind to a collection.
+ /// The target collection to keep in sync.
+ /// When a changeset exceeds this many changes, the collection is reset instead of applying individual changes.
+ /// A continuation of the source changeset stream (allows further chaining).
+ /// or is .
+ ///
+ ///
+ /// Delegates to with an internal collection adaptor.
+ /// Each changeset is applied to the target collection on the calling thread. For UI binding, ensure the source is
+ /// observed on the UI thread (e.g., via ObserveOn).
+ ///
+ ///
+ /// EventBehavior
+ /// - AddItem inserted at the specified index in the target collection.
+ /// - AddRangeItems inserted as a range. If the count exceeds , the collection is cleared and repopulated.
+ /// - ReplaceItem at the specified index is replaced.
+ /// - RemoveItem at the specified index is removed.
+ /// - RemoveRange/ClearItems removed from the collection.
+ /// - MovedItem is moved between positions in the collection.
+ /// - RefreshDepends on the adaptor implementation.
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ public static IObservable> Bind(this IObservable> source, IObservableCollection targetCollection, int resetThreshold = BindingOptions.DefaultResetThreshold)
+ where T : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+ targetCollection.ThrowArgumentNullExceptionIfNull(nameof(targetCollection));
+
+ // if user has not specified different defaults, use system wide defaults instead.
+ // This is a hack to retro fit system wide defaults which override the hard coded defaults above
+ var defaults = DynamicDataOptions.Binding;
+
+ var options = resetThreshold == BindingOptions.DefaultResetThreshold
+ ? defaults
+ : defaults with { ResetThreshold = resetThreshold };
+
+ return source.Bind(targetCollection, options);
+ }
+
+ ///
+ /// Binds the source changeset stream to , with fine-grained control over reset threshold and other behaviors.
+ ///
+ ///
+ public static IObservable> Bind(this IObservable> source, IObservableCollection targetCollection, BindingOptions options)
+ where T : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+ targetCollection.ThrowArgumentNullExceptionIfNull(nameof(targetCollection));
+
+ var adaptor = new ObservableCollectionAdaptor(targetCollection, options);
+ return source.Adapt(adaptor);
+ }
+
+ ///
+ /// Constructs a and binds the changeset stream to it.
+ /// Use this overload when you need a read-only view (typically for UI binding) without managing the backing collection yourself.
+ /// The created collection is returned via the output parameter.
+ ///
+ ///
+ ///
+ ///
+ /// The created collection is backed by an internal ObservableCollectionExtended<T>. Callers receive only the read-only wrapper.
+ ///
+ public static IObservable> Bind(this IObservable> source, out ReadOnlyObservableCollection readOnlyObservableCollection, int resetThreshold = BindingOptions.DefaultResetThreshold)
+ where T : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ // if user has not specified different defaults, use system wide defaults instead.
+ // This is a hack to retro fit system wide defaults which override the hard coded defaults above
+ var defaults = DynamicDataOptions.Binding;
+ var options = resetThreshold == BindingOptions.DefaultResetThreshold
+ ? defaults
+ : defaults with { ResetThreshold = resetThreshold };
+
+ return source.Bind(out readOnlyObservableCollection, options);
+ }
+
+ ///
+ /// Constructs a and binds the changeset stream to it,
+ /// with fine-grained control over reset threshold and other behaviors.
+ /// The created collection is returned via the output parameter.
+ ///
+ ///
+ ///
+ ///
+ /// The created collection is backed by an internal ObservableCollectionExtended<T>. Callers receive only the read-only wrapper.
+ ///
+ public static IObservable> Bind(this IObservable> source, out ReadOnlyObservableCollection readOnlyObservableCollection, BindingOptions options)
+ where T : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ var target = new ObservableCollectionExtended();
+ var result = new ReadOnlyObservableCollection(target);
+ var adaptor = new ObservableCollectionAdaptor(target, options);
+ readOnlyObservableCollection = result;
+ return source.Adapt(adaptor);
+ }
+
+#if SUPPORTS_BINDINGLIST
+
+ ///
+ /// Binds the source changeset stream to a WinForms , keeping in sync.
+ ///
+ ///
+ public static IObservable> Bind<[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.All)] T>(this IObservable> source, BindingList bindingList, int resetThreshold = BindingOptions.DefaultResetThreshold)
+ where T : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+ bindingList.ThrowArgumentNullExceptionIfNull(nameof(bindingList));
+
+ return source.Adapt(new BindingListAdaptor(bindingList, resetThreshold));
+ }
+
+#endif
+}
diff --git a/src/DynamicData/List/ObservableListEx.ChangeStream.cs b/src/DynamicData/List/ObservableListEx.ChangeStream.cs
new file mode 100644
index 00000000..830d3687
--- /dev/null
+++ b/src/DynamicData/List/ObservableListEx.ChangeStream.cs
@@ -0,0 +1,209 @@
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Roland Pheasant licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System.Collections.ObjectModel;
+using System.ComponentModel;
+using System.Diagnostics.CodeAnalysis;
+using System.Linq.Expressions;
+using System.Reactive;
+using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+using System.Reactive.Linq;
+using DynamicData.Binding;
+using DynamicData.Cache.Internal;
+using DynamicData.List.Internal;
+using DynamicData.List.Linq;
+
+// ReSharper disable once CheckNamespace
+namespace DynamicData;
+
+///
+/// ObservableList extensions for changeset stream lifecycle helpers and buffering.
+///
+public static partial class ObservableListEx
+{
+ ///
+ ///
+ ///
+ /// This overload starts unpaused and has no timeout.
+ ///
+ public static IObservable> BufferIf(this IObservable> source, IObservable pauseIfTrueSelector, IScheduler? scheduler = null)
+ where T : notnull => BufferIf(source, pauseIfTrueSelector, false, scheduler);
+
+ ///
+ ///
+ ///
+ /// This overload allows setting the initial pause state but has no timeout.
+ ///
+ public static IObservable> BufferIf(this IObservable> source, IObservable pauseIfTrueSelector, bool initialPauseState, IScheduler? scheduler = null)
+ where T : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+ pauseIfTrueSelector.ThrowArgumentNullExceptionIfNull(nameof(pauseIfTrueSelector));
+
+ return BufferIf(source, pauseIfTrueSelector, initialPauseState, null, scheduler);
+ }
+
+ ///
+ ///
+ ///
+ /// This overload starts unpaused and accepts a timeout but not an explicit initial pause state.
+ ///
+ public static IObservable> BufferIf(this IObservable> source, IObservable pauseIfTrueSelector, TimeSpan? timeOut, IScheduler? scheduler = null)
+ where T : notnull => BufferIf(source, pauseIfTrueSelector, false, timeOut, scheduler);
+
+ ///
+ /// Buffers changeset notifications while a pause signal is active, then flushes all buffered changes when resumed.
+ ///
+ /// The type of items in the list.
+ /// The source to conditionally buffer.
+ /// An of that controls buffering: pauses (buffers), resumes (flushes).
+ /// The initial pause state. When , buffering starts immediately.
+ /// An optional maximum duration to keep the buffer open. After this time, the buffer is flushed regardless of pause state.
+ /// The for timeout scheduling.
+ /// A list changeset stream that buffers during pause and emits combined changesets on resume.
+ /// or is .
+ ///
+ ///
+ /// All changeset events are buffered at the changeset level (not individual changes) while paused.
+ /// On resume, all buffered changesets are emitted as a single combined changeset. If the buffer is empty on resume,
+ /// no emission occurs.
+ ///
+ ///
+ /// EventBehavior
+ /// - Any (while paused)Accumulated in an internal buffer. Not emitted downstream.
+ /// - Any (while active)Passed through immediately.
+ /// - Pause selector emits falseAll buffered changesets are flushed downstream as one combined changeset.
+ /// - Timeout firesAutomatically resumes and flushes the buffer.
+ /// - OnErrorForwarded immediately (not buffered).
+ /// - OnCompletedForwarded immediately.
+ ///
+ /// Worth noting: Each pause/resume cycle re-arms the timeout. Rapid toggling can create many small buffer windows.
+ ///
+ public static IObservable> BufferIf(this IObservable> source, IObservable pauseIfTrueSelector, bool initialPauseState, TimeSpan? timeOut, IScheduler? scheduler = null)
+ where T : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+ pauseIfTrueSelector.ThrowArgumentNullExceptionIfNull(nameof(pauseIfTrueSelector));
+
+ return new BufferIf(source, pauseIfTrueSelector, initialPauseState, timeOut, scheduler).Run();
+ }
+
+ ///
+ /// Buffers changesets during an initial time window, then emits a single combined changeset and passes through subsequent changes.
+ ///
+ /// The type of items in the list.
+ /// The source to buffer during the initial loading period.
+ /// The time period (measured from first emission) during which changes are buffered.
+ /// The for timing the buffer window.
+ /// A list changeset stream where the initial burst is combined into one changeset.
+ ///
+ ///
+ /// For a configured duration after the first emission, all changesets are buffered and combined into a single emission.
+ /// After this initial window, subsequent changesets pass through immediately.
+ ///
+ ///
+ ///
+ ///
+ public static IObservable> BufferInitial(this IObservable> source, TimeSpan initialBuffer, IScheduler? scheduler = null)
+ where TObject : notnull => source.DeferUntilLoaded().Publish(
+ shared =>
+ {
+ var initial = shared.Buffer(initialBuffer, scheduler ?? GlobalConfig.DefaultScheduler).FlattenBufferResult().Take(1);
+
+ return initial.Concat(shared);
+ });
+
+ ///
+ /// Defers downstream delivery until the source emits its first changeset, then forwards all subsequent changesets.
+ ///
+ /// The type of the object.
+ /// The source to defer until the first changeset arrives.
+ /// A list changeset stream that begins emitting only after the source has produced its first changeset.
+ /// is .
+ ///
+ ///
+ /// Subscribes to the source immediately but buffers internally until the first changeset arrives, at which point it emits
+ /// the initial data and all subsequent changesets. This is useful when downstream consumers should not receive an empty initial state.
+ ///
+ ///
+ ///
+ ///
+ public static IObservable> DeferUntilLoaded(this IObservable> source)
+ where T : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ return new DeferUntilLoaded(source).Run();
+ }
+
+ ///
+ ///
+ ///
+ /// Convenience overload that calls source.Connect().DeferUntilLoaded().
+ ///
+ public static IObservable> DeferUntilLoaded(this IObservableList source)
+ where T : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ return source.Connect().DeferUntilLoaded();
+ }
+
+ ///
+ /// Suppresses empty changesets from the stream. Only changesets with at least one change are forwarded.
+ ///
+ /// The type of the item.
+ /// The source to suppress empty changesets.
+ /// A list changeset stream with empty changesets filtered out.
+ /// is .
+ ///
+ ///
+ public static IObservable> NotEmpty(this IObservable> source)
+ where T : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ return source.Where(s => s.Count != 0);
+ }
+
+ ///
+ /// Skips the initial changeset (the snapshot emitted on subscription) and forwards all subsequent changesets.
+ /// Internally defers until loaded, then skips the first emission.
+ ///
+ /// The type of the object.
+ /// The source to skip the initial changeset.
+ /// A list changeset stream that omits the initial snapshot.
+ /// is .
+ ///
+ ///
+ /// Warning: This operator assumes the initial changeset is empty. If the source emits a non-empty
+ /// initial snapshot, those items are silently dropped while downstream consumers remain unaware of them.
+ /// Any later Refresh, Replace, Remove, or Moved change targeting one of those
+ /// dropped items will throw because the downstream collection has no record of them. Only use this against
+ /// a source you know starts empty (for example, a that has not yet been populated).
+ ///
+ ///
+ ///
+ ///
+ public static IObservable> SkipInitial(this IObservable> source)
+ where T : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ return source.DeferUntilLoaded().Skip(1);
+ }
+
+ ///
+ /// Prepends an empty changeset to the source stream. Useful for initializing downstream consumers that expect an initial emission.
+ ///
+ /// The type of item.
+ /// The source to prepend an empty changeset to.
+ /// A list changeset stream that begins with an empty changeset.
+ ///
+ ///
+ ///
+ public static IObservable> StartWithEmpty(this IObservable> source)
+ where T : notnull => source.StartWith(ChangeSet.Empty);
+}
diff --git a/src/DynamicData/List/ObservableListEx.Combinators.cs b/src/DynamicData/List/ObservableListEx.Combinators.cs
new file mode 100644
index 00000000..77298688
--- /dev/null
+++ b/src/DynamicData/List/ObservableListEx.Combinators.cs
@@ -0,0 +1,355 @@
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Roland Pheasant licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System.Collections.ObjectModel;
+using System.ComponentModel;
+using System.Diagnostics.CodeAnalysis;
+using System.Linq.Expressions;
+using System.Reactive;
+using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+using System.Reactive.Linq;
+using DynamicData.Binding;
+using DynamicData.Cache.Internal;
+using DynamicData.List.Internal;
+using DynamicData.List.Linq;
+
+// ReSharper disable once CheckNamespace
+namespace DynamicData;
+
+///
+/// ObservableList extensions for set-style combinators (And, Or, Xor, Except).
+///
+public static partial class ObservableListEx
+{
+ ///
+ /// Applies a logical AND (intersection) between multiple list changeset streams.
+ /// Only items present in ALL sources appear in the result.
+ ///
+ /// The type of items in the lists.
+ /// The first source to intersect.
+ /// The additional changeset streams to intersect with.
+ /// A list changeset stream containing items that exist in every source.
+ /// is .
+ ///
+ ///
+ /// Uses reference counting per item across all sources. An item appears downstream only when
+ /// its reference count is non-zero in ALL sources. Item identity is determined by the default equality comparer.
+ ///
+ ///
+ /// EventBehavior
+ /// - Add/AddRangeThe item's reference count is incremented in its source tracker. If the item is now present in all sources, an Add is emitted.
+ /// - ReplaceThe old item's reference count is decremented and the new item's is incremented. Depending on whether each is present in ALL sources, this emits an Add, Remove, Replace, or nothing.
+ /// - Remove/RemoveRange/ClearThe item's reference count is decremented. If it was in the result and is no longer in all sources, a Remove is emitted.
+ /// - RefreshForwarded as Refresh if the item is currently in the result.
+ /// - MovedIgnored (set operations are position-independent).
+ ///
+ /// Worth noting: Item identity uses object equality, not position. Duplicate items in a single source are reference-counted independently.
+ ///
+ ///
+ ///
+ ///
+ ///
+ public static IObservable> And(this IObservable> source, params IObservable>[] others)
+ where T : notnull
+ {
+ others.ThrowArgumentNullExceptionIfNull(nameof(others));
+
+ return source.Combine(CombineOperator.And, others);
+ }
+
+ ///
+ /// A of changeset streams to intersect.
+ ///
+ ///
+ /// This overload accepts a pre-built collection of sources instead of a params array.
+ ///
+ public static IObservable> And(this ICollection>> sources)
+ where T : notnull => sources.Combine(CombineOperator.And);
+
+ ///
+ /// An of changeset streams. Sources can be added or removed dynamically.
+ ///
+ ///
+ /// This overload supports dynamic source management: adding or removing changeset streams from the observable list triggers re-evaluation.
+ ///
+ public static IObservable> And(this IObservableList>> sources)
+ where T : notnull => sources.Combine(CombineOperator.And);
+
+ ///
+ /// An of . Each inner list's changes are connected automatically.
+ ///
+ ///
+ /// This overload accepts instances directly, calling Connect() internally.
+ ///
+ public static IObservable> And(this IObservableList> sources)
+ where T : notnull => sources.Combine(CombineOperator.And);
+
+ ///
+ /// An of . Each inner list's changes are connected automatically.
+ ///
+ ///
+ /// This overload accepts instances directly, calling Connect() internally.
+ ///
+ public static IObservable> And(this IObservableList> sources)
+ where T : notnull => sources.Combine(CombineOperator.And);
+
+ private static IObservable> Combine(this ICollection>> sources, CombineOperator type)
+ where T : notnull
+ {
+ sources.ThrowArgumentNullExceptionIfNull(nameof(sources));
+
+ return new Combiner(sources, type).Run();
+ }
+
+ private static IObservable> Combine(this IObservable> source, CombineOperator type, params IObservable>[] others)
+ where T : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+ others.ThrowArgumentNullExceptionIfNull(nameof(others));
+
+ if (others.Length == 0)
+ {
+ throw new ArgumentException("Must be at least one item to combine with", nameof(others));
+ }
+
+ var items = source.EnumerateOne().Union(others).ToList();
+ return new Combiner(items, type).Run();
+ }
+
+ private static IObservable> Combine(this IObservableList> sources, CombineOperator type)
+ where T : notnull
+ {
+ sources.ThrowArgumentNullExceptionIfNull(nameof(sources));
+
+ return Observable.Create>(
+ observer =>
+ {
+ var changesSetList = sources.Connect().Transform(s => s.Connect()).AsObservableList();
+ var subscriber = changesSetList.Combine(type).SubscribeSafe(observer);
+ return new CompositeDisposable(changesSetList, subscriber);
+ });
+ }
+
+ private static IObservable> Combine(this IObservableList> sources, CombineOperator type)
+ where T : notnull
+ {
+ sources.ThrowArgumentNullExceptionIfNull(nameof(sources));
+
+ return Observable.Create>(
+ observer =>
+ {
+ var changesSetList = sources.Connect().Transform(s => s.Connect()).AsObservableList();
+ var subscriber = changesSetList.Combine(type).SubscribeSafe(observer);
+ return new CompositeDisposable(changesSetList, subscriber);
+ });
+ }
+
+ private static IObservable> Combine(this IObservableList>> sources, CombineOperator type)
+ where T : notnull
+ {
+ sources.ThrowArgumentNullExceptionIfNull(nameof(sources));
+
+ return new DynamicCombiner(sources, type).Run();
+ }
+
+ ///
+ /// Applies a logical set-difference (Except) between the source and other streams.
+ /// Items present in the first source but not in any of the are included in the result.
+ ///
+ /// The type of the item.
+ /// The primary from which other streams are subtracted.
+ /// The other changeset streams to exclude from the result.
+ /// A list changeset stream containing items from that are not in any of .
+ /// is .
+ ///
+ ///
+ /// Item identity is determined by the default equality comparer for . Across all sources, items are tracked
+ /// by reference-counted equality (not by index position).
+ /// The first source has a special role: only items from it can appear in the result, and only if they do not exist in any other source.
+ ///
+ ///
+ /// EventBehavior
+ /// - Add/AddRange (first source)If the item does not exist in any other source, an Add is emitted.
+ /// - Add/AddRange (other source)If the item was in the result (from first source), a Remove is emitted.
+ /// - Remove/RemoveRange/Clear (first source)If the item was in the result, a Remove is emitted.
+ /// - Remove/RemoveRange/Clear (other source)If the item exists in the first source and no longer in any other, an Add is emitted.
+ /// - ReplaceTreated as a Remove of the old item plus an Add of the new item, with set logic re-evaluated.
+ /// - MovedIgnored by the set logic (no positional semantics).
+ /// - RefreshForwarded if the item is currently in the result set.
+ ///
+ /// Worth noting: Unlike , the first source is asymmetric: only its items can appear in the result.
+ ///
+ ///
+ ///
+ ///
+ ///
+ public static IObservable> Except(this IObservable> source, params IObservable>[] others)
+ where T : notnull
+ {
+ others.ThrowArgumentNullExceptionIfNull(nameof(others));
+
+ return source.Combine(CombineOperator.Except, others);
+ }
+
+ ///
+ ///
+ ///
+ /// Static overload accepting a pre-built collection of sources. The first item in the collection is the primary source.
+ ///
+ public static IObservable> Except(this ICollection>> sources)
+ where T : notnull => sources.Combine(CombineOperator.Except);
+
+ ///
+ ///
+ ///
+ /// Dynamic overload: sources can be added or removed from the at runtime. The first source in the list acts as the primary.
+ ///
+ public static IObservable> Except(this IObservableList>> sources)
+ where T : notnull => sources.Combine(CombineOperator.Except);
+
+ ///
+ ///
+ ///
+ /// Dynamic overload accepting of . Each inner list's Connect() is used as a source.
+ ///
+ public static IObservable> Except(this IObservableList> sources)
+ where T : notnull => sources.Combine(CombineOperator.Except);
+
+ ///
+ ///
+ ///
+ /// Dynamic overload accepting of . Each inner list's Connect() is used as a source.
+ ///
+ public static IObservable> Except(this IObservableList> sources)
+ where T : notnull => sources.Combine(CombineOperator.Except);
+
+ ///
+ ///
+ /// Applies a logical OR (union) between a pre-built collection of list changeset sources. Items present in any source are included.
+ ///
+ ///
+ public static IObservable> Or(this ICollection>> sources)
+ where T : notnull => sources.Combine(CombineOperator.Or);
+
+ ///
+ /// Applies a logical OR (union) between the source and other list changeset streams.
+ /// Items present in any of the sources are included in the result, using reference-counted equality.
+ ///
+ /// The type of the item.
+ /// The primary source to union.
+ /// The other changeset streams to combine with.
+ /// A list changeset stream containing items that exist in at least one source.
+ /// is .
+ ///
+ ///
+ /// Item identity is determined by the default equality comparer for . Uses reference-counted equality: an item is included when it first appears in any source and removed when it no longer exists in any source.
+ /// Moved changes are ignored by the set logic.
+ ///
+ ///
+ /// EventBehavior
+ /// - Add/AddRange (any source)If the item is new to the result, an Add is emitted. Otherwise the reference count is incremented.
+ /// - Remove/RemoveRange/Clear (any source)Reference count decremented. If count reaches zero, a Remove is emitted.
+ /// - ReplaceOld item reference count decremented, new item reference count incremented. Add/Remove emitted as needed.
+ /// - RefreshForwarded if the item is in the result set.
+ /// - MovedIgnored.
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ public static IObservable> Or(this IObservable> source, params IObservable>[] others)
+ where T : notnull
+ {
+ others.ThrowArgumentNullExceptionIfNull(nameof(others));
+
+ return source.Combine(CombineOperator.Or, others);
+ }
+
+ ///
+ ///
+ /// Dynamic OR: sources can be added or removed from the at runtime.
+ ///
+ public static IObservable> Or(this IObservableList>> sources)
+ where T : notnull => sources.Combine(CombineOperator.Or);
+
+ ///
+ ///
+ /// Dynamic OR accepting of . Each inner list's Connect() is used as a source.
+ ///
+ public static IObservable> Or(this IObservableList> sources)
+ where T : notnull => sources.Combine(CombineOperator.Or);
+
+ ///
+ ///
+ /// Dynamic OR accepting of . Each inner list's Connect() is used as a source.
+ ///
+ public static IObservable> Or(this IObservableList> sources)
+ where T : notnull => sources.Combine(CombineOperator.Or);
+
+ ///
+ /// Applies a logical XOR (symmetric difference) between the source and other streams.
+ /// Items present in exactly one source are included in the result.
+ ///
+ /// The type of the item.
+ /// The primary source to exclusively combine.
+ /// The other changeset streams to combine with.
+ /// A list changeset stream containing items that exist in exactly one source.
+ /// is .
+ ///
+ ///
+ /// Item identity is determined by the default equality comparer for . Uses reference-counted equality: an item is included when it exists in exactly one source.
+ /// If it appears in a second source, it is removed from the result. If it then leaves one source,
+ /// it re-enters the result. Moved changes are ignored.
+ ///
+ ///
+ /// EventBehavior
+ /// - Add/AddRangeReference count updated. If the item is now in exactly one source, an Add is emitted. If now in two or more, a Remove is emitted.
+ /// - Remove/RemoveRange/ClearReference count decremented. If now in exactly one source, an Add is emitted. If now in zero, a Remove is emitted.
+ /// - ReplaceOld item reference count decremented, new item incremented, with Xor logic applied.
+ /// - RefreshForwarded if item is in the result set.
+ /// - MovedIgnored.
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ public static IObservable> Xor(this IObservable> source, params IObservable>[] others)
+ where T : notnull
+ {
+ others.ThrowArgumentNullExceptionIfNull(nameof(others));
+
+ return source.Combine(CombineOperator.Xor, others);
+ }
+
+ ///
+ ///
+ /// Applies a logical XOR between a pre-built collection of list changeset sources.
+ ///
+ public static IObservable> Xor(this ICollection>> sources)
+ where T : notnull => sources.Combine(CombineOperator.Xor);
+
+ ///
+ ///
+ /// Dynamic XOR: sources can be added or removed from the at runtime.
+ ///
+ public static IObservable> Xor(this IObservableList>> sources)
+ where T : notnull => sources.Combine(CombineOperator.Xor);
+
+ ///
+ ///
+ /// Dynamic XOR accepting of . Each inner list's Connect() is used as a source.
+ ///
+ public static IObservable> Xor(this IObservableList> sources)
+ where T : notnull => sources.Combine(CombineOperator.Xor);
+
+ ///
+ ///
+ /// Dynamic XOR accepting of . Each inner list's Connect() is used as a source.
+ ///
+ public static IObservable> Xor(this IObservableList> sources)
+ where T : notnull => sources.Combine(CombineOperator.Xor);
+}
diff --git a/src/DynamicData/List/ObservableListEx.Conversions.cs b/src/DynamicData/List/ObservableListEx.Conversions.cs
new file mode 100644
index 00000000..2376273d
--- /dev/null
+++ b/src/DynamicData/List/ObservableListEx.Conversions.cs
@@ -0,0 +1,295 @@
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Roland Pheasant licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System.Collections.ObjectModel;
+using System.ComponentModel;
+using System.Diagnostics.CodeAnalysis;
+using System.Linq.Expressions;
+using System.Reactive;
+using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+using System.Reactive.Linq;
+using DynamicData.Binding;
+using DynamicData.Cache.Internal;
+using DynamicData.List.Internal;
+using DynamicData.List.Linq;
+
+// ReSharper disable once CheckNamespace
+namespace DynamicData;
+
+///
+/// ObservableList extensions for type and shape conversions.
+///
+public static partial class ObservableListEx
+{
+ ///
+ /// Adds a key to each item in a list changeset, converting it to a cache changeset that supports all keyed DynamicData operators.
+ ///
+ /// The type of items in the list.
+ /// The type of the key.
+ /// The source to add keys to, converting to a cache changeset.
+ /// A function to extract a unique key from each item.
+ /// A cache changeset stream with keyed items.
+ /// or is .
+ ///
+ ///
+ /// All index information is dropped during conversion because cache changesets are unordered by default.
+ /// Use this when you need to transition from list-based pipelines to cache-based operators (Filter by key, Join, Group, etc.).
+ ///
+ ///
+ ///
+ public static IObservable> AddKey(this IObservable> source, Func keySelector)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+ keySelector.ThrowArgumentNullExceptionIfNull(nameof(keySelector));
+
+ return source.Select(changes => new ChangeSet(new AddKeyEnumerator(changes, keySelector)));
+ }
+
+ ///
+ /// Wraps a as a read-only , hiding mutation methods.
+ ///
+ /// The type of items in the list.
+ /// The mutable source list to wrap.
+ /// A read-only observable list that mirrors the source.
+ /// is .
+ public static IObservableList AsObservableList(this ISourceList source)
+ where T : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ return new AnonymousObservableList(source);
+ }
+
+ ///
+ /// Materializes a changeset stream into a read-only .
+ /// The list is kept in sync with the source stream for the lifetime of the subscription.
+ ///
+ /// The type of items in the list.
+ /// The source to materialize into a read-only list.
+ /// A read-only observable list reflecting the current state of the stream.
+ /// is .
+ ///
+ ///
+ /// This is the primary way to multicast a changeset pipeline. Materializing once into an ,
+ /// then calling Connect() on the result for each downstream consumer, ensures the upstream operators are evaluated only once
+ /// regardless of how many subscribers consume the result.
+ ///
+ ///
+ ///
+ public static IObservableList AsObservableList(this IObservable> source)
+ where T : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ return new AnonymousObservableList(source);
+ }
+
+ ///
+ /// Casts each item in the changeset from object to using a direct cast.
+ ///
+ /// The target type to cast to.
+ /// The source of object items.
+ /// A list changeset stream of cast items.
+ /// is .
+ ///
+ ///
+ public static IObservable> Cast(this IObservable> source)
+ where TDestination : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ return source.Select(changes => changes.Transform(t => (TDestination)t));
+ }
+
+ ///
+ /// Transforms each item in the changeset using a conversion function.
+ ///
+ /// The source item type.
+ /// The destination item type.
+ /// The source to cast.
+ /// A function to convert each item from to .
+ /// A list changeset stream of converted items.
+ /// or is .
+ /// Use this overload when type inference requires explicit specification of both source and destination types. Alternatively, call first, then the single-type-parameter overload.
+ ///
+ ///
+ public static IObservable> Cast(this IObservable> source, Func conversionFactory)
+ where TSource : notnull
+ where TDestination : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ conversionFactory.ThrowArgumentNullExceptionIfNull(nameof(conversionFactory));
+
+ return source.Select(changes => changes.Transform(conversionFactory));
+ }
+
+ ///
+ /// Casts each item in the changeset to object. Typically used before to work around type inference limitations.
+ ///
+ /// The source item type (must be a reference type).
+ /// The source to cast to object.
+ /// A list changeset stream of object items.
+ ///
+ public static IObservable> CastToObject(this IObservable> source)
+ where T : class => source.Select(changes => changes.Transform(t => (object)t));
+
+ ///
+ /// Applies each changeset to the target list as a side effect, keeping it synchronized with the source.
+ ///
+ /// The type of items in the list.
+ /// The source to clone.
+ /// The target list to clone changes into.
+ /// A continuation of the source changeset stream.
+ /// is .
+ ///
+ /// Lower-level than . Uses .Clone() to apply all changeset operations directly.
+ ///
+ ///
+ ///
+ public static IObservable> Clone(this IObservable> source, IList target)
+ where T : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ return source.Do(target.Clone);
+ }
+
+ ///
+ /// Convert the object using the specified conversion function.
+ /// This is a lighter equivalent of Transform and is designed to be used with non-disposable objects.
+ ///
+ /// The type of items in the list.
+ /// The type of the destination items.
+ /// The source to convert.
+ /// The conversion factory.
+ /// An observable which emits the change set.
+ [Obsolete("Prefer Cast as it is does the same thing but is semantically correct")]
+ public static IObservable> Convert(this IObservable> source, Func conversionFactory)
+ where TObject : notnull
+ where TDestination : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ conversionFactory.ThrowArgumentNullExceptionIfNull(nameof(conversionFactory));
+
+ return source.Select(changes => changes.Transform(conversionFactory));
+ }
+
+ ///
+ /// Flattens buffered changesets (e.g. from ) back into single changesets.
+ /// Empty buffers are dropped.
+ ///
+ /// The type of the item.
+ /// The of buffered changeset lists.
+ /// A list changeset stream with all buffered changes concatenated into single changesets.
+ ///
+ /// Use this after applying Observable.Buffer() to a changeset stream to re-merge the batched changesets into a single stream.
+ ///
+ ///
+ ///
+ public static IObservable> FlattenBufferResult(this IObservable>> source)
+ where T : notnull => source.Where(x => x.Count != 0).Select(updates => new ChangeSet(updates.SelectMany(u => u)));
+
+ ///
+ /// Invokes once for every in each changeset. Range changes
+ /// (AddRange, RemoveRange, Clear) are delivered as a single ; they are not flattened into per-item changes.
+ /// The changeset is forwarded downstream unchanged.
+ ///
+ /// The type of items in the list.
+ /// The source to observe each change in.
+ /// The action invoked for each .
+ /// A continuation of the source changeset stream.
+ /// or is .
+ ///
+ /// This is a side-effect operator. It does not modify the changeset. If you need each individual item from range operations flattened out, use instead.
+ ///
+ /// EventBehavior
+ /// - Add/Replace/Remove/Moved/RefreshCallback invoked with the (single-item change). Changeset forwarded.
+ /// - AddRange/RemoveRange/ClearCallback invoked once with the containing the range (accessible via Range property). Changeset forwarded.
+ /// - OnErrorIf the callback throws, the exception propagates as OnError.
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ public static IObservable> ForEachChange(this IObservable> source, Action> action)
+ where TObject : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ action.ThrowArgumentNullExceptionIfNull(nameof(action));
+
+ return source.Do(changes => changes.ForEach(action));
+ }
+
+ ///
+ /// Invokes for every individual in each changeset.
+ /// Range changes are flattened into individual item changes first, so the callback only receives Add, Replace, Remove, and Refresh.
+ ///
+ /// The type of items in the list.
+ /// The source to observe each item-level change in.
+ /// The action invoked for each individual item change.
+ /// A continuation of the source changeset stream.
+ /// or is .
+ ///
+ ///
+ /// Unlike , this operator flattens
+ /// AddRange, RemoveRange, and Clear into individual entries before invoking the callback.
+ ///
+ ///
+ ///
+ public static IObservable> ForEachItemChange(this IObservable> source, Action> action)
+ where TObject : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ action.ThrowArgumentNullExceptionIfNull(nameof(action));
+
+ return source.Do(changes => changes.Flatten().ForEach(action));
+ }
+
+ ///
+ /// Reference-counted materialization of the source changeset stream into an .
+ /// The shared list is created on the first subscriber and disposed when the last subscriber unsubscribes.
+ ///
+ /// The type of the item.
+ /// The source to share via reference counting.
+ /// A list changeset stream backed by a shared, reference-counted .
+ /// is .
+ ///
+ /// Equivalent to Publish().RefCount() for changeset streams. The underlying list is created lazily on first subscription.
+ ///
+ ///
+ public static IObservable> RefCount(this IObservable> source)
+ where T : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ return new RefCount(source).Run();
+ }
+
+ ///
+ /// Strips index information from all changes in the stream.
+ ///
+ /// The type of the object.
+ /// The source to strip index information.
+ /// A list changeset stream with all index values removed from changes.
+ /// is .
+ ///
+ /// Removes index positions from every change in each changeset. This is useful when downstream operators do not require or support index-based operations.
+ ///
+ ///
+ public static IObservable> RemoveIndex(this IObservable> source)
+ where T : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ return source.Select(changes => new ChangeSet(changes.YieldWithoutIndex()));
+ }
+}
diff --git a/src/DynamicData/List/ObservableListEx.Expiration.cs b/src/DynamicData/List/ObservableListEx.Expiration.cs
new file mode 100644
index 00000000..4407c9c6
--- /dev/null
+++ b/src/DynamicData/List/ObservableListEx.Expiration.cs
@@ -0,0 +1,125 @@
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Roland Pheasant licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System.Collections.ObjectModel;
+using System.ComponentModel;
+using System.Diagnostics.CodeAnalysis;
+using System.Linq.Expressions;
+using System.Reactive;
+using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+using System.Reactive.Linq;
+using DynamicData.Binding;
+using DynamicData.Cache.Internal;
+using DynamicData.List.Internal;
+using DynamicData.List.Linq;
+
+// ReSharper disable once CheckNamespace
+namespace DynamicData;
+
+///
+/// ObservableList extensions for ExpireAfter, LimitSizeTo, and Top.
+///
+public static partial class ObservableListEx
+{
+ ///
+ /// Automatically removes items from the list after the duration returned by .
+ /// Returns an observable of the items that were expired and removed.
+ ///
+ /// The type of the item.
+ /// The source list to apply time-based expiration to.
+ /// A function returning the time-to-live for each item. Return for items that should never expire.
+ /// An optional polling interval to batch expiry checks. If omitted, a separate timer is created for each unique expiry time.
+ /// The scheduler for scheduling expiry timers. Defaults to .
+ /// An observable that emits collections of items each time expired items are removed from the source list.
+ ///
+ ///
+ /// This operator acts directly on an , not on a changeset stream. It monitors items as they are added,
+ /// schedules their removal, and physically removes them from the source list when their time expires.
+ ///
+ ///
+ /// When is specified, all items due for removal are batched into a single removal at each polling tick,
+ /// which can improve performance when many items expire around the same time.
+ ///
+ /// Worth noting: The returned observable emits the expired items (not changesets). Subscribe to this observable to trigger the expiry mechanism; if not subscribed, no items will be removed.
+ ///
+ ///
+ ///
+ public static IObservable> ExpireAfter(
+ this ISourceList source,
+ Func timeSelector,
+ TimeSpan? pollingInterval = null,
+ IScheduler? scheduler = null)
+ where T : notnull
+ => List.Internal.ExpireAfter.Create(
+ source: source,
+ timeSelector: timeSelector,
+ pollingInterval: pollingInterval,
+ scheduler: scheduler);
+
+ ///
+ /// Limits the source list to a maximum number of items using FIFO eviction.
+ /// When the list exceeds , the oldest items are removed.
+ /// Returns an observable of the items that were removed.
+ ///
+ /// The type of the item.
+ /// The source list to apply size limits to.
+ /// The maximum number of items allowed. Must be greater than zero.
+ /// The scheduler for scheduling size checks. Defaults to .
+ /// An observable that emits collections of items each time excess items are removed from the source list.
+ /// is .
+ /// is zero or negative.
+ ///
+ ///
+ /// This operator acts directly on an . It subscribes to the source's changes,
+ /// tracks insertion order using an internal Transform, and removes the oldest items when the size limit is exceeded.
+ ///
+ /// Worth noting: The returned observable emits the removed items (not changesets). Subscribe to this observable to activate the size-limiting mechanism. Removal is performed synchronously under a lock shared with the change tracking.
+ ///
+ ///
+ ///
+ public static IObservable> LimitSizeTo(this ISourceList source, int sizeLimit, IScheduler? scheduler = null)
+ where T : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ if (sizeLimit <= 0)
+ {
+ throw new ArgumentException("sizeLimit cannot be zero", nameof(sizeLimit));
+ }
+
+ var locker = InternalEx.NewLock();
+ var limiter = new LimitSizeTo(source, sizeLimit, scheduler ?? GlobalConfig.DefaultScheduler, locker);
+
+ return limiter.Run().Synchronize(locker).Do(source.RemoveMany);
+ }
+
+ ///
+ /// Takes the first items from the source list. Implemented as Virtualise with a fixed window starting at index 0.
+ ///
+ /// The type of the item.
+ /// The source to take the top items.
+ /// The maximum number of items to include. Must be greater than zero.
+ /// A virtual changeset stream containing at most items from the beginning of the source.
+ /// is .
+ /// is zero or negative.
+ ///
+ /// The source should ideally be sorted before applying Top, since list order determines which items appear.
+ ///
+ ///
+ ///
+ ///
+ public static IObservable> Top(this IObservable> source, int numberOfItems)
+ where T : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ if (numberOfItems <= 0)
+ {
+ throw new ArgumentOutOfRangeException(nameof(numberOfItems), "Number of items should be greater than zero");
+ }
+
+ return source.Virtualise(Observable.Return(new VirtualRequest(0, numberOfItems)));
+ }
+}
diff --git a/src/DynamicData/List/ObservableListEx.Filter.cs b/src/DynamicData/List/ObservableListEx.Filter.cs
new file mode 100644
index 00000000..db4c9c4a
--- /dev/null
+++ b/src/DynamicData/List/ObservableListEx.Filter.cs
@@ -0,0 +1,351 @@
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Roland Pheasant licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System.Collections.ObjectModel;
+using System.ComponentModel;
+using System.Diagnostics.CodeAnalysis;
+using System.Linq.Expressions;
+using System.Reactive;
+using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+using System.Reactive.Linq;
+using DynamicData.Binding;
+using DynamicData.Cache.Internal;
+using DynamicData.List.Internal;
+using DynamicData.List.Linq;
+
+// ReSharper disable once CheckNamespace
+namespace DynamicData;
+
+///
+/// ObservableList extensions for filtering and change-reason gating.
+///
+public static partial class ObservableListEx
+{
+ ///
+ /// Extracts distinct values from source items using , with reference counting to track when values enter and leave the result set.
+ ///
+ /// The type of items in the source list.
+ /// The type of distinct values produced.
+ /// The source to extract distinct values.
+ /// A function that extracts the value to track from each source item.
+ /// A list changeset stream of distinct values.
+ /// or is .
+ ///
+ ///
+ /// Maintains an internal reference count per distinct value. A value is included when its count first exceeds zero
+ /// and removed when its count drops back to zero.
+ ///
+ ///
+ /// EventBehavior
+ /// - Add/AddRangeValue extracted. If first occurrence, an Add is emitted. Otherwise the reference count is incremented silently.
+ /// - ReplaceOld value's reference count decremented (removed if zero), new value's count incremented (added if first). If the value did not change, no emission.
+ /// - Remove/RemoveRangeReference count decremented. If the count reaches zero, a Remove is emitted for that distinct value.
+ /// - RefreshValue is re-extracted. If changed, old value decremented and new value incremented (same as Replace logic).
+ /// - ClearAll reference counts cleared. Remove emitted for every tracked distinct value.
+ ///
+ ///
+ ///
+ public static IObservable> DistinctValues(this IObservable> source, Func valueSelector)
+ where TObject : notnull
+ where TValue : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ valueSelector.ThrowArgumentNullExceptionIfNull(nameof(valueSelector));
+
+ return new Distinct(source, valueSelector).Run();
+ }
+
+ ///
+ /// Filters items from the source list changeset stream using a static predicate.
+ /// Only items satisfying are included downstream.
+ ///
+ /// The type of items in the list.
+ /// The source to filter.
+ /// A predicate that determines which items are included. Items returning appear downstream; items returning are excluded.
+ /// A list changeset stream containing only items that satisfy .
+ /// Thrown when or is .
+ ///
+ ///
+ /// Use this overload when you need only a single predicate function for the lifetime of the subscription;
+ /// unlike the dynamic-predicate and state-driven overloads, the predicate function itself never changes.
+ /// Note that this does not mean an item's inclusion is fixed: Refresh events can re-evaluate each item against the predicate
+ /// and promote a previously-excluded item to included (or vice versa).
+ /// Item ordering is preserved.
+ ///
+ ///
+ /// EventBehavior
+ /// - AddThe predicate is evaluated. If the item passes, an Add is emitted at the calculated downstream index. Otherwise dropped.
+ /// - AddRangeEach item in the range is evaluated. Matching items are emitted as an AddRange.
+ /// - ReplaceThe predicate is re-evaluated. Four outcomes: both pass produces Replace; new passes but old didn't produces Add; old passed but new doesn't produces Remove; neither passes is dropped.
+ /// - RemoveIf the item was included downstream, a Remove is emitted. Otherwise dropped.
+ /// - RemoveRangeIncluded items in the range are emitted as individual Remove changes.
+ /// - RefreshThe predicate is re-evaluated. If the item now passes but previously did not, an Add is emitted. If it previously passed but no longer does, a Remove is emitted. If still passes, the Refresh is forwarded. If still fails, dropped.
+ /// - ClearAll downstream items are cleared.
+ ///
+ /// Worth noting: Refresh events trigger re-evaluation, which can promote or demote items (turning a Refresh into an Add or Remove). Pair with for property-change-driven filtering.
+ ///
+ ///
+ ///
+ ///
+ ///
+ public static IObservable> Filter(
+ this IObservable> source,
+ Func predicate)
+ where T : notnull
+ => List.Internal.Filter.Static.Create(
+ source: source,
+ predicate: predicate,
+ suppressEmptyChangesets: true);
+
+ ///
+ /// Filters items using a dynamically changing predicate.
+ /// When emits a new function, all items are re-evaluated.
+ ///
+ /// The type of the item.
+ /// The source to filter.
+ /// An that emits new predicate functions. Each emission triggers a full re-evaluation of all items.
+ /// The that controls re-filtering behavior when the predicate changes.
+ /// A list changeset stream containing only items that satisfy the most recent predicate.
+ ///
+ ///
+ /// Each time emits, every item is re-evaluated against the new predicate.
+ ///
+ ///
+ /// EventBehavior
+ /// - AddThe current predicate is evaluated. If the item passes, an Add is emitted. Otherwise dropped.
+ /// - AddRangeEach item is evaluated. Matching items are emitted as AddRange.
+ /// - ReplaceRe-evaluated. Same four-outcome logic as the static overload (Replace, Add, Remove, or dropped).
+ /// - RemoveIf the item was downstream, a Remove is emitted. Otherwise dropped.
+ /// - RefreshRe-evaluated. If inclusion status changed, an Add or Remove is emitted. If unchanged, Refresh forwarded or dropped.
+ /// - ClearAll downstream items are cleared.
+ /// - Predicate changedAll items are re-evaluated against the new predicate. The output is shaped by .
+ /// - OnCompletedIndependent completion of does not terminate the filter.
+ ///
+ /// Worth noting: No items are included until emits its first function.
+ ///
+ /// or is .
+ ///
+ ///
+ public static IObservable> Filter(this IObservable> source, IObservable> predicate, ListFilterPolicy filterPolicy = ListFilterPolicy.CalculateDiff)
+ where T : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ predicate.ThrowArgumentNullExceptionIfNull(nameof(predicate));
+
+ return new List.Internal.Filter.Dynamic(source, predicate, filterPolicy).Run();
+ }
+
+ ///
+ /// Filters items using a predicate that receives external state. When emits a new state value,
+ /// all items are re-evaluated against using the updated state.
+ ///
+ /// The type of the item.
+ /// The type of state value required by .
+ /// The source to filter.
+ /// An stream of state values to be passed to .
+ /// A static predicate receiving the current state and an item, returning to include or to exclude. The function itself does not change; only the state value passed to it changes.
+ /// The that controls re-filtering behavior when the state changes.
+ /// When (default), empty changesets are suppressed. Set to to publish empty changesets (useful for monitoring loading status).
+ /// A list changeset stream containing only items satisfying with the current state.
+ /// , , or is .
+ ///
+ ///
+ /// The predicate cannot be invoked until the first state value is received. Until then, all items are treated as excluded.
+ /// Each subsequent state emission triggers a full re-evaluation of all items according to .
+ ///
+ ///
+ /// EventBehavior
+ /// - Add/AddRangeEvaluated using current state. Matching items emitted as Add/AddRange.
+ /// - ReplaceRe-evaluated. Same four-outcome logic as the static filter (Replace, Add, Remove, or dropped).
+ /// - Remove/RemoveRangeIf the item was downstream, a Remove is emitted.
+ /// - RefreshRe-evaluated against current state. Inclusion status may change.
+ /// - ClearAll downstream items are cleared.
+ /// - State changedAll items are re-evaluated with the new state value. The output is shaped by .
+ ///
+ ///
+ ///
+ ///
+ public static IObservable> Filter(
+ this IObservable> source,
+ IObservable predicateState,
+ Func predicate,
+ ListFilterPolicy filterPolicy = ListFilterPolicy.CalculateDiff,
+ bool suppressEmptyChangeSets = true)
+ where T : notnull
+ => List.Internal.Filter.WithPredicateState.Create(
+ source: source,
+ predicateState: predicateState,
+ predicate: predicate,
+ filterPolicy: filterPolicy,
+ suppressEmptyChangeSets: suppressEmptyChangeSets);
+
+ ///
+ /// Filters each item using a per-item of that dynamically controls inclusion.
+ /// When an item's observable emits the item enters the result; when it emits the item is removed.
+ ///
+ /// The type of items in the list.
+ /// The source to filter by property value.
+ /// A function that returns an observable of for each item, controlling its inclusion.
+ /// An optional throttle duration applied to each per-item observable to reduce re-evaluation frequency.
+ /// The used when throttling. Defaults to the system default scheduler.
+ /// A list changeset stream containing only items whose per-item observable most recently emitted .
+ /// or is .
+ ///
+ ///
+ /// Each item in the source gets its own subscription to the observable returned by .
+ /// The item's inclusion is determined by the most recent boolean value emitted by that observable.
+ ///
+ ///
+ /// Event (source)Behavior
+ /// - Add/AddRangeSubscribes to the per-item observable. Item is included when it first emits .
+ /// - ReplaceOld subscription disposed, new subscription created for the replacement item.
+ /// - Remove/RemoveRange/ClearSubscription disposed. If the item was downstream, a Remove is emitted.
+ /// - RefreshForwarded if the item is currently included.
+ ///
+ ///
+ /// Event (per-item observable)Behavior
+ /// - Emits If not already included, an Add is emitted downstream.
+ /// - Emits If currently included, a Remove is emitted downstream.
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ public static IObservable> FilterOnObservable(this IObservable> source, Func> objectFilterObservable, TimeSpan? propertyChangedThrottle = null, IScheduler? scheduler = null)
+ where TObject : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ return new FilterOnObservable(source, objectFilterObservable, propertyChangedThrottle, scheduler).Run();
+ }
+
+ ///
+ /// Filters items based on a property value, automatically re-evaluating when the specified property changes on any item.
+ ///
+ /// The type of the object. Must implement