UDP Signaling & TCP File Transport

03. 实时通信与文件通道

采用 UDP 承载轻量业务与实时媒体,TCP 承载文件数据,通过独立通道满足低延迟与可靠传输两类需求。

6 个核心函数5 个重点文件基于当前工程源码

模块职责

采用 UDP 承载轻量业务与实时媒体,TCP 承载文件数据,通过独立通道满足低延迟与可靠传输两类需求。

重点文件

调用链

  1. 1客户端序列化 JSON
  2. 2UDP/TCP 发送
  3. 3中心服务接收
  4. 4业务或文件处理
  5. 5结果与进度回传
客户端 UDP 发送

NetworkManager::sendJson

将 QJsonObject 序列化为紧凑 JSON,并通过 QUdpSocket 发送至指定地址。

networkmanager.cpp · L136–L176
原型void NetworkManager::sendJson(const QJsonObject& json, const QHostAddress& ip, quint16 port)

调用时机

ClientLogic 完成业务封装后调用。

返回说明

无返回值;错误通过 sigLogMessage 发出。

参数

参数说明
json待发送业务对象
ip目标地址
port目标端口

执行流程

  1. 校验 socket 与目标
  2. 序列化 JSON
  3. 检查包体规模
  4. writeDatagram 发送
  5. 记录异常

工程说明

适用于控制信令、消息和实时分片,不承担大文件传输。

关联接口

查看完整实现
networkmanager.cpp
void NetworkManager::sendJson(const QJsonObject& json,
                              const QHostAddress& ip,
                              quint16 port)
{
    if (!udpSocket) {
        emit sigLogMessage("⚠️ UDP socket 未初始化,无法发送。");
        return;
    }

    if (ip.isNull() || port == 0) {
        emit sigLogMessage("⚠️ UDP 发送目标无效,已取消发送。");
        return;
    }

    QJsonDocument doc(json);
    QByteArray data = doc.toJson(QJsonDocument::Compact);

    if (data.isEmpty()) {
        emit sigLogMessage("⚠️ UDP 发送内容为空,已取消发送。");
        return;
    }

    if (data.size() > MAX_SAFE_UDP_PACKET_BYTES) {
        emit sigLogMessage(QString("⚠️ UDP 包过大: %1 bytes,可能发生分片丢包。cmd=%2")
                               .arg(data.size())
                               .arg(json.value("cmd").toString()));
    }

    qint64 sent = udpSocket->writeDatagram(data, ip, port);

    if (sent < 0) {
        emit sigLogMessage(QString("⚠️ UDP 发送失败: %1").arg(udpSocket->errorString()));
        return;
    }

    if (sent != data.size()) {
        emit sigLogMessage(QString("⚠️ UDP 发送长度异常: sent=%1 total=%2")
                               .arg(sent)
                               .arg(data.size()));
    }
}
客户端 UDP 接收

NetworkManager::processPendingDatagrams

批量读取待处理数据报,校验来源并解析 JSON,随后发出统一响应信号。

networkmanager.cpp · L196–L255
原型void NetworkManager::processPendingDatagrams()

调用时机

QUdpSocket readyRead 信号触发。

返回说明

无返回值。

参数

无显式参数。

执行流程

  1. 循环读取数据报
  2. 校验来源地址
  3. 解析 JSON 文档
  4. 发出 sigResponseReceived
  5. 记录非法数据

工程说明

接收函数不直接操作 UI,保持网络层职责单一。

关联接口

