Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions lib/utils/signal_wrapper.dart
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class _MultiSignalWrapper implements SignalWrapper {
});

final Iterable<ProcessSignal> _signals;
final Completer<ProcessSignal> _completer;
final Completer<Null> _completer;

@override
final OnDisposeCallback? onDispose;
Expand All @@ -63,15 +63,13 @@ class _MultiSignalWrapper implements SignalWrapper {
final subscriptions = [
for (final signal in _signals)
signal.watch().listen((signal) {
if (!_completer.isCompleted) _completer.complete(null);
if (onSignal?.call(signal) case final Future f) futures.add(f);
if (!_completer.isCompleted) _completer.complete(signal);
}),
];

try {
final result = await Future.any([_completer.future, fn()]);
if (result is T) return result;
return null;
return await Future.any([_completer.future, fn()]);
} catch (_) {
rethrow;
} finally {
Expand Down Expand Up @@ -106,8 +104,8 @@ class _SingleSignalWrapper implements SignalWrapper {
Future<T?> call<T>(Future<T> Function() fn) async {
final futures = <Future>[];
final subscription = _signal.watch().listen((signal) {
if (onSignal?.call(signal) case final Future future) futures.add(future);
if (!_completer.isCompleted) _completer.complete(null);
if (onSignal?.call(signal) case final Future future) futures.add(future);
});

try {
Expand Down
20 changes: 8 additions & 12 deletions lib/utils/speed_monitor.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,19 @@ final class _SpeedSample with LinkedListEntry<_SpeedSample> {
}

/// A utility class for monitoring processing speed over a sliding time window.
///
/// It tracks the number of units processed over a specified [windowDuration]
/// (defaulting to 1 second) and calculates the current speed.
class SpeedMonitor implements Disposable {
static const double _zero = 0;

/// Creates a [SpeedMonitor] with a given [windowDuration].
/// Creates a [SpeedMonitor]
factory SpeedMonitor() => ._(samples: .new());

const SpeedMonitor._({required this._samples})
: windowDuration = const .new(seconds: 1);
const SpeedMonitor._({
required this._samples,
this._windowDuration = const .new(seconds: 1),
});

/// The duration of the sliding window used for speed calculation.
final Duration windowDuration;
final Duration _windowDuration;

/// A linked list of samples currently within the time window.
final LinkedList<_SpeedSample> _samples;
Expand All @@ -34,9 +33,6 @@ class SpeedMonitor implements Disposable {
}

/// Adds a sample of the [totalProcessedUnits] and returns the current speed.
///
/// The speed is calculated as units per [windowDuration].
/// It automatically removes samples that are older than the [windowDuration].
double add(int totalProcessedUnits) {
final _SpeedSample last = .new(totalProcessedUnits);

Expand All @@ -48,13 +44,13 @@ class SpeedMonitor implements Disposable {

int baseDeltaTime = last.time - first.time;

while (baseDeltaTime > windowDuration.inMicroseconds) {
while (baseDeltaTime > _windowDuration.inMicroseconds) {
first.unlink();
first = _samples.first;
baseDeltaTime = last.time - first.time;
}

final deltaTime = baseDeltaTime / windowDuration.inMicroseconds;
final deltaTime = baseDeltaTime / _windowDuration.inMicroseconds;

final deltaUnits = last.units - first.units;

Expand Down
16 changes: 8 additions & 8 deletions modules/local_store/lib/src/json_local_store.dart
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
part of "local_store.dart";

class _JSONLocalStore implements LocalStore {
static final _jsonBase64Decoder = utf8.decoder
static final Converter<List<int>, Object?> _jsonBase64Decoder = utf8.decoder
.fuse(base64.decoder)
.fuse(utf8.decoder)
.fuse(json.decoder);

static final _jsonBase64Encoder = json.encoder
static final Converter<Object?, List<int>> _jsonBase64Encoder = json.encoder
.fuse(utf8.encoder)
.fuse(base64.encoder);
.fuse(base64.encoder)
.fuse(utf8.encoder);

_JSONLocalStore(this.path);
_JSONLocalStore(this.path) : _cache = .new(), _file = .new(path);

final String path;
late final File _file = .new(path);

final _cache = <String, dynamic>{};
final Map<String, dynamic> _cache;
final File _file;

Completer<void>? _loadCompleter;

Expand All @@ -30,13 +31,12 @@ class _JSONLocalStore implements LocalStore {
} catch (_) {
} finally {
completer.complete();
if (completer != _loadCompleter) _loadCompleter?.complete();
}
}

Future<void> _save() async {
if (!await _file.exists()) await _file.create(recursive: true);
await _file.writeAsString(_jsonBase64Encoder.convert(_cache), flush: true);
await _file.writeAsBytes(_jsonBase64Encoder.convert(_cache), flush: true);
}

@override
Expand Down
13 changes: 13 additions & 0 deletions modules/nested_map/lib/src/extension.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ extension MapExtension<K extends String> on Map<K, dynamic> {
dynamic _nestedGet(List<K> keys) {
return switch (this[keys.removeAt(0)]) {
final Map<K, dynamic> map => map._nestedGet(keys),
final List list => list._nestedGet(keys),
final value => value,
};
}
Expand All @@ -32,3 +33,15 @@ extension MapExtension<K extends String> on Map<K, dynamic> {
_nestedSet(keys, lastKey, value);
}
}

extension ListExtension on List {
dynamic _nestedGet(List<String> keys) {
if (int.tryParse(keys.removeAt(0)) case final index?) {
return switch (this[index]) {
final List list => list._nestedGet(keys),
final Map<String, dynamic> map => map._nestedGet(keys),
final value => value,
};
}
}
}
Loading