Command Routing & Response Flow

04. 业务协议与命令分发

通过 cmd 字段统一描述业务意图,客户端请求层、网络层和服务端处理层围绕标准 JSON 协议协同。

5 个核心函数4 个重点文件基于当前工程源码

模块职责

通过 cmd 字段统一描述业务意图,客户端请求层、网络层和服务端处理层围绕标准 JSON 协议协同。

重点文件

调用链

  1. 1UI 调用请求函数
  2. 2ClientLogic 生成命令
  3. 3NetworkManager 发送
  4. 4NetworkServer 投递任务
  5. 5BusinessHandler 路由处理
  6. 6响应返回客户端
客户端组合请求

ClientLogic::requestForceRefreshCoreData

统一触发好友、群组、消息中心等核心数据刷新,并利用 pending 标记避免重复请求。

clientlogic.cpp · L892–L906
原型void ClientLogic::requestForceRefreshCoreData()

调用时机

用户主动同步或服务端通知数据变化时调用。

返回说明

无返回值。

参数

无显式参数。

执行流程

  1. 校验认证状态
  2. 重置必要 pending 标记
  3. 拉取好友列表
  4. 拉取群组列表
  5. 拉取消息中心

工程说明

组合请求保持窗口和通话状态不受影响。

关联接口

查看完整实现
clientlogic.cpp
void ClientLogic::requestForceRefreshCoreData()
{
    if (!hasValidAuth("同步数据")) {
        return;
    }

    m_pullNotificationsPending = false;
    m_pullFriendsPending = false;
    m_pullGroupsPending = false;

    requestPullNotifications();
    requestPullFriends();
    requestPullGroups();
}
服务端事件循环

NetworkServer::start

通过 epoll 读取 UDP 数据,将业务包非阻塞投递到线程池,并周期清理失效会话。

network_server.cpp · L503–L630
原型void NetworkServer::start()

调用时机

中心服务初始化完成后进入主循环。

返回说明

无返回值。

参数

无显式参数。

执行流程

  1. 等待 epoll 事件
  2. recvfrom 读取数据
  3. 校验包体
  4. tryEnqueue 投递业务
  5. 执行会话清理

工程说明

收包线程不执行数据库和业务逻辑。

关联接口

查看完整实现
network_server.cpp
void NetworkServer::start()
{
    if (server_fd < 0 || epoll_fd < 0 || !handler) {
        std::cerr << "❌ [UDP 服务器] 初始化失败,无法启动主循环。\n";
        return;
    }

    running.store(true);

    epoll_event events[MAX_EPOLL_EVENTS];

    int64_t lastSessionCleanup = nowSeconds();

    char buffer[65535];

    while (running.load()) {
        int n = epoll_wait(epoll_fd, events, MAX_EPOLL_EVENTS, 1000);

        const int64_t now = nowSeconds();
        if (now - lastSessionCleanup >= 5) {
            removeExpiredSessions(35);
            lastSessionCleanup = now;
        }

        if (n < 0) {
            if (errno == EINTR) {
                continue;
            }

            std::cerr << "❌ [UDP 服务器] epoll_wait 失败: " << strerror(errno) << "\n";
            break;
        }

        for (int i = 0; i < n; ++i) {
            if (events[i].data.fd != server_fd) {
                continue;
            }

            if (events[i].events & (EPOLLERR | EPOLLHUP)) {
                std::cerr << "⚠️ [UDP 服务器] epoll 返回错误事件: events="
                          << events[i].events << "\n";
                continue;
            }

            while (true) {
                sockaddr_in client_addr{};
                socklen_t addr_len = sizeof(client_addr);

                ssize_t len = recvfrom(
                    server_fd,
                    buffer,
                    sizeof(buffer),
                    0,
                    reinterpret_cast<sockaddr*>(&client_addr),
                    &addr_len
                );

                if (len > 0) {

                    std::string data(buffer, static_cast<std::size_t>(len));

                    if (static_cast<std::size_t>(len) > MAX_UDP_PAYLOAD_BYTES) {
                        std::cerr << "⚠️ [UDP 服务器] 收到超大 UDP 包 len="
                                  << len << " from=" << addrToString(client_addr) << "\n";
                    }

                    bool queued = pool.tryEnqueue([this, data, client_addr]() {
                        if (!this->handler) {
                            return;
                        }

                        this->handler->dispatchTask(data, client_addr);
                    });

                    if (!queued) {
                        static std::atomic<unsigned long long> droppedTasks{0};
                        unsigned long long dropped = ++droppedTasks;

                        if (dropped <= 10 || dropped % 100 == 0) {
                            std::cerr << "⚠️ [UDP 服务器] 线程池队列已满,丢弃业务包"
                                      << " dropped=" << dropped
                                      << " pending=" << pool.pendingTasks()
                                      << " active=" << pool.activeTasks()
                                      << " completed=" << pool.completedTasks()
                                      << " rejected=" << pool.rejectedTasks()
                                      << "\n";
                        }
                    }

                    continue;
                }

                if (len == 0) {

                    break;
                }

                if (errno == EAGAIN || errno == EWOULDBLOCK) {
                    break;
                }

                if (errno == EINTR) {
                    continue;
                }

                std::cerr << "⚠️ [UDP 服务器] recvfrom 失败: "
                          << strerror(errno) << "\n";
                break;
            }
        }
    }

    running.store(false);
}
服务端命令路由