查看完整实现
networkmanager.cpp
void NetworkManager::processPendingDatagrams()
{
    if (!udpSocket) {
        return;
    }

    int processed = 0;

    while (udpSocket->hasPendingDatagrams()) {
        if (processed >= MAX_DATAGRAMS_PER_TICK) {

            QTimer::singleShot(0, this, &NetworkManager::processPendingDatagrams);
            break;
        }

        processed++;

        QNetworkDatagram datagram = udpSocket->receiveDatagram();
        QByteArray data = datagram.data();

        if (data.isEmpty()) {
            continue;
        }

        if (data.size() > MAX_SAFE_UDP_PACKET_BYTES) {
            emit sigLogMessage(QString("⚠️ 收到过大的 UDP 包: %1 bytes, from=%2:%3")
                                   .arg(data.size())
                                   .arg(datagram.senderAddress().toString())
                                   .arg(datagram.senderPort()));

        }

        QJsonParseError error;
        QJsonDocument doc = QJsonDocument::fromJson(data, &error);

        if (error.error != QJsonParseError::NoError || !doc.isObject()) {
            badDatagramCount++;

            if (badDatagramCount <= 5 || badDatagramCount % 50 == 0) {
                emit sigLogMessage(QString("解析收到的 UDP JSON 失败: %1, from=%2:%3, bad_count=%4")
                                       .arg(error.errorString())
                                       .arg(datagram.senderAddress().toString())
                                       .arg(datagram.senderPort())
                                       .arg(badDatagramCount));
            }

            continue;
        }

        QJsonObject obj = doc.object();

        emit sigJsonReceived(obj);
    }
}
服务端 UDP 初始化

NetworkServer::initUdpServer

创建非阻塞 UDP socket,绑定业务端口并注册到 epoll。

network_server.cpp · L139–L214
原型bool NetworkServer::initUdpServer(int port)

调用时机

NetworkServer 构造阶段调用。

返回说明

初始化成功返回 true。

参数

参数说明
port监听端口

执行流程

  1. 创建 socket
  2. 设置复用与非阻塞
  3. 绑定地址
  4. 创建 epoll
  5. 注册 EPOLLIN

工程说明

以 epoll 驱动业务收包,避免阻塞式单连接模型。

关联接口

查看完整实现
network_server.cpp
bool NetworkServer::initUdpServer(int port)
{
    if (port <= 0 || port > 65535) {
        std::cerr << "❌ [UDP 服务器] 非法端口: " << port << "\n";
        return false;
    }

    server_fd = socket(AF_INET, SOCK_DGRAM, 0);

    if (server_fd < 0) {
        std::cerr << "❌ [UDP 服务器] 创建 Socket 失败: " << strerror(errno) << "\n";
        return false;
    }

    int reuse = 1;
    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) < 0) {
        std::cerr << "⚠️ [UDP 服务器] 设置 SO_REUSEADDR 失败: " << strerror(errno) << "\n";
    }

    int flags = fcntl(server_fd, F_GETFL, 0);

    if (flags < 0) {
        std::cerr << "❌ [UDP 服务器] 获取 Socket flags 失败: " << strerror(errno) << "\n";
        close(server_fd);
        server_fd = -1;
        return false;
    }

    if (fcntl(server_fd, F_SETFL, flags | O_NONBLOCK) < 0) {
        std::cerr << "❌ [UDP 服务器] 设置非阻塞失败: " << strerror(errno) << "\n";
        close(server_fd);
        server_fd = -1;
        return false;
    }

    sockaddr_in server_addr{};
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = INADDR_ANY;
    server_addr.sin_port = htons(static_cast<uint16_t>(port));

    if (bind(server_fd, reinterpret_cast<sockaddr*>(&server_addr), sizeof(server_addr)) < 0) {
        std::cerr << "❌ [UDP 服务器] 绑定端口 " << port
                  << " 失败: " << strerror(errno) << "\n";
        close(server_fd);
        server_fd = -1;
        return false;
    }

    epoll_fd = epoll_create1(0);

    if (epoll_fd < 0) {
        std::cerr << "❌ [UDP 服务器] 创建 epoll 失败: " << strerror(errno) << "\n";
        close(server_fd);
        server_fd = -1;
        return false;
    }

    epoll_event event{};
    event.events = EPOLLIN | EPOLLET;
    event.data.fd = server_fd;

    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &event) < 0) {
        std::cerr << "❌ [UDP 服务器] epoll_ctl 添加监听失败: "
                  << strerror(errno) << "\n";
        close(server_fd);
        close(epoll_fd);
        server_fd = -1;
        epoll_fd = -1;
        return false;
    }

    std::cout << "✅ [UDP 服务器] 已绑定端口 " << port << ",非阻塞 epoll ET 模式\n";
    return true;
}
客户端文件上传

TcpFileUploader::startUpload

