add rtos for peer.cpp

This commit is contained in:
phamvannhat 2025-12-06 22:32:13 +07:00
parent 8f1dff7da7
commit 17c7f6e767
1 changed files with 284 additions and 41 deletions

View File

@ -1,4 +1,5 @@
// peer_client_fixed.cpp
// peer_client_rt.cpp
// Based on your peer_client_fixed.cpp but with RT-friendly thread design
#include <iostream>
#include <memory>
#include <thread>
@ -7,6 +8,13 @@
#include <cstring>
#include <cstdint>
#include <algorithm>
#include <chrono>
#include <condition_variable>
#include <queue>
#include <cassert>
#include <pthread.h>
#include <sched.h>
#include "udp_signaling.hpp"
#include "rtc/rtc.hpp"
@ -142,7 +150,214 @@ static rtc::binary vec_u8_to_rtcbin(const std::vector<uint8_t>& v) {
return out;
}
// ---------- MAIN ----------
// ------------------ RT Utilities: simple SPSC ring and pools ------------------
// Simple single-producer single-consumer ring for pointers to slots.
// Not generalized lockfree queue with ABA protections — simple and works for typical SPSC usage.
template<typename T, size_t N>
class SPSC_Ring {
public:
SPSC_Ring() {
static_assert((N & (N-1)) == 0, "N must be power of two");
head.store(0);
tail.store(0);
for (size_t i=0;i<N;i++) buf[i] = nullptr;
}
bool push(T* item) {
size_t h = head.load(std::memory_order_relaxed);
size_t next = (h + 1) & (N-1);
if (next == tail.load(std::memory_order_acquire)) return false; // full
buf[h] = item;
head.store(next, std::memory_order_release);
return true;
}
bool pop(T*& item) {
size_t t = tail.load(std::memory_order_relaxed);
if (t == head.load(std::memory_order_acquire)) return false; // empty
item = buf[t];
tail.store((t + 1) & (N-1), std::memory_order_release);
return true;
}
bool empty() const {
return head.load(std::memory_order_acquire) == tail.load(std::memory_order_acquire);
}
private:
T* buf[N];
std::atomic<size_t> head;
std::atomic<size_t> tail;
};
// Preallocated slot for Rx/Tx to avoid heap on fast path
struct Slot {
size_t len;
uint8_t data[512];
};
// Pools and rings
static const size_t RING_SIZE = 256; // must be power of two
using SlotRing = SPSC_Ring<Slot, RING_SIZE>;
static Slot rxPoolStorage[RING_SIZE];
static Slot txPoolStorage[RING_SIZE];
static std::atomic<size_t> rxPoolFreeIndex{0};
static std::atomic<size_t> txPoolFreeIndex{0};
static SlotRing rxRing; // WebRTC callback -> rx_handler
static SlotRing txRing; // control loop -> tx_dispatcher
// Try to allocate a slot from simple pool (wrap-around). Not thread-safe multi-producer; we use it from single producer contexts.
Slot* alloc_rx_slot() {
// simple lock-free-ish circular allocation: may overwrite if more producers — but here single producer expected.
size_t idx = rxPoolFreeIndex.fetch_add(1);
return &rxPoolStorage[idx % RING_SIZE];
}
Slot* alloc_tx_slot() {
size_t idx = txPoolFreeIndex.fetch_add(1);
return &txPoolStorage[idx % RING_SIZE];
}
// ------------------ Thread priority helper (Linux) ------------------
void set_thread_realtime(std::thread &thr, int priority, int cpu = -1) {
#ifdef __linux__
pthread_t handle = thr.native_handle();
sched_param sch;
sch.sched_priority = priority;
if (pthread_setschedparam(handle, SCHED_FIFO, &sch) != 0) {
std::cerr << "[WARN] failed to set SCHED_FIFO (need CAP_SYS_NICE?).\n";
}
if (cpu >= 0) {
cpu_set_t cpus;
CPU_ZERO(&cpus);
CPU_SET(cpu, &cpus);
if (pthread_setaffinity_np(handle, sizeof(cpu_set_t), &cpus) != 0) {
std::cerr << "[WARN] failed to set thread affinity\n";
}
}
#else
(void)thr; (void)priority; (void)cpu;
#endif
}
// ------------------ Simple logger thread (non-RT) ------------------
std::mutex logMutex;
std::condition_variable logCv;
std::queue<std::string> logQueue;
std::atomic<bool> stopping{false};
void log_enqueue(const std::string &s) {
{
std::lock_guard<std::mutex> g(logMutex);
logQueue.push(s);
}
logCv.notify_one();
}
void logger_thread_fn() {
while (!stopping.load()) {
std::unique_lock<std::mutex> lk(logMutex);
logCv.wait_for(lk, std::chrono::milliseconds(200), []{return !logQueue.empty() || stopping.load();});
while (!logQueue.empty()) {
std::string s = logQueue.front();
logQueue.pop();
lk.unlock();
std::cout << s << std::endl;
lk.lock();
}
}
// flush leftover
std::lock_guard<std::mutex> g(logMutex);
while (!logQueue.empty()) {
std::cout << logQueue.front() << std::endl;
logQueue.pop();
}
}
// ------------------ Global references (for threads) ------------------
std::shared_ptr<rtc::DataChannel> g_dc;
std::shared_ptr<rtc::PeerConnection> g_pc;
// ------------------ Threads ------------------
// RX handler: parse frames popped from rxRing and push to processing (here we will print ACKs)
void rx_handler_thread_fn() {
// This thread should be RT priority in deployment
while (!stopping.load()) {
Slot* s = nullptr;
if (rxRing.pop(s)) {
// parse
Frame_t rx{};
bool ok = parse_frame(s->data, s->len, rx);
if (!ok) {
log_enqueue("[RX] Invalid frame");
} else {
// Example: If this is an ACK, log minimal info (use log thread)
char buf[128];
snprintf(buf, sizeof(buf), "[RX] Got cmd=0x%02X msg_id=%u ack_code=%u", rx.cmd, rx.msg_id, rx.payload[0]);
log_enqueue(buf);
// If needed, push to other RT queues (not implemented here)
}
} else {
// nothing — small sleep to avoid busyspin; in real RT, prefer blocking wait
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
}
// Control loop: periodic, build status frames and/or commands and push to txRing
void control_loop_thread_fn(int period_ms = 10) {
// RT priority recommended
auto next = std::chrono::steady_clock::now();
uint8_t seq = 1;
while (!stopping.load()) {
next += std::chrono::milliseconds(period_ms);
// Example: build a periodic telemetry/status frame (here we simulate)
uint8_t payload[2];
payload[0] = 0x55; // example data
payload[1] = seq++;
auto frame_vec = build_frame(seq, CMD_GET_STATUS, payload, 2);
Slot* s = alloc_tx_slot();
s->len = frame_vec.size();
memcpy(s->data, frame_vec.data(), s->len);
// push to txRing; if full, drop (or implement backpressure)
if (!txRing.push(s)) {
log_enqueue("[TX] txRing full, dropping frame");
}
// wait until next period
std::this_thread::sleep_until(next);
}
}
// TX dispatcher: pop from txRing and call dc->send (run as RT)
void tx_dispatcher_thread_fn() {
while (!stopping.load()) {
Slot* s = nullptr;
if (txRing.pop(s)) {
if (g_dc) {
try {
rtc::binary bin = vec_u8_to_rtcbin(std::vector<uint8_t>(s->data, s->data + s->len));
g_dc->send(bin);
// minimal logging
// avoid expensive formatting on RT path
} catch (const std::exception &e) {
std::string msg = std::string("[TX] send exception: ") + e.what();
log_enqueue(msg);
} catch (...) {
log_enqueue("[TX] send unknown exception");
}
} else {
log_enqueue("[TX] no DataChannel available");
}
} else {
// nothing to send
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
}
// ------------------ MAIN ------------------
int main(int argc, char** argv) {
if (argc < 3) {
std::cout << "Usage: peer_client <server_ip> <server_port>\n";
@ -162,85 +377,113 @@ int main(int argc, char** argv) {
rtc::Configuration config;
config.iceServers.emplace_back("stun:stun.l.google.com:19302");
auto pc = std::make_shared<rtc::PeerConnection>(config);
g_pc = pc;
// start logger thread
std::thread logger_t(logger_thread_fn);
// --- Signaling callbacks ---
pc->onLocalDescription([&signaling](rtc::Description desc) {
std::string msg = "SDP|" + base64_encode(std::string(desc));
signaling.send(msg);
std::cout << "[Client] Sent SDP\n";
log_enqueue("[Signaling] Sent SDP");
});
pc->onLocalCandidate([&signaling](rtc::Candidate cand) {
std::string msg = "ICE|" + base64_encode(cand.candidate())
+ "|" + base64_encode(cand.mid());
signaling.send(msg);
std::cout << "[Client] Sent ICE\n";
log_enqueue("[Signaling] Sent ICE");
});
signaling.onMessage([&](const std::string &msg, const std::string&, int) {
if (msg.rfind("SDP|", 0) == 0) {
pc->setRemoteDescription(rtc::Description(base64_decode(msg.substr(4))));
std::cout << "[Client] Got remote SDP\n";
log_enqueue("[Signaling] Got remote SDP");
} else if (msg.rfind("ICE|", 0) == 0) {
size_t p = msg.find('|', 4);
if (p != std::string::npos) {
std::string c = base64_decode(msg.substr(4, p-4));
std::string mid = base64_decode(msg.substr(p+1));
pc->addRemoteCandidate(rtc::Candidate(c, mid));
std::cout << "[Client] Got ICE\n";
log_enqueue("[Signaling] Got ICE");
}
}
});
// --- Create DataChannel ---
rtc::DataChannelInit opts;
// Example: set unordered/unreliable for telemetry to reduce latency
// opts.ordered = false; opts.maxRetransmits = 0;
auto dc = pc->createDataChannel("ctrl", opts);
g_dc = dc;
dc->onOpen([dc]() {
std::cout << "[Client] DC OPEN\n";
// --- TEST SEND COMMAND: CMD_SET_SPEED = 0x10 ---
uint8_t speed_payload[1] = { 80 }; // set speed = 80%
auto frame = build_frame(1, CMD_SET_DIRECTION, speed_payload, 1);
// convert to rtc::binary and send
rtc::binary bin = vec_u8_to_rtcbin(frame);
try {
dc->send(bin);
std::cout << "[Client] Sent frame, len=" << bin.size() << std::endl;
} catch (const std::exception &e) {
std::cout << "[Client] send exception: " << e.what() << std::endl;
} catch (...) {
std::cout << "[Client] send unknown exception\n";
}
});
// WebRTC callback: fast copy into rxRing
dc->onMessage([&](rtc::message_variant msg){
if (std::holds_alternative<rtc::binary>(msg)) {
auto bin = std::get<rtc::binary>(msg);
Frame_t rx{};
bool ok = parse_frame(
reinterpret_cast<const uint8_t*>(bin.data()),
bin.size(),
rx
);
if (!ok) {
std::cout << "[Client] Invalid frame\n";
// allocate slot and copy quickly
Slot* s = alloc_rx_slot();
size_t len = bin.size();
if (len > sizeof(s->data)) {
// too big, drop
log_enqueue("[RX] incoming too large, drop");
return;
}
std::cout << "[Client] Got ACK: cmd=0x" << std::hex << (int)rx.cmd << std::dec
<< " msg_id=" << (int)rx.msg_id
<< " ack_code=" << (int)rx.payload[0] << std::endl;
s->len = len;
// rtc::binary::data() returns pointer-like; reinterpret
const uint8_t* p = reinterpret_cast<const uint8_t*>(bin.data());
memcpy(s->data, p, len);
if (!rxRing.push(s)) {
log_enqueue("[RX] rxRing full, drop");
}
} else if (std::holds_alternative<std::string>(msg)) {
std::cout << "[Client] Received text: " << std::get<std::string>(msg) << std::endl;
// small text messages -> log via logger thread
log_enqueue(std::string("[DC TEXT] ") + std::get<std::string>(msg));
}
});
dc->onOpen([&](){
log_enqueue("[Client] DC OPEN");
// prepare a test frame but push into txRing so send happens in tx dispatcher
uint8_t speed_payload[1] = { 80 }; // set speed = 80%
auto frame = build_frame(1, CMD_SET_DIRECTION, speed_payload, 1);
Slot* s = alloc_tx_slot();
s->len = frame.size();
memcpy(s->data, frame.data(), s->len);
if (!txRing.push(s)) {
log_enqueue("[TX] failed to push test frame");
} else {
log_enqueue("[Client] queued test frame to txRing");
}
});
// Start RT threads
std::thread rx_handler_t(rx_handler_thread_fn);
std::thread control_loop_t([](){ control_loop_thread_fn(10); }); // 10 ms period
std::thread tx_dispatcher_t(tx_dispatcher_thread_fn);
// Optionally set RT priorities (requires privileges)
set_thread_realtime(control_loop_t, 80, 1); // high priority
set_thread_realtime(tx_dispatcher_t, 70, 1);
set_thread_realtime(rx_handler_t, 60, 1);
// logger stays normal
pc->setLocalDescription();
while (true) std::this_thread::sleep_for(std::chrono::seconds(1));
// Main thread waits for user interrupt (Ctrl+C) to stop
std::cout << "[Client] Running. Press Ctrl+C to exit.\n";
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(1));
}
// graceful stop (unreachable in current loop; add signal handler if needed)
stopping.store(true);
logCv.notify_all();
logger_t.join();
rx_handler_t.join();
control_loop_t.join();
tx_dispatcher_t.join();
return 0;
}