MySQL Persistence & Redis Cache

06. 数据持久化与高速缓存

MySQL 承担关系型长期数据,Redis 承担高频会话与最近消息,DBManager 作为统一访问边界。

6 个核心函数3 个重点文件基于当前工程源码

模块职责

MySQL 承担关系型长期数据,Redis 承担高频会话与最近消息,DBManager 作为统一访问边界。

重点文件

调用链

  1. 1业务层调用 DBManager
  2. 2借用连接池资源
  3. 3执行 MySQL/Redis 操作
  4. 4组织结果
  5. 5RAII 自动归还连接
数据服务初始化

DBManager::init

加载数据库参数,初始化 MySQL/Redis 连接池并准备数据访问环境。

db_manager.cpp · L321–L430
原型bool DBManager::init(const std::string& host, const std::string& user, const std::string& pwd, const std::string& dbname)

调用时机

中心服务启动阶段调用。

返回说明

初始化成功返回 true。

参数

参数说明
hostMySQL 地址
user数据库账号
password数据库凭据
database数据库名称

执行流程

  1. 保存连接参数
  2. 初始化 MySQL 连接池
  3. 初始化 Redis 连接池
  4. 验证关键连接
  5. 输出初始化状态

工程说明

后续业务函数按请求借用独立连接。

关联接口

查看完整实现
db_manager.cpp
bool DBManager::init(const std::string& host,
                     const std::string& user,
                     const std::string& pwd,
                     const std::string& dbname) {
    connection_pools_ready = false;

    {
        std::lock_guard<std::recursive_mutex> lock(mysql_mutex);

        mysql_conn = mysql_init(nullptr);
        if (!mysql_conn) {
            std::cerr << "MySQL Init Failed: mysql_init returned null\n";
            return false;
        }

        if (!mysql_real_connect(mysql_conn,
                                host.c_str(),
                                user.c_str(),
                                pwd.c_str(),
                                dbname.c_str(),
                                0,
                                nullptr,
                                0)) {
            std::cerr << "MySQL Init Failed: " << mysql_error(mysql_conn) << "\n";
            mysql_close(mysql_conn);
            mysql_conn = nullptr;
            return false;
        }

        mysql_set_character_set(mysql_conn, "utf8mb4");
    }

    {
        std::lock_guard<std::mutex> r_lock(redis_mutex);

        redis_conn = redisConnect("127.0.0.1", 6379);
        if (redis_conn == nullptr || redis_conn->err) {
            std::cerr << "Redis Init Failed\n";

            if (redis_conn) {
                redisFree(redis_conn);
                redis_conn = nullptr;
            }

            std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
            if (mysql_conn) {
                mysql_close(mysql_conn);
                mysql_conn = nullptr;
            }

            return false;
        }
    }

    MySQLPoolConfig mysqlCfg;
    mysqlCfg.host = host;
    mysqlCfg.user = user;
    mysqlCfg.password = pwd;
    mysqlCfg.database = dbname;
    mysqlCfg.minConnections = 2;
    mysqlCfg.maxConnections = 16;
    mysqlCfg.acquireTimeoutMs = 3000;
    mysqlCfg.connectTimeoutSeconds = 5;
    mysqlCfg.charset = "utf8mb4";
    mysqlCfg.reconnect = false;

    RedisPoolConfig redisCfg;
    redisCfg.host = "127.0.0.1";
    redisCfg.port = 6379;
    redisCfg.minConnections = 1;
    redisCfg.maxConnections = 8;
    redisCfg.acquireTimeoutMs = 3000;

    bool mysqlPoolOk = mysql_pool.init(mysqlCfg);
    bool redisPoolOk = redis_pool.init(redisCfg);

    if (mysqlPoolOk) {
        auto testConn = mysql_pool.acquire(1000);
        mysqlPoolOk = static_cast<bool>(testConn);
    }

    if (redisPoolOk) {
        auto testConn = redis_pool.acquire(1000);
        redisPoolOk = static_cast<bool>(testConn);
    }

    connection_pools_ready = mysqlPoolOk && redisPoolOk;

    if (connection_pools_ready) {
        std::cout << "✅ [DBPool] MySQL/Redis 连接池初始化成功\n";
    } else {
        std::cerr << "⚠️ [DBPool] 连接池初始化不完整,暂时回退到旧单连接路径\n";
    }

    return true;
}
账号验证

