update version 5 : restart sdp

This commit is contained in:
phamvannhat 2025-12-31 13:08:54 +07:00
parent 00bcdf57a7
commit 9d9bf80b4e
4 changed files with 166 additions and 206 deletions

View File

@ -1,22 +1,28 @@
#include "PeerServer.hpp"
#include <iostream>
#include <thread>
#include <atomic>
#include <chrono>
PeerServer::PeerServer(SignalingManager& sig,
WebRTCTransport& rtc)
: sig_(sig),
rtc_(rtc),
gotOffer_(false),
sdpDone_(false)
running_(false)
{
/* ===== DATA CHANNEL ===== */
/* ================================
* DATA CHANNEL
* ================================ */
rtc_.onDcOpen([this]{
std::cout << "[Server] DC OPEN\n";
running_ = true;
/* sender thread */
std::thread([this]{
int i = 0;
while (true) {
while (running_) {
rtc_.sendMessage(
"Hello client #" + std::to_string(i++));
std::this_thread::sleep_for(
@ -29,44 +35,39 @@ PeerServer::PeerServer(SignalingManager& sig,
std::cout << "[Server] DC received: " << msg << "\n";
});
/* ===== LOCAL SDP (ANSWER) ===== */
/* ================================
* LOCAL SDP (ANSWER)
* ================================ */
rtc_.onLocalSdp([this](std::string sdp){
if (sdpDone_) {
std::cout << "[Server] Ignore duplicate ANSWER\n";
return;
}
std::cout << "[Server] Send ANSWER\n";
sig_.sendSdp(sdp);
sdpDone_ = true;
std::cout << "[Server] Sent ANSWER\n";
});
/* ===== LOCAL ICE ===== */
/* ================================
* LOCAL ICE
* ================================ */
rtc_.onLocalIce([this](std::string cand, std::string mid){
sig_.sendIce(cand, mid);
std::cout << "[Server] Sent ICE: " << cand << "\n";
std::cout << "[Server] Sent ICE\n";
});
/* ===== SIGNALING RECEIVE ===== */
/* ================================
* SIGNALING RECEIVE
* ================================ */
sig_.onReceive([this](const SignalingMsg& m){
if (m.type == SigType::SDP) {
if (gotOffer_) {
std::cout << "[Server] Ignore duplicate OFFER\n";
return;
}
std::cout << "[Server] Got OFFER\n";
gotOffer_ = true;
std::cout << "[Server] Got OFFER → reset / new session\n";
rtc_.setRemoteOffer(m.payload1);
rtc_.createAnswer();
} else if (m.type == SigType::ICE) {
rtc_.addRemoteIce(m.payload1, m.payload2);
std::cout << "[Server] Got remote ICE\n";
}
});
}

View File

@ -1,20 +1,22 @@
#pragma once
#include "SignalingManager.hpp"
#include "WebRTCTransport.hpp"
#include <vector>
#include <string>
#include <atomic>
class PeerServer
{
public:
PeerServer(SignalingManager& sig, WebRTCTransport& rtc);
PeerServer(SignalingManager& sig,
WebRTCTransport& rtc);
void start();
private:
SignalingManager& sig_;
WebRTCTransport& rtc_;
bool gotOffer_ = false;
bool sdpDone_ = false;
std::vector<std::pair<std::string,std::string>> iceBuffer_;
/* sender thread control */
std::atomic<bool> running_{false};
};

View File

@ -1,113 +1,112 @@
#include "WebRTCTransport.hpp"
#include <iostream>
#include <utility>
/* ================================
* Constructor
* ================================ */
using namespace std::chrono;
WebRTCTransport::WebRTCTransport()
: dcOpen_(false),
haveRemoteOffer_(false),
localAnswerSent_(false)
haveRemoteOffer_(false)
{
createPeerConnection();
}
/* ================================
* PEER CONNECTION
* ================================ */
void WebRTCTransport::createPeerConnection()
{
rtc::Configuration cfg;
cfg.iceServers.emplace_back("stun:stun.l.google.com:19302");
pc_ = std::make_shared<rtc::PeerConnection>(cfg);
/* =====================================================
* LOCAL SDP REGISTER ONCE (CRITICAL FIX)
* ===================================================== */
/* ===== LOCAL SDP ===== */
pc_->onLocalDescription([this](rtc::Description d) {
if (d.type() != rtc::Description::Type::Answer)
return;
if (localAnswerSent_) {
std::cout << "[Transport] Duplicate local ANSWER ignored\n";
return;
}
localAnswerSent_ = true;
std::string sdp = std::string(d);
/* ===== SEND ANSWER ===== */
if (onLocalSdpCb_) {
if (onLocalSdpCb_)
onLocalSdpCb_(sdp);
} else {
/* ultra-safe: cache if callback registered late */
else
cachedAnswer_ = sdp;
std::cout << "[Transport] ANSWER cached (no callback yet)\n";
}
});
/* =====================================================
* LOCAL ICE
* ===================================================== */
/* ===== LOCAL ICE ===== */
pc_->onLocalCandidate([this](rtc::Candidate c) {
if (onLocalIceCb_) {
if (onLocalIceCb_)
onLocalIceCb_(c.candidate(), c.mid());
}
});
/* =====================================================
* DATA CHANNEL
* ===================================================== */
/* ===== DATA CHANNEL ===== */
pc_->onDataChannel([this](std::shared_ptr<rtc::DataChannel> dc) {
dc_ = dc;
dc_->onOpen([this] {
dcOpen_ = true;
if (onDcOpenCb_)
onDcOpenCb_();
lastPing_ = lastPong_ = steady_clock::now();
if (onDcOpenCb_) onDcOpenCb_();
});
dc_->onClosed([this] {
dcOpen_ = false;
std::cout << "[Transport] DataChannel closed\n";
std::cout << "[DC] closed → reset session\n";
close();
});
dc_->onMessage([this](rtc::message_variant msg) {
if (const auto* s = std::get_if<std::string>(&msg)) {
if (auto* s = std::get_if<std::string>(&msg)) {
if (*s == "__ping__") {
dc_->send("__pong__");
return;
}
if (*s == "__pong__") {
lastPong_ = steady_clock::now();
return;
}
if (onDcMessageCb_)
onDcMessageCb_(*s);
}
else if (auto* b = std::get_if<rtc::binary>(&msg)) {
// optional: onDcBinaryCb_…
}
});
});
pc_->onIceStateChange(
[](rtc::PeerConnection::IceState s) {
std::cout << "[ICE] state="
<< static_cast<int>(s) << "\n";
/* ===== ICE STATE ===== */
pc_->onIceStateChange([this](rtc::PeerConnection::IceState s) {
std::cout << "[ICE] state=" << static_cast<int>(s) << "\n";
if (s == rtc::PeerConnection::IceState::Failed) {
std::cout << "[ICE] FAILED → should restart ICE\n";
}
});
if (s == rtc::PeerConnection::IceState::Failed) {
std::cout << "[ICE] FAILED → reset session\n";
close();
}
});
}
pc_->onSignalingStateChange(
[](rtc::PeerConnection::SignalingState s) {
std::cout << "[SIG] state="
<< static_cast<int>(s) << "\n";
});
void WebRTCTransport::destroyPeerConnection()
{
try {
if (dc_) dc_->close();
if (pc_) pc_->close();
} catch (...) {}
pc_.reset();
dc_.reset();
dcOpen_ = false;
haveRemoteOffer_ = false;
iceBuffer_.clear();
cachedAnswer_.reset();
}
/* ================================
* CALLBACK REGISTRATION
* CALLBACK REG
* ================================ */
void WebRTCTransport::onLocalSdp(
std::function<void(std::string)> cb)
void WebRTCTransport::onLocalSdp(std::function<void(std::string)> cb)
{
onLocalSdpCb_ = std::move(cb);
/* flush cached ANSWER if any */
if (cachedAnswer_) {
onLocalSdpCb_(*cachedAnswer_);
cachedAnswer_.reset();
@ -120,13 +119,9 @@ void WebRTCTransport::onLocalIce(
onLocalIceCb_ = std::move(cb);
}
void WebRTCTransport::onDcOpen(
std::function<void()> cb)
void WebRTCTransport::onDcOpen(std::function<void()> cb)
{
onDcOpenCb_ = std::move(cb);
if (dc_ && dc_->isOpen()) {
onDcOpenCb_();
}
}
void WebRTCTransport::onDcMessage(
@ -136,68 +131,41 @@ void WebRTCTransport::onDcMessage(
}
/* ================================
* SDP
* SIGNALING
* ================================ */
void WebRTCTransport::setRemoteOffer(const std::string& sdp)
{
if (haveRemoteOffer_) {
std::cout << "[Transport] Duplicate OFFER ignored\n";
return;
}
std::cout << "[SDP] New OFFER → reset session\n";
std::cout << "[Transport] Set remote OFFER\n";
destroyPeerConnection();
createPeerConnection();
pc_->setRemoteDescription(
rtc::Description(sdp,
rtc::Description::Type::Offer));
rtc::Description(sdp, rtc::Description::Type::Offer));
haveRemoteOffer_ = true;
/* ===== Flush ICE buffer ===== */
if (!iceBuffer_.empty()) {
std::cout << "[Transport] Flush ICE buffer: "
<< iceBuffer_.size()
<< " candidate(s)\n";
for (auto& c : iceBuffer_)
pc_->addRemoteCandidate(c);
iceBuffer_.clear();
for (auto& c : iceBuffer_) {
pc_->addRemoteCandidate(c);
}
iceBuffer_.clear();
}
createAnswer();
}
void WebRTCTransport::createAnswer()
{
auto state = pc_->signalingState();
if (!pc_) return;
if (state != rtc::PeerConnection::SignalingState::HaveRemoteOffer) {
std::cout << "[Transport] Cannot create ANSWER, state = "
<< static_cast<int>(state) << "\n";
if (pc_->signalingState() !=
rtc::PeerConnection::SignalingState::HaveRemoteOffer) {
return;
}
std::cout << "[Transport] Create ANSWER\n";
std::cout << "[SDP] Create ANSWER\n";
pc_->setLocalDescription(rtc::Description::Type::Answer);
}
void WebRTCTransport::setRemoteAnswer(const std::string& sdp)
{
if (pc_->signalingState() !=
rtc::PeerConnection::SignalingState::HaveLocalOffer) {
std::cout << "[Transport] Ignored remote ANSWER (bad state)\n";
return;
}
else {
std::cout << "[Transport] Set remote ANSWER\n";
}
pc_->setRemoteDescription(
rtc::Description(sdp,
rtc::Description::Type::Answer));
}
/* ================================
* ICE
* ================================ */
@ -208,16 +176,34 @@ void WebRTCTransport::addRemoteIce(
{
rtc::Candidate c(cand, mid);
if (!pc_ || !haveRemoteOffer_) {
iceBuffer_.push_back(c);
return;
}
try {
if (haveRemoteOffer_) {
pc_->addRemoteCandidate(c);
std::cout << "[Transport] addRemoteCandidate mid=" << mid << "\n";
} else {
iceBuffer_.push_back(c);
std::cout << "[Transport] buffer ICE mid=" << mid << "\n";
}
} catch (const std::exception& e) {
std::cerr << "[Transport][ERROR] addRemoteCandidate failed: " << e.what() << "\n";
pc_->addRemoteCandidate(c);
} catch (...) {}
}
/* ================================
* KEEPALIVE
* ================================ */
void WebRTCTransport::tick()
{
if (!dcOpen_) return;
auto now = steady_clock::now();
if (now - lastPing_ > seconds(3)) {
dc_->send("__ping__");
lastPing_ = now;
}
if (now - lastPong_ > seconds(10)) {
std::cout << "[CONN] timeout → reset session\n";
close();
}
}
@ -227,33 +213,18 @@ void WebRTCTransport::addRemoteIce(
void WebRTCTransport::sendMessage(const std::string& msg)
{
if (!dc_) {
std::cout << "[Transport][WARN] DC null\n"; return;
}
if (!dc_->isOpen()) {
std::cout << "[Transport][WARN] DC not open\n"; return;
}
try {
dc_->send(msg);
} catch (const std::exception& e) {
std::cerr << "[Transport][ERROR] sendMessage: " << e.what() << "\n";
}
if (!dc_ || !dc_->isOpen())
return;
dc_->send(msg);
}
/* ================================
* CLOSE
* ================================ */
void WebRTCTransport::close() {
try {
if (dc_) dc_->close();
if (pc_) pc_->close();
} catch (...) {}
{
dcOpen_ = false;
haveRemoteOffer_ = false;
localAnswerSent_ = false;
cachedAnswer_.reset();
iceBuffer_.clear();
}
std::cout << "[Transport] Closed\n";
void WebRTCTransport::close()
{
destroyPeerConnection();
createPeerConnection();
std::cout << "[Transport] Session reset\n";
}

View File

@ -1,65 +1,51 @@
#pragma once
#include <rtc/rtc.hpp>
#include <functional>
#include <memory>
#include <vector>
#include <optional>
#include <string>
#include <vector>
#include <chrono>
class WebRTCTransport {
public:
WebRTCTransport();
/* callbacks */
void onLocalSdp(std::function<void(std::string)> cb);
void onLocalIce(std::function<void(std::string,std::string)> cb);
void onLocalIce(std::function<void(std::string, std::string)> cb);
void onDcOpen(std::function<void()> cb);
void onDcMessage(std::function<void(const std::string&)> cb);
/* signaling */
void setRemoteOffer(const std::string& sdp);
void createAnswer();
void setRemoteAnswer(const std::string& sdp);
void addRemoteIce(const std::string& cand,
const std::string& mid);
void addRemoteIce(const std::string& cand, const std::string& mid);
/* runtime */
void tick();
void sendMessage(const std::string& msg);
void close();
private:
void createPeerConnection();
void destroyPeerConnection();
void createAnswer();
/* ================================
* Signaling State (STRONG)
* ================================ */
enum class TransportState {
Idle, // initial
HaveRemoteOffer, // setRemoteOffer done
AnswerSent, // onLocalDescription(ANSWER)
Connected, // ICE + DTLS up (optional)
Closed
};
TransportState state_ = TransportState::Idle;
/* WebRTC core */
private:
std::shared_ptr<rtc::PeerConnection> pc_;
std::shared_ptr<rtc::DataChannel> dc_;
/* Callbacks */
std::vector<rtc::Candidate> iceBuffer_;
bool dcOpen_;
bool haveRemoteOffer_;
std::optional<std::string> cachedAnswer_;
std::function<void(std::string)> onLocalSdpCb_;
std::function<void(std::string,std::string)> onLocalIceCb_;
std::function<void(std::string, std::string)> onLocalIceCb_;
std::function<void()> onDcOpenCb_;
std::function<void(const std::string&)> onDcMessageCb_;
/* State */
bool dcOpen_{false};
bool haveRemoteOffer_{false};
bool localAnswerSent_{false};
/* ICE buffer */
std::vector<rtc::Candidate> iceBuffer_;
/*FIX: cache SDP if callback registered late */
std::optional<std::string> cachedAnswer_;
std::chrono::steady_clock::time_point lastPing_;
std::chrono::steady_clock::time_point lastPong_;
};