原型template <class F> bool tryEnqueue(F&& f)
调用时机
NetworkServer 收到 UDP 包后调用。
执行流程
- 构造 Task
- 锁定队列
- 检查 accepting 状态
- 检查队列上限
- 入队并通知 worker
关联接口
ThreadPool::workerLoopNetworkServer::start
查看完整实现
thread_pool.h
template <class F>
bool tryEnqueue(F&& f)
{
Task task(std::forward<F>(f));
{
std::lock_guard<std::mutex> lock(m_mutex);
if (!m_accepting) {
++m_rejected;
return false;
}
if (m_maxQueueSize != 0 && m_tasks.size() >= m_maxQueueSize) {
++m_rejected;
return false;
}
m_tasks.emplace_back(std::move(task));
}
m_taskCv.notify_one();
return true;
}
原型void workerLoop()
执行流程
- 等待条件变量
- 读取队首任务
- 更新活跃计数
- 捕获任务异常
- 更新完成计数
关联接口
ThreadPool::tryEnqueueThreadPool::shutdown
查看完整实现
thread_pool.h
void workerLoop()
{
while (true) {
Task task;
{
std::unique_lock<std::mutex> lock(m_mutex);
m_taskCv.wait(lock, [this]() {
return m_stopping || !m_tasks.empty();
});
if (m_stopping && m_tasks.empty()) {
return;
}
task = std::move(m_tasks.front());
m_tasks.pop_front();
++m_active;
}
m_spaceCv.notify_one();
try {
task();
} catch (const std::exception& e) {
std::cerr << "⚠️ [" << m_name << "] task exception: "
<< e.what() << "\n";
} catch (...) {
std::cerr << "⚠️ [" << m_name << "] task unknown exception\n";
}
--m_active;
++m_completed;
}
}
原型bool MySQLConnectionPool::init(const MySQLPoolConfig& config)
执行流程
- 校验连接数范围
- 写入配置
- 创建最小连接
- 加入空闲队列
- 设置初始化状态
关联接口
MySQLConnectionPool::acquireMySQLConnectionPool::release
查看完整实现
mysql_connection_pool.cpp
bool MySQLConnectionPool::init(const MySQLPoolConfig& config)
{
shutdown();
MySQLPoolConfig cfg = config;
if (cfg.maxConnections == 0) {
cfg.maxConnections = 1;
}
if (cfg.minConnections > cfg.maxConnections) {
cfg.minConnections = cfg.maxConnections;
}
{
std::lock_guard<std::mutex> lock(m_mutex);
m_config = cfg;
m_shutdown = false;
m_initialized = true;
}
for (std::size_t i = 0; i < cfg.minConnections; ++i) {
Node* node = createNode();
if (!node) {
std::cerr << "⚠️ [MySQLPool] 预创建连接失败 index=" << i << "\n";
continue;
}
std::lock_guard<std::mutex> lock(m_mutex);
m_idle.push_back(node);
++m_total;
}
return true;
}
原型MySQLConnectionPool::Connection MySQLConnectionPool::acquire(int timeoutMs)
调用时机
DBManager 每次访问 MySQL 前调用。
参数
| 参数 | 说明 |
|---|
timeoutMs | 等待超时;负数使用默认值 |
执行流程
- 计算截止时间
- 优先获取空闲连接
- 按上限创建连接
- 执行健康检查
- 返回 RAII 句柄
工程说明
Connection 析构时自动归还连接。
关联接口
MySQLConnectionPool::releaseMySQLConnectionPool::isAlive
查看完整实现
mysql_connection_pool.cpp
MySQLConnectionPool::Connection
MySQLConnectionPool::acquire(int timeoutMs)
{
if (timeoutMs < 0) {
timeoutMs = m_config.acquireTimeoutMs;
}
const auto deadline = Clock::now() + std::chrono::milliseconds(timeoutMs);
while (true) {
Node* node = nullptr;
bool shouldCreate = false;
{
std::unique_lock<std::mutex> lock(m_mutex);
if (!m_initialized || m_shutdown) {
return {};
}
if (!m_idle.empty()) {
node = m_idle.front();
m_idle.pop_front();
} else if (m_total < m_config.maxConnections) {
++m_total;
shouldCreate = true;
} else {
if (m_cv.wait_until(lock, deadline) == std::cv_status::timeout) {
return {};
}
continue;
}
}
if (shouldCreate) {
node = createNode();
if (!node) {
std::lock_guard<std::mutex> lock(m_mutex);
if (m_total > 0) {
--m_total;
}
m_cv.notify_one();
return {};
}
return Connection(this, node);
}
if (!node) {
continue;
}
if (isAlive(node)) {
return Connection(this, node);
}
closeNode(node);
{
std::lock_guard<std::mutex> lock(m_mutex);
if (m_total > 0) {
--m_total;
}
m_cv.notify_one();
}
}
}
原型bool RedisConnectionPool::init(const RedisPoolConfig& config)
调用时机
DBManager 初始化 Redis 缓存服务时调用。
执行流程
- 校验配置
- 创建连接节点
- 执行认证与选库
- 加入空闲队列
- 标记可用
工程说明
初始化阶段即验证 Redis 服务可达性。
关联接口
RedisConnectionPool::acquireRedisConnectionPool::release
查看完整实现
redis_connection_pool.cpp
bool RedisConnectionPool::init(const RedisPoolConfig& config)
{
shutdown();
RedisPoolConfig cfg = config;
if (cfg.maxConnections == 0) {
cfg.maxConnections = 1;
}
if (cfg.minConnections > cfg.maxConnections) {
cfg.minConnections = cfg.maxConnections;
}
{
std::lock_guard<std::mutex> lock(m_mutex);
m_config = cfg;
m_shutdown = false;
m_initialized = true;
}
for (std::size_t i = 0; i < cfg.minConnections; ++i) {
Node* node = createNode();
if (!node) {
std::cerr << "⚠️ [RedisPool] 预创建连接失败 index=" << i << "\n";
continue;
}
std::lock_guard<std::mutex> lock(m_mutex);
m_idle.push_back(node);
++m_total;
}
return true;
}
原型RedisConnectionPool::Connection RedisConnectionPool::acquire(int timeoutMs)
执行流程
- 等待可用节点
- 按上限扩容
- 检查连接存活
- 重建失效连接
- 返回句柄
关联接口
RedisConnectionPool::isAliveRedisConnectionPool::release
查看完整实现
redis_connection_pool.cpp
RedisConnectionPool::Connection
RedisConnectionPool::acquire(int timeoutMs)
{
if (timeoutMs < 0) {
timeoutMs = m_config.acquireTimeoutMs;
}
const auto deadline = Clock::now() + std::chrono::milliseconds(timeoutMs);
while (true) {
Node* node = nullptr;
bool shouldCreate = false;
{
std::unique_lock<std::mutex> lock(m_mutex);
if (!m_initialized || m_shutdown) {
return {};
}
if (!m_idle.empty()) {
node = m_idle.front();
m_idle.pop_front();
} else if (m_total < m_config.maxConnections) {
++m_total;
shouldCreate = true;
} else {
if (m_cv.wait_until(lock, deadline) == std::cv_status::timeout) {
return {};
}
continue;
}
}
if (shouldCreate) {
node = createNode();
if (!node) {
std::lock_guard<std::mutex> lock(m_mutex);
if (m_total > 0) {
--m_total;
}
m_cv.notify_one();
return {};
}
return Connection(this, node);
}
if (!node) {
continue;
}
if (isAlive(node)) {
return Connection(this, node);
}
closeNode(node);
{
std::lock_guard<std::mutex> lock(m_mutex);
if (m_total > 0) {
--m_total;
}
m_cv.notify_one();
}
}
}