原型bool DBManager::init(const std::string& host, const std::string& user, const std::string& pwd, const std::string& dbname)
参数
| 参数 | 说明 |
|---|
host | MySQL 地址 |
user | 数据库账号 |
password | 数据库凭据 |
database | 数据库名称 |
执行流程
- 保存连接参数
- 初始化 MySQL 连接池
- 初始化 Redis 连接池
- 验证关键连接
- 输出初始化状态
关联接口
MySQLConnectionPool::initRedisConnectionPool::init
查看完整实现
db_manager.cpp
bool DBManager::init(const std::string& host,
const std::string& user,
const std::string& pwd,
const std::string& dbname) {
connection_pools_ready = false;
{
std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
mysql_conn = mysql_init(nullptr);
if (!mysql_conn) {
std::cerr << "MySQL Init Failed: mysql_init returned null\n";
return false;
}
if (!mysql_real_connect(mysql_conn,
host.c_str(),
user.c_str(),
pwd.c_str(),
dbname.c_str(),
0,
nullptr,
0)) {
std::cerr << "MySQL Init Failed: " << mysql_error(mysql_conn) << "\n";
mysql_close(mysql_conn);
mysql_conn = nullptr;
return false;
}
mysql_set_character_set(mysql_conn, "utf8mb4");
}
{
std::lock_guard<std::mutex> r_lock(redis_mutex);
redis_conn = redisConnect("127.0.0.1", 6379);
if (redis_conn == nullptr || redis_conn->err) {
std::cerr << "Redis Init Failed\n";
if (redis_conn) {
redisFree(redis_conn);
redis_conn = nullptr;
}
std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
if (mysql_conn) {
mysql_close(mysql_conn);
mysql_conn = nullptr;
}
return false;
}
}
MySQLPoolConfig mysqlCfg;
mysqlCfg.host = host;
mysqlCfg.user = user;
mysqlCfg.password = pwd;
mysqlCfg.database = dbname;
mysqlCfg.minConnections = 2;
mysqlCfg.maxConnections = 16;
mysqlCfg.acquireTimeoutMs = 3000;
mysqlCfg.connectTimeoutSeconds = 5;
mysqlCfg.charset = "utf8mb4";
mysqlCfg.reconnect = false;
RedisPoolConfig redisCfg;
redisCfg.host = "127.0.0.1";
redisCfg.port = 6379;
redisCfg.minConnections = 1;
redisCfg.maxConnections = 8;
redisCfg.acquireTimeoutMs = 3000;
bool mysqlPoolOk = mysql_pool.init(mysqlCfg);
bool redisPoolOk = redis_pool.init(redisCfg);
if (mysqlPoolOk) {
auto testConn = mysql_pool.acquire(1000);
mysqlPoolOk = static_cast<bool>(testConn);
}
if (redisPoolOk) {
auto testConn = redis_pool.acquire(1000);
redisPoolOk = static_cast<bool>(testConn);
}
connection_pools_ready = mysqlPoolOk && redisPoolOk;
if (connection_pools_ready) {
std::cout << "✅ [DBPool] MySQL/Redis 连接池初始化成功\n";
} else {
std::cerr << "⚠️ [DBPool] 连接池初始化不完整,暂时回退到旧单连接路径\n";
}
return true;
}
原型bool DBManager::verifyLogin(const std::string& username, const std::string& password)
调用时机
BusinessHandler::handleLogin 调用。
参数
| 参数 | 说明 |
|---|
username | 账号 |
password | 登录凭据 |
执行流程
- 借用 MySQL 连接
- 转义输入
- 执行查询
- 读取结果
- 释放结果集
工程说明
当前访问封装为 DBManager 方法,便于后续替换为预编译语句。
关联接口
BusinessHandler::handleLoginMySQLConnectionPool::acquire
查看完整实现
db_manager.cpp
bool DBManager::verifyLogin(const std::string& username,
const std::string& password) {
if (connection_pools_ready) {
auto conn = mysql_pool.acquire();
if (!conn) {
std::cerr << "[DBPool] verifyLogin acquire MySQL failed\n";
return false;
}
MYSQL* mysql = conn.get();
if (!ensureMySQLAlive(mysql, "verifyLogin(pool)")) {
return false;
}
std::string safeUser = escapeStringWithConn(mysql, username);
std::string safePwd = escapeStringWithConn(mysql, password);
std::string sql =
"SELECT username FROM users "
"WHERE username = '" + safeUser + "' "
"AND password_hash = '" + safePwd + "'";
if (mysql_query(mysql, sql.c_str()) != 0) {
logMySQLError(mysql, "verifyLogin(pool) query", sql);
return false;
}
MYSQL_RES* res = mysql_store_result(mysql);
if (!res) {
logMySQLError(mysql, "verifyLogin(pool) store result", sql);
return false;
}
bool ok = mysql_num_rows(res) > 0;
mysql_free_result(res);
return ok;
}
std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
if (!ensureMySQLAlive(mysql_conn, "verifyLogin")) {
return false;
}
std::string safeUser = escapeString(username);
std::string safePwd = escapeString(password);
std::string sql =
"SELECT username FROM users "
"WHERE username = '" + safeUser + "' "
"AND password_hash = '" + safePwd + "'";
if (mysql_query(mysql_conn, sql.c_str()) != 0) {
logMySQLError(mysql_conn, "verifyLogin query", sql);
return false;
}
MYSQL_RES* res = mysql_store_result(mysql_conn);
if (!res) {
logMySQLError(mysql_conn, "verifyLogin store result", sql);
return false;
}
bool ok = mysql_num_rows(res) > 0;
mysql_free_result(res);
return ok;
}
原型void DBManager::saveChatMessage(const std::string& room_id, const std::string& json_msg)
参数
| 参数 | 说明 |
|---|
room_id | 私聊房间键 |
message_json | 序列化消息 |
执行流程
- 借用 Redis 连接
- RPUSH 写入消息
- LTRIM 限制长度
- 归还连接
工程说明
高频最近记录使用 Redis,降低关系库访问压力。
关联接口
DBManager::getChatHistoryBusinessHandler::handleChatRelay
查看完整实现
db_manager.cpp
void DBManager::saveChatMessage(const std::string& room_id,
const std::string& json_msg) {
if (connection_pools_ready) {
auto conn = redis_pool.acquire();
if (!conn || !ensureRedisAlive(conn.get(), "saveChatMessage(pool)")) {
return;
}
redisContext* redis = conn.get();
redisReply* reply = static_cast<redisReply*>(
redisCommand(redis, "RPUSH %s %s", room_id.c_str(), json_msg.c_str())
);
if (reply) {
freeReplyObject(reply);
}
reply = static_cast<redisReply*>(
redisCommand(redis, "LTRIM %s -500 -1", room_id.c_str())
);
if (reply) {
freeReplyObject(reply);
}
return;
}
std::lock_guard<std::mutex> lock(redis_mutex);
if (!redis_conn) {
return;
}
redisReply* reply = static_cast<redisReply*>(
redisCommand(redis_conn, "RPUSH %s %s", room_id.c_str(), json_msg.c_str())
);
if (reply) {
freeReplyObject(reply);
}
reply = static_cast<redisReply*>(
redisCommand(redis_conn, "LTRIM %s -500 -1", room_id.c_str())
);
if (reply) {
freeReplyObject(reply);
}
}
原型std::vector<std::string> DBManager::getChatHistory(const std::string& room_id, int limit)
参数
| 参数 | 说明 |
|---|
room_id | 私聊房间键 |
limit | 最大返回条数 |
执行流程
- 借用 Redis 连接
- 计算列表范围
- 执行 LRANGE
- 遍历回复元素
- 返回消息集合
关联接口
DBManager::saveChatMessageBusinessHandler::handleSyncChat
查看完整实现
db_manager.cpp
std::vector<std::string> DBManager::getChatHistory(const std::string& room_id,
int limit) {
std::vector<std::string> history;
if (limit <= 0 || limit > 500) {
limit = 50;
}
if (connection_pools_ready) {
auto conn = redis_pool.acquire();
if (!conn || !ensureRedisAlive(conn.get(), "getChatHistory(pool)")) {
return history;
}
redisReply* reply = static_cast<redisReply*>(
redisCommand(conn.get(), "LRANGE %s -%d -1", room_id.c_str(), limit)
);
if (!reply) {
return history;
}
if (reply->type == REDIS_REPLY_ARRAY) {
for (size_t i = 0; i < reply->elements; ++i) {
redisReply* item = reply->element[i];
if (item && item->type == REDIS_REPLY_STRING && item->str) {
history.emplace_back(item->str, item->len);
}
}
}
freeReplyObject(reply);
return history;
}
std::lock_guard<std::mutex> lock(redis_mutex);
if (!redis_conn) {
return history;
}
redisReply* reply = static_cast<redisReply*>(
redisCommand(redis_conn, "LRANGE %s -%d -1", room_id.c_str(), limit)
);
if (!reply) {
return history;
}
if (reply->type == REDIS_REPLY_ARRAY) {
for (size_t i = 0; i < reply->elements; ++i) {
redisReply* item = reply->element[i];
if (item && item->type == REDIS_REPLY_STRING && item->str) {
history.emplace_back(item->str, item->len);
}
}
}
freeReplyObject(reply);
return history;
}
原型bool DBManager::saveGroupMessage(const std::string& msgId, uint64_t groupId, const std::string& senderId, const std::string& msgType, const std::string& content, int64_t createdAt, std::string& outMsg)
调用时机
BusinessHandler::handleGroupChat 完成权限校验后调用。
参数
| 参数 | 说明 |
|---|
msgId | 消息唯一标识 |
groupId | 群组 ID |
senderId | 发送者 |
messageType | 消息类型 |
content | 消息内容 |
执行流程
- 借用 MySQL 连接
- 转义消息字段
- 构造插入语句
- 执行写入
- 检查结果
关联接口
DBManager::getGroupMessagesBusinessHandler::handleGroupChat
查看完整实现
db_manager.cpp
bool DBManager::saveGroupMessage(const std::string& msgId,
uint64_t groupId,
const std::string& senderId,
const std::string& msgType,
const std::string& content,
int64_t createdAt,
std::string& outMsg) {
outMsg.clear();
if (connection_pools_ready) {
if (msgId.empty() || groupId == 0 || senderId.empty() || content.empty()) {
outMsg = "群消息参数不完整";
return false;
}
auto conn = mysql_pool.acquire();
if (!conn) {
outMsg = "获取数据库连接失败";
std::cerr << "[DBPool] saveGroupMessage acquire MySQL failed\n";
return false;
}
MYSQL* mysql = conn.get();
if (!ensureMySQLAlive(mysql, "saveGroupMessage(pool)")) {
outMsg = "数据库连接异常";
return false;
}
std::string safeSender = escapeStringWithConn(mysql, senderId);
std::string memberSql =
"SELECT id FROM group_members "
"WHERE group_id=" + std::to_string(groupId) + " "
"AND user_id='" + safeSender + "' "
"AND status='normal' "
"LIMIT 1";
if (mysql_query(mysql, memberSql.c_str()) != 0) {
outMsg = std::string("检查群成员失败: ") + mysql_error(mysql);
return false;
}
MYSQL_RES* memberRes = mysql_store_result(mysql);
if (!memberRes) {
outMsg = "检查群成员结果失败";
return false;
}
bool isMember = mysql_num_rows(memberRes) > 0;
mysql_free_result(memberRes);
if (!isMember) {
outMsg = "你不是该群成员";
return false;
}
std::string safeMsgId = escapeStringWithConn(mysql, msgId);
std::string safeType = escapeStringWithConn(mysql, msgType);
std::string safeContent = escapeStringWithConn(mysql, content);
std::string sql =
"INSERT INTO group_messages(msg_id, group_id, sender_id, msg_type, content, created_at) VALUES('"
+ safeMsgId + "', "
+ std::to_string(groupId) + ", '"
+ safeSender + "', '"
+ safeType + "', '"
+ safeContent + "', "
+ std::to_string(createdAt) + ")";
if (mysql_query(mysql, sql.c_str()) != 0) {
if (mysql_errno(mysql) == 1062) {
outMsg = "群消息已存在";
return true;
}
logMySQLError(mysql, "saveGroupMessage(pool) insert", sql);
outMsg = std::string("保存群消息失败: ") + mysql_error(mysql);
return false;
}
outMsg = "群消息已保存";
return true;
}
std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
outMsg.clear();
if (!mysql_conn) {
outMsg = "数据库未连接";
return false;
}
if (msgId.empty() || groupId == 0 || senderId.empty() || content.empty()) {
outMsg = "群消息参数不完整";
return false;
}
if (!isGroupMember(groupId, senderId)) {
outMsg = "你不是该群成员";
return false;
}
std::string safeMsgId = escapeString(msgId);
std::string safeSender = escapeString(senderId);
std::string safeType = escapeString(msgType);
std::string safeContent = escapeString(content);
std::string sql =
"INSERT INTO group_messages(msg_id, group_id, sender_id, msg_type, content, created_at) VALUES('"
+ safeMsgId + "', "
+ std::to_string(groupId) + ", '"
+ safeSender + "', '"
+ safeType + "', '"
+ safeContent + "', "
+ std::to_string(createdAt) + ")";
if (mysql_query(mysql_conn, sql.c_str()) != 0) {
outMsg = std::string("保存群消息失败: ") + mysql_error(mysql_conn);
return false;
}
outMsg = "群消息已保存";
return true;
}
原型bool DBManager::getGroupMessages(uint64_t groupId, int64_t sinceTs, int limit, json& history)
参数
| 参数 | 说明 |
|---|
groupId | 群组 ID |
sinceTs | 增量同步起点 |
messages | 输出消息数组 |
执行流程
- 校验群组参数
- 借用 MySQL 连接
- 执行有序查询
- 转换每行字段
- 填充 JSON 数组
关联接口
GroupChatWindow::requestHistoryBusinessHandler::handleSyncGroupChat
查看完整实现
db_manager.cpp
bool DBManager::getGroupMessages(uint64_t groupId,
int64_t sinceTs,
int limit,
json& history) {
history = json::array();
if (connection_pools_ready) {
if (groupId == 0) {
return false;
}
if (limit <= 0 || limit > 200) {
limit = 50;
}
auto conn = mysql_pool.acquire();
if (!conn) {
std::cerr << "[DBPool] getGroupMessages acquire MySQL failed\n";
return false;
}
MYSQL* mysql = conn.get();
if (!ensureMySQLAlive(mysql, "getGroupMessages(pool)")) {
return false;
}
std::string sql =
"SELECT msg_id, group_id, sender_id, msg_type, content, created_at "
"FROM group_messages "
"WHERE group_id=" + std::to_string(groupId) + " "
"AND created_at > " + std::to_string(sinceTs) + " "
"ORDER BY created_at ASC "
"LIMIT " + std::to_string(limit);
if (mysql_query(mysql, sql.c_str()) != 0) {
logMySQLError(mysql, "getGroupMessages(pool) query", sql);
return false;
}
MYSQL_RES* res = mysql_store_result(mysql);
if (!res) {
logMySQLError(mysql, "getGroupMessages(pool) store result", sql);
return false;
}
MYSQL_ROW row;
while ((row = mysql_fetch_row(res))) {
json item;
item["msg_id"] = row[0] ? row[0] : "";
item["group_id"] = row[1] ? std::stoull(row[1]) : 0;
std::string sender = row[2] ? row[2] : "";
item["sender"] = sender;
item["sender_id"] = sender;
item["msg_type"] = row[3] ? row[3] : "text";
item["content"] = row[4] ? row[4] : "";
item["created_at"] = row[5] ? std::stoll(row[5]) : 0;
history.push_back(item);
}
mysql_free_result(res);
return true;
}
std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
history = json::array();
if (!mysql_conn) {
return false;
}
if (groupId == 0) {
return false;
}
if (limit <= 0 || limit > 200) {
limit = 50;
}
std::string sql =
"SELECT msg_id, group_id, sender_id, msg_type, content, created_at "
"FROM group_messages "
"WHERE group_id=" + std::to_string(groupId) + " "
"AND created_at > " + std::to_string(sinceTs) + " "
"ORDER BY created_at ASC "
"LIMIT " + std::to_string(limit);
if (mysql_query(mysql_conn, sql.c_str()) != 0) {
std::cerr << "[DB] getGroupMessages failed: "
<< mysql_error(mysql_conn)
<< "\nSQL: " << sql << "\n";
return false;
}
MYSQL_RES* res = mysql_store_result(mysql_conn);
if (!res) {
std::cerr << "[DB] getGroupMessages store result failed: "
<< mysql_error(mysql_conn) << "\n";
return false;
}
MYSQL_ROW row;
while ((row = mysql_fetch_row(res))) {
json item;
item["msg_id"] = row[0] ? row[0] : "";
item["group_id"] = row[1] ? std::stoull(row[1]) : 0;
item["sender"] = row[2] ? row[2] : "";
item["msg_type"] = row[3] ? row[3] : "text";
item["content"] = row[4] ? row[4] : "";
item["created_at"] = row[5] ? std::stoll(row[5]) : 0;
history.push_back(item);
}
mysql_free_result(res);
return true;
}