Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions aether/channels/ethernet_transport_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ std::unique_ptr<ByteIStream> EthernetTransportFactory::BuildUdp(
[[maybe_unused]] Ptr<IPoller> const& poller,
[[maybe_unused]] Endpoint address_port_protocol) {
# ifdef SYSTEM_SOCKET_UDP_TRANSPORT_ENABLED
# if LWIP_CB_TCP_SOCKET_ENABLED
# if LWIP_CB_UDP_SOCKET_ENABLED
using SocketType = LwipCBUdpSocket;
# elif LWIP_TCP_SOCKET_ENABLED
# elif LWIP_UDP_SOCKET_ENABLED
using SocketType = LwipUdpSocket;
# elif UNIX_SOCKET_ENABLED
using SocketType = UnixUdpSocket;
Expand Down
66 changes: 45 additions & 21 deletions aether/poller/epoll_poller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,11 @@ EpollImpl::EpollImpl()
thread_(&EpollImpl::Loop, this) {
AE_TELE_INFO(kEpollWorkerCreate);

auto lock = std::scoped_lock{poller_mutex_};
// add wake up fd to epoll
if (event_fd_ != -1) {
Event(DescriptorType{event_fd_}, EventType::kRead, [](auto) {});
Callback(DescriptorType{event_fd_}, [](auto, auto) {});
Event(DescriptorType{event_fd_}, EventType::kRead | EventType::kError);
}
}

Expand Down Expand Up @@ -101,40 +103,62 @@ EpollImpl::~EpollImpl() {
AE_TELE_INFO(kEpollWorkerDestroyed);
}

void EpollImpl::Event(DescriptorType fd, EventType event, EventCb cb) {
AE_TELE_DEBUG(kEpollAddDescriptor, "Poller event fd:{} event:{}", fd, event);
void EpollImpl::lock() { poller_mutex_.lock(); }
void EpollImpl::unlock() { poller_mutex_.unlock(); }

void EpollImpl::Callback(DescriptorType fd, EventCb cb) {
AE_TELE_DEBUG(kEpollAddDescriptor, "Poller callback for fd:{}", fd);
event_map_.emplace(fd, EventHandler{.cb = std::move(cb), .events = {}});
}

void EpollImpl::Event(DescriptorType fd, EventType events) {
AE_TELED_DEBUG("Poller event for fd:{} events: {}", fd,
static_cast<std::uint8_t>(events));
auto it = event_map_.find(fd);
if (it == event_map_.end()) {
assert(false && "Callback should setup first");
return;
}

struct epoll_event epoll_event;
epoll_event.events = epoll_poller_internal::PollerEventsToEpol(event);
epoll_event.events = epoll_poller_internal::PollerEventsToEpol(events);
// watch only edge triggered events
epoll_event.events |= EPOLLET;
epoll_event.data.fd = fd;

auto lock = std::scoped_lock{poller_mutex_};
auto [_, new_event] = event_map_.insert_or_assign(fd, std::move(cb));
int op = new_event ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
int op = (it->second.events == EventType{}) ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
auto res = epoll_ctl(epoll_fd_, op, fd, &epoll_event);
if (res < 0) {
AE_TELE_ERROR(kEpollAddFailed, "Failed to add to epoll {} {}", errno,
strerror(errno));
assert(false);
}
it->second.events = events;
}

void EpollImpl::Remove(DescriptorType fd) {
AE_TELE_DEBUG(kEpollRemoveDescriptor, "Remove poller event {}", fd);

auto lock = std::scoped_lock{poller_mutex_};
event_map_.erase(fd);
auto it = event_map_.find(fd);
if (it == event_map_.end()) {
// nothing to remove
return;
}

struct epoll_event epoll_event{};
auto res = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, &epoll_event);
if (res < 0) {
if (errno != ENOENT) {
AE_TELE_ERROR(kEpollRemoveFailed, "Failed to remove from epoll {} {}",
errno, strerror(errno));
assert(false);
if (it->second.events != EventType{}) {
// if events not empty, remove from epol_ctl also
struct epoll_event epoll_event{};
auto res = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, &epoll_event);
if (res < 0) {
if (errno != ENOENT) {
AE_TELE_ERROR(kEpollRemoveFailed, "Failed to remove from epoll {} {}",
errno, strerror(errno));
assert(false);
}
}
}

event_map_.erase(fd);
}

int EpollImpl::MakeEventFd() {
Expand All @@ -161,7 +185,7 @@ int EpollImpl::InitEpoll() {
}

void EpollImpl::Loop() {
static constexpr auto kMaxEvents = 10;
static constexpr std::size_t kMaxEvents = 10;
std::array<struct epoll_event, kMaxEvents> events;

while (!stop_requested_) {
Expand Down Expand Up @@ -190,7 +214,7 @@ void EpollImpl::Loop() {
continue;
}
auto ev_type = epoll_poller_internal::EpollEventToEventType(event.events);
poller_event->second(ev_type);
poller_event->second.cb(fd, ev_type);
}
}
}
Expand All @@ -199,11 +223,11 @@ EpollPoller::EpollPoller() = default;

EpollPoller::EpollPoller(ObjProp prop) : IPoller{prop} {}

NativePoller* EpollPoller::Native() {
std::shared_ptr<NativePoller> EpollPoller::Native() {
if (!impl_) {
impl_.emplace();
impl_ = std::make_shared<EpollImpl>();
}
return static_cast<NativePoller*>(&*impl_);
return impl_;
}

} // namespace ae
Expand Down
21 changes: 15 additions & 6 deletions aether/poller/epoll_poller.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,30 @@
# include <mutex>
# include <thread>
# include <atomic>
# include <optional>
# include <memory>

# include "aether/poller/poller.h"
# include "aether/poller/unix_poller.h"

namespace ae {
class EpollImpl final : public UnixPollerImpl {
struct EventHandler {
EventCb cb;
EventType events;
};

public:
EpollImpl();
~EpollImpl() override;

void Event(DescriptorType fd, EventType event, EventCb cb) override;
private:
void lock() override;
void unlock() override;

void Callback(DescriptorType fd, EventCb cb) override;
void Event(DescriptorType fd, EventType events) override;
void Remove(DescriptorType fd) override;

private:
static int InitEpoll();
static int MakeEventFd();
void EmptyWakeUpPipe(EventType event);
Expand All @@ -46,7 +55,7 @@ class EpollImpl final : public UnixPollerImpl {
int epoll_fd_;
int event_fd_;
std::recursive_mutex poller_mutex_;
std::map<DescriptorType, EventCb> event_map_;
std::map<DescriptorType, EventHandler> event_map_;

std::atomic_bool stop_requested_{false};
std::thread thread_;
Expand All @@ -62,10 +71,10 @@ class EpollPoller : public IPoller {

AE_OBJECT_REFLECT()

NativePoller* Native() override;
std::shared_ptr<NativePoller> Native() override;

private:
std::optional<EpollImpl> impl_;
std::shared_ptr<EpollImpl> impl_;
};

} // namespace ae
Expand Down
45 changes: 39 additions & 6 deletions aether/poller/freertos_poller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,12 @@ FreeRtosLwipPollerImpl::~FreeRtosLwipPollerImpl() {
AE_TELE_DEBUG(kFreertosWorkerDestroyed, "Poll worker has been destroyed");
}

void FreeRtosLwipPollerImpl::lock() { ctl_mutex_.lock(); }

void FreeRtosLwipPollerImpl::unlock() { ctl_mutex_.unlock(); }

void FreeRtosLwipPollerImpl::Event(DescriptorType fd, EventType event_type,
EventCb cb) {
auto lock = std::scoped_lock{ctl_mutex_};
AE_TELE_DEBUG(kFreertosAddDescriptor, "Added descriptor {} event {}", fd,
event_type);
event_map_.insert_or_assign(fd, PollEvent{event_type, std::move(cb)});
Expand All @@ -147,7 +150,6 @@ void FreeRtosLwipPollerImpl::Event(DescriptorType fd, EventType event_type,
}

void FreeRtosLwipPollerImpl::Remove(DescriptorType fd) {
auto lock = std::scoped_lock{ctl_mutex_};
event_map_.erase(fd);
freertos_poller_internal::WritePipe(wake_up_pipe_);
AE_TELE_DEBUG(kFreertosRemoveDescriptor, "Removed descriptor {}", fd);
Expand Down Expand Up @@ -190,7 +192,7 @@ void FreeRtosLwipPollerImpl::Loop() {
continue;
}
poll_event->second.cb(
freertos_poller_internal::FromEpollEvent(v.revents));
v.fd, freertos_poller_internal::FromEpollEvent(v.revents));
}
}
}
Expand Down Expand Up @@ -242,15 +244,46 @@ std::vector<pollfd> FreeRtosLwipPollerImpl::FillFdsVector() {
return fds;
}

FreeRtosPolledFd::FreeRtosPolledFd(DescriptorType fd,
std::shared_ptr<NativePoller> const& poller)
: fd_{fd},
poller_{std::static_pointer_cast<FreeRtosLwipPollerImpl>(poller)} {}

FreeRtosPolledFd::~FreeRtosPolledFd() {
if (fd_ != kInvalidDescriptor) {
auto lock = std::scoped_lock(*poller_);
poller_->Remove(fd_);
}
}

void FreeRtosPolledFd::Event(EventType event_type,
FreeRtosLwipPollerImpl::EventCb event_cb) {
auto lock = std::scoped_lock(*poller_);
poller_->Event(fd_, event_type, std::move(event_cb));
}

FreeRtosPolledFd::Fd FreeRtosPolledFd::fd() const noexcept {
return Fd{std::unique_lock{*poller_}, fd_};
}

FreeRtosPolledFd::Fd FreeRtosPolledFd::Remove() noexcept {
auto fd = Fd{std::unique_lock{*poller_}, fd_};
if (fd_ != kInvalidDescriptor) {
poller_->Remove(fd_);
fd_ = kInvalidDescriptor;
}
return fd;
}

FreertosPoller::FreertosPoller() = default;

FreertosPoller::FreertosPoller(ObjProp prop) : IPoller{prop} {}

NativePoller* FreertosPoller::Native() {
std::shared_ptr<NativePoller> FreertosPoller::Native() {
if (!impl_) {
impl_.emplace();
impl_ = std::make_shared<FreeRtosLwipPollerImpl>();
}
return static_cast<NativePoller*>(&*impl_);
return impl_;
}

} // namespace ae
Expand Down
44 changes: 38 additions & 6 deletions aether/poller/freertos_poller.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
# include <map>
# include <mutex>
# include <atomic>
# include <optional>
# include <memory>

# include "aether/poller/poller.h"
# include "aether/poller/poller_types.h"
Expand All @@ -38,17 +38,20 @@ class FreeRtosLwipPollerImpl : public NativePoller {
friend void vTaskFunction(void* pvParameters);

public:
using EventCb = SmallFunction<void(EventType event)>;
using EventCb = SmallFunction<void(DescriptorType fd, EventType event)>;

struct PollEvent {
EventType event_type;
EventCb cb;
};

FreeRtosLwipPollerImpl();
~FreeRtosLwipPollerImpl();
~FreeRtosLwipPollerImpl() override;

void Event(DescriptorType fd, EventType event_type, EventCb cb);
void lock();
void unlock();

void Event(DescriptorType fd, EventType event_type, EventCb event_cb);
void Remove(DescriptorType fd);

private:
Expand All @@ -62,6 +65,35 @@ class FreeRtosLwipPollerImpl : public NativePoller {
std::recursive_mutex ctl_mutex_;
};

class FreeRtosPolledFd {
public:
class Fd {
public:
Fd(std::unique_lock<FreeRtosLwipPollerImpl>&& lock,
DescriptorType fd) noexcept
: lock_{std::move(lock)}, fd_{fd} {}

DescriptorType operator*() const noexcept { return fd_; }

private:
std::unique_lock<FreeRtosLwipPollerImpl> lock_;
DescriptorType fd_;
};

FreeRtosPolledFd(DescriptorType fd,
std::shared_ptr<NativePoller> const& poller);

~FreeRtosPolledFd();

void Event(EventType event_type, FreeRtosLwipPollerImpl::EventCb event_cb);
Fd fd() const noexcept;
Fd Remove() noexcept;

private:
DescriptorType fd_;
mutable std::shared_ptr<FreeRtosLwipPollerImpl> poller_;
};

class FreertosPoller : public IPoller {
AE_OBJECT(FreertosPoller, IPoller, 0)

Expand All @@ -72,10 +104,10 @@ class FreertosPoller : public IPoller {

AE_OBJECT_REFLECT()

NativePoller* Native() override;
std::shared_ptr<NativePoller> Native() override;

private:
std::optional<FreeRtosLwipPollerImpl> impl_;
std::shared_ptr<FreeRtosLwipPollerImpl> impl_;
};

} // namespace ae
Expand Down
Loading
Loading