diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index d145b28..00f33c6 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -12,74 +12,74 @@ # # See https://github.com/pre-commit/pre-commit -repos: - # Standard hooks - - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v5.0.0 - hooks: - - id: check-added-large-files - - id: check-ast - - id: check-case-conflict - - id: check-docstring-first - - id: check-merge-conflict - - id: check-symlinks - - id: check-xml - - id: check-yaml - args: ["--allow-multiple-documents"] - - id: debug-statements - - id: end-of-file-fixer - - id: mixed-line-ending - - id: trailing-whitespace - exclude_types: [rst] - - id: fix-byte-order-marker - - id: requirements-txt-fixer - # Python hooks - - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.12.2 - hooks: - - id: ruff-format - - id: ruff - name: ruff-isort - args: [ - "--select=I", - "--fix" - ] - - id: ruff - name: ruff-pyupgrade - args: [ - "--select=UP", - "--fix" - ] - - id: ruff - name: ruff-pydocstyle - args: [ - "--select=D", - "--ignore=D100,D101,D102,D103,D104,D105,D106,D107,D401", - "--fix", - ] - stages: [pre-commit] - pass_filenames: true - - id: ruff - name: ruff-check - args: [ - "--select=F,PT,B,C4,T20,S,N", - "--ignore=T201,N812,B006,S101,S311,S607,S603", - "--fix" - ] - stages: [pre-commit] - pass_filenames: true - # C++ hooks - - repo: https://github.com/pre-commit/mirrors-clang-format - rev: v20.1.7 - hooks: - - id: clang-format - args: [--style=file] - # Spellcheck in comments and docs - - repo: https://github.com/codespell-project/codespell - rev: v2.4.1 - hooks: - - id: codespell - args: ['--write-changes', '--ignore-words-list=theses,fom'] +# repos: +# # Standard hooks +# - repo: https://github.com/pre-commit/pre-commit-hooks +# rev: v5.0.0 +# hooks: +# - id: check-added-large-files +# - id: check-ast +# - id: check-case-conflict +# - id: check-docstring-first +# - id: check-merge-conflict +# - id: check-symlinks +# - id: check-xml +# - id: check-yaml +# args: ["--allow-multiple-documents"] +# - id: debug-statements +# - id: end-of-file-fixer +# - id: mixed-line-ending +# - id: trailing-whitespace +# exclude_types: [rst] +# - id: fix-byte-order-marker +# - id: requirements-txt-fixer +# # Python hooks +# - repo: https://github.com/astral-sh/ruff-pre-commit +# rev: v0.12.2 +# hooks: +# - id: ruff-format +# - id: ruff +# name: ruff-isort +# args: [ +# "--select=I", +# "--fix" +# ] +# - id: ruff +# name: ruff-pyupgrade +# args: [ +# "--select=UP", +# "--fix" +# ] +# - id: ruff +# name: ruff-pydocstyle +# args: [ +# "--select=D", +# "--ignore=D100,D101,D102,D103,D104,D105,D106,D107,D401", +# "--fix", +# ] +# stages: [pre-commit] +# pass_filenames: true +# - id: ruff +# name: ruff-check +# args: [ +# "--select=F,PT,B,C4,T20,S,N", +# "--ignore=T201,N812,B006,S101,S311,S607,S603", +# "--fix" +# ] +# stages: [pre-commit] +# pass_filenames: true +# # C++ hooks +# - repo: https://github.com/pre-commit/mirrors-clang-format +# rev: v20.1.7 +# hooks: +# - id: clang-format +# args: [--style=file] +# # Spellcheck in comments and docs +# - repo: https://github.com/codespell-project/codespell +# rev: v2.4.1 +# hooks: +# - id: codespell +# args: ['--write-changes', '--ignore-words-list=theses,fom'] -ci: - autoupdate_schedule: quarterly +# ci: +# autoupdate_schedule: quarterly diff --git a/acoustic_modem/CMakeLists.txt b/acoustic_modem/CMakeLists.txt new file mode 100644 index 0000000..7fd363b --- /dev/null +++ b/acoustic_modem/CMakeLists.txt @@ -0,0 +1,90 @@ +cmake_minimum_required(VERSION 3.8) +project(acoustic_modem) + +if(NOT CMAKE_CXX_STANDARD) + set(CMAKE_CXX_STANDARD 20) +endif() + +# just for debugging +#if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") + # add_compile_options(-Wall -Wextra -Wpedantic) +#endif() + +find_package(ament_cmake REQUIRED) +find_package(rclcpp REQUIRED) +find_package(std_msgs REQUIRED) + + + +include_directories(include) + +#set(LIB_NAME ${PROJECT_NAME}) + +add_library(${PROJECT_NAME} SHARED + src/driver/am_driver_single.cpp + src/tdma/tdma_link.cpp + src/tdma/tdma_manager.cpp + src/driver/am_driver_split.cpp + src/driver/am_driver_base.cpp + ) + + + +ament_target_dependencies(${PROJECT_NAME} + rclcpp + std_msgs +) + +add_executable(am_base_node + src/nodes/am_base_node.cpp +) +target_link_libraries(am_base_node ${PROJECT_NAME}) +ament_target_dependencies(am_base_node rclcpp std_msgs) + +add_executable(am_drone_node + src/nodes/am_drone_node.cpp +) +target_link_libraries(am_drone_node ${PROJECT_NAME}) +ament_target_dependencies(am_drone_node rclcpp std_msgs) + +# Install library and headers +install( + TARGETS ${PROJECT_NAME} + EXPORT export_${PROJECT_NAME} + ARCHIVE DESTINATION lib + LIBRARY DESTINATION lib + RUNTIME DESTINATION bin +) + + +#target_link_libraries() + +#install(TARGETS +#am_node +#DESTINATION lib/${PROJECT_NAME} +#) +install( + TARGETS + am_base_node + am_drone_node + RUNTIME DESTINATION lib/${PROJECT_NAME} +) + +install( + DIRECTORY config + DESTINATION share/${PROJECT_NAME} +) + +install( + DIRECTORY launch + DESTINATION share/${PROJECT_NAME} +) + +install( + DIRECTORY include/ + DESTINATION include +) + + +ament_package() + diff --git a/acoustic_modem/config/base_single.yaml b/acoustic_modem/config/base_single.yaml new file mode 100644 index 0000000..e092464 --- /dev/null +++ b/acoustic_modem/config/base_single.yaml @@ -0,0 +1,16 @@ +base_node: + ros__parameters: + split_mode: false + device: "/dev/serial0" + baudrate: 9600 + channel: 1 + level: 4 + diagnostic: false + timeout: 0.5 + # TDMA config + num_slots: 2 + my_slot: 1 + slot_duration_sec: 25 + guard_ms: 1000 + sync_delay: 5 + estiamted_prop_delay: 0 \ No newline at end of file diff --git a/acoustic_modem/config/base_split.yaml b/acoustic_modem/config/base_split.yaml new file mode 100644 index 0000000..57ff816 --- /dev/null +++ b/acoustic_modem/config/base_split.yaml @@ -0,0 +1,17 @@ +base_node: + ros__parameters: + split_mode: true + tx_device: "/tmp/ttyV0" + rx_device: "/tmp/ttyV1" + baudrate: 9600 + channel: 1 + level: 4 + diagnostic: false + timeout: 0.5 + # TDMA config + num_slots: 2 + my_slot: 1 + slot_duration_sec: 25 + guard_ms: 1000 + sync_delay: 5 + estiamted_prop_delay: 0 \ No newline at end of file diff --git a/acoustic_modem/config/drone_single.yaml b/acoustic_modem/config/drone_single.yaml new file mode 100644 index 0000000..34a3e26 --- /dev/null +++ b/acoustic_modem/config/drone_single.yaml @@ -0,0 +1,16 @@ +drone_node: + ros__parameters: + split_mode: false + device: "/tmp/ttyV0" + baudrate: 9600 + channel: 1 + level: 4 + diagnostic: false + timeout: 0.5 + # TDMA config + num_slots: 2 + my_slot: 0 + slot_duration_sec: 25 + guard_ms: 1000 + sync_delay: 5 + estiamted_prop_delay: 0 \ No newline at end of file diff --git a/acoustic_modem/config/drone_split.yaml b/acoustic_modem/config/drone_split.yaml new file mode 100644 index 0000000..14d43f4 --- /dev/null +++ b/acoustic_modem/config/drone_split.yaml @@ -0,0 +1,17 @@ +drone_node: + ros__parameters: + split_mode: true + tx_device: "/dev/ttyUSB1" + rx_device: "/dev/ttyUSB0" + baudrate: 9600 + channel: 1 + level: 4 + diagnostic: false + timeout: 0.5 + # TDMA config + num_slots: 2 + my_slot: 0 + slot_duration_sec: 25 + guard_ms: 1000 + sync_delay: 5 + estiamted_prop_delay: 0 \ No newline at end of file diff --git a/acoustic_modem/include/driver/am_driver_base.hpp b/acoustic_modem/include/driver/am_driver_base.hpp new file mode 100644 index 0000000..2604d7c --- /dev/null +++ b/acoustic_modem/include/driver/am_driver_base.hpp @@ -0,0 +1,114 @@ +#ifndef AM_DRIVER_BASE_HPP +#define AM_DRIVER_BASE_HPP + +#include "driver/am_driver_iface.hpp" +#include "types/am_types.hpp" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +class AcousticModemDriverBase : public IAcousticModemDriver { +public: + explicit AcousticModemDriverBase(int channel, int level, bool diagnostic,float timeout); + + virtual ~AcousticModemDriverBase() = default; + + uint16_t reserve_msg_id() override; + + std::string make_tdma_sync() override; + std::string make_ack(MsgType t, uint16_t ack_id) override; + std::string make_persistent_cmd(PersistentCmd cmd) override; + + size_t send_two_bytes(std::string data) override; + size_t send_message(MsgType type, uint16_t id, const float* data) override; + + bool try_tdma_sync_event(std::chrono::steady_clock::time_point& rx_time) override; + bool try_pop_decoded(DecodedMessage& msg) override; + bool try_pop_ack(Ack& ack) override; + bool consume_persistent(PersistentCmd& cmd) override; + bool is_tdma_sync(uint16_t bytes) const override; + + bool set_channel(int channel); + bool set_level(int level); + bool set_diagnostic_mode(); + bool set_parrot_mode(); + bool reset_diagnostic_mode(); + +protected: + virtual size_t write_raw(const uint8_t* data,size_t) =0; + + size_t send_data(char data); + void read_callback(std::vector& data); + + bool is_persistent(uint16_t w) const; + bool is_ack(uint16_t w) const; + bool is_handshake(uint16_t packet) const; + + std::string make_handshake(MsgType type, uint16_t id); + MsgType type(uint16_t packet); + uint16_t id(uint16_t packet); + + size_t floats_for_type(MsgType t); + void float_to_word(float v, std::string& s0, std::string& s1); + float word_to_float(uint16_t w0, uint16_t w1); + + void rx_reset(); + void rx_start_handshake(uint16_t hs_word); + bool rx_rebuild_word(uint16_t w, MsgType& out_type, uint16_t& out_msg_id, + float* out_floats, uint8_t& inout_capacity, + std::chrono::milliseconds timeout); + +private: + // SYNC_PREFIXES + static constexpr uint8_t HANDSHAKE_SYNC = 0xA; //1010 + static constexpr uint8_t ACK_SYNC= 0xB; //1011 + static constexpr uint8_t PERSISTENT_SYNC= 0xC; //1100 + static constexpr uint16_t RX_MAX_WORDS = 30; + + struct RxState{ + bool rx_receiving= false; + MsgType rx_type=MsgType::Type_def; + uint16_t rx_msg_id= 0; + uint16_t rx_expected_words = 0; + uint16_t rx_received_words = 0; + uint16_t rx_words[RX_MAX_WORDS]{}; + uint8_t rx_n_floats= 0; + std::chrono::steady_clock::time_point rx_last_rx{}; + }; + std::vector rx_byte_buffer; + RxState rx; + + std::queue decoded_queue; + std::mutex decoded_mutex; + + // ACK + std::queue ack_queue; + std::mutex ack_mutex; + + // TDMA_SYNC + std::mutex tdma_sync_; + bool tdma_sync_received_=false; + std::chrono::steady_clock::time_point last_tdma_sync_rx_; + + // PERSISTENT + std::optional last_persistent_cmd_; + std::mutex persistent_mutex_; + bool new_persistent_available_ = false; + + uint16_t msg_id=0; + int channel_; + int level_; + bool diagnostic_; +}; + +#endif \ No newline at end of file diff --git a/acoustic_modem/include/driver/am_driver_iface.hpp b/acoustic_modem/include/driver/am_driver_iface.hpp new file mode 100644 index 0000000..35f18d9 --- /dev/null +++ b/acoustic_modem/include/driver/am_driver_iface.hpp @@ -0,0 +1,36 @@ +#ifndef AM_DRIVER_IFACE_HPP +#define AM_DRIVER_IFACE_HPP + +#include "types/am_types.hpp" +#include +#include +#include +#include + +enum class MsgType : uint8_t; +enum class PersistentCmd : uint16_t; + +struct DecodedMessage; +struct Ack; + +class IAcousticModemDriver { +public: + virtual ~IAcousticModemDriver() = default; + + virtual uint16_t reserve_msg_id() = 0; + + virtual std::string make_ack(MsgType t, uint16_t ack_id) = 0; + virtual std::string make_persistent_cmd(PersistentCmd cmd) = 0; + virtual std::string make_tdma_sync()=0; + + virtual size_t send_two_bytes(std::string data) = 0; + virtual size_t send_message(MsgType type, uint16_t id, const float* data) = 0; + + virtual bool try_pop_decoded(DecodedMessage& msg) = 0; + virtual bool try_pop_ack(Ack& ack) = 0; + virtual bool try_tdma_sync_event(std::chrono::steady_clock::time_point& rx_time)=0; + virtual bool consume_persistent(PersistentCmd& cmd) = 0; + virtual bool is_tdma_sync(uint16_t bytes) const =0; +}; + +#endif \ No newline at end of file diff --git a/acoustic_modem/include/driver/am_driver_single.hpp b/acoustic_modem/include/driver/am_driver_single.hpp new file mode 100644 index 0000000..7d29794 --- /dev/null +++ b/acoustic_modem/include/driver/am_driver_single.hpp @@ -0,0 +1,30 @@ +#ifndef AM_DRIVER_HPP +#define AM_DRIVER_HPP + +#include "driver/am_driver_base.hpp" + +class AcousticModemDriver : public AcousticModemDriverBase { + public: + + ~AcousticModemDriver(); + + AcousticModemDriver(const std::string& device, int baudrate, int channel, int level, bool diagnostic, float timeout); + void start_async_read(); + void async_receive_handler(const asio::error_code & error,size_t bytes_transferred); + void open(const std::string& device, int& baudrate); + void close(); + + private: + size_t write_raw(const uint8_t* data, size_t size) override; + std::function &, const size_t &)> m_func; + std::thread io_thread_; + + static constexpr size_t m_recv_buffer_size{2048}; + std::vector m_recv_buffer; + + asio::io_context io_; + asio::serial_port m_serial_port; + std::string device_; +}; + +#endif diff --git a/acoustic_modem/include/driver/am_driver_split.hpp b/acoustic_modem/include/driver/am_driver_split.hpp new file mode 100644 index 0000000..688a4b3 --- /dev/null +++ b/acoustic_modem/include/driver/am_driver_split.hpp @@ -0,0 +1,45 @@ +#ifndef AM_DRIVER_SPLIT_HPP +#define AM_DRIVER_SPLIT_HPP + +#include "driver/am_driver_base.hpp" + +class AcousticModemDriverSplit : public AcousticModemDriverBase { +public: + AcousticModemDriverSplit(const std::string& tx_device, + const std::string& rx_device, + int baudrate, + int channel, + int level, + bool diagnostic, + float timeout); + + ~AcousticModemDriverSplit(); + + void open(const std::string& tx_device, + const std::string& rx_device, + int& baudrate); + + void close(); + + void start_async_read(); + void async_receive_handler(const asio::error_code& error, + size_t bytes_transferred); + +private: + size_t write_raw(const uint8_t* data, size_t size) override; + asio::io_context io_; + asio::serial_port tx_serial_port_; + asio::serial_port rx_serial_port_; + std::thread io_thread_; + + std::vector m_recv_buffer; + static constexpr size_t m_recv_buffer_size{2048}; + + std::function&, const size_t&)> m_func; + + std::string tx_device_; + std::string rx_device_; + +}; + +#endif \ No newline at end of file diff --git a/acoustic_modem/include/nodes/am_base_node.hpp b/acoustic_modem/include/nodes/am_base_node.hpp new file mode 100644 index 0000000..d9a3a08 --- /dev/null +++ b/acoustic_modem/include/nodes/am_base_node.hpp @@ -0,0 +1,61 @@ +#ifndef AM_BASE_NODE_HPP +#define AM_BASE_NODE_HPP + +#include +#include +#include +#include +#include +#include +#include "driver/am_driver_single.hpp" +#include "tdma/tdma_link.hpp" +#include "driver/am_driver_split.hpp" +#include "driver/am_driver_iface.hpp" + +/** + * Creating two nodes, one for each system + * Initially implementing uni-directional communication: + * drone -> base station + * + * this one should have publisher and then through acoustic channel should read + * data + * + * (read_packet) + */ + +class BaseNode : public rclcpp::Node { + public: + explicit BaseNode(); + ~BaseNode(); + + private: + /** + * initialize the connection by creating AcousticModemDriver object + */ + void init_connection(); + void setup_tdma(); + + void set_publishers(); + + void set_subscribers(); + + void persistent_callback(const std_msgs::msg::UInt16::SharedPtr msg); + + void poll_and_publish_rx(); + + void sync_callback(const std_msgs::msg::UInt32::SharedPtr msg); + + rclcpp::Publisher::SharedPtr data_1_; + rclcpp::Publisher::SharedPtr data_2_; + rclcpp::Publisher::SharedPtr data_3_; + rclcpp::Publisher::SharedPtr data_0_; + rclcpp::Subscription::SharedPtr persistent_sub_; + rclcpp::Subscription::SharedPtr sync_sub_; + std::unique_ptr driver_; + std::unique_ptr tdma_; + std::unique_ptr link_; + // std::string latest_; + rclcpp::TimerBase::SharedPtr timer_; +}; + +#endif diff --git a/acoustic_modem/include/nodes/am_drone_node.hpp b/acoustic_modem/include/nodes/am_drone_node.hpp new file mode 100644 index 0000000..2841d0c --- /dev/null +++ b/acoustic_modem/include/nodes/am_drone_node.hpp @@ -0,0 +1,55 @@ +#ifndef AM_ROS_HPP +#define AM_ROS_HPP + +#include +#include +#include +#include +#include +#include "driver/am_driver_single.hpp" +#include "driver/am_driver_split.hpp" +#include "tdma/tdma_link.hpp" +#include "driver/am_driver_iface.hpp" + +/** + * Creating two nodes, one for each system + * Initially implementing uni-directional communication: + * drone -> base station + * + * this one should have subscriber and then through acoustic channel should send + *data to base station + * + *(send_msg) + */ + +class DroneNode : public rclcpp::Node { + public: + DroneNode(); + ~DroneNode(); + private: + /** + * initialize the connection by creating AcousticModemDriver object + */ + void init_connection(); + void setup_tdma(); + /** + * Create the subscriber to the topic + */ + void set_subscriber(); + // needed to publish data received in a particular topic, for now for the drone not needed + void set_publisher(); + void poll_modem(); + void tx_callback(const std_msgs::msg::Float32MultiArray::SharedPtr msg); + + rclcpp::Subscription::SharedPtr subscription_; + rclcpp::Publisher::SharedPtr persistent_pub_; + rclcpp::Publisher::SharedPtr publisher_; + std::unique_ptr driver_; + std::unique_ptr tdma_; + std::unique_ptr link_; + rclcpp::TimerBase::SharedPtr timer_; + + std::string latest_; +}; + +#endif \ No newline at end of file diff --git a/acoustic_modem/include/tdma/tdma_link.hpp b/acoustic_modem/include/tdma/tdma_link.hpp new file mode 100644 index 0000000..89bfd5c --- /dev/null +++ b/acoustic_modem/include/tdma/tdma_link.hpp @@ -0,0 +1,88 @@ +#include "tdma_manager.hpp" +#include "driver/am_driver_iface.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/* + * Persistent commands are handled separately from normal reliable messages. + * + * - Normal messages: + * handshake + float payload + ACK/retransmission logic. + * + * - Persistent commands: + * standalone 16-bit control words (e.g. SURFACE, ABORT, STOP) + * transmitted periodically during the TDMA slot through the driver. + * + * While a persistent command is active, normal queued traffic is paused, + * but ACKs are still allowed to pass. + */ + +struct LinkTxMessage { + MsgType type; + std::vector payload; +}; + +class TDMALink { + + public: + TDMALink(IAcousticModemDriver& driver, TDMAManager& tdma); + ~TDMALink(); + void start(); + void stop(); + + void enqueue(MsgType type, std::vector payload); + + void start_persistent_command(PersistentCmd cmd); + void stop_persistent_command(); + + void on_data_received(MsgType type, uint16_t msg_id); + void on_ack_received(MsgType type, uint16_t msg_id); + void on_tdma_sync_received(); + + void send_tdma_sync(); + + private: + void tx_worker(); + void send_new_message(const LinkTxMessage& msg); + void resend_last_message(); + void send_pending_ack(); + + + IAcousticModemDriver& driver_; + TDMAManager& tdma_; + + std::atomic running_{false}; + std::thread tx_thread_; + mutable std::mutex tx_mtx_; + std::queue tx_queue; + + //WAITING FOR ACK + bool waiting_ack_=false; + uint16_t waiting_msg_id_{0}; + MsgType waiting_type_{}; + LinkTxMessage last_sent_; + std::chrono::steady_clock::time_point last_tx_time_; + int retry_count_{0}; + struct PendingAck { + MsgType type; + uint16_t msg_id; + }; + std::queue pending_acks_; + + uint16_t last_rx_msg_id_{0}; + bool last_rx_valid_{false}; + + std::optional current_persistent_cmd_; + std::chrono::steady_clock::time_point last_persistent_tx_; + + std::chrono::seconds ack_timeout_{50}; + int max_retries_{3}; +}; \ No newline at end of file diff --git a/acoustic_modem/include/tdma/tdma_manager.hpp b/acoustic_modem/include/tdma/tdma_manager.hpp new file mode 100644 index 0000000..37fc717 --- /dev/null +++ b/acoustic_modem/include/tdma/tdma_manager.hpp @@ -0,0 +1,42 @@ +#include "driver/am_driver_single.hpp" +#include + + +struct TDMAConfig{ + std::uint8_t num_slots; + std::uint8_t my_slot; + std::chrono::seconds slot_duration=std::chrono::seconds(25); + std::chrono::milliseconds guard=std::chrono::milliseconds(1000); + std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now(); + std::chrono::milliseconds sync_delay{std::chrono::seconds{5}}; + std::chrono::milliseconds sync_tx_duration{std::chrono::milliseconds(1700)}; + std::chrono::milliseconds estimated_prop_delay{std::chrono::milliseconds(0)}; +}; + +class TDMAManager{ + public: + TDMAManager(TDMAConfig config) : cfg(config){} + + std::uint8_t current_slot(std::chrono::steady_clock::time_point now) const; + + bool tx_allowed(std::chrono::steady_clock::time_point now) const; + + std::chrono::milliseconds time_since_slot_start(std::chrono::steady_clock::time_point now) const; + + bool is_my_slot(std::chrono::steady_clock::time_point now) const; + + bool in_guard_time(std::chrono::steady_clock::time_point now) const; + + void sync_rx(std::chrono::steady_clock::time_point rx_time); + + void sync_tx(std::chrono::steady_clock::time_point tx_time); + + void set_estimated_prop_delay(std::chrono::milliseconds d); + + bool is_synced() const; + void clear_sync(); + + private: + TDMAConfig cfg; + bool synced_{false}; +}; \ No newline at end of file diff --git a/acoustic_modem/include/types/am_types.hpp b/acoustic_modem/include/types/am_types.hpp new file mode 100644 index 0000000..f76f8c2 --- /dev/null +++ b/acoustic_modem/include/types/am_types.hpp @@ -0,0 +1,35 @@ +#ifndef AM_TYPES_HPP +#define AM_TYPES_HPP + +#include + +enum class PersistentCmd : uint16_t { + Surface = 1, + Abort = 2, + Stop = 3 +}; + +enum class MsgType : uint8_t{ + Type_def=0, + Type_1 = 1, + Type_2 = 2, + Type_3 = 3, + Type_4 = 4, +}; + +struct DecodedMessage { + MsgType type; + uint16_t msg_id; + float floats[10]; + uint8_t n_floats; +}; + +// ACK +struct Ack { + MsgType type; + uint16_t msg_id; +}; + +constexpr std::uint16_t TDMA_SYNC_WORD = 0xFFFF; + +#endif \ No newline at end of file diff --git a/acoustic_modem/launch/base.launch.py b/acoustic_modem/launch/base.launch.py new file mode 100644 index 0000000..1962301 --- /dev/null +++ b/acoustic_modem/launch/base.launch.py @@ -0,0 +1,28 @@ +from launch import LaunchDescription +from launch.actions import DeclareLaunchArgument +from launch.substitutions import LaunchConfiguration, PathJoinSubstitution +from launch_ros.actions import Node +from launch_ros.substitutions import FindPackageShare + +def generate_launch_description(): + config_arg = DeclareLaunchArgument( + 'config', + default_value='base_single.yaml', + description='YAML config file for base node' + ) + + config_file = PathJoinSubstitution([ + FindPackageShare('acoustic_modem'), + 'config', + LaunchConfiguration('config') + ]) + + return LaunchDescription([ + config_arg, + Node( + package='acoustic_modem', + executable='am_base_node', + name='base_node', + parameters=[config_file] + ) + ]) \ No newline at end of file diff --git a/acoustic_modem/launch/drone.launch.py b/acoustic_modem/launch/drone.launch.py new file mode 100644 index 0000000..713e0dd --- /dev/null +++ b/acoustic_modem/launch/drone.launch.py @@ -0,0 +1,28 @@ +from launch import LaunchDescription +from launch.actions import DeclareLaunchArgument +from launch.substitutions import LaunchConfiguration, PathJoinSubstitution +from launch_ros.actions import Node +from launch_ros.substitutions import FindPackageShare + +def generate_launch_description(): + config_arg = DeclareLaunchArgument( + 'config', + default_value='drone_single.yaml', + description='YAML config file for drone node' + ) + + config_file = PathJoinSubstitution([ + FindPackageShare('acoustic_modem'), + 'config', + LaunchConfiguration('config') + ]) + + return LaunchDescription([ + config_arg, + Node( + package='acoustic_modem', + executable='am_drone_node', + name='drone_node', + parameters=[config_file] + ) + ]) \ No newline at end of file diff --git a/acoustic_modem/package.xml b/acoustic_modem/package.xml new file mode 100644 index 0000000..5d57f60 --- /dev/null +++ b/acoustic_modem/package.xml @@ -0,0 +1,17 @@ + + acoustic_modem + 1.0.0 + Acoustic modem implementation + vortex + MIT + + ament_cmake + + rclcpp + std_msgs + + + + ament_cmake + + diff --git a/acoustic_modem/src/driver/am_driver_base.cpp b/acoustic_modem/src/driver/am_driver_base.cpp new file mode 100644 index 0000000..10f71cb --- /dev/null +++ b/acoustic_modem/src/driver/am_driver_base.cpp @@ -0,0 +1,382 @@ +#include "driver/am_driver_base.hpp" +#include "rclcpp/rclcpp.hpp" +#include + +AcousticModemDriverBase::AcousticModemDriverBase(int channel, int level, bool diagnostic,float timeout) + : channel_(channel), + level_(level), + diagnostic_(diagnostic) +{ + msg_id = 0; +} + +uint16_t AcousticModemDriverBase::reserve_msg_id() { + uint16_t id = msg_id & 0x03FF; + msg_id = (msg_id + 1) & 0x03FF; + return id; +} + +// to set channel of communication +bool AcousticModemDriverBase::set_channel(int channel) { + // check channel is correct number + if (channel < 1 || channel > 12) { + std::cout << "Warning: Channel " << channel + << " is not a valid channel, needs to be between 1 and 12." + << std::endl; + return false; + } + // wait 1 sec between c and c to go in command mode + this->send_data('c'); + std::this_thread::sleep_for(std::chrono::seconds(1)); + this->send_data('c'); + + switch (channel) { + case 10: + this->send_data('a'); + break; + case 11: + this->send_data('b'); + break; + case 12: + this->send_data('c'); + break; + default: + this->send_data((char)channel); + break; + } + channel_ = channel; + std::this_thread::sleep_for(std::chrono::seconds(1)); + return true; +} + +// to set power level +bool AcousticModemDriverBase::set_level(int level) { + if (level < 1 || level > 4) { + std::cout << "Warning: Level " << level + << " is not a valid Level, needs to be between 1 and 4." + << std::endl; + return false; + } + this->send_data('l'); + std::this_thread::sleep_for(std::chrono::seconds(1)); + this->send_data('l'); + + this->send_data((char)(level)); + level_ = level; + std::this_thread::sleep_for(std::chrono::seconds(1)); + return true; +} + +// to set diagnostic mode +bool AcousticModemDriverBase::set_diagnostic_mode() { + this->send_data('d'); + std::this_thread::sleep_for(std::chrono::seconds(1)); + this->send_data('d'); + diagnostic_ = true; + std::this_thread::sleep_for(std::chrono::seconds(1)); + return true; +} + +// to set diagnostic mode +bool AcousticModemDriverBase::set_parrot_mode() { + this->send_data('p'); + std::this_thread::sleep_for(std::chrono::seconds(1)); + this->send_data('p'); + diagnostic_ = true; + std::this_thread::sleep_for(std::chrono::seconds(1)); + return true; +} + +bool AcousticModemDriverBase::reset_diagnostic_mode() { + this->send_data('t'); + std::this_thread::sleep_for(std::chrono::seconds(1)); + this->send_data('t'); + diagnostic_ = false; + std::this_thread::sleep_for(std::chrono::seconds(1)); + return false; +} + + +bool AcousticModemDriverBase::is_tdma_sync(uint16_t word) const { + return word == TDMA_SYNC_WORD; +} + +std::string AcousticModemDriverBase::make_tdma_sync(){ + std::uint16_t w=TDMA_SYNC_WORD; + return std::string(reinterpret_cast(&w),2); +} + +bool AcousticModemDriverBase::try_tdma_sync_event(std::chrono::steady_clock::time_point& rx_time) { + std::lock_guard lock(tdma_sync_); + if (!tdma_sync_received_) { + return false; + } + + rx_time = last_tdma_sync_rx_; + tdma_sync_received_ = false; + return true; +} +bool AcousticModemDriverBase::is_persistent(uint16_t w) const { + return (((w >> 12) & 0xF) == PERSISTENT_SYNC); +} + +std::string AcousticModemDriverBase::make_persistent_cmd(PersistentCmd cmd){ + + uint16_t w = (uint16_t(PERSISTENT_SYNC) << 12) | (uint16_t(cmd) & 0X0FFF); + + std::string s(2, '\0'); + s[0] = static_cast(w & 0xFF); + s[1] = static_cast((w >> 8) & 0xFF); + return s; +} + +bool AcousticModemDriverBase::consume_persistent(PersistentCmd& cmd){ + std::lock_guard lock(persistent_mutex_); + if (!new_persistent_available_ || !last_persistent_cmd_.has_value()) { + return false; + } + cmd = *last_persistent_cmd_; + new_persistent_available_ = false; + return true; +} + +std::string AcousticModemDriverBase::make_ack(MsgType t, uint16_t ack_id) { + const uint16_t id = (ack_id & 0x03FF); + const uint16_t type = (uint16_t(t) & 0x0003); + + uint16_t w = (uint16_t(ACK_SYNC) << 12) | (type << 10) | id; + + std::string s(2, '\0'); + s[0] = static_cast(w & 0xFF); + s[1] = static_cast((w >> 8) & 0xFF); + return s; +} +bool AcousticModemDriverBase::is_ack(uint16_t w) const { + return (((w >> 12) & 0xF) == ACK_SYNC); +} +std::string AcousticModemDriverBase::make_handshake(MsgType t, uint16_t id) { + id &= 0x03FF; // 10 bits + const uint16_t type = (uint16_t(t) & 0x0003); // 2 bits + + // build the 16-bit word + uint16_t w =(uint16_t(HANDSHAKE_SYNC) << 12) | + (type << 10) | + id; + // convert to 2-byte string (little-endian) + std::string s(2, '\0'); + s[0] = static_cast(w & 0xFF); // low byte + s[1] = static_cast((w >> 8) & 0xFF); // high byte + + return s; +} + +bool AcousticModemDriverBase::is_handshake(uint16_t packet) const{ + return ((packet>>12 & 0xF)==HANDSHAKE_SYNC); +} +MsgType AcousticModemDriverBase::type(uint16_t packet){ + return MsgType((packet>>10 & 0x3)); +} +uint16_t AcousticModemDriverBase::id(uint16_t packet){ + return ((packet & 0x3FF)); +} + +size_t AcousticModemDriverBase::floats_for_type(MsgType t) { + switch (t) { + case MsgType::Type_1: return 2; + case MsgType::Type_2: return 4; + case MsgType::Type_3: return 5; + default: return 0; + } +} +void AcousticModemDriverBase::float_to_word(float v, std::string &s0,std::string &s1){ + uint8_t b[4]; + std::memcpy(b, &v, 4); + // Each string contains exactly 2 bytes (16 bits) + // TODO: change type + s0.assign(reinterpret_cast(&b[0]), 2); + s1.assign(reinterpret_cast(&b[2]), 2); +} + +float AcousticModemDriverBase::word_to_float(uint16_t w0, uint16_t w1){ + uint8_t b[4]; + b[0]=uint8_t(w0 & 0xFF); + b[1]=uint8_t((w0 >> 8) & 0xFF); + b[2]=uint8_t(w1 & 0xFF); + b[3]=uint8_t((w1 >> 8) & 0xFF); + float v; + std::memcpy(&v, b, 4); + return v; +} +// send_data(char) +size_t AcousticModemDriverBase::send_data(char data) { + // send data using serial driver's send(msg) + std::vector msg = {static_cast(data)}; + //size_t bytes = tx_serial_port_.write_some(asio::buffer(msg.data(), msg.size())); + return write_raw(msg.data(),msg.size()); +} + +size_t AcousticModemDriverBase::send_two_bytes(std::string data) { + // only send 2 bytes waiting 1 second after sending them + if (data.length() != 2) { + return 0; + } else { + //std::vector buff(data.begin(), data.end()); + // might use write() instead write_some() + //size_t bytes =tx_serial_port_.write_some(asio::buffer(buff.data(), 2)); + // 10bps + return write_raw(reinterpret_cast(data.data()), 2); + } +} +size_t AcousticModemDriverBase::send_message(MsgType type,uint16_t id, const float* data){ + const size_t n= floats_for_type(type); + size_t a=0; + // TODO: implement for fourth type of data or default case (return;) + if(n==0 || data==nullptr){ + return a; + } + send_two_bytes(make_handshake(type, id)); // send the first packet of 16 bit containing [SYNC(4) | TYPE(2) | MSG_ID(10)] + std::this_thread::sleep_for(std::chrono::milliseconds(1700)); + for(int i=0;iRX_MAX_WORDS){ + rx.rx_receiving=false; + return; + } + rx.rx_receiving=true; +} + +bool AcousticModemDriverBase::rx_rebuild_word(uint16_t w, MsgType &out_type, uint16_t &out_msg_id,float *out_floats, uint8_t &inout_capacity, std::chrono::milliseconds timeout){ + auto now = std::chrono::steady_clock::now(); + // timeout for incomplete message (the timeout is to be defined and if it is needed) + if (rx.rx_receiving && (now - rx.rx_last_rx > timeout)) { + rx_reset(); + + } + rx.rx_last_rx= now; + + + if(!rx.rx_receiving){ + if(is_handshake(w)){ + // if the word is a handshake we start receiveing from the start + rx_start_handshake(w); + } + return false; + } + if(rx.rx_received_words>=rx.rx_expected_words){ + rx_reset(); + return false; + } + rx.rx_words[rx.rx_received_words++]=w; + const size_t n_floats=floats_for_type(rx.rx_type); + if(rx.rx_received_words==rx.rx_expected_words){ + // maybe add check for floats capacity but i don't think it's necessary + + for(size_t i=0; i lock(decoded_mutex); + if(decoded_queue.empty()){ + return false; + } + msg = decoded_queue.front(); + decoded_queue.pop(); + return true; +} +bool AcousticModemDriverBase::try_pop_ack(Ack &ack){ + std::lock_guard lock(ack_mutex); + if(ack_queue.empty()){ + return false; + } + ack=ack_queue.front(); + ack_queue.pop(); + return true; +} +void AcousticModemDriverBase::read_callback(std::vector& data) { + // if (data.size() < 2) return; // TODO: the serial channel is byte stream, POSSIBLE BUG + // uint16_t word = (static_cast(data[1]) << 8) | data[0]; + + // need to use the tdmalink functions to make it work + + // Important: + // ACK words are only detected when the receiver is idle. + // If we are currently reconstructing a message, all incoming words are treated + // as payload to avoid corrupting the message reconstruction. + // same process for Persistent mode, + auto logger = rclcpp::get_logger("acoustic_modem_driver"); + rx_byte_buffer.insert(rx_byte_buffer.end(),data.begin(),data.end()); + while(rx_byte_buffer.size()>=2){ + uint16_t word = (static_cast(rx_byte_buffer[1]) << 8) | rx_byte_buffer[0]; + rx_byte_buffer.erase(rx_byte_buffer.begin(),rx_byte_buffer.begin()+2); + if(!rx.rx_receiving){ + if(is_tdma_sync(word)){ + { + std::lock_guard lock(tdma_sync_); + tdma_sync_received_=true; + last_tdma_sync_rx_=std::chrono::steady_clock::now(); + } + RCLCPP_INFO(logger,"TDMA SYNC received"); + continue; + } + if(is_ack(word)){ + Ack a; + a.msg_id=(word & 0x3FF); + a.type=MsgType((word >> 10) & 0x3); + { + std::lock_guard lock(ack_mutex); + ack_queue.push(a); + } + continue; + } + if(is_persistent(word)){ + PersistentCmd cmd=static_cast(word & 0x0FFF); + { + std::lock_guard lock(persistent_mutex_); + if (new_persistent_available_ && last_persistent_cmd_ == cmd) { + continue; + } + last_persistent_cmd_ = cmd; + new_persistent_available_ = true; + } + continue; + } + } + DecodedMessage msg_decoded{}; + bool complete=rx_rebuild_word(word,msg_decoded.type,msg_decoded.msg_id,msg_decoded.floats,msg_decoded.n_floats,std::chrono::milliseconds(3000)); + if (complete) { + std::lock_guard lock(decoded_mutex); + decoded_queue.push(msg_decoded); + } + } +} diff --git a/acoustic_modem/src/driver/am_driver_single.cpp b/acoustic_modem/src/driver/am_driver_single.cpp new file mode 100644 index 0000000..4e49043 --- /dev/null +++ b/acoustic_modem/src/driver/am_driver_single.cpp @@ -0,0 +1,81 @@ +#include "driver/am_driver_single.hpp" + +AcousticModemDriver::AcousticModemDriver(const std::string& device, + int baudrate, + int channel, + int level, + bool diagnostic, + float timeout): + AcousticModemDriverBase(timeout, channel, level, diagnostic), + io_(), + m_serial_port(io_), + device_(device) + { + + open(device,baudrate); + set_channel(channel); + set_level(level); + + std::cout << "[INFO] Modem initialized on device " << this->device_ + << ", channel " << channel << ", level " << level + << ", diagnostic mode = " << std::boolalpha << diagnostic + << std::endl; +} + +void AcousticModemDriver::open(const std::string& device, int& baudrate){ + m_recv_buffer.resize(m_recv_buffer_size); + m_serial_port.open(device); + m_serial_port.set_option(asio::serial_port_base::baud_rate(baudrate)); + m_serial_port.set_option(asio::serial_port_base::flow_control(asio::serial_port_base::flow_control::none)); + m_serial_port.set_option(asio::serial_port_base::parity(asio::serial_port_base::parity::none)); + m_serial_port.set_option(asio::serial_port_base::stop_bits(asio::serial_port_base::stop_bits::one)); + + start_async_read(); + io_thread_ = std::thread([this] { io_.run(); }); +} + +AcousticModemDriver::~AcousticModemDriver() { + asio::error_code error; + m_serial_port.close(error); +} + + +void AcousticModemDriver::start_async_read() { + m_func=[this](std::vector& buffer, const size_t& bytes_transferred) { + std::vector data(buffer.begin(), + buffer.begin() + bytes_transferred); + this->read_callback(data); + }; + m_serial_port.async_read_some( + asio::buffer(m_recv_buffer), + [this](std::error_code error, size_t bytes_transferred) + { + async_receive_handler(error, bytes_transferred); + }); + +} + +void AcousticModemDriver::async_receive_handler(const asio::error_code & error,size_t bytes_transferred) { + if (error) { + this->close(); + return; + } + if (bytes_transferred > 0 && m_func) { + m_func(m_recv_buffer, bytes_transferred); + + } + m_serial_port.async_read_some( + asio::buffer(m_recv_buffer), + [this](std::error_code error, size_t bytes_transferred) + { + async_receive_handler(error, bytes_transferred); + }); +} + +size_t AcousticModemDriver::write_raw(const uint8_t* data, size_t size){ + return m_serial_port.write_some(asio::buffer(data, size)); +} +void AcousticModemDriver::close() { + asio::error_code error; + m_serial_port.close(error); +} \ No newline at end of file diff --git a/acoustic_modem/src/driver/am_driver_split.cpp b/acoustic_modem/src/driver/am_driver_split.cpp new file mode 100644 index 0000000..44ef879 --- /dev/null +++ b/acoustic_modem/src/driver/am_driver_split.cpp @@ -0,0 +1,100 @@ +#include "driver/am_driver_split.hpp" + +AcousticModemDriverSplit::AcousticModemDriverSplit(const std::string& tx_device, + const std::string& rx_device, + int baudrate, + int channel, + int level, + bool diagnostic, + float timeout): + AcousticModemDriverBase(channel, level, diagnostic, timeout), + io_(), + tx_serial_port_(io_), + rx_serial_port_(io_), + tx_device_(tx_device), + rx_device_(rx_device) +{ + open(tx_device, rx_device, baudrate); + set_channel(channel); + set_level(level); + + std::cout << "[INFO] Split modem initialized TX=" << tx_device_ + << " RX=" << rx_device_ + << ", channel " << channel + << ", level " << level + << ", diagnostic mode = " << std::boolalpha << diagnostic + << std::endl; +} + +void AcousticModemDriverSplit::open(const std::string& tx_device, + const std::string& rx_device, + int& baudrate) +{ + m_recv_buffer.resize(m_recv_buffer_size); + + tx_serial_port_.open(tx_device); + rx_serial_port_.open(rx_device); + + tx_serial_port_.set_option(asio::serial_port_base::baud_rate(baudrate)); + tx_serial_port_.set_option(asio::serial_port_base::flow_control(asio::serial_port_base::flow_control::none)); + tx_serial_port_.set_option(asio::serial_port_base::parity(asio::serial_port_base::parity::none)); + tx_serial_port_.set_option(asio::serial_port_base::stop_bits(asio::serial_port_base::stop_bits::one)); + + rx_serial_port_.set_option(asio::serial_port_base::baud_rate(baudrate)); + rx_serial_port_.set_option(asio::serial_port_base::flow_control(asio::serial_port_base::flow_control::none)); + rx_serial_port_.set_option(asio::serial_port_base::parity(asio::serial_port_base::parity::none)); + rx_serial_port_.set_option(asio::serial_port_base::stop_bits(asio::serial_port_base::stop_bits::one)); + + start_async_read(); + io_thread_ = std::thread([this] { io_.run(); }); +} + +AcousticModemDriverSplit::~AcousticModemDriverSplit() { + asio::error_code error; + tx_serial_port_.close(error); + rx_serial_port_.close(error); +} +void AcousticModemDriverSplit::close() +{ + asio::error_code error; + tx_serial_port_.close(error); + rx_serial_port_.close(error); +} + +size_t AcousticModemDriverSplit::write_raw(const uint8_t* data, size_t size) { + return tx_serial_port_.write_some(asio::buffer(data, size)); +} + +void AcousticModemDriverSplit::start_async_read() { + m_func=[this](std::vector& buffer, const size_t& bytes_transferred) { + std::vector data(buffer.begin(), + buffer.begin() + bytes_transferred); + this->read_callback(data); + }; + rx_serial_port_.async_read_some( + asio::buffer(m_recv_buffer), + [this](std::error_code error, size_t bytes_transferred) + { + async_receive_handler(error, bytes_transferred); + }); + +} + +void AcousticModemDriverSplit::async_receive_handler(const asio::error_code & error,size_t bytes_transferred) { + if (error) { + this->close(); + return; + } + if (bytes_transferred > 0 && m_func) { + m_func(m_recv_buffer, bytes_transferred); + + } + rx_serial_port_.async_read_some( + asio::buffer(m_recv_buffer), + [this](std::error_code error, size_t bytes_transferred) + { + async_receive_handler(error, bytes_transferred); + }); +} + + diff --git a/acoustic_modem/src/nodes/am_base_node.cpp b/acoustic_modem/src/nodes/am_base_node.cpp new file mode 100644 index 0000000..616e006 --- /dev/null +++ b/acoustic_modem/src/nodes/am_base_node.cpp @@ -0,0 +1,177 @@ +#include "nodes/am_base_node.hpp" + +BaseNode::BaseNode() : Node("base_node") { + init_connection(); + setup_tdma(); + set_publishers(); + set_subscribers(); + + timer_ = this->create_wall_timer( + std::chrono::milliseconds(1000), std::bind(&BaseNode::poll_and_publish_rx, this)); + + link_->start(); + + link_->send_tdma_sync(); + + RCLCPP_INFO(this->get_logger(), "BaseNode started"); +} + +BaseNode::~BaseNode() { + if (link_) { + link_->stop(); + } +} + + +void BaseNode::init_connection() { + this->declare_parameter("device", ""); + this->declare_parameter("tx_device", ""); + this->declare_parameter("rx_device", ""); + this->declare_parameter("baudrate", 9600); + this->declare_parameter("channel", 1); + this->declare_parameter("level", 4); + this->declare_parameter("diagnostic", false); + this->declare_parameter("timeout", 0.5); + this->declare_parameter("split_mode",false); + + int baudrate = this->get_parameter("baudrate").as_int(); + int channel = this->get_parameter("channel").as_int(); + int level = this->get_parameter("level").as_int(); + bool diagnostic = this->get_parameter("diagnostic").as_bool(); + float timeout = + static_cast(this->get_parameter("timeout").as_double()); + bool split=this->get_parameter("split_mode").as_bool(); + + if (!split) { + std::string device = this->get_parameter("device").as_string(); + + driver_ = std::make_unique( + device, baudrate, channel, level, diagnostic, timeout); + } else { + std::string tx_device = this->get_parameter("tx_device").as_string(); + std::string rx_device = this->get_parameter("rx_device").as_string(); + + driver_ = std::make_unique( + tx_device, rx_device, baudrate, channel, level, diagnostic, timeout); + } +} + +void BaseNode::setup_tdma(){ + this->declare_parameter("num_slots", 2); + this->declare_parameter("my_slot", 1); + this->declare_parameter("slot_duration_sec", 25); + this->declare_parameter("guard_ms", 1000); + this->declare_parameter("sync_delay", 5); + this->declare_parameter("estimated_prop_delay", 0); + + TDMAConfig cfg; + cfg.num_slots = static_cast(this->get_parameter("num_slots").as_int()); + cfg.my_slot = static_cast(this->get_parameter("my_slot").as_int()); + cfg.slot_duration = std::chrono::seconds(this->get_parameter("slot_duration_sec").as_int()); + cfg.guard = std::chrono::milliseconds(this->get_parameter("guard_ms").as_int()); + cfg.sync_delay=std::chrono::seconds(this->get_parameter("sync_delay").as_int()); + cfg.estimated_prop_delay=std::chrono::milliseconds(this->get_parameter("estimated_prop_delay").as_int()); + + cfg.t0 = std::chrono::steady_clock::time_point{}; + + tdma_=std::make_unique(cfg); + link_=std::make_unique(*driver_,*tdma_); + +} + +void BaseNode::set_publishers() { + // we reserved 2 bit for the type of data so we'll have 4 types of data + data_0_ = this->create_publisher("topic_0", 10); + data_1_ = this->create_publisher("topic_1", 10); + data_2_ = this->create_publisher("topic_2", 10); + data_3_ = this->create_publisher("topic_3", 10); +} + +void BaseNode::set_subscribers() { + persistent_sub_ = this->create_subscription( + "persistent", + 10, + std::bind(&BaseNode::persistent_callback, this, std::placeholders::_1)); + + sync_sub_=this->create_subscription( + "synchronize", + 10, + std::bind(&BaseNode::sync_callback, this, std::placeholders::_1)); +} + +void BaseNode::sync_callback(const std_msgs::msg::UInt32::SharedPtr msg){ + auto prop_delay_ms=std::chrono::milliseconds(msg->data); + tdma_->set_estimated_prop_delay(prop_delay_ms); + RCLCPP_INFO(this->get_logger(), + "Manual TDMA sync requested with estimated propagation delay = %u ms", + msg->data); + link_->send_tdma_sync(); + RCLCPP_INFO(this->get_logger(), "Manual TDMA synchronization triggered"); +} + +void BaseNode::persistent_callback(const std_msgs::msg::UInt16::SharedPtr msg) { + std::uint16_t value = msg->data; + + if (value == 0) { + link_->stop_persistent_command(); + RCLCPP_INFO(this->get_logger(), "Stopped persistent command"); + return; + } + + PersistentCmd cmd = static_cast(value); + + link_->start_persistent_command(cmd); + + RCLCPP_INFO(this->get_logger(), + "Sent persistent command: %u", + static_cast(cmd)); +} + +void BaseNode::poll_and_publish_rx() { + DecodedMessage msg; + while (driver_->try_pop_decoded(msg)) { + link_->on_data_received(msg.type,msg.msg_id); + std_msgs::msg::Float32MultiArray out; + for (std::uint8_t i = 0; i < msg.n_floats; ++i) { + out.data.push_back(msg.floats[i]); + } + switch(msg.type) { + case MsgType::Type_1: + data_1_->publish(out); + break; + case MsgType::Type_2: + data_2_->publish(out); + break; + case MsgType::Type_3: + data_3_->publish(out); + break; + default: + data_0_->publish(out); + break; + } + } + Ack ack; + while(driver_->try_pop_ack(ack)){ + link_->on_ack_received(ack.type,ack.msg_id); + + RCLCPP_INFO(this->get_logger(),"ACK received, msg_id=%u", ack.msg_id); + } + + PersistentCmd cmd; + if(driver_->consume_persistent(cmd)){ + //std_msgs::msg::UInt16 out_cmd; + //out_cmd.data = static_cast(cmd); + + //persistent_pub_->publish(out_cmd); + + RCLCPP_INFO(this->get_logger(),"Received persistent command: %u", static_cast(cmd)); + } +} + +int main(int argc, char *argv[]) { + rclcpp::init(argc, argv); + auto node = std::make_shared(); + rclcpp::spin(node); + rclcpp::shutdown(); + return 0; +} \ No newline at end of file diff --git a/acoustic_modem/src/nodes/am_drone_node.cpp b/acoustic_modem/src/nodes/am_drone_node.cpp new file mode 100644 index 0000000..c37bf2a --- /dev/null +++ b/acoustic_modem/src/nodes/am_drone_node.cpp @@ -0,0 +1,176 @@ +#include "nodes/am_drone_node.hpp" + +DroneNode::DroneNode() : Node("drone_node") { + + init_connection(); + setup_tdma(); + set_subscriber(); + set_publisher(); + + timer_ = this->create_wall_timer( + std::chrono::milliseconds(200), std::bind(&DroneNode::poll_modem, this)); + + link_->start(); + + RCLCPP_INFO(this->get_logger(), "DroneNode started"); +} + +DroneNode::~DroneNode() { + if (link_) { + link_->stop(); + } +} + +void DroneNode::init_connection() { + this->declare_parameter("device", ""); + this->declare_parameter("tx_device", ""); + this->declare_parameter("rx_device", ""); + this->declare_parameter("baudrate", 9600); + this->declare_parameter("channel", 1); + this->declare_parameter("level", 4); + this->declare_parameter("diagnostic", false); + this->declare_parameter("timeout", 0.5); + this->declare_parameter("split_mode",false); + + std::string device = this->get_parameter("device").as_string(); + int baudrate = this->get_parameter("baudrate").as_int(); + int channel = this->get_parameter("channel").as_int(); + int level = this->get_parameter("level").as_int(); + bool diagnostic = this->get_parameter("diagnostic").as_bool(); + float timeout =static_cast(this->get_parameter("timeout").as_double()); + bool split=this->get_parameter("split_mode").as_bool(); + + if (!split) { + std::string device = this->get_parameter("device").as_string(); + + driver_ = std::make_unique( + device, baudrate, channel, level, diagnostic, timeout); + } else { + std::string tx_device = this->get_parameter("tx_device").as_string(); + std::string rx_device = this->get_parameter("rx_device").as_string(); + + driver_ = std::make_unique( + tx_device, rx_device, baudrate, channel, level, diagnostic, timeout); + } +} + +void DroneNode::setup_tdma(){ + this->declare_parameter("num_slots", 2); + this->declare_parameter("my_slot", 0); + this->declare_parameter("slot_duration_sec", 25); + this->declare_parameter("guard_ms", 1000); + this->declare_parameter("sync_delay", 5); + this->declare_parameter("estimated_prop_delay", 0); + + TDMAConfig cfg; + cfg.num_slots = static_cast(this->get_parameter("num_slots").as_int()); + cfg.my_slot = static_cast(this->get_parameter("my_slot").as_int()); + cfg.slot_duration = std::chrono::seconds(this->get_parameter("slot_duration_sec").as_int()); + cfg.guard = std::chrono::milliseconds(this->get_parameter("guard_ms").as_int()); + cfg.sync_delay=std::chrono::seconds(this->get_parameter("sync_delay").as_int()); + cfg.estimated_prop_delay=std::chrono::milliseconds(this->get_parameter("estimated_prop_delay").as_int()); + + cfg.t0 = std::chrono::steady_clock::time_point{}; + + tdma_=std::make_unique(cfg); + link_=std::make_unique(*driver_,*tdma_); + +} + +void DroneNode::set_subscriber() { + // Depends on the topic in which we will read the data + subscription_ = this->create_subscription( + "data_topic", 10, std::bind(&DroneNode::tx_callback, this, + std::placeholders::_1)); +} +void DroneNode::set_publisher(){ + persistent_pub_ = this->create_publisher("persistent_cmd_topic", 10); + publisher_=this->create_publisher("not_necessary_topic",10); +} + +void DroneNode::tx_callback(const std_msgs::msg::Float32MultiArray::SharedPtr msg){ + std::vector payload(msg->data.begin(), msg->data.end()); + + MsgType type; + + switch (payload.size()) { + case 2: + type = MsgType::Type_1; + break; + + case 4: + type = MsgType::Type_2; + break; + + case 5: + type = MsgType::Type_3; + break; + + default: + RCLCPP_WARN(this->get_logger(), + "Unsupported payload size: %zu floats", + payload.size()); + return; + } + + + link_->enqueue(type, payload); + RCLCPP_INFO(this->get_logger(),"Enqueued outgoing acoustic message with %zu floats",payload.size()); + +} + + +/* + * Persistent commands are received as standalone 16-bit control words and are + * decoded directly by the driver. In poll_modem(), we check whether a new + * persistent command has been received and, if so, publish it immediately on + * a dedicated ROS topic so the drone control logic can react without waiting + * for normal payload handling. + */ +void DroneNode::poll_modem(){ + + std::chrono::steady_clock::time_point sync_time; + if(driver_->try_tdma_sync_event(sync_time)){ + link_->on_tdma_sync_received(); + RCLCPP_INFO(this->get_logger(),"TDMA SYNC processed in Drone Node"); + } + + DecodedMessage msg; + while(driver_->try_pop_decoded(msg)){ + link_->on_data_received(msg.type,msg.msg_id); + + std_msgs::msg::Float32MultiArray out; + for (std::uint8_t i = 0; i < msg.n_floats; ++i) { + out.data.push_back(msg.floats[i]); + } + // TODO: here i can publish the data in a topic or i can just print them out or whatever + // Let's publish them for now + publisher_->publish(out); + RCLCPP_INFO(this->get_logger(),"Published received acoustic message, msg_id=%u", msg.msg_id); + } + + Ack ack; + while(driver_->try_pop_ack(ack)){ + link_->on_ack_received(ack.type,ack.msg_id); + + RCLCPP_INFO(this->get_logger(),"ACK received, msg_id=%u", ack.msg_id); + } + + PersistentCmd cmd; + if(driver_->consume_persistent(cmd)){ + std_msgs::msg::UInt16 out_cmd; + out_cmd.data = static_cast(cmd); + + persistent_pub_->publish(out_cmd); + + RCLCPP_INFO(this->get_logger(),"Received persistent command: %u", static_cast(cmd)); + } +} + +int main(int argc, char *argv[]) { + rclcpp::init(argc, argv); + auto node = std::make_shared(); + rclcpp::spin(node); + rclcpp::shutdown(); + return 0; +} diff --git a/acoustic_modem/src/tdma/tdma_link.cpp b/acoustic_modem/src/tdma/tdma_link.cpp new file mode 100644 index 0000000..9679113 --- /dev/null +++ b/acoustic_modem/src/tdma/tdma_link.cpp @@ -0,0 +1,157 @@ +#include "tdma/tdma_link.hpp" +#include + +TDMALink::TDMALink(IAcousticModemDriver& driver, TDMAManager& tdma): driver_(driver), tdma_(tdma){} + +TDMALink::~TDMALink(){ + stop(); +} + +void TDMALink::start(){ + running_=true; + tx_thread_ = std::thread(&TDMALink::tx_worker, this); +} + +void TDMALink::stop() { + running_ = false; + + if (tx_thread_.joinable()) { + tx_thread_.join(); + } +} + +void TDMALink::enqueue(MsgType type, std::vector payload){ + std::lock_guardlock(tx_mtx_); + tx_queue.push(LinkTxMessage{type, payload}); +} + +void TDMALink::start_persistent_command(PersistentCmd cmd){ + std::lock_guard lock(tx_mtx_); + current_persistent_cmd_=cmd; + last_persistent_tx_=std::chrono::steady_clock::now()-std::chrono::milliseconds(2000); +} + +void TDMALink::stop_persistent_command(){ + std::lock_guard lock(tx_mtx_); + current_persistent_cmd_.reset(); +} + +void TDMALink::on_data_received(MsgType type, uint16_t msg_id){ + std::lock_guardlock(tx_mtx_); + pending_acks_.push(PendingAck{type,msg_id}); + + if (msg_id == last_rx_msg_id_ && last_rx_valid_) { + return; + } + last_rx_valid_=true; + last_rx_msg_id_ = msg_id; +} + +void TDMALink::on_ack_received(MsgType type, uint16_t msg_id){ + std::lock_guardlock(tx_mtx_); + if(waiting_ack_ && waiting_msg_id_==msg_id){ + waiting_ack_=false; + retry_count_=0; + waiting_msg_id_=0; + waiting_type_=MsgType{}; + + std::cout << "[TDMA LINK] ACK received for msg_id=" << msg_id << "\n"; + } +} + +void TDMALink::on_tdma_sync_received(){ + auto rx_time=std::chrono::steady_clock::now(); + + tdma_.sync_rx(rx_time); + + auto logger=rclcpp::get_logger("acoustic_modem_driver"); + RCLCPP_INFO(logger, "TDMA SYNC received, local TDMA armed"); +} + +void TDMALink::send_tdma_sync(){ + auto tx_time = std::chrono::steady_clock::now(); + + driver_.send_two_bytes(driver_.make_tdma_sync()); + std::this_thread::sleep_for(std::chrono::milliseconds(1700)); + + tdma_.sync_tx(tx_time); + + auto logger=rclcpp::get_logger("acoustic_modem_driver"); + RCLCPP_INFO(logger, "TDMA SYNC sent"); +} + +void TDMALink::send_pending_ack(){ + // PRECONDITION: tx_mtx_ must already be locked + PendingAck ack=pending_acks_.front(); + pending_acks_.pop(); + + driver_.send_two_bytes(driver_.make_ack(ack.type,ack.msg_id)); + std::this_thread::sleep_for(std::chrono::milliseconds(1600)); + auto logger = rclcpp::get_logger("acoustic_modem_driver"); + RCLCPP_INFO(logger, "SENT ACK"); +} + +void TDMALink::resend_last_message(){ + driver_.send_message(waiting_type_,waiting_msg_id_,last_sent_.payload.data()); + std::cout<<"resend message"<<"\n"; + last_tx_time_=std::chrono::steady_clock::now(); + retry_count_++; +} + +void TDMALink::send_new_message(const LinkTxMessage& msg){ + uint16_t id= driver_.reserve_msg_id(); + driver_.send_message(msg.type,id,msg.payload.data()); + + waiting_ack_=true; + waiting_msg_id_=id; + waiting_type_=msg.type; + last_sent_=msg; + last_tx_time_=std::chrono::steady_clock::now(); + retry_count_=0; +} + +void TDMALink::tx_worker(){ + while(running_){ + auto now=std::chrono::steady_clock::now(); + { + std::lock_guardlock(tx_mtx_); + if(tdma_.tx_allowed(now)){ + /** + * in this case we can transmit more than one ack for slot, + * we can change it by putting a flag saying we already sent one, + * to be checked with more testing in real time, now it works + */ + if(!pending_acks_.empty()){ + send_pending_ack(); + } + if(current_persistent_cmd_.has_value()){ + if(now-last_persistent_tx_>=std::chrono::milliseconds(2200)){ + driver_.send_two_bytes(driver_.make_persistent_cmd(*current_persistent_cmd_)); + last_persistent_tx_=now; + } + }else{ + if(waiting_ack_){ + if(now-last_tx_time_>=ack_timeout_){ + if(retry_count_ float[%zu] = %f", i, msg.payload[i]); + } + tx_queue.pop(); + send_new_message(msg); + } + } + } + } + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + } +} \ No newline at end of file diff --git a/acoustic_modem/src/tdma/tdma_manager.cpp b/acoustic_modem/src/tdma/tdma_manager.cpp new file mode 100644 index 0000000..2c4e370 --- /dev/null +++ b/acoustic_modem/src/tdma/tdma_manager.cpp @@ -0,0 +1,78 @@ +#include "tdma/tdma_manager.hpp" + +std::uint8_t TDMAManager::current_slot(std::chrono::steady_clock::time_point now) const{ + auto cycle=cfg.num_slots*cfg.slot_duration; + auto elapsed=now-cfg.t0; + auto time_cycle=elapsed%cycle; + uint8_t slot_i= time_cycle/cfg.slot_duration; + return slot_i; +} + +bool TDMAManager::tx_allowed(std::chrono::steady_clock::time_point now) const{ + if (!synced_) { + return false; + } + + if (now < cfg.t0) { + return false; + } + + if(current_slot(now)!=cfg.my_slot){ + return false; + } + auto cycle=cfg.num_slots*cfg.slot_duration; + auto elapsed=now-cfg.t0; + auto time_cycle=elapsed%cycle; + uint8_t slot_i= time_cycle/cfg.slot_duration; + auto offset_in_slot=time_cycle%cfg.slot_duration; + + if(offset_in_slot(cfg.slot_duration-cfg.guard)){ + return false; + } + return true; +} + +std::chrono::milliseconds TDMAManager::time_since_slot_start(std::chrono::steady_clock::time_point now) const{ + auto elapsed=now-cfg.t0; + auto time_in_slot = std::chrono::duration_cast(elapsed % cfg.slot_duration); + return time_in_slot; +} + +bool TDMAManager::is_my_slot(std::chrono::steady_clock::time_point now) const{ + if (!synced_ || now < cfg.t0) { + return false; + } + return (cfg.my_slot==current_slot(now)); +} + +bool TDMAManager::in_guard_time(std::chrono::steady_clock::time_point now) const{ + if (!synced_ || now < cfg.t0) { + return false; + } + return time_since_slot_start(now)