BusinessHandler::dispatchTask

解析 JSON 并根据 cmd 将请求分派到认证、消息、群组、媒体或游戏处理函数。

business_handler.cpp · L204–L329
原型void BusinessHandler::dispatchTask(const std::string& json_str, struct sockaddr_in client_addr)

调用时机

线程池执行每个 UDP 业务任务时调用。

返回说明

无直接返回;各处理函数负责响应。

参数

参数说明
json_str原始业务 JSON
client_addr请求来源地址

执行流程

  1. 解析 JSON
  2. 校验对象与 cmd
  3. 匹配命令分支
  4. 调用具体处理器
  5. 捕获解析与业务异常

工程说明

集中路由便于协议扩展和统一错误处理。

关联接口

查看完整实现
business_handler.cpp
void BusinessHandler::dispatchTask(const std::string& json_str,
                                   struct sockaddr_in client_addr)
{
    try {
        json j = json::parse(json_str);

        if (!j.is_object()) {
            sendSimpleError(net_server, client_addr, "error", "请求 JSON 必须是对象");
            return;
        }

        std::string cmd = getStringField(j, "cmd");

        if (cmd.empty()) {
            sendSimpleError(net_server, client_addr, "error", "缺少 cmd 字段");
            return;
        }

        if (cmd == "login") {
            handleLogin(json_str, client_addr);
        } else if (cmd == "register") {
            handleRegister(json_str, client_addr);
        } else if (cmd == "heartbeat") {
            handleHeartbeat(json_str, client_addr);

        } else if (cmd == "send_chat" || cmd == "chat") {
            handleChatRelay(json_str, client_addr);
        } else if (cmd == "sync_chat" || cmd == "pull_history") {
            handleSyncChat(json_str, client_addr);
        } else if (cmd == "add_friend") {
            handleAddFriend(json_str, client_addr);
        } else if (cmd == "pull_friends") {
            handlePullFriends(json_str, client_addr);
        } else if (cmd == "pull_notifications") {
            handlePullNotifications(json_str, client_addr);
        } else if (cmd == "process_notification") {
            handleProcessNotification(json_str, client_addr);
        } else if (cmd == "pull_friend_requests") {
            handlePullFriendRequests(json_str, client_addr);
        } else if (cmd == "process_friend_req") {
            handleProcessFriendReq(json_str, client_addr);
        } else if (cmd == "delete_friend") {
            handleDeleteFriend(j, client_addr, net_server);

        } else if (cmd == "call") {
            handleCall(json_str, client_addr);
        } else if (cmd == "accept") {
            handleAccept(json_str, client_addr);
        } else if (cmd == "reject") {
            handleRejectCall(json_str, client_addr);
        } else if (cmd == "call_end") {
            handleEndCall(json_str, client_addr);
        } else if (cmd == "media_audio") {
            handleMediaAudio(json_str, client_addr);
        } else if (cmd == "media_video") {
            handleMediaVideo(json_str, client_addr);

        } else if (cmd == "invite_group_member") {
            handleInviteGroupMember(json_str, client_addr);
        } else if (cmd == "create_group") {
            handleCreateGroup(json_str, client_addr);
        } else if (cmd == "pull_groups") {
            handlePullGroups(json_str, client_addr);
        } else if (cmd == "pull_group_members") {
            handlePullGroupMembers(json_str, client_addr);
        } else if (cmd == "group_chat") {
            handleGroupChat(json_str, client_addr);
        } else if (cmd == "sync_group_chat") {
            handleSyncGroupChat(json_str, client_addr);
        } else if (cmd == "group_media_video") {
            handleGroupMediaVideo(json_str, client_addr);
        } else if (cmd == "group_media_video_control") {
            handleGroupMediaVideoControl(json_str, client_addr);
        } else if (cmd == "group_media_audio") {
            handleGroupMediaAudio(json_str, client_addr);

        } else if (cmd == "create_game_room") {
            handleCreateGameRoom(json_str, client_addr);
        } else if (cmd == "game_join_room") {
            handleGameJoinRoom(json_str, client_addr);
        } else if (cmd == "game_pull_room") {
            handleGamePullRoom(json_str, client_addr);
        } else if (cmd == "game_update_config") {
            handleGameUpdateConfig(json_str, client_addr);
        } else if (cmd == "game_ready") {
            handleGameReady(json_str, client_addr);
        } else if (cmd == "game_start") {
            handleGameStart(json_str, client_addr);
        } else if (cmd == "game_end_room") {
            handleGameEndRoom(json_str, client_addr);

        } else if (cmd == "cloud_factorio_status") {
            handleCloudFactorioStatus(json_str, client_addr);
        } else if (cmd == "cloud_factorio_start") {
            handleCloudFactorioStart(json_str, client_addr);
        } else if (cmd == "cloud_factorio_query") {
            handleCloudFactorioQuery(json_str, client_addr);
        } else if (cmd == "cloud_factorio_stop") {
            handleCloudFactorioStop(json_str, client_addr);
        } else if (cmd == "cloud_factorio_export_save") {
            handleCloudFactorioExportSave(json_str, client_addr);
        } else {
            std::cout << "⚠️ [服务器中转] 收到无法识别的未知指令: "
                      << cmd << "\n";
            sendSimpleError(net_server, client_addr, "error", "未知 cmd: " + cmd);
        }

    } catch (const json::parse_error& e) {
        std::string preview = json_str.substr(0, 200);

        std::cerr << "JSON 解析错误,数据长度: " << json_str.size()
                  << ",错误: " << e.what()
                  << ",前200字节: " << preview << "\n";

        sendSimpleError(net_server, client_addr, "error", "JSON 解析失败");
    } catch (const std::exception& e) {
        std::cerr << "dispatchTask 业务分发异常: " << e.what() << "\n";
        sendSimpleError(net_server, client_addr, "error", "服务器业务分发异常");
    }
}
服务端响应发送

