udp_testing connect with together

This commit is contained in:
phamvannhat 2025-12-27 18:05:36 +07:00
parent f643fb253e
commit 7af0c6e139
7 changed files with 553 additions and 15 deletions

View File

@ -71,8 +71,8 @@ int main(int argc, char* argv[])
// --------------------------------------------------------
// 1⃣ POSIX SIGNALS
// --------------------------------------------------------
// std::signal(SIGINT, signal_handler);
// std::signal(SIGTERM, signal_handler);
std::signal(SIGINT, signal_handler);
std::signal(SIGTERM, signal_handler);
// --------------------------------------------------------
// 2⃣ RTE / EVENT SYSTEM INIT
@ -88,22 +88,22 @@ int main(int argc, char* argv[])
// --------------------------------------------------------
// 4⃣ TRANSPORT INIT
// --------------------------------------------------------
//std::unique_ptr<ITransport> transport;
std::unique_ptr<ITransport> transport;
// #if 1
// // ===== REAL WebRTC =====
// transport = std::make_unique<WebRTCTransport>();
// #else
// // ===== LOOPBACK TEST =====
// transport = std::make_unique<LoopbackTransport>();
// #endif
#if 1
// ===== REAL WebRTC =====
transport = std::make_unique<WebRTCTransport>();
#else
// ===== LOOPBACK TEST =====
transport = std::make_unique<LoopbackTransport>();
#endif
// // EventHandler consumes RX frames from transport
// transport->set_rx_callback([](const std::vector<u8>& data) {
// EventHandler::on_transport_rx(data);
// });
// EventHandler consumes RX frames from transport
transport->set_rx_callback([](const std::vector<u8>& data) {
EventHandler::on_transport_rx(data);
});
// transport->start();
transport->start();
// --------------------------------------------------------
// 5⃣ RUNNABLE THREADS (AUTOSAR style)

View File

@ -0,0 +1,89 @@
cmake_minimum_required(VERSION 3.10)
project(libdc_udp_demo LANGUAGES CXX)
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
find_package(Threads REQUIRED)
find_package(PkgConfig QUIET)
# Try pkg-config first
set(LIBDATACHANNEL_LIBS "")
set(LIBDATACHANNEL_INCLUDE_DIRS "")
set(LIBDATACHANNEL_LIBRARY_DIRS "")
if (PKG_CONFIG_FOUND)
pkg_check_modules(LIBDATACHANNEL libdatachannel)
endif()
if (LIBDATACHANNEL_FOUND)
message(STATUS "Found libdatachannel via pkg-config")
list(APPEND LIBDATACHANNEL_INCLUDE_DIRS ${LIBDATACHANNEL_INCLUDE_DIRS})
list(APPEND LIBDATACHANNEL_LIBRARY_DIRS ${LIBDATACHANNEL_LIBRARY_DIRS})
set(LIBDATACHANNEL_LIBS ${LIBDATACHANNEL_LIBRARIES})
else()
message(STATUS "pkg-config couldn't find libdatachannel.")
message(STATUS "You can export PKG_CONFIG_PATH or set environment variables LIBDATACHANNEL_INCLUDE_DIR and LIBDATACHANNEL_LIB before running cmake.")
# Allow manual override via environment variables:
if (DEFINED ENV{LIBDATACHANNEL_INCLUDE_DIR})
set(LIBDATACHANNEL_INCLUDE_DIRS $ENV{LIBDATACHANNEL_INCLUDE_DIR})
message(STATUS "Using LIBDATACHANNEL_INCLUDE_DIR = ${LIBDATACHANNEL_INCLUDE_DIRS}")
else()
# common default
set(LIBDATACHANNEL_INCLUDE_DIRS /usr/local/include)
message(STATUS "Defaulting LIBDATACHANNEL_INCLUDE_DIRS = ${LIBDATACHANNEL_INCLUDE_DIRS}")
endif()
if (DEFINED ENV{LIBDATACHANNEL_LIB})
# user can provide full path or linker name(s)
set(LIBDATACHANNEL_LIBS $ENV{LIBDATACHANNEL_LIB})
message(STATUS "Using LIBDATACHANNEL_LIB = ${LIBDATACHANNEL_LIBS}")
else()
# sensible default linker names (adjust if your lib files differ)
set(LIBDATACHANNEL_LIBS datachannel sctp ssl crypto pthread dl)
message(STATUS "Defaulting LIBDATACHANNEL_LIBS = ${LIBDATACHANNEL_LIBS}")
endif()
# optional library directories (if user set PKG_CONFIG_PATH not used)
if (DEFINED ENV{LIBDATACHANNEL_LIBRARY_DIR})
set(LIBDATACHANNEL_LIBRARY_DIRS $ENV{LIBDATACHANNEL_LIBRARY_DIR})
link_directories(${LIBDATACHANNEL_LIBRARY_DIRS})
else()
# we assume /usr/local/lib is in link path - you can add more if needed
link_directories(/usr/local/lib)
endif()
endif()
# include dirs for our headers
include_directories(${CMAKE_SOURCE_DIR}/src)
# also include libdatachannel headers
include_directories(${LIBDATACHANNEL_INCLUDE_DIRS})
# add signaling static lib
add_library(signaling STATIC src/udp_signaling.cpp src/udp_signaling.hpp)
# targets
add_executable(peer_server src/peer_server.cpp)
add_executable(peer_client src/peer_client.cpp)
# make sure signaling target exposes include dir (for consumers)
target_include_directories(signaling PUBLIC ${CMAKE_SOURCE_DIR}/src)
# Link libraries: correct ordering and visibility
# Note: target_link_libraries(<target> <PRIVATE|PUBLIC|INTERFACE> <libs...>)
target_link_libraries(peer_server
PRIVATE
signaling
${LIBDATACHANNEL_LIBS}
Threads::Threads
)
target_link_libraries(peer_client
PRIVATE
signaling
${LIBDATACHANNEL_LIBS}
Threads::Threads
)
# install rules (optional)
install(TARGETS peer_server peer_client DESTINATION bin)