打开本地文件并连接 TCP 文件服务,后续按块发送文件头和文件内容。

tcpfileuploader.cpp · L51–L60
原型void TcpFileUploader::startUpload()

调用时机

用户选择图片或附件并确认发送后调用。

返回说明

无返回值;进度与完成状态通过信号返回。

参数

无显式参数。

执行流程

  1. 打开源文件
  2. 校验文件状态
  3. 连接 TCP 服务
  4. 在 connected 回调发送元数据
  5. 分块写入内容

工程说明

认证字段和传输场景通过文件头发送。

关联接口

查看完整实现
tcpfileuploader.cpp
void TcpFileUploader::startUpload() {
    if (!file->open(QIODevice::ReadOnly)) {
        emit uploadFinished(false, "无法读取本地文件!");
        return;
    }
    payloadSize = file->size();

    qDebug() << "🔗 正在建立 TCP 数据通道连接:" << m_serverIp << ":" << m_serverPort;
    tcpSocket->connectToHost(m_serverIp, m_serverPort);
}
客户端文件下载

TcpFileDownloader::startDownload

连接 TCP 文件服务,请求指定文件并将数据流保存到目标路径。

tcpfiledownloader.cpp · L56–L59
原型void TcpFileDownloader::startDownload()

调用时机

用户点击聊天文件下载入口时调用。

返回说明

无返回值;完成或错误通过信号返回。

参数

无显式参数。

执行流程

  1. 连接 TCP 服务
  2. 发送下载请求
  3. 读取响应头
  4. 持续写入文件
  5. 完成后校验状态

工程说明

私聊和群聊文件使用相同传输器,通过 scene/target/group_id 区分权限上下文。

关联接口

查看完整实现
tcpfiledownloader.cpp
void TcpFileDownloader::startDownload() {
    qDebug() << "🔗 正在连接服务器准备下载...";
    tcpSocket->connectToHost(m_serverIp, m_serverPort);
}
服务端 TCP 接入

TcpFileServer::acceptLoop

持续接受文件连接,并将客户端处理任务交给受控执行单元。

tcp_file_server.cpp · L702–L760
原型void TcpFileServer::acceptLoop()

调用时机

TCP 文件服务启动后在后台运行。

返回说明

无返回值。

参数

无显式参数。

执行流程

  1. 等待新连接
  2. 处理 accept 异常
  3. 配置客户端 socket
  4. 投递连接处理任务
  5. 继续接受连接

工程说明

接入层与具体上传下载协议解析分离。

关联接口

查看完整实现
tcp_file_server.cpp
void TcpFileServer::acceptLoop()
{
    while (m_running.load()) {
        sockaddr_in client_addr{};
        socklen_t addr_len = sizeof(client_addr);

        int client_fd = accept(
            m_server_fd,
            reinterpret_cast<sockaddr*>(&client_addr),
            &addr_len
        );

        if (client_fd < 0) {
            if (!m_running.load()) {
                break;
            }

            if (errno == EINTR) {
                continue;
            }

            std::cerr << "⚠️ [TCP 文件服务器] accept 失败: "
                      << strerror(errno) << "\n";
            continue;
        }

        if (m_active_clients.load() >= MAX_ACTIVE_CLIENTS) {
            json resp;
            resp["cmd"] = "error";
            resp["msg"] = "文件服务器繁忙,请稍后重试";
            sendJsonLine(client_fd, resp);
            close(client_fd);
            continue;
        }

        setSocketTimeout(client_fd);

        bool queued = m_worker_pool.tryEnqueue([this, client_fd]() {
            this->handleClient(client_fd);
        });

        if (!queued) {
            json resp;
            resp["cmd"] = "error";
            resp["msg"] = "文件服务器繁忙,请稍后重试";
            sendJsonLine(client_fd, resp);
            close(client_fd);

            std::cerr << "⚠️ [TCP 文件服务器] 线程池队列已满,拒绝新的文件连接。"
                      << " pending=" << m_worker_pool.pendingTasks()
                      << " active=" << m_worker_pool.activeTasks()
                      << " rejected=" << m_worker_pool.rejectedTasks()
                      << "\n";
        }
    }
}