NetworkServer::sendData

将序列化后的响应或通知发送到指定客户端 UDP 地址。

network_server.cpp · L641–L681
原型void NetworkServer::sendData(const std::string& data, sockaddr_in dest_addr)

调用时机

业务处理完成或需要主动推送时调用。

返回说明

无返回值。

参数

参数说明
data待发送数据
dest_addr目标客户端地址

执行流程

  1. 校验服务状态
  2. 锁定发送资源
  3. sendto 发送
  4. 记录发送异常

工程说明

通知推送和请求响应共用统一发送出口。

关联接口

查看完整实现
network_server.cpp
void NetworkServer::sendData(const std::string& data, sockaddr_in dest_addr)
{
    if (server_fd < 0) {
        return;
    }

    if (data.empty()) {
        return;
    }

    if (data.size() > MAX_UDP_PAYLOAD_BYTES) {
        std::cerr << "⚠️ [UDP 服务器] sendData 包过大 size="
                  << data.size()
                  << ",可能发生 UDP 分片/丢包\n";
    }

    if (dest_addr.sin_family != AF_INET || dest_addr.sin_port == 0) {
        std::cerr << "⚠️ [UDP 服务器] sendData 目标地址非法\n";
        return;
    }

    ssize_t sent = sendto(
        server_fd,
        data.data(),
        data.size(),
        0,
        reinterpret_cast<sockaddr*>(&dest_addr),
        sizeof(dest_addr)
    );

    if (sent < 0) {
        std::cerr << "⚠️ [UDP 服务器] sendto 失败: "
                  << strerror(errno) << "\n";
        return;
    }

    if (static_cast<std::size_t>(sent) != data.size()) {
        std::cerr << "⚠️ [UDP 服务器] sendto 长度异常 sent="
                  << sent << " total=" << data.size() << "\n";
    }
}
状态变化推送

BusinessHandler::notifyNotificationsChanged

向在线目标用户发送轻量级 notifications_changed 事件,驱动客户端主动刷新完整数据。

business_handler.cpp · L170–L202
原型void BusinessHandler::notifyNotificationsChanged(const std::string& targetUser, const std::string& reason, const json& extra)

调用时机

好友申请、通知处理或群组关系变化后调用。

返回说明

无返回值。

参数

参数说明
targetUser目标用户
reason状态变化原因

执行流程

  1. 查询目标在线地址
  2. 构造轻量事件
  3. 写入原因字段
  4. 发送到在线客户端

工程说明

推送事件只表达“数据已变化”,完整数据仍通过拉取接口获取。

关联接口

查看完整实现
business_handler.cpp
void BusinessHandler::notifyNotificationsChanged(const std::string& targetUser,
                                                 const std::string& reason,
                                                 const json& extra)
{
    if (!net_server || targetUser.empty()) {
        return;
    }

    Session targetSession;
    if (!net_server->getSession(targetUser, targetSession)) {
        return;
    }

    json notify;
    notify["cmd"] = "notifications_changed";
    notify["reason"] = reason;

    if (extra.is_object()) {
        for (auto it = extra.begin(); it != extra.end(); ++it) {
            notify[it.key()] = it.value();
        }
    }

    struct sockaddr_in targetAddr{};
    targetAddr.sin_family = AF_INET;
    inet_pton(AF_INET, targetSession.ip.c_str(), &targetAddr.sin_addr);
    targetAddr.sin_port = htons(targetSession.port);

    net_server->sendData(notify.dump(), targetAddr);

    std::cout << "📨 [消息中心] 已通知在线用户 [" << targetUser
              << "] 刷新状态,reason=" << reason << "\n";
}