View File

@ -0,0 +1,47 @@
#pragma once
#include <string>
#include <vector>
static const std::string b64_chars =
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz"
"0123456789+/";
inline std::string b64_encode(const std::string &in)
{
std::string out;
int val = 0, valb = -6;
for (uint8_t c : in) {
val = (val << 8) + c;
valb += 8;
while (valb >= 0) {
out.push_back(b64_chars[(val >> valb) & 0x3F]);
valb -= 6;
}
}
if (valb > -6)
out.push_back(b64_chars[((val << 8) >> (valb + 8)) & 0x3F]);
while (out.size() % 4)
out.push_back('=');
return out;
}
inline std::string b64_decode(const std::string &in)
{
std::vector<int> T(256, -1);
for (int i = 0; i < 64; i++)
T[b64_chars[i]] = i;
std::string out;
int val = 0, valb = -8;
for (uint8_t c : in) {
if (T[c] == -1) break;
val = (val << 6) + T[c];
valb += 6;
if (valb >= 0) {
out.push_back(char((val >> valb) & 0xFF));
valb -= 8;
}
}
return out;
}

View File

@ -0,0 +1,89 @@
#include <rtc/rtc.hpp>
#include <iostream>
#include <thread>
#include "udp_signaling.hpp"
#include "base64.hpp"
int main(int argc, char** argv)
{
if (argc < 2)
{
std::cout << "peer_client <server_ip>\n";
return 0;
}
UdpSignaling signaling;
signaling.init(nullptr, 0, argv[1], 6000);
signaling.start();
rtc::Configuration cfg;
cfg.iceServers.emplace_back("stun:stun.l.google.com:19302");
auto pc = std::make_shared<rtc::PeerConnection>(cfg);
bool answerOnce = false;
auto dc = pc->createDataChannel("dc");
dc->onOpen([]()
{
std::cout << "[Client] DC OPEN\n";
});
pc->onLocalDescription([&](rtc::Description desc)
{
std::string msg =
"SDP|" + b64_encode(std::string(desc));
signaling.send(
(uint8_t*)msg.data(),
msg.size());
std::cout << "[Client] Sent OFFER\n";
});
pc->onLocalCandidate([&](rtc::Candidate c)
{
std::string msg =
"ICE|" + b64_encode(c.candidate()) +
"|" + b64_encode(c.mid());
signaling.send(
(uint8_t*)msg.data(),
msg.size());
});
signaling.setReceiveCallback(
[&](const uint8_t* data,
size_t len,
uint32_t,
uint16_t)
{
std::string msg((char*)data, len);
if (!answerOnce &&
msg.rfind("SDP|", 0) == 0)
{
pc->setRemoteDescription(
rtc::Description(
b64_decode(msg.substr(4)),
rtc::Description::Type::Answer));
answerOnce = true;
std::cout << "[Client] Got ANSWER\n";
}
else if (msg.rfind("ICE|", 0) == 0)
{
auto p = msg.find('|', 4);
pc->addRemoteCandidate(
rtc::Candidate(
b64_decode(msg.substr(4, p - 4)),
b64_decode(msg.substr(p + 1))));
}
});
pc->setLocalDescription();
while (true)
std::this_thread::sleep_for(std::chrono::seconds(1));
}

