Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 52 additions & 51 deletions aether/ae_actions/time_sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,52 +76,56 @@ auto TimeSyncRequest::EnsureConnected() {
}

auto TimeSyncRequest::SyncRequest() {
return ex::create<ex::set_value_t(Success), ex::set_error_t(Failed),
ex::set_error_t(Retry)>([&](auto& ctx) noexcept {
auto client_ptr = client_.Lock();
if (!client_ptr) {
return ex::set_error(std::move(ctx.receiver), Failed{});
}

// send get_time_utc request to main server
// get_time_utc return server utc time point in microseconds
CloudVisit::Visit(
[&](CloudServerConnection* sc) {
assert((sc != nullptr) && "Server connection is null!");

auto* cc = sc->client_connection();
assert(((cc != nullptr) &&
(cc->stream_info().link_state == LinkState::kLinked)) &&
"Client connection is not linked!");

auto& write_action =
cc->LoginApiCall(SubApi<LoginApi>{[&](auto& api) {
AE_TELED_DEBUG("Make time sync request");
response_sub_ = api->get_time_utc().Subscribe(
[&, request_time{Now()}](auto const& p) {
if (!p) {
return ex::set_error(std::move(ctx.receiver), Retry{});
}
HandleResponse(
std::chrono::milliseconds{
static_cast<std::int64_t>(p.value())},
request_time, Now());
// time synced
return ex::set_value(std::move(ctx.receiver), Success{});
});
}});
write_action_sub_ =
write_action.status_event().Subscribe([&](auto status) {
if (status == WriteAction::Status::kFail) {
AE_TELED_ERROR("Time sync write error, retry");
return ex::set_error(std::move(ctx.receiver), Retry{});
}
});
},
client_ptr->cloud_connection(), RequestPolicy::MainServer{});

// use raw time to avoid sync jumps
request_time_ = Now();
return ex::let_value([&](Success) noexcept {
return ex::create<ex::set_value_t(Success), ex::set_error_t(Failed),
ex::set_error_t(Retry)>([&](auto& ctx) noexcept {
auto client_ptr = client_.Lock();
if (!client_ptr) {
return ex::set_error(std::move(ctx.receiver), Failed{});
}

// send get_time_utc request to main server
// get_time_utc return server utc time point in microseconds
CloudVisit::Visit(
[&](CloudServerConnection* sc) {
assert((sc != nullptr) && "Server connection is null!");

auto* cc = sc->client_connection();
assert(((cc != nullptr) &&
(cc->stream_info().link_state == LinkState::kLinked)) &&
"Client connection is not linked!");

auto& write_action =
cc->LoginApiCall(SubApi<LoginApi>{[&](auto& api) {
AE_TELED_DEBUG("Make time sync request");
response_sub_ = api->get_time_utc().Subscribe(
[&, request_time{Now()}](auto const& p) {
if (!p) {
return ex::set_error(std::move(ctx.receiver),
Retry{});
}
HandleResponse(
std::chrono::milliseconds{
static_cast<std::int64_t>(p.value())},
request_time, Now());
// time synced
return ex::set_value(std::move(ctx.receiver),
Success{});
});
}});
write_action_sub_ =
write_action.status_event().Subscribe([&](auto status) {
if (status == WriteAction::Status::kFail) {
AE_TELED_ERROR("Time sync write error, retry");
return ex::set_error(std::move(ctx.receiver), Retry{});
}
});
},
client_ptr->cloud_connection(), RequestPolicy::MainServer{});

// use raw time to avoid sync jumps
request_time_ = Now();
});
});
}

Expand All @@ -131,10 +135,7 @@ TimeSyncRequest::TimeSyncRequest(AeContext const& ae_context,
auto s =
ex::for_range(Range{1, kMaxTries},
[&](auto) {
return EnsureConnected() |
ex::let_value([&](Success) noexcept {
return SyncRequest();
}) |
return EnsureConnected() | SyncRequest() |
ex::with_timeout(ae_context_, kRequestTimeout) |
ex::let_error(Override{
[](Retry) noexcept {
Expand Down Expand Up @@ -184,7 +185,7 @@ void TimeSyncRequest::HandleResponse(std::chrono::milliseconds server_epoch,
// update diff time
SyncClock::SyncTimeDiff +=
std::chrono::duration_cast<decltype(SyncClock::SyncTimeDiff)>(diff_time);
AE_TELED_DEBUG("Current time {:%Y-%m-%d %H:%M:%S}", Now());
AE_TELED_DEBUG("Current time {:%Y-%m-%d %H:%M:%S}", SyncClock::now());
}

} // namespace time_sync_internal
Expand Down
4 changes: 0 additions & 4 deletions aether/aether.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ Aether::Aether(ObjProp prop)

Aether::~Aether() { AE_TELE_DEBUG(AetherDestroyed); }

void Aether::Update(TimePoint current_time) {
update_time = task_scheduler->Update(current_time);
}

AeCtx Aether::ToAeContext() const {
static constexpr AeCtxTable ae_table{
[](void* obj) -> Aether& { return *static_cast<Aether*>(obj); },
Expand Down
2 changes: 0 additions & 2 deletions aether/aether.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,6 @@ class Aether : public Obj {
tele_statistics, poller, dns_resolver, adapter_registry, uap);
}

void Update(TimePoint current_time) override;

// AeContext protocol
AeCtx ToAeContext() const;

Expand Down
4 changes: 2 additions & 2 deletions aether/aether_app.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
#include "aether/obj/domain.h"
#include "aether/types/small_function.h"

#include "aether/events/events.h" // IWYU pragma: keep
#include "aether/events/events.h" // IWYU pragma: keep
#include "aether/actions/action.h" // IWYU pragma: keep

#include "aether/cloud.h"
Expand Down Expand Up @@ -190,7 +190,7 @@ class AetherApp {
* \brief Run one iteration of application update loop.
*/
TimePoint Update(TimePoint current_time) {
return domain_->Update(current_time);
return aether_->task_scheduler->Update(current_time);
}

/**
Expand Down
4 changes: 4 additions & 0 deletions aether/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@
# define AE_LWIP_SOCKET_TYPES LWIP_CB_SOCKETS
#endif

#ifndef AE_FREERTOS_POLLER_STACK_SIZE
# define AE_FREERTOS_POLLER_STACK_SIZE 4096
#endif

/**
* Is WiFi modules supported.
* Also choose one of the supported WiFi modules implementations.
Expand Down
4 changes: 4 additions & 0 deletions aether/connection_manager/client_cloud_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
#include "aether/work_cloud_api/server_descriptor.h"
#include "aether/cloud_connections/cloud_server_connections.h"

#include "aether/tele/tele.h"

namespace ae {
namespace client_cloud_manager_internal {
GetCloudFromCache::GetCloudFromCache(AeContext const& ae_context,
Expand Down Expand Up @@ -62,6 +64,8 @@ GetCloudFromAether::result_event() noexcept {
}

void GetCloudFromAether::RequestCloud() {
AE_TELED_DEBUG("RequestCloud");

get_client_cloud_action_.emplace(
ae_context_, client_uid_, *cloud_connection_,
RequestPolicy::Replica{cloud_connection_->count_connections()});
Expand Down
22 changes: 0 additions & 22 deletions aether/obj/domain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,28 +99,6 @@ Domain::Domain(TimePoint p, IDomainStorage& storage)
storage_(&storage),
registry_{&Registry::GetRegistry()} {}

TimePoint Domain::Update(TimePoint current_time) {
update_time_ = current_time;
auto next_time = current_time + std::chrono::hours(365);
for (auto& [_, ptr_view] : id_objects_) {
auto ptr = ptr_view.Lock();
if (!ptr) {
continue;
}
// TODO: do not call update for someone who is not want it
ptr->Update(current_time);
if (ptr->update_time > current_time) {
next_time = std::min(next_time, ptr->update_time);
} else if (ptr->update_time < current_time) {
#ifdef DEBUG
AE_TELE_ERROR(ObjectDomainUpdatePastTime,
"Update returned next time point in the past");
#endif // DEBUG_
}
}
return next_time;
}

Ptr<Obj> Domain::ConstructObj(Factory const& factory, ObjId obj_id) {
Ptr<Obj> o = factory.create();
AddObject(obj_id, o);
Expand Down
2 changes: 0 additions & 2 deletions aether/obj/domain.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,6 @@ class Domain {
public:
Domain(TimePoint p, IDomainStorage& storage);

TimePoint Update(TimePoint current_time);

// Search for the object by obj_id.
Ptr<Obj> Find(ObjId obj_id) const;

Expand Down
7 changes: 0 additions & 7 deletions aether/obj/obj.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,6 @@ Obj::~Obj() {

uint32_t Obj::GetClassId() const { return kClassId; }

void Obj::Update(TimePoint current_time) {
if (update_time < current_time) {
// almost never
update_time = current_time + std::chrono::hours(365 * 24);
}
}

namespace reflect {
std::size_t GetObjIndexImpl(Obj const* obj, std::uint32_t class_id) {
auto res = crc32::from_buffer(
Expand Down
14 changes: 5 additions & 9 deletions aether/obj/obj.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
#include "aether/config.h"

#include "aether/crc.h"
#include "aether/clock.h"
#include "aether/obj/obj_id.h"
#include "aether/obj/obj_ptr.h"
#include "aether/obj/domain.h"
Expand Down Expand Up @@ -62,14 +61,11 @@ class Obj {
virtual ~Obj();

virtual std::uint32_t GetClassId() const;
virtual void Update(TimePoint current_time);

// only update time should be reflected
AE_REFLECT_MEMBERS(update_time);
AE_REFLECT();

Domain* domain{};
ObjId obj_id;
TimePoint update_time;
};

namespace reflect {
Expand All @@ -89,25 +85,25 @@ struct ObjectIndex<T, std::enable_if_t<std::is_base_of_v<Obj, T>>> {
static constexpr std::uint32_t kClassId = CLASS_ID; \
static constexpr std::uint32_t kBaseClassId = BASE_CLASS_ID; \
static constexpr std::uint32_t kVersion = VERSION; \
using CurrentVersion = ae::Version<kVersion>; \
using CurrentVersion = ::ae::Version<kVersion>; \
static constexpr CurrentVersion kCurrentVersion{};

/**
* \brief Use it inside each derived class to register it with the object system
*/
#define AE_OBJECT(DERIVED, BASE, VERSION) \
protected: \
friend class ae::Registrar<DERIVED>; \
friend class ::ae::Registrar<DERIVED>; \
friend ae::Ptr<DERIVED> ae::MakePtr<DERIVED>(); \
\
public: \
_AE_OBJECT_FIELDS(crc32::from_literal(#DERIVED).value, BASE::kClassId, \
VERSION) \
inline static auto registrar_ = \
ae::Registrar<DERIVED>(kClassId, kBaseClassId); \
::ae::Registrar<DERIVED>(kClassId, kBaseClassId); \
\
using Base = BASE; \
using ptr = ae::ObjPtr<DERIVED>; \
using ptr = ::ae::ObjPtr<DERIVED>; \
\
Base& base_{*this}; \
\
Expand Down
Loading
Loading