DBManager::verifyLogin

通过 MySQL 查询用户凭据并返回认证结果。

db_manager.cpp · L480–L547
原型bool DBManager::verifyLogin(const std::string& username, const std::string& password)

调用时机

BusinessHandler::handleLogin 调用。

返回说明

匹配成功返回 true。

参数

参数说明
username账号
password登录凭据

执行流程

  1. 借用 MySQL 连接
  2. 转义输入
  3. 执行查询
  4. 读取结果
  5. 释放结果集

工程说明

当前访问封装为 DBManager 方法,便于后续替换为预编译语句。

关联接口

查看完整实现
db_manager.cpp
bool DBManager::verifyLogin(const std::string& username,
                            const std::string& password) {
    if (connection_pools_ready) {
        auto conn = mysql_pool.acquire();
        if (!conn) {
            std::cerr << "[DBPool] verifyLogin acquire MySQL failed\n";
            return false;
        }

        MYSQL* mysql = conn.get();
        if (!ensureMySQLAlive(mysql, "verifyLogin(pool)")) {
            return false;
        }

        std::string safeUser = escapeStringWithConn(mysql, username);
        std::string safePwd = escapeStringWithConn(mysql, password);

        std::string sql =
            "SELECT username FROM users "
            "WHERE username = '" + safeUser + "' "
            "AND password_hash = '" + safePwd + "'";

        if (mysql_query(mysql, sql.c_str()) != 0) {
            logMySQLError(mysql, "verifyLogin(pool) query", sql);
            return false;
        }

        MYSQL_RES* res = mysql_store_result(mysql);
        if (!res) {
            logMySQLError(mysql, "verifyLogin(pool) store result", sql);
            return false;
        }

        bool ok = mysql_num_rows(res) > 0;
        mysql_free_result(res);
        return ok;
    }

    std::lock_guard<std::recursive_mutex> lock(mysql_mutex);

    if (!ensureMySQLAlive(mysql_conn, "verifyLogin")) {
        return false;
    }

    std::string safeUser = escapeString(username);
    std::string safePwd = escapeString(password);

    std::string sql =
        "SELECT username FROM users "
        "WHERE username = '" + safeUser + "' "
        "AND password_hash = '" + safePwd + "'";

    if (mysql_query(mysql_conn, sql.c_str()) != 0) {
        logMySQLError(mysql_conn, "verifyLogin query", sql);
        return false;
    }

    MYSQL_RES* res = mysql_store_result(mysql_conn);
    if (!res) {
        logMySQLError(mysql_conn, "verifyLogin store result", sql);
        return false;
    }

    bool ok = mysql_num_rows(res) > 0;
    mysql_free_result(res);

    return ok;
}
私聊最近消息缓存

DBManager::saveChatMessage

将结构化私聊消息写入 Redis 列表,并限制保留数量。

db_manager.cpp · L653–L703
原型void DBManager::saveChatMessage(const std::string& room_id, const std::string& json_msg)

调用时机

私聊消息通过认证和关系校验后调用。

返回说明

无返回值。

参数

参数说明
room_id私聊房间键
message_json序列化消息

执行流程

  1. 借用 Redis 连接
  2. RPUSH 写入消息
  3. LTRIM 限制长度
  4. 归还连接

工程说明

高频最近记录使用 Redis,降低关系库访问压力。

关联接口