View File

@ -0,0 +1,137 @@
#include <rtc/rtc.hpp>
#include <iostream>
#include <thread>
#include <vector>
#include <atomic>
#include "udp_signaling.hpp"
#include "base64.hpp"
int main()
{
std::cout << "[Server] Ready on UDP 6000\n";
/* ---------- UDP signaling ---------- */
UdpSignaling signaling;
signaling.init(nullptr, 6000, nullptr, 0);
signaling.start();
/* ---------- PeerConnection ---------- */
rtc::Configuration cfg;
cfg.iceServers.emplace_back("stun:stun.l.google.com:19302");
auto pc = std::make_shared<rtc::PeerConnection>(cfg);
/* ---------- State ---------- */
bool remoteSet = false;
bool answerSent = false;
std::vector<rtc::Candidate> iceBuf;
/* ---------- DataChannel ---------- */
pc->onDataChannel([&](std::shared_ptr<rtc::DataChannel> dc)
{
std::cout << "[Server] DC received\n";
dc->onOpen([]()
{
std::cout << "[Server] DC OPEN\n";
});
dc->onMessage([](rtc::message_variant msg)
{
if (const auto* s = std::get_if<std::string>(&msg))
std::cout << "[Server] DC msg: " << *s << "\n";
});
});
/* ---------- Local SDP ---------- */
pc->onLocalDescription([&](rtc::Description desc)
{
/* CHỈ GỬI ANSWER */
if (desc.type() != rtc::Description::Type::Answer)
return;
if (answerSent)
{
std::cout << "[Server] Ignore duplicate ANSWER\n";
return;
}
answerSent = true;
std::string msg =
"SDP|" + b64_encode(std::string(desc));
signaling.send(
reinterpret_cast<const uint8_t*>(msg.data()),
msg.size());
std::cout << "[Server] Sent ANSWER\n";
});
/* ---------- Local ICE ---------- */
pc->onLocalCandidate([&](rtc::Candidate c)
{
std::string msg =
"ICE|" + b64_encode(c.candidate()) +
"|" + b64_encode(c.mid());
signaling.send(
reinterpret_cast<const uint8_t*>(msg.data()),
msg.size());
});
/* ---------- Receive signaling ---------- */
signaling.setReceiveCallback(
[&](const uint8_t* data,
size_t len,
uint32_t,
uint16_t)
{
std::string msg((const char*)data, len);
/* ===== SDP ===== */
if (msg.rfind("SDP|", 0) == 0)
{
if (remoteSet)
{
std::cout << "[Server] Ignore duplicate OFFER\n";
return;
}
pc->setRemoteDescription(
rtc::Description(
b64_decode(msg.substr(4)),
rtc::Description::Type::Offer));
remoteSet = true;
std::cout << "[Server] Got OFFER\n";
/* Flush buffered ICE */
for (auto& c : iceBuf)
pc->addRemoteCandidate(c);
iceBuf.clear();
/* Create ANSWER */
pc->setLocalDescription();
}
/* ===== ICE ===== */
else if (msg.rfind("ICE|", 0) == 0)
{
auto p = msg.find('|', 4);
rtc::Candidate c(
b64_decode(msg.substr(4, p - 4)),
b64_decode(msg.substr(p + 1)));
if (!remoteSet)
iceBuf.push_back(c);
else
pc->addRemoteCandidate(c);
}
});
/* ---------- Loop ---------- */
while (true)
std::this_thread::sleep_for(std::chrono::seconds(1));
}

