It might be possible that there will be new emission(s) in the middle of replaying the previous emission(s) when subscribing to a (serialized) replay subject.
It seems like the replay loop only takes up to the last element of the queue at the start of the loop. New emission(s) that are added to the queue while replaying are not being retrieved which results to those emission(s) to be missed.
auto get_observable() const
{
return create_subject_on_subscribe_observable<Type, optimal_disposables_strategy>([state = m_state]<rpp::constraint::observer_of_type<Type> TObs>(TObs&& observer) {
const auto locked = state.lock();
for (auto&& value : locked->get_actual_values())
observer.on_next(std::move(value.value));
locked->on_subscribe(std::forward<TObs>(observer));
});
}
Attached is a simple project that can replicated the issue (at least with my setup).
rpp_replay_test.zip
It might be possible that there will be new emission(s) in the middle of replaying the previous emission(s) when subscribing to a (serialized) replay subject.
It seems like the replay loop only takes up to the last element of the queue at the start of the loop. New emission(s) that are added to the queue while replaying are not being retrieved which results to those emission(s) to be missed.
Attached is a simple project that can replicated the issue (at least with my setup).
rpp_replay_test.zip