查看完整实现
db_manager.cpp
void DBManager::saveChatMessage(const std::string& room_id,
                                const std::string& json_msg) {
    if (connection_pools_ready) {
        auto conn = redis_pool.acquire();
        if (!conn || !ensureRedisAlive(conn.get(), "saveChatMessage(pool)")) {
            return;
        }

        redisContext* redis = conn.get();

        redisReply* reply = static_cast<redisReply*>(
            redisCommand(redis, "RPUSH %s %s", room_id.c_str(), json_msg.c_str())
        );

        if (reply) {
            freeReplyObject(reply);
        }

        reply = static_cast<redisReply*>(
            redisCommand(redis, "LTRIM %s -500 -1", room_id.c_str())
        );

        if (reply) {
            freeReplyObject(reply);
        }

        return;
    }

    std::lock_guard<std::mutex> lock(redis_mutex);

    if (!redis_conn) {
        return;
    }

    redisReply* reply = static_cast<redisReply*>(
        redisCommand(redis_conn, "RPUSH %s %s", room_id.c_str(), json_msg.c_str())
    );

    if (reply) {
        freeReplyObject(reply);
    }

    reply = static_cast<redisReply*>(
        redisCommand(redis_conn, "LTRIM %s -500 -1", room_id.c_str())
    );

    if (reply) {
        freeReplyObject(reply);
    }
}
私聊历史读取

DBManager::getChatHistory

从 Redis 获取指定范围的最近私聊记录并返回 JSON 字符串集合。

db_manager.cpp · L706–L768
原型std::vector<std::string> DBManager::getChatHistory(const std::string& room_id, int limit)

调用时机

客户端同步私聊历史时调用。

返回说明

返回消息字符串列表。

参数

参数说明
room_id私聊房间键
limit最大返回条数

执行流程

  1. 借用 Redis 连接
  2. 计算列表范围
  3. 执行 LRANGE
  4. 遍历回复元素
  5. 返回消息集合

工程说明

返回结果由业务层转成 JSON 数组。

关联接口

查看完整实现
db_manager.cpp
std::vector<std::string> DBManager::getChatHistory(const std::string& room_id,
                                                   int limit) {
    std::vector<std::string> history;

    if (limit <= 0 || limit > 500) {
        limit = 50;
    }

    if (connection_pools_ready) {
        auto conn = redis_pool.acquire();
        if (!conn || !ensureRedisAlive(conn.get(), "getChatHistory(pool)")) {
            return history;
        }

        redisReply* reply = static_cast<redisReply*>(
            redisCommand(conn.get(), "LRANGE %s -%d -1", room_id.c_str(), limit)
        );

        if (!reply) {
            return history;
        }

        if (reply->type == REDIS_REPLY_ARRAY) {
            for (size_t i = 0; i < reply->elements; ++i) {
                redisReply* item = reply->element[i];

                if (item && item->type == REDIS_REPLY_STRING && item->str) {
                    history.emplace_back(item->str, item->len);
                }
            }
        }

        freeReplyObject(reply);
        return history;
    }

    std::lock_guard<std::mutex> lock(redis_mutex);

    if (!redis_conn) {
        return history;
    }

    redisReply* reply = static_cast<redisReply*>(
        redisCommand(redis_conn, "LRANGE %s -%d -1", room_id.c_str(), limit)
    );

    if (!reply) {
        return history;
    }

    if (reply->type == REDIS_REPLY_ARRAY) {
        for (size_t i = 0; i < reply->elements; ++i) {
            redisReply* item = reply->element[i];

            if (item && item->type == REDIS_REPLY_STRING && item->str) {
                history.emplace_back(item->str, item->len);
            }
        }
    }

    freeReplyObject(reply);
    return history;
}
群聊持久化

DBManager::saveGroupMessage

将群聊消息写入 MySQL,保存消息标识、发送者、类型、内容和时间。

db_manager.cpp · L3044–L3174
原型bool DBManager::saveGroupMessage(const std::string& msgId, uint64_t groupId, const std::string& senderId, const std::string& msgType, const std::string& content, int64_t createdAt, std::string& outMsg)

调用时机

BusinessHandler::handleGroupChat 完成权限校验后调用。

返回说明

写入成功返回 true。

参数

参数说明
msgId消息唯一标识
groupId群组 ID
senderId发送者
messageType消息类型
content消息内容

执行流程

  1. 借用 MySQL 连接
  2. 转义消息字段
  3. 构造插入语句
  4. 执行写入
  5. 检查结果

工程说明

msg_id 用于客户端历史去重。

关联接口

