1888 字
9 分钟
多消费者多生产者无锁队列

多消费者多生产者无锁队列#

#pragma once
#include <atomic>
#include <thread>
#include <vector>
#include <cassert>
#include <cstdint>
#include <functional>
#include <algorithm>
#include <memory>
// -------- Hazard Pointer 简单实现 --------
// 目标:每个线程可以“声明”它正在访问的指针(hazard pointers)。
// 当节点被 retire(延迟删除)时,我们把它放入线程本地的 retire-list。
// 当 retire-list 达到阈值时,扫描全局 hazard pointer 表,
// 对不在任意 hazard pointer 的节点执行 delete。
// 这是最常见的 hazard pointer 回收策略的精简实现。
namespace lf
{
static const unsigned MAX_HAZARD_POINTERS = 256; // 最大线程数(每个线程一个 hazard slot),(线程数 * 保护指针数)
static const unsigned RETIRE_THRESHOLD = 40; // 每个线程延迟回收阈值
// 全局 hazard 指针表(简单数组)
struct HazardRecord
{
std::atomic<std::uintptr_t> ptr; // 存放裸指针地址(uintptr_t 便于 atomic)
HazardRecord() : ptr(0) {}
};
static HazardRecord g_hazard_table[MAX_HAZARD_POINTERS]; // 全局 hazard 指针表
// 为线程分配一个 hazard slot id,(或多个 slot 需要时可扩展)。
// 这里只分配一个 slot 给每个线程用于保护节点指针(Michael-Scott 需要保护 head/next 等,我们会在需要时临时占用)。
inline unsigned acquire_hazard_slot_for_current_thread()
{
thread_local int slot = -1;
if (slot != -1)
return (unsigned)slot;
for (unsigned i = 0; i < MAX_HAZARD_POINTERS; ++i)
{
uintptr_t expected = 0;
// 试图把 slot 标记为特殊非零值 (例如地址 of &g_hazard_table[i]),表示被占用。
// 这里我们用 occupant marker: 1 (非 0) 表示已占用。为了简单,先用 CAS 将 0->1 再实际使用 ptr 位置存放 0 表示可用。
// 实际设计可以用更复杂的注册逻辑。下面采用更保守的操作:先尝试将 ptr 从 0 改为 1 (占位)
if (g_hazard_table[i].ptr.compare_exchange_strong(expected, 1,
std::memory_order_acq_rel, std::memory_order_relaxed))
{
// 成功占位,初始化为 0(表示当前无保护指针)
g_hazard_table[i].ptr.store(0, std::memory_order_release);
slot = (int)i;
return (unsigned)slot;
}
}
// 如果没有可用 slot,程序利益下会失败。为了安全,抛出或断言。
// 在生产里应支持更多 slot 或动态注册。
assert(false && "No hazard pointer slots available: increase MAX_HAZARD_POINTERS");
return 0;
}
// 设置当前线程 slot 的 hazard pointer 值为 p(可以为 nullptr)
inline void set_hazard_pointer(unsigned slot, void *p)
{
assert(slot < MAX_HAZARD_POINTERS);
g_hazard_table[slot].ptr.store(reinterpret_cast<std::uintptr_t>(p), std::memory_order_release);
}
// 清除 slot(设为 nullptr)
inline void clear_hazard_pointer(unsigned slot)
{
assert(slot < MAX_HAZARD_POINTERS);
g_hazard_table[slot].ptr.store(0, std::memory_order_release);
}
// 收集当前所有活跃的 hazard 指针到 vector(用于回收判断)
inline void collect_hazard_pointers(std::vector<std::uintptr_t> &out)
{
out.clear();
out.reserve(MAX_HAZARD_POINTERS);
for (unsigned i = 0; i < MAX_HAZARD_POINTERS; ++i)
{
uintptr_t p = g_hazard_table[i].ptr.load(std::memory_order_acquire);
if (p != 0)
out.push_back(p);
}
}
// 线程本地的 retire list,保存待删除节点指针
template <typename Node>
struct RetireList
{
std::vector<Node *> nodes;
RetireList() { nodes.reserve(RETIRE_THRESHOLD * 2); }
void add(Node *n)
{
nodes.push_back(n);
if (nodes.size() >= RETIRE_THRESHOLD)
{
scan_and_reclaim();
}
}
void scan_and_reclaim()
{
// 收集所有 hazard 指针
std::vector<std::uintptr_t> hazards;
collect_hazard_pointers(hazards);
// 对于每个待回收节点,若不在 hazards 中则 delete
auto it = nodes.begin();
while (it != nodes.end())
{
uintptr_t np = reinterpret_cast<std::uintptr_t>(*it);
bool protected_by_hazard = std::binary_search(hazards.begin(), hazards.end(), np) || (std::find(hazards.begin(), hazards.end(), np) != hazards.end());
if (!protected_by_hazard)
{
delete *it;
it = nodes.erase(it);
}
else
{
++it;
}
}
// 如果仍然很多,可以选择再次尝试或扩大阈值;这里保守退出
}
~RetireList()
{
// 程序退出或线程结束时尽量回收残余节点(简单处理)
for (Node *n : nodes)
delete n;
nodes.clear();
}
};
} // namespace lf
template <typename T>
class LockFreeQueue
{
private:
struct Node
{
std::atomic<Node *> next; // 指向下一个节点
T data; // 数据
bool has_data; // 是否包含数据(用于判断是否为 dummy 节点)
Node() : next(nullptr), data(), has_data(false) {}
explicit Node(const T &v) : next(nullptr), data(v), has_data(true) {}
explicit Node(T &&v) : next(nullptr), data(std::move(v)), has_data(true) {}
};
std::atomic<Node *> head_; // 队头(指向 dummy 节点)
std::atomic<Node *> tail_; // 队尾(指向最后一个节点)
// 每线程的 retire list
static lf::RetireList<Node> &thread_retire_list()
{
thread_local lf::RetireList<Node> rl;
return rl;
}
// 获取当前线程的 hazard slot id(只分配一次)
static unsigned get_hazard_slot()
{
static thread_local unsigned slot = lf::acquire_hazard_slot_for_current_thread();
return slot;
}
public:
LockFreeQueue()
{
Node *dummy = new Node();
head_.store(dummy, std::memory_order_relaxed);
tail_.store(dummy, std::memory_order_relaxed);
}
~LockFreeQueue()
{
// 删除链上所有节点(此时假设没有并发线程访问)
Node *p = head_.load(std::memory_order_relaxed);
while (p)
{
Node *n = p->next.load(std::memory_order_relaxed);
delete p;
p = n;
}
}
// 禁止拷贝
LockFreeQueue(const LockFreeQueue &) = delete;
LockFreeQueue &operator=(const LockFreeQueue &) = delete;
/**
* @brief 入队
*
* @param value 要加入队列的元素值
*/
void Enqueue(const T &value)
{
Node *newNode = new Node(value); // 创建新节点
newNode->next.store(nullptr, std::memory_order_relaxed); // 将新节点的next指针设置为nullptr,使用内存序relaxed
while (true)
{
Node *last = tail_.load(std::memory_order_acquire); // 获取当前 tail
Node *next = last->next.load(std::memory_order_acquire); // 获取 tail 的 next
if (last == tail_.load(std::memory_order_acquire)) // 尝试获取 tail 的独占权
{
if (next == nullptr) // 如果 next 为空,说明 tail 是链表的最后一个节点
{
// tail 后没有节点,尝试插入新节点
if (last->next.compare_exchange_weak(next, newNode, // 使用比较交换操作尝试将新节点插入到 last 之后
std::memory_order_release, std::memory_order_relaxed))
{
// 插入成功,尝试推进 tail 指针到新节点
tail_.compare_exchange_strong(last, newNode, // 使用比较交换操作更新 tail 指针
std::memory_order_release, std::memory_order_relaxed);
return;
}
}
else
{
// tail 后有节点,尝试推进 tail
tail_.compare_exchange_strong(last, next,
std::memory_order_release, std::memory_order_relaxed); // 使用内存顺序为release和relaxed的比较交换操作
}
}
// else retry
}
}
// Enqueue move version
void Enqueue(T &&value)
{
Node *newNode = new Node(std::move(value)); // 创建新节点,使用移动语义初始化节点数据
newNode->next.store(nullptr, std::memory_order_relaxed); // 将新节点的next指针初始化为nullptr,使用内存序relaxed
while (true)
{
Node *last = tail_.load(std::memory_order_acquire); // 使用memory_order_acquire获取尾节点指针
Node *next = last->next.load(std::memory_order_acquire); // 获取尾节点的下一个节点指针
if (last == tail_.load(std::memory_order_acquire)) // 确保尾节点没有被其他线程修改过
{
if (next == nullptr)
{
if (last->next.compare_exchange_weak(next, newNode, // 尝试将尾节点的next指针更新为新节点 使用compare_exchange_weak处理并发情况,成功则使用memory_order_release
std::memory_order_release, std::memory_order_relaxed))
{
tail_.compare_exchange_strong(last, newNode, // 更新队列的尾指针为新节点 使用compare_exchange_strong确保原子性
std::memory_order_release, std::memory_order_relaxed);
return; // 入队成功,退出循环
}
}
else
{
tail_.compare_exchange_strong(last, next,
std::memory_order_release, std::memory_order_relaxed);
}
}
}
}
// Dequeue (pop). 返回 true 并将 value 设置为弹出值;空队列返回 false
bool Dequeue(T &value)
{
unsigned hp_slot = get_hazard_slot(); // 获取一个危险指针槽位,用于保护正在访问的节点
while (true)
{
Node *first = head_.load(std::memory_order_acquire); // 加载头节点指针,使用memory_order_acquire保证内存顺序
// 保护 first
lf::set_hazard_pointer(hp_slot, first); // 设置危险指针,防止节点被其他线程删除
// 重新读取以保证一致性
if (head_.load(std::memory_order_acquire) != first) // 检查头节点是否被其他线程修改
{
// 被修改,重试
lf::clear_hazard_pointer(hp_slot); // 清除危险指针,防止内存泄漏
continue;
}
Node *last = tail_.load(std::memory_order_acquire); // 获取尾节点的指针,使用内存获取顺序
Node *next = first->next.load(std::memory_order_acquire); // 获取首节点的下一个节点指针,使用内存获取顺序
if (first == head_.load(std::memory_order_acquire)) // 检查当前首节点是否仍然等于head_指针,使用内存获取顺序
{
if (next == nullptr) // 检查下一个节点是否为空
{
// 队列空 ,清理并返回
lf::clear_hazard_pointer(hp_slot);
return false;
}
// 保护 next(因为我们要读取 next->data)
// 为防止同一线程只用一个 slot 导致覆盖,临时我们 reuse same slot:
// Set hazard to next before CAS head to next (some implementations require two slots).
// 为简化:先 set next into same slot, 然后 re-check head hasn't changed.
lf::set_hazard_pointer(hp_slot, next);
if (head_.load(std::memory_order_acquire) != first)
{
// head changed, retry
lf::clear_hazard_pointer(hp_slot);
continue;
}
// 如果 tail == first(尾落后),尝试推进 tail(帮助)
if (first == last)
{
tail_.compare_exchange_strong(last, next,
std::memory_order_release, std::memory_order_relaxed);
lf::clear_hazard_pointer(hp_slot);
continue;
}
// 现在尝试将 head 从 first 推到 next
if (head_.compare_exchange_strong(first, next,
std::memory_order_acq_rel, std::memory_order_relaxed))
{
// 成功,取得数据
assert(next->has_data); // 除非逻辑出错
value = std::move(next->data);
// 在把旧节点 retire(延迟删除)前,清除 hazard pointer
lf::clear_hazard_pointer(hp_slot);
// 把 first 放入 retire list(延迟 delete)
thread_retire_list().add(first);
return true;
}
else
{
// CAS 失败,重试
lf::clear_hazard_pointer(hp_slot);
continue;
}
}
// else head changed;重试
}
}
/**
* @brief 检查队列是否为空
*
* @return true
* @return false
*/
bool Empty() const
{
Node *h = head_.load(std::memory_order_acquire);
Node *n = h->next.load(std::memory_order_acquire);
return n == nullptr;
}
};
多消费者多生产者无锁队列
https://mizuki.mysqil.com/posts/concurrent_container/queue_mpmc/
作者
Mirai
发布于
2025-10-04
许可协议
CC BY-NC-SA 4.0