View File

@ -0,0 +1,126 @@
#include "udp_signaling.hpp"
#include <arpa/inet.h>
#include <cstring>
#include <thread>
#include <unistd.h>
UdpSignaling::UdpSignaling() noexcept
: sock_(-1),
running_(false),
hasRemote_(false)
{
std::memset(&localAddr_, 0, sizeof(localAddr_));
std::memset(&remoteAddr_, 0, sizeof(remoteAddr_));
}
UdpSignaling::~UdpSignaling() noexcept
{
stop();
}
UdpSignaling::Result
UdpSignaling::init(const char* localIp,
std::uint16_t localPort,
const char* remoteIp,
std::uint16_t remotePort) noexcept
{
sock_ = socket(AF_INET, SOCK_DGRAM, 0);
if (sock_ < 0)
return Result::Error;
localAddr_.sin_family = AF_INET;
localAddr_.sin_port = htons(localPort);
localAddr_.sin_addr.s_addr =
localIp ? inet_addr(localIp) : INADDR_ANY;
if (bind(sock_,
(sockaddr*)&localAddr_,
sizeof(localAddr_)) < 0)
return Result::Error;
if (remoteIp)
{
remoteAddr_.sin_family = AF_INET;
remoteAddr_.sin_port = htons(remotePort);
remoteAddr_.sin_addr.s_addr = inet_addr(remoteIp);
hasRemote_ = true;
}
return Result::Ok;
}
UdpSignaling::Result
UdpSignaling::start() noexcept
{
running_ = true;
std::thread(&UdpSignaling::recvLoop, this).detach();
return Result::Ok;
}
UdpSignaling::Result
UdpSignaling::stop() noexcept
{
running_ = false;
if (sock_ >= 0)
{
close(sock_);
sock_ = -1;
}
return Result::Ok;
}
void
UdpSignaling::setReceiveCallback(RxCallback cb) noexcept
{
rxCb_ = cb;
}
UdpSignaling::Result
UdpSignaling::send(const std::uint8_t* data,
std::size_t len) noexcept
{
if (!hasRemote_)
return Result::Error;
sendto(sock_,
data,
len,
0,
(sockaddr*)&remoteAddr_,
sizeof(remoteAddr_));
return Result::Ok;
}
void
UdpSignaling::recvLoop() noexcept
{
std::uint8_t buf[2048];
while (running_)
{
sockaddr_in from{};
socklen_t fromLen = sizeof(from);
ssize_t n = recvfrom(sock_,
buf,
sizeof(buf),
0,
(sockaddr*)&from,
&fromLen);
if (n <= 0)
continue;
remoteAddr_ = from;
hasRemote_ = true;
if (rxCb_)
{
rxCb_(buf,
static_cast<std::size_t>(n),
ntohl(from.sin_addr.s_addr),
ntohs(from.sin_port));
}
}
}

View File

@ -0,0 +1,50 @@
#pragma once
#include <cstdint>
#include <cstddef>
#include <functional>
#include <netinet/in.h>
class UdpSignaling
{
public:
enum class Result
{
Ok,
Error
};
using RxCallback =
std::function<void(const std::uint8_t*,
std::size_t,
std::uint32_t,
std::uint16_t)>;
UdpSignaling() noexcept;
~UdpSignaling() noexcept;
Result init(const char* localIp,
std::uint16_t localPort,
const char* remoteIp,
std::uint16_t remotePort) noexcept;
Result start() noexcept;
Result stop() noexcept;
Result send(const std::uint8_t* data,
std::size_t len) noexcept;
void setReceiveCallback(RxCallback cb) noexcept;
private:
int sock_;
bool running_;
sockaddr_in localAddr_;
sockaddr_in remoteAddr_;
bool hasRemote_;
RxCallback rxCb_;
void recvLoop() noexcept;
};