查看完整实现
db_manager.cpp
bool DBManager::saveGroupMessage(const std::string& msgId,
                                 uint64_t groupId,
                                 const std::string& senderId,
                                 const std::string& msgType,
                                 const std::string& content,
                                 int64_t createdAt,
                                 std::string& outMsg) {
    outMsg.clear();

    if (connection_pools_ready) {
        if (msgId.empty() || groupId == 0 || senderId.empty() || content.empty()) {
            outMsg = "群消息参数不完整";
            return false;
        }

        auto conn = mysql_pool.acquire();
        if (!conn) {
            outMsg = "获取数据库连接失败";
            std::cerr << "[DBPool] saveGroupMessage acquire MySQL failed\n";
            return false;
        }

        MYSQL* mysql = conn.get();
        if (!ensureMySQLAlive(mysql, "saveGroupMessage(pool)")) {
            outMsg = "数据库连接异常";
            return false;
        }

        std::string safeSender = escapeStringWithConn(mysql, senderId);

        std::string memberSql =
            "SELECT id FROM group_members "
            "WHERE group_id=" + std::to_string(groupId) + " "
            "AND user_id='" + safeSender + "' "
            "AND status='normal' "
            "LIMIT 1";

        if (mysql_query(mysql, memberSql.c_str()) != 0) {
            outMsg = std::string("检查群成员失败: ") + mysql_error(mysql);
            return false;
        }

        MYSQL_RES* memberRes = mysql_store_result(mysql);
        if (!memberRes) {
            outMsg = "检查群成员结果失败";
            return false;
        }

        bool isMember = mysql_num_rows(memberRes) > 0;
        mysql_free_result(memberRes);

        if (!isMember) {
            outMsg = "你不是该群成员";
            return false;
        }

        std::string safeMsgId = escapeStringWithConn(mysql, msgId);
        std::string safeType = escapeStringWithConn(mysql, msgType);
        std::string safeContent = escapeStringWithConn(mysql, content);

        std::string sql =
            "INSERT INTO group_messages(msg_id, group_id, sender_id, msg_type, content, created_at) VALUES('"
            + safeMsgId + "', "
            + std::to_string(groupId) + ", '"
            + safeSender + "', '"
            + safeType + "', '"
            + safeContent + "', "
            + std::to_string(createdAt) + ")";

        if (mysql_query(mysql, sql.c_str()) != 0) {

            if (mysql_errno(mysql) == 1062) {
                outMsg = "群消息已存在";
                return true;
            }

            logMySQLError(mysql, "saveGroupMessage(pool) insert", sql);
            outMsg = std::string("保存群消息失败: ") + mysql_error(mysql);
            return false;
        }

        outMsg = "群消息已保存";
        return true;
    }

    std::lock_guard<std::recursive_mutex> lock(mysql_mutex);

    outMsg.clear();

    if (!mysql_conn) {
        outMsg = "数据库未连接";
        return false;
    }

    if (msgId.empty() || groupId == 0 || senderId.empty() || content.empty()) {
        outMsg = "群消息参数不完整";
        return false;
    }

    if (!isGroupMember(groupId, senderId)) {
        outMsg = "你不是该群成员";
        return false;
    }

    std::string safeMsgId = escapeString(msgId);
    std::string safeSender = escapeString(senderId);
    std::string safeType = escapeString(msgType);
    std::string safeContent = escapeString(content);

    std::string sql =
        "INSERT INTO group_messages(msg_id, group_id, sender_id, msg_type, content, created_at) VALUES('"
        + safeMsgId + "', "
        + std::to_string(groupId) + ", '"
        + safeSender + "', '"
        + safeType + "', '"
        + safeContent + "', "
        + std::to_string(createdAt) + ")";

    if (mysql_query(mysql_conn, sql.c_str()) != 0) {
        outMsg = std::string("保存群消息失败: ") + mysql_error(mysql_conn);
        return false;
    }

    outMsg = "群消息已保存";
    return true;

}
群聊历史读取

DBManager::getGroupMessages

按群组和时间条件查询持久化群消息,输出 JSON 数组。

db_manager.cpp · L3176–L3305
原型bool DBManager::getGroupMessages(uint64_t groupId, int64_t sinceTs, int limit, json& history)

