diff --git a/aether/ae_actions/time_sync.cpp b/aether/ae_actions/time_sync.cpp index c4db6e3d..7a8f0728 100644 --- a/aether/ae_actions/time_sync.cpp +++ b/aether/ae_actions/time_sync.cpp @@ -76,52 +76,56 @@ auto TimeSyncRequest::EnsureConnected() { } auto TimeSyncRequest::SyncRequest() { - return ex::create([&](auto& ctx) noexcept { - auto client_ptr = client_.Lock(); - if (!client_ptr) { - return ex::set_error(std::move(ctx.receiver), Failed{}); - } - - // send get_time_utc request to main server - // get_time_utc return server utc time point in microseconds - CloudVisit::Visit( - [&](CloudServerConnection* sc) { - assert((sc != nullptr) && "Server connection is null!"); - - auto* cc = sc->client_connection(); - assert(((cc != nullptr) && - (cc->stream_info().link_state == LinkState::kLinked)) && - "Client connection is not linked!"); - - auto& write_action = - cc->LoginApiCall(SubApi{[&](auto& api) { - AE_TELED_DEBUG("Make time sync request"); - response_sub_ = api->get_time_utc().Subscribe( - [&, request_time{Now()}](auto const& p) { - if (!p) { - return ex::set_error(std::move(ctx.receiver), Retry{}); - } - HandleResponse( - std::chrono::milliseconds{ - static_cast(p.value())}, - request_time, Now()); - // time synced - return ex::set_value(std::move(ctx.receiver), Success{}); - }); - }}); - write_action_sub_ = - write_action.status_event().Subscribe([&](auto status) { - if (status == WriteAction::Status::kFail) { - AE_TELED_ERROR("Time sync write error, retry"); - return ex::set_error(std::move(ctx.receiver), Retry{}); - } - }); - }, - client_ptr->cloud_connection(), RequestPolicy::MainServer{}); - - // use raw time to avoid sync jumps - request_time_ = Now(); + return ex::let_value([&](Success) noexcept { + return ex::create([&](auto& ctx) noexcept { + auto client_ptr = client_.Lock(); + if (!client_ptr) { + return ex::set_error(std::move(ctx.receiver), Failed{}); + } + + // send get_time_utc request to main server + // get_time_utc return server utc time point in microseconds + CloudVisit::Visit( + [&](CloudServerConnection* sc) { + assert((sc != nullptr) && "Server connection is null!"); + + auto* cc = sc->client_connection(); + assert(((cc != nullptr) && + (cc->stream_info().link_state == LinkState::kLinked)) && + "Client connection is not linked!"); + + auto& write_action = + cc->LoginApiCall(SubApi{[&](auto& api) { + AE_TELED_DEBUG("Make time sync request"); + response_sub_ = api->get_time_utc().Subscribe( + [&, request_time{Now()}](auto const& p) { + if (!p) { + return ex::set_error(std::move(ctx.receiver), + Retry{}); + } + HandleResponse( + std::chrono::milliseconds{ + static_cast(p.value())}, + request_time, Now()); + // time synced + return ex::set_value(std::move(ctx.receiver), + Success{}); + }); + }}); + write_action_sub_ = + write_action.status_event().Subscribe([&](auto status) { + if (status == WriteAction::Status::kFail) { + AE_TELED_ERROR("Time sync write error, retry"); + return ex::set_error(std::move(ctx.receiver), Retry{}); + } + }); + }, + client_ptr->cloud_connection(), RequestPolicy::MainServer{}); + + // use raw time to avoid sync jumps + request_time_ = Now(); + }); }); } @@ -131,10 +135,7 @@ TimeSyncRequest::TimeSyncRequest(AeContext const& ae_context, auto s = ex::for_range(Range{1, kMaxTries}, [&](auto) { - return EnsureConnected() | - ex::let_value([&](Success) noexcept { - return SyncRequest(); - }) | + return EnsureConnected() | SyncRequest() | ex::with_timeout(ae_context_, kRequestTimeout) | ex::let_error(Override{ [](Retry) noexcept { @@ -184,7 +185,7 @@ void TimeSyncRequest::HandleResponse(std::chrono::milliseconds server_epoch, // update diff time SyncClock::SyncTimeDiff += std::chrono::duration_cast(diff_time); - AE_TELED_DEBUG("Current time {:%Y-%m-%d %H:%M:%S}", Now()); + AE_TELED_DEBUG("Current time {:%Y-%m-%d %H:%M:%S}", SyncClock::now()); } } // namespace time_sync_internal diff --git a/aether/aether.cpp b/aether/aether.cpp index 80dc8a2e..dfb9dd74 100644 --- a/aether/aether.cpp +++ b/aether/aether.cpp @@ -41,10 +41,6 @@ Aether::Aether(ObjProp prop) Aether::~Aether() { AE_TELE_DEBUG(AetherDestroyed); } -void Aether::Update(TimePoint current_time) { - update_time = task_scheduler->Update(current_time); -} - AeCtx Aether::ToAeContext() const { static constexpr AeCtxTable ae_table{ [](void* obj) -> Aether& { return *static_cast(obj); }, diff --git a/aether/aether.h b/aether/aether.h index 95206bcd..805031f4 100644 --- a/aether/aether.h +++ b/aether/aether.h @@ -77,8 +77,6 @@ class Aether : public Obj { tele_statistics, poller, dns_resolver, adapter_registry, uap); } - void Update(TimePoint current_time) override; - // AeContext protocol AeCtx ToAeContext() const; diff --git a/aether/aether_app.h b/aether/aether_app.h index 34dfc65d..1511b6fa 100644 --- a/aether/aether_app.h +++ b/aether/aether_app.h @@ -29,7 +29,7 @@ #include "aether/obj/domain.h" #include "aether/types/small_function.h" -#include "aether/events/events.h" // IWYU pragma: keep +#include "aether/events/events.h" // IWYU pragma: keep #include "aether/actions/action.h" // IWYU pragma: keep #include "aether/cloud.h" @@ -190,7 +190,7 @@ class AetherApp { * \brief Run one iteration of application update loop. */ TimePoint Update(TimePoint current_time) { - return domain_->Update(current_time); + return aether_->task_scheduler->Update(current_time); } /** diff --git a/aether/config.h b/aether/config.h index ea75f0ca..5723aa44 100644 --- a/aether/config.h +++ b/aether/config.h @@ -119,6 +119,10 @@ # define AE_LWIP_SOCKET_TYPES LWIP_CB_SOCKETS #endif +#ifndef AE_FREERTOS_POLLER_STACK_SIZE +# define AE_FREERTOS_POLLER_STACK_SIZE 4096 +#endif + /** * Is WiFi modules supported. * Also choose one of the supported WiFi modules implementations. diff --git a/aether/connection_manager/client_cloud_manager.cpp b/aether/connection_manager/client_cloud_manager.cpp index a8eab74b..2e2d0e20 100644 --- a/aether/connection_manager/client_cloud_manager.cpp +++ b/aether/connection_manager/client_cloud_manager.cpp @@ -27,6 +27,8 @@ #include "aether/work_cloud_api/server_descriptor.h" #include "aether/cloud_connections/cloud_server_connections.h" +#include "aether/tele/tele.h" + namespace ae { namespace client_cloud_manager_internal { GetCloudFromCache::GetCloudFromCache(AeContext const& ae_context, @@ -62,6 +64,8 @@ GetCloudFromAether::result_event() noexcept { } void GetCloudFromAether::RequestCloud() { + AE_TELED_DEBUG("RequestCloud"); + get_client_cloud_action_.emplace( ae_context_, client_uid_, *cloud_connection_, RequestPolicy::Replica{cloud_connection_->count_connections()}); diff --git a/aether/obj/domain.cpp b/aether/obj/domain.cpp index 49387669..ab064f50 100644 --- a/aether/obj/domain.cpp +++ b/aether/obj/domain.cpp @@ -99,28 +99,6 @@ Domain::Domain(TimePoint p, IDomainStorage& storage) storage_(&storage), registry_{&Registry::GetRegistry()} {} -TimePoint Domain::Update(TimePoint current_time) { - update_time_ = current_time; - auto next_time = current_time + std::chrono::hours(365); - for (auto& [_, ptr_view] : id_objects_) { - auto ptr = ptr_view.Lock(); - if (!ptr) { - continue; - } - // TODO: do not call update for someone who is not want it - ptr->Update(current_time); - if (ptr->update_time > current_time) { - next_time = std::min(next_time, ptr->update_time); - } else if (ptr->update_time < current_time) { -#ifdef DEBUG - AE_TELE_ERROR(ObjectDomainUpdatePastTime, - "Update returned next time point in the past"); -#endif // DEBUG_ - } - } - return next_time; -} - Ptr Domain::ConstructObj(Factory const& factory, ObjId obj_id) { Ptr o = factory.create(); AddObject(obj_id, o); diff --git a/aether/obj/domain.h b/aether/obj/domain.h index b6986b09..f52baa9c 100644 --- a/aether/obj/domain.h +++ b/aether/obj/domain.h @@ -138,8 +138,6 @@ class Domain { public: Domain(TimePoint p, IDomainStorage& storage); - TimePoint Update(TimePoint current_time); - // Search for the object by obj_id. Ptr Find(ObjId obj_id) const; diff --git a/aether/obj/obj.cpp b/aether/obj/obj.cpp index 572289d4..06edc1f1 100644 --- a/aether/obj/obj.cpp +++ b/aether/obj/obj.cpp @@ -29,13 +29,6 @@ Obj::~Obj() { uint32_t Obj::GetClassId() const { return kClassId; } -void Obj::Update(TimePoint current_time) { - if (update_time < current_time) { - // almost never - update_time = current_time + std::chrono::hours(365 * 24); - } -} - namespace reflect { std::size_t GetObjIndexImpl(Obj const* obj, std::uint32_t class_id) { auto res = crc32::from_buffer( diff --git a/aether/obj/obj.h b/aether/obj/obj.h index ae849d2b..015338a4 100644 --- a/aether/obj/obj.h +++ b/aether/obj/obj.h @@ -32,7 +32,6 @@ #include "aether/config.h" #include "aether/crc.h" -#include "aether/clock.h" #include "aether/obj/obj_id.h" #include "aether/obj/obj_ptr.h" #include "aether/obj/domain.h" @@ -62,14 +61,11 @@ class Obj { virtual ~Obj(); virtual std::uint32_t GetClassId() const; - virtual void Update(TimePoint current_time); - // only update time should be reflected - AE_REFLECT_MEMBERS(update_time); + AE_REFLECT(); Domain* domain{}; ObjId obj_id; - TimePoint update_time; }; namespace reflect { @@ -89,7 +85,7 @@ struct ObjectIndex>> { static constexpr std::uint32_t kClassId = CLASS_ID; \ static constexpr std::uint32_t kBaseClassId = BASE_CLASS_ID; \ static constexpr std::uint32_t kVersion = VERSION; \ - using CurrentVersion = ae::Version; \ + using CurrentVersion = ::ae::Version; \ static constexpr CurrentVersion kCurrentVersion{}; /** @@ -97,17 +93,17 @@ struct ObjectIndex>> { */ #define AE_OBJECT(DERIVED, BASE, VERSION) \ protected: \ - friend class ae::Registrar; \ + friend class ::ae::Registrar; \ friend ae::Ptr ae::MakePtr(); \ \ public: \ _AE_OBJECT_FIELDS(crc32::from_literal(#DERIVED).value, BASE::kClassId, \ VERSION) \ inline static auto registrar_ = \ - ae::Registrar(kClassId, kBaseClassId); \ + ::ae::Registrar(kClassId, kBaseClassId); \ \ using Base = BASE; \ - using ptr = ae::ObjPtr; \ + using ptr = ::ae::ObjPtr; \ \ Base& base_{*this}; \ \ diff --git a/aether/poller/freertos_poller.cpp b/aether/poller/freertos_poller.cpp index 25b8e50a..e027fa50 100644 --- a/aether/poller/freertos_poller.cpp +++ b/aether/poller/freertos_poller.cpp @@ -27,17 +27,18 @@ namespace ae { namespace freertos_poller_internal { -inline struct sockaddr_in MakeLoopbackAddr() { +inline struct sockaddr_in MakeLoopbackAddr(std::uint16_t port) noexcept { constexpr char kLoopbackIp[] = "127.0.0.1"; struct sockaddr_in loopback_addr; memset(&loopback_addr, 0, sizeof(loopback_addr)); loopback_addr.sin_family = AF_INET; loopback_addr.sin_addr.s_addr = inet_addr(kLoopbackIp); - loopback_addr.sin_port = htons(66); + loopback_addr.sin_port = htons(port); return loopback_addr; } -inline int MakeListenPart() { +inline int MakeLoopbackSocket(std::uint16_t port_listen, + std::uint16_t port_connect) noexcept { int l_socket = socket(AF_INET, SOCK_DGRAM, 0); if (l_socket == -1) { AE_TELED_ERROR("Create listen socket failed with {}", errno); @@ -45,17 +46,18 @@ inline int MakeListenPart() { return -1; } fcntl(l_socket, F_SETFL, O_NONBLOCK); - auto const addr = MakeLoopbackAddr(); - if (bind(l_socket, reinterpret_cast(&addr), sizeof(addr)) != - 0) { - AE_TELED_ERROR("Bind listen socket failed with {}", errno); + auto const bind_addr = MakeLoopbackAddr(port_listen); + if (bind(l_socket, reinterpret_cast(&bind_addr), + sizeof(bind_addr)) != 0) { + AE_TELED_ERROR("Bind socket failed with {}", errno); close(l_socket); assert(false); return -1; } - if (connect(l_socket, reinterpret_cast(&addr), - sizeof(addr)) != 0) { - AE_TELED_ERROR("Create socket failed with {}", errno); + auto const conn_addr = MakeLoopbackAddr(port_connect); + if (connect(l_socket, reinterpret_cast(&conn_addr), + sizeof(conn_addr)) != 0) { + AE_TELED_ERROR("Connect socket failed with {}", errno); close(l_socket); assert(false); return -1; @@ -63,18 +65,22 @@ inline int MakeListenPart() { return l_socket; } -inline std::array MakePipe() { +inline int MakeListenPart() noexcept { return MakeLoopbackSocket(66, 67); } +inline int MakeWritePart() noexcept { return MakeLoopbackSocket(67, 66); } + +inline std::array MakePipe() noexcept { auto sock = MakeListenPart(); - return {sock, sock}; + auto write_sock = MakeWritePart(); + return {sock, write_sock}; } -inline void WritePipe(std::array const& pipe) { +inline void WritePipe(std::array const& pipe) noexcept { if (pipe[1] != -1) { send(pipe[1], "1", 1, 0); } } -inline void ReadPipe(std::array const& pipe) { +inline void ReadPipe(std::array const& pipe) noexcept { if (pipe[0] != -1) { std::uint8_t buff[16]; auto n = recv(pipe[0], &buff, sizeof(buff), 0); @@ -82,7 +88,7 @@ inline void ReadPipe(std::array const& pipe) { } } -inline void ClosePipe(std::array& pipe) { +inline void ClosePipe(std::array& pipe) noexcept { if (pipe[1] != -1) { close(pipe[1]); pipe[1] = -1; @@ -93,7 +99,7 @@ inline void ClosePipe(std::array& pipe) { } } -EventType FromEpollEvent(std::uint32_t events) { +EventType FromPollEvent(std::uint32_t events) { EventType event_type{0}; if ((events & POLLIN) != 0) { @@ -110,29 +116,42 @@ EventType FromEpollEvent(std::uint32_t events) { } // namespace freertos_poller_internal -void vTaskFunction(void* pvParameters) { - auto* poller = static_cast(pvParameters); - poller->Loop(); -} - FreeRtosLwipPollerImpl::FreeRtosLwipPollerImpl() : wake_up_pipe_{freertos_poller_internal::MakePipe()}, stop_requested_{false} { assert(wake_up_pipe_[0] != -1); assert(wake_up_pipe_[1] != -1); - xTaskCreate(static_cast(&vTaskFunction), "Poller loop", 4096, - static_cast(this), tskIDLE_PRIORITY, &myTaskHandle_); - AE_TELE_DEBUG(kFreertosWorkerCreate, "Poll worker was created"); + my_task_handle_ = xTaskGetCurrentTaskHandle(); + + auto res = xTaskCreate( + [](void* arg) noexcept { + auto* poller = static_cast(arg); + poller->Loop(); + // notify - the task is finished + xTaskNotifyGive(poller->my_task_handle_); + // delete the task + vTaskDelete(nullptr); + }, + "FreeRTOS lwip poller loop", kStackSize, static_cast(this), + tskIDLE_PRIORITY + 1, &worker_task_handle_); + if (res != pdPASS) { + AE_TELED_ERROR("Failed to create poll worker task: {}", res); + } else { + AE_TELE_DEBUG(kFreertosWorkerCreate, "Poll worker was created"); + } } FreeRtosLwipPollerImpl::~FreeRtosLwipPollerImpl() { - stop_requested_ = true; + stop_requested_.store(true, std::memory_order::release); freertos_poller_internal::WritePipe(wake_up_pipe_); - if (myTaskHandle_ != nullptr) { - vTaskDelete(myTaskHandle_); + // if task was created + if (worker_task_handle_ != nullptr) { + // wait for the task to stop (join) + ulTaskNotifyTake(pdTRUE, portMAX_DELAY); } freertos_poller_internal::ClosePipe(wake_up_pipe_); + AE_TELE_DEBUG(kFreertosWorkerDestroyed, "Poll worker has been destroyed"); } @@ -157,11 +176,11 @@ void FreeRtosLwipPollerImpl::Remove(DescriptorType fd) { void FreeRtosLwipPollerImpl::Loop() { static constexpr int kPollingTimeout = 16000; - while (!stop_requested_) { - std::vector fds_vector; + std::vector fds_vector; + while (!stop_requested_.load(std::memory_order::acquire)) { { auto lock = std::scoped_lock{ctl_mutex_}; - fds_vector = FillFdsVector(); + FillFdsVector(fds_vector); } auto res = lwip_poll(fds_vector.data(), fds_vector.size(), kPollingTimeout); if (res == -1) { @@ -191,14 +210,14 @@ void FreeRtosLwipPollerImpl::Loop() { if (poll_event == event_map_.end()) { continue; } - poll_event->second.cb( - v.fd, freertos_poller_internal::FromEpollEvent(v.revents)); + poll_event->second.cb(v.fd, + freertos_poller_internal::FromPollEvent(v.revents)); } } } -std::vector FreeRtosLwipPollerImpl::FillFdsVector() { - std::vector fds; +void FreeRtosLwipPollerImpl::FillFdsVector(std::vector& fds) { + fds.clear(); // all events and wake up pipe fds.reserve(event_map_.size() + 1); { @@ -240,8 +259,6 @@ std::vector FreeRtosLwipPollerImpl::FillFdsVector() { pfd.revents = 0; fds.push_back(pfd); } - - return fds; } FreeRtosPolledFd::FreeRtosPolledFd(DescriptorType fd, diff --git a/aether/poller/freertos_poller.h b/aether/poller/freertos_poller.h index 76c612a2..f044b0e1 100644 --- a/aether/poller/freertos_poller.h +++ b/aether/poller/freertos_poller.h @@ -29,13 +29,14 @@ # include # include +# include "aether/config.h" # include "aether/poller/poller.h" # include "aether/poller/poller_types.h" # include "aether/types/small_function.h" namespace ae { class FreeRtosLwipPollerImpl : public NativePoller { - friend void vTaskFunction(void* pvParameters); + static constexpr std::size_t kStackSize = AE_FREERTOS_POLLER_STACK_SIZE; public: using EventCb = SmallFunction; @@ -48,6 +49,7 @@ class FreeRtosLwipPollerImpl : public NativePoller { FreeRtosLwipPollerImpl(); ~FreeRtosLwipPollerImpl() override; + // implementation for BasicLockable void lock(); void unlock(); @@ -56,9 +58,10 @@ class FreeRtosLwipPollerImpl : public NativePoller { private: void Loop(); - std::vector FillFdsVector(); + void FillFdsVector(std::vector& fds); - TaskHandle_t myTaskHandle_ = nullptr; + TaskHandle_t worker_task_handle_ = nullptr; + TaskHandle_t my_task_handle_ = nullptr; std::array wake_up_pipe_; std::atomic_bool stop_requested_; std::map event_map_; @@ -113,4 +116,4 @@ class FreertosPoller : public IPoller { } // namespace ae #endif -#endif // AETHER_POLLER_FREERTOS_POLLER_H_ */ +#endif // AETHER_POLLER_FREERTOS_POLLER_H_ diff --git a/aether/server.cpp b/aether/server.cpp index 3ade56f3..2ebcfdeb 100644 --- a/aether/server.cpp +++ b/aether/server.cpp @@ -30,17 +30,6 @@ Server::Server(ObjProp prop, ServerId server_id, Register(); } -void Server::Update(TimePoint current_time) { - if (!subscribed_) { - subscribed_ = true; - UpdateSubscription(); - } - if (update_time < current_time) { - // almost never - update_time = current_time + std::chrono::hours(365 * 24); - } -} - void Server::Register() { UpdateSubscription(); diff --git a/aether/server.h b/aether/server.h index 457d277f..cf1f3ac3 100644 --- a/aether/server.h +++ b/aether/server.h @@ -41,7 +41,16 @@ class Server : public Obj { AE_OBJECT_REFLECT(AE_MMBRS(server_id, endpoints, adapter_registry_, channels)) - void Update(TimePoint current_time) override; + template + void Load(CurrentVersion, Dnv& dnv){ + dnv(base_); + dnv(server_id, endpoints, adapter_registry_, channels); + if (!subscribed_) { + subscribed_ = true; + UpdateSubscription(); + } + } + ChannelsChanged::Subscriber channels_changed(); diff --git a/aether/tasks/details/manual_task_scheduler.h b/aether/tasks/details/manual_task_scheduler.h index 1936022c..8156f650 100644 --- a/aether/tasks/details/manual_task_scheduler.h +++ b/aether/tasks/details/manual_task_scheduler.h @@ -21,8 +21,8 @@ #include #include #include -#include -#include +#include // IWYU pragma: keeps +#include // IWYU pragma: keeps #include #include "aether/tasks/details/task_manager.h" @@ -60,7 +60,7 @@ class ManualTaskScheduler { TimePointType Update( TimePointType current_time = TimePointType::clock::now()) { auto lock = std::unique_lock{lock_}; - trigger_ = false; + trigger_.store(false, std::memory_order::release); CheckOverflows(); // run regular tasks @@ -84,12 +84,15 @@ class ManualTaskScheduler { * \param wake_up_time - maximum time to wait. */ void WaitUntil(TimePointType wake_up_time) { - auto lock = std::unique_lock{lock_}; - if (trigger_.exchange(false)) { + // fast check without mutex locking + if (trigger_.load(std::memory_order::acquire)) { return; } - cv_.wait_until(lock, wake_up_time, [this]() { return trigger_.load(); }); - trigger_ = false; + + auto lock = std::unique_lock{lock_}; + cv_.wait_until(lock, wake_up_time, [this]() noexcept { + return trigger_.load(std::memory_order::acquire); + }); } std::size_t overflow_counter() const { return overflow_counter_; } @@ -102,7 +105,7 @@ class ManualTaskScheduler { if (p == nullptr) { overflow_counter_++; } - trigger_ = true; + trigger_.store(true, std::memory_order::release); cv_.notify_one(); return p; } diff --git a/aether/tele/env/compilation_options.h b/aether/tele/env/compilation_options.h index 28dfd889..dce1d465 100644 --- a/aether/tele/env/compilation_options.h +++ b/aether/tele/env/compilation_options.h @@ -108,6 +108,7 @@ constexpr inline auto _compile_options_list = std::array{ _OPTION_VALUE(AE_SUPPORT_DYNAMIC_CLOUD_IPS, 0), #endif _OPTION(AE_LWIP_SOCKET_TYPES), + _OPTION(AE_FREERTOS_POLLER_STACK_SIZE), _OPTION(AE_SUPPORT_MODEMS), _OPTION(AE_ENABLE_THINGY91X), _OPTION(AE_ENABLE_SIM7070), diff --git a/aether/transport/system_sockets/sockets/lwip_cb_tcp_socket.cpp b/aether/transport/system_sockets/sockets/lwip_cb_tcp_socket.cpp index 1c3b737d..e3c81371 100644 --- a/aether/transport/system_sockets/sockets/lwip_cb_tcp_socket.cpp +++ b/aether/transport/system_sockets/sockets/lwip_cb_tcp_socket.cpp @@ -28,8 +28,8 @@ LwipCBTcpSocket::LwipCBTcpSocket(Ptr const&) {} LwipCBTcpSocket::~LwipCBTcpSocket() { Disconnect(); } -ISocket& LwipCBTcpSocket::ReadyToWrite( - [[maybe_unused]] ReadyToWriteCb ready_to_write_cb) { +ISocket& LwipCBTcpSocket::ReadyToWrite(ReadyToWriteCb ready_to_write_cb) { + ready_to_write_cb_ = std::move(ready_to_write_cb); return *this; } @@ -136,12 +136,20 @@ ISocket& LwipCBTcpSocket::Connect(AddressPort const& destination, } void LwipCBTcpSocket::OnConnectionEvent() { - AE_TELED_DEBUG("LwIp CB TCP socket connectioin event {}", connection_state_); + AE_TELED_DEBUG("LwIp CB TCP socket connection event {}", connection_state_); if (connected_cb_) { connected_cb_(connection_state_); } } +// Callback client data sent +err_t LwipCBTcpSocket::TcpClientSent(void* arg, struct tcp_pcb*, u16_t) { + auto* self = static_cast(arg); + if (self->ready_to_write_cb_) { + self->ready_to_write_cb_(); + } + return ERR_OK; +} // Callback client data received err_t LwipCBTcpSocket::TcpClientRecv(void* arg, struct tcp_pcb* tpcb, @@ -174,7 +182,9 @@ err_t LwipCBTcpSocket::TcpClientRecv(void* arg, struct tcp_pcb* tpcb, static_cast(packet->payload), static_cast(packet->len), }; - self->recv_data_cb_(payload_span); + if (self->recv_data_cb_) { + self->recv_data_cb_(payload_span); + } } // Ack @@ -205,6 +215,7 @@ err_t LwipCBTcpSocket::TcpClientConnected(void* arg, struct tcp_pcb* tpcb, // Setting callbacks tcp_recv(tpcb, LwipCBTcpSocket::TcpClientRecv); + tcp_sent(tpcb, LwipCBTcpSocket::TcpClientSent); AE_TELED_DEBUG("Connected to the server"); self->connection_state_ = ConnectionState::kConnected; diff --git a/aether/transport/system_sockets/sockets/lwip_cb_tcp_socket.h b/aether/transport/system_sockets/sockets/lwip_cb_tcp_socket.h index 0e4304f5..dec4fe72 100644 --- a/aether/transport/system_sockets/sockets/lwip_cb_tcp_socket.h +++ b/aether/transport/system_sockets/sockets/lwip_cb_tcp_socket.h @@ -51,6 +51,7 @@ class LwipCBTcpSocket : public ISocket { ConnectedCb connected_cb) override; // LWIP RAW callbacks + static err_t TcpClientSent(void* arg, struct tcp_pcb* tpcb, u16_t len); static err_t TcpClientRecv(void* arg, struct tcp_pcb* tpcb, struct pbuf* p, err_t err); static err_t TcpClientConnected(void* arg, struct tcp_pcb* tpcb, err_t err); @@ -60,6 +61,7 @@ class LwipCBTcpSocket : public ISocket { void OnErrorEvent(); void OnConnectionEvent(); + ReadyToWriteCb ready_to_write_cb_; RecvDataCb recv_data_cb_; ErrorCb error_cb_; ConnectedCb connected_cb_; diff --git a/aether/transport/system_sockets/sockets/lwip_cb_udp_socket.cpp b/aether/transport/system_sockets/sockets/lwip_cb_udp_socket.cpp index 699a08cc..c0ea8afb 100644 --- a/aether/transport/system_sockets/sockets/lwip_cb_udp_socket.cpp +++ b/aether/transport/system_sockets/sockets/lwip_cb_udp_socket.cpp @@ -28,8 +28,8 @@ LwipCBUdpSocket::LwipCBUdpSocket(Ptr const&) {} LwipCBUdpSocket::~LwipCBUdpSocket() { Disconnect(); } -ISocket& LwipCBUdpSocket::ReadyToWrite(ReadyToWriteCb ready_to_write_cb) { - ready_to_write_cb_ = std::move(ready_to_write_cb); +ISocket& LwipCBUdpSocket::ReadyToWrite( + [[maybe_unused]] ReadyToWriteCb ready_to_write_cb) { return *this; } @@ -75,7 +75,7 @@ std::optional LwipCBUdpSocket::Send(Span data) { return std::nullopt; } - AE_TELED_ERROR("Sent {} bytes", data.size()); + AE_TELED_DEBUG("Sent {} bytes", data.size()); return data.size(); } @@ -85,10 +85,11 @@ void LwipCBUdpSocket::Disconnect() { return; } LOCK_TCPIP_CORE(); + ae_defer[]() { UNLOCK_TCPIP_CORE(); }; + udp_recv(pcb_, nullptr, nullptr); udp_remove(pcb_); pcb_ = nullptr; - UNLOCK_TCPIP_CORE(); } ISocket& LwipCBUdpSocket::Connect(AddressPort const& destination, @@ -158,8 +159,10 @@ void LwipCBUdpSocket::UdpClientRecv(void* arg, struct udp_pcb* upcb, } // Our server address? - if (!ip_addr_cmp(&self->server_ipaddr_, addr)) { - AE_TELED_ERROR("Received from unexpected IP:{}", ipaddr_ntoa(addr)); + if (!ip_addr_cmp(&self->server_ipaddr_, addr) || + (self->server_port_ != port)) { + AE_TELED_ERROR("Received from unexpected IP:{}:{}", ipaddr_ntoa(addr), + port); self->OnError(); return; } diff --git a/aether/transport/system_sockets/sockets/lwip_cb_udp_socket.h b/aether/transport/system_sockets/sockets/lwip_cb_udp_socket.h index 7ccf8020..e98de704 100644 --- a/aether/transport/system_sockets/sockets/lwip_cb_udp_socket.h +++ b/aether/transport/system_sockets/sockets/lwip_cb_udp_socket.h @@ -57,7 +57,6 @@ class LwipCBUdpSocket : public ISocket { protected: void OnError(); - ReadyToWriteCb ready_to_write_cb_; RecvDataCb recv_data_cb_; ErrorCb error_cb_; diff --git a/aether/transport/system_sockets/sockets/lwip_socket.cpp b/aether/transport/system_sockets/sockets/lwip_socket.cpp index e4ebb3ac..d424d96d 100644 --- a/aether/transport/system_sockets/sockets/lwip_socket.cpp +++ b/aether/transport/system_sockets/sockets/lwip_socket.cpp @@ -48,22 +48,20 @@ std::optional LwipSocket::Send(Span data) { return std::nullopt; } auto size_to_send = data.size(); - // add nosignal to prevent throw SIGPIPE and handle it manually - int flags = MSG_NOSIGNAL; + int flags = 0; auto res = send(*socket_->fd(), data.data(), size_to_send, flags); if (res == -1) { if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { // add wait for kWrite + // kWrite will be removed on the next Poll call socket_->Event(EventType::kRead | EventType::kWrite | EventType::kError, MethodPtr<&LwipSocket::OnPollerEvent>{this}); - } - - if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) { + return 0; + } else { AE_TELED_ERROR("Send to socket error: {}, {}", static_cast(errno), strerror(errno)); return std::nullopt; } - return 0; } return static_cast(res); } @@ -127,11 +125,7 @@ void LwipSocket::OnReadEvent(DescriptorType fd) { buffer = buffer.sub(0, *res); if (recv_data_cb_) { recv_data_cb_(buffer); - } else { - printf("fd %d, Received bytes=%zu but no callback set\n", - static_cast(fd), *res); } - return; } } @@ -172,7 +166,7 @@ std::optional LwipSocket::Receive(DescriptorType fd, std::optional LwipSocket::GetSocketError(DescriptorType fd) { int err{}; - socklen_t len = sizeof(len); + socklen_t len = sizeof(err); if (getsockopt(fd, SOL_SOCKET, SO_ERROR, static_cast(&err), &len) != 0) { AE_TELED_ERROR("Getsockopt error: {}, {}", static_cast(errno), diff --git a/aether/transport/system_sockets/sockets/lwip_tcp_socket.cpp b/aether/transport/system_sockets/sockets/lwip_tcp_socket.cpp index 7310fc2f..0de0b195 100644 --- a/aether/transport/system_sockets/sockets/lwip_tcp_socket.cpp +++ b/aether/transport/system_sockets/sockets/lwip_tcp_socket.cpp @@ -27,17 +27,17 @@ namespace ae { namespace lwip_tcp_socket_internal { -inline bool SetNonblocking(int sock) { - if (lwip_fcntl(sock, F_SETFL, O_NONBLOCK) != ESP_OK) { +inline bool SetNonblocking(int sock) noexcept { + if (lwip_fcntl(sock, F_SETFL, O_NONBLOCK) != 0) { AE_TELED_ERROR("Socket set O_NONBLOCK error {} {}", errno, strerror(errno)); return false; } return true; } -inline bool SetTcpNoDelay(int sock, int on) { +inline bool SetTcpNoDelay(int sock, int on) noexcept { auto res = setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &on, sizeof(on)); - if (res != ESP_OK) { + if (res != 0) { AE_TELED_ERROR("Socket set option TCP_NODELAY error {} {}", errno, strerror(errno)); return false; @@ -45,31 +45,15 @@ inline bool SetTcpNoDelay(int sock, int on) { return true; } -inline bool SetReuseAddress(int sock, int on) { +inline bool SetReuseAddress(int sock, int on) noexcept { auto res = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); - if (res != ESP_OK) { + if (res != 0) { AE_TELED_ERROR("Socket set option SO_REUSEADDR error {} {}", errno, strerror(errno)); return false; } return true; } - -inline bool SetReciveTimeouts(int sock, int tv_sec, int tv_usec) { - // set receive timeout on socket. - timeval tv; - tv.tv_sec = tv_sec; - tv.tv_usec = tv_usec; - auto res = setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); - if (res != ESP_OK) { - AE_TELED_ERROR( - "setupSocket(): setsockopt() SO_RCVTIMEO on client socket: error: " - "{} {}", - errno, strerror(errno)); - return false; - } - return true; -} } // namespace lwip_tcp_socket_internal LwipTcpSocket::LwipTcpSocket(Ptr const& poller) @@ -79,8 +63,7 @@ LwipTcpSocket::LwipTcpSocket(Ptr const& poller) recv_buffer_.resize(1500); } -int LwipTcpSocket::MakeSocket() { - bool created = false; +int LwipTcpSocket::MakeSocket() noexcept { constexpr int on = 1; auto sock = socket(AF_INET, SOCK_STREAM, IPPROTO_IP); @@ -91,11 +74,7 @@ int LwipTcpSocket::MakeSocket() { } // close the socket if not created - ae_defer[&] { - if (!created) { - close(sock); - } - }; + auto close_socket = ae_defer_at[&] { close(sock); }; if (!lwip_tcp_socket_internal::SetNonblocking(sock)) { return kInvalidDescriptor; @@ -107,13 +86,8 @@ int LwipTcpSocket::MakeSocket() { return kInvalidDescriptor; } - if (!lwip_tcp_socket_internal::SetReciveTimeouts(sock, kRcvTimeoutSec, - kRcvTimeoutUsec)) { - return kInvalidDescriptor; - } - AE_TELED_DEBUG("LwIp TCP socket created"); - created = true; + close_socket.Reset(); return sock; } @@ -122,12 +96,7 @@ ISocket& LwipTcpSocket::Connect(AddressPort const& destination, assert(socket_ && "Socket is not initialized"); connected_cb_ = std::move(connected_cb); - ae_defer[&]() { - // wait for all events to detect connection - socket_->Event(EventType::kRead | EventType::kWrite | EventType::kError, - MethodPtr<&LwipTcpSocket::OnPollerEvent>{this}); - connected_cb_(connection_state_); - }; + ae_defer[&]() { connected_cb_(connection_state_); }; auto addr = GetSockAddr(destination); auto res = @@ -135,9 +104,10 @@ ISocket& LwipTcpSocket::Connect(AddressPort const& destination, if (res == -1) { if ((errno == EAGAIN) || (errno == EINPROGRESS)) { AE_TELED_DEBUG("Wait connection"); + // wait for all events to detect connection + socket_->Event(EventType::kRead | EventType::kWrite | EventType::kError, + MethodPtr<&LwipTcpSocket::OnPollerEvent>{this}); connection_state_ = ConnectionState::kConnecting; - Poll(); - connected_cb_(connection_state_); return *this; } AE_TELED_ERROR("Not connected {} {}", errno, strerror(errno)); @@ -160,7 +130,7 @@ void LwipTcpSocket::OnPollerEvent(DescriptorType fd, EventType event) { void LwipTcpSocket::OnConnectionEvent(DescriptorType fd) { ae_defer[&]() { if (connected_cb_) { - AE_TELED_DEBUG("LwIp TCP socket connectioin event {}", connection_state_); + AE_TELED_DEBUG("LwIp TCP socket connection event {}", connection_state_); connected_cb_(connection_state_); } }; @@ -176,6 +146,7 @@ void LwipTcpSocket::OnConnectionEvent(DescriptorType fd) { AE_TELED_DEBUG("Socket connected"); connection_state_ = ConnectionState::kConnected; + Poll(); } } // namespace ae #endif diff --git a/aether/transport/system_sockets/sockets/lwip_tcp_socket.h b/aether/transport/system_sockets/sockets/lwip_tcp_socket.h index d9cdb679..96adb3ca 100644 --- a/aether/transport/system_sockets/sockets/lwip_tcp_socket.h +++ b/aether/transport/system_sockets/sockets/lwip_tcp_socket.h @@ -27,9 +27,6 @@ namespace ae { class LwipTcpSocket final : public LwipSocket { - static constexpr int kRcvTimeoutSec = 0; - static constexpr int kRcvTimeoutUsec = 10000; - public: explicit LwipTcpSocket(Ptr const& poller); @@ -40,7 +37,7 @@ class LwipTcpSocket final : public LwipSocket { void OnPollerEvent(DescriptorType fd, EventType event) override; private: - static int MakeSocket(); + static int MakeSocket() noexcept; void OnConnectionEvent(DescriptorType fd); diff --git a/aether/transport/system_sockets/sockets/lwip_udp_socket.cpp b/aether/transport/system_sockets/sockets/lwip_udp_socket.cpp index 63e70ceb..cf9a9c69 100644 --- a/aether/transport/system_sockets/sockets/lwip_udp_socket.cpp +++ b/aether/transport/system_sockets/sockets/lwip_udp_socket.cpp @@ -33,8 +33,7 @@ LwipUdpSocket::LwipUdpSocket(Ptr const& poller) recv_buffer_.resize(1200); } -int LwipUdpSocket::MakeSocket() { - bool created = false; +int LwipUdpSocket::MakeSocket() noexcept { auto sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); if (sock < 0) { AE_TELED_ERROR("LwIp UDP socket not created {}", strerror(errno)); @@ -42,21 +41,17 @@ int LwipUdpSocket::MakeSocket() { } // close the socket if not created - ae_defer[&] { - if (!created) { - close(sock); - } - }; + auto on_failure = ae_defer_at[&] { close(sock); }; // make socket nonblocking - if (lwip_fcntl(sock, F_SETFL, O_NONBLOCK) != ESP_OK) { + if (lwip_fcntl(sock, F_SETFL, O_NONBLOCK) != 0) { AE_TELED_ERROR("lwip_fcntl set nonblocking mode error {} {}", errno, strerror(errno)); return kInvalidDescriptor; } AE_TELED_DEBUG("LwIp UDP socket created"); - created = true; + on_failure.Reset(); return sock; } @@ -66,8 +61,7 @@ ISocket& LwipUdpSocket::Connect(AddressPort const& destination, assert(socket_ && "Socket is not initialized"); ae_defer[&]() { - Poll(); - AE_TELED_DEBUG("LwIp UDP socket connectioin event {}", connection_state_); + AE_TELED_DEBUG("LwIp UDP socket connection event {}", connection_state_); connected_cb(connection_state_); }; @@ -81,6 +75,7 @@ ISocket& LwipUdpSocket::Connect(AddressPort const& destination, } connection_state_ = ConnectionState::kConnected; + Poll(); return *this; } } // namespace ae diff --git a/aether/transport/system_sockets/sockets/lwip_udp_socket.h b/aether/transport/system_sockets/sockets/lwip_udp_socket.h index e4ba6497..4efedf31 100644 --- a/aether/transport/system_sockets/sockets/lwip_udp_socket.h +++ b/aether/transport/system_sockets/sockets/lwip_udp_socket.h @@ -34,7 +34,7 @@ class LwipUdpSocket final : public LwipSocket { ConnectedCb connected_cb) override; private: - static int MakeSocket(); + static int MakeSocket() noexcept; ConnectionState connection_state_; ConnectedCb connected_cb_;