调用时机

群聊窗口首次加载或增量刷新时调用。

返回说明

查询成功返回 true。

参数

参数说明
groupId群组 ID
sinceTs增量同步起点
messages输出消息数组

执行流程

  1. 校验群组参数
  2. 借用 MySQL 连接
  3. 执行有序查询
  4. 转换每行字段
  5. 填充 JSON 数组

工程说明

支持按时间戳增量拉取,减少重复传输。

关联接口

查看完整实现
db_manager.cpp
bool DBManager::getGroupMessages(uint64_t groupId,
                                 int64_t sinceTs,
                                 int limit,
                                 json& history) {
    history = json::array();

    if (connection_pools_ready) {
        if (groupId == 0) {
            return false;
        }

        if (limit <= 0 || limit > 200) {
            limit = 50;
        }

        auto conn = mysql_pool.acquire();
        if (!conn) {
            std::cerr << "[DBPool] getGroupMessages acquire MySQL failed\n";
            return false;
        }

        MYSQL* mysql = conn.get();
        if (!ensureMySQLAlive(mysql, "getGroupMessages(pool)")) {
            return false;
        }

        std::string sql =
            "SELECT msg_id, group_id, sender_id, msg_type, content, created_at "
            "FROM group_messages "
            "WHERE group_id=" + std::to_string(groupId) + " "
            "AND created_at > " + std::to_string(sinceTs) + " "
            "ORDER BY created_at ASC "
            "LIMIT " + std::to_string(limit);

        if (mysql_query(mysql, sql.c_str()) != 0) {
            logMySQLError(mysql, "getGroupMessages(pool) query", sql);
            return false;
        }

        MYSQL_RES* res = mysql_store_result(mysql);
        if (!res) {
            logMySQLError(mysql, "getGroupMessages(pool) store result", sql);
            return false;
        }

        MYSQL_ROW row;

        while ((row = mysql_fetch_row(res))) {
            json item;

            item["msg_id"] = row[0] ? row[0] : "";
            item["group_id"] = row[1] ? std::stoull(row[1]) : 0;

            std::string sender = row[2] ? row[2] : "";
            item["sender"] = sender;
            item["sender_id"] = sender;

            item["msg_type"] = row[3] ? row[3] : "text";
            item["content"] = row[4] ? row[4] : "";
            item["created_at"] = row[5] ? std::stoll(row[5]) : 0;

            history.push_back(item);
        }

        mysql_free_result(res);
        return true;
    }

    std::lock_guard<std::recursive_mutex> lock(mysql_mutex);

    history = json::array();

    if (!mysql_conn) {
        return false;
    }

    if (groupId == 0) {
        return false;
    }

    if (limit <= 0 || limit > 200) {
        limit = 50;
    }

    std::string sql =
        "SELECT msg_id, group_id, sender_id, msg_type, content, created_at "
        "FROM group_messages "
        "WHERE group_id=" + std::to_string(groupId) + " "
        "AND created_at > " + std::to_string(sinceTs) + " "
        "ORDER BY created_at ASC "
        "LIMIT " + std::to_string(limit);

    if (mysql_query(mysql_conn, sql.c_str()) != 0) {
        std::cerr << "[DB] getGroupMessages failed: "
                  << mysql_error(mysql_conn)
                  << "\nSQL: " << sql << "\n";
        return false;
    }

    MYSQL_RES* res = mysql_store_result(mysql_conn);

    if (!res) {
        std::cerr << "[DB] getGroupMessages store result failed: "
                  << mysql_error(mysql_conn) << "\n";
        return false;
    }

    MYSQL_ROW row;

    while ((row = mysql_fetch_row(res))) {
        json item;

        item["msg_id"] = row[0] ? row[0] : "";
        item["group_id"] = row[1] ? std::stoull(row[1]) : 0;
        item["sender"] = row[2] ? row[2] : "";
        item["msg_type"] = row[3] ? row[3] : "text";
        item["content"] = row[4] ? row[4] : "";
        item["created_at"] = row[5] ? std::stoll(row[5]) : 0;

        history.push_back(item);
    }

    mysql_free_result(res);
    return true;

}