我不熟悉在C ++中使用condition_variable
和unique_lock
s。我正在创建一个事件循环,该事件循环轮询两个自定义事件队列和一个“布尔值”(请参阅充当布尔值的整数),可以由多个来源对其进行作用。
我有一个演示(如下),该演示可以正常工作,如果您可以查看并确认它是否遵循使用unique_lock
和condition_variable
的最佳做法以及您遇到的任何问题,我将不胜感激。预见发生的情况(竞赛条件,线程阻塞等)。
-
在
ThreadSafeQueue::enqueue(...):
中,我们是否通过调用notify并使unique_lock超出范围来进行两次解锁? -
在方法
TheadSafeQueue::dequeueAll():
中,我们假定它已由已通知(cond.notify)的方法调用,因此已被锁定。有没有更好的方法可以封装它以保持调用方的干净? -
我们是否需要使班级成员具有可变性similar to this?
-
是否有更好的方法来模拟我们的情况,从而允许我们测试是否正确实现了锁?也许没有sleep语句和自动化的检查过程?
ThreadSafeQueue.h:
#include <condition_variable>
#include <cstdint>
#include <iostream>
#include <mutex>
#include <vector>
template <class T>
class ThreadSafeQueue {
public:
ThreadSafeQueue(std::condition_variable* cond,std::mutex* unvrsl_m)
: ThreadSafeQueue(cond,unvrsl_m,1) {}
ThreadSafeQueue(std::condition_variable* cond,std::mutex* unvrsl_m,uint32_t capacity)
: cond(cond),m(unvrsl_m),head(0),tail(0),capacity(capacity),buffer((T*)malloc(get_size() * sizeof(T))),scratch_space((T*)malloc(get_size() * sizeof(T))) {}
std::condition_variable* cond;
~ThreadSafeQueue() {
free(scratch_space);
free(buffer);
}
void resize(uint32_t new_cap) {
std::unique_lock<std::mutex> lock(*m);
check_params_resize(new_cap);
free(scratch_space);
scratch_space = buffer;
buffer = (T*)malloc(sizeof(T) * new_cap);
copy_cyclical_queue();
free(scratch_space);
scratch_space = (T*)malloc(new_cap * sizeof(T));
tail = get_size();
head = 0;
capacity = new_cap;
}
void enqueue(const T& value) {
std::unique_lock<std::mutex> lock(*m);
resize();
buffer[tail++] = value;
if (tail == get_capacity()) {
tail = 0;
} else if (tail > get_capacity())
throw("Something went horribly wrong TSQ: 75");
cond->notify_one();
}
// Assuming m has already been locked by the caller...
void dequeueAll(std::vector<T>* vOut) {
if (get_size() == 0) return;
scratch_space = buffer;
copy_cyclical_queue();
vOut->insert(vOut->end(),buffer,buffer + get_size());
head = tail = 0;
}
// Const functions because they shouldn't be modifying the internal variables
// of the object
bool is_empty() const { return get_size() == 0; }
uint32_t get_size() const {
if (head == tail)
return 0;
else if (head < tail) {
// 1 2 3
// 0 1 2
// 1
// 0
return tail - head;
} else {
// 3 _ 1 2
// 0 1 2 3
// capacity-head + tail+1 = 4-2+0+1 = 2 + 1
return get_capacity() - head + tail + 1;
}
}
uint32_t get_capacity() const { return capacity; }
//---------------------------------------------------------------------------
private:
std::mutex* m;
uint32_t head;
uint32_t tail;
uint32_t capacity;
T* buffer;
T* scratch_space;
uint32_t get_next_empty_spot();
void copy_cyclical_queue() {
uint32_t size = get_size();
uint32_t cap = get_capacity();
if (size == 0) {
return; // because we have nothing to copy
}
if (head + size <= cap) {
// _ 1 2 3 ... index = 1,size = 3,1+3 = 4 = capacity... only need 1 copy
memcpy(buffer,scratch_space + head,sizeof(T) * size);
} else {
// 5 1 2 3 4 ... index = 1,size = 5,1+5 = 6 = capacity... need to copy
// 1-4 then 0-1
// copy number of bytes: front = 1,to (5-1 = 4 elements)
memcpy(buffer,sizeof(T) * (cap - head));
// just copy the bytes from the front up to the first element in the old
// array
memcpy(buffer + (cap - head),scratch_space,sizeof(T) * tail);
}
}
void check_params_resize(uint32_t new_cap) {
if (new_cap < get_size()) {
std::cerr << "ThreadSafeQueue: check_params_resize: size(" << get_size()
<< ") > new_cap(" << new_cap
<< ")... data "
"loss will occur if this happens. Prevented."
<< std::endl;
}
}
void resize() {
uint32_t new_cap;
uint32_t size = get_size();
uint32_t cap = get_capacity();
if (size + 1 >= cap - 1) {
std::cout << "RESIZE CALLED --- BAD" << std::endl;
new_cap = 2 * cap;
check_params_resize(new_cap);
free(scratch_space); // free existing (too small) scratch space
scratch_space = buffer; // transfer pointer over
buffer = (T*)malloc(sizeof(T) * new_cap); // allocate a bigger buffer
copy_cyclical_queue();
// move over everything with memcpy from scratch_space to buffer
free(scratch_space); // free what used to be the too-small buffer
scratch_space =
(T*)malloc(sizeof(T) * new_cap); // recreate scratch space
tail = size;
head = 0;
// since we're done with the old array... delete for memory management->
capacity = new_cap;
}
}
};
// Event Types
// keyboard/mouse
// network
// dirty flag
Main.cpp:
#include <unistd.h>
#include <cstdint>
#include <iostream>
#include <mutex>
#include <queue>
#include <sstream>
#include <thread>
#include "ThreadSafeQueue.h"
using namespace std;
void write_to_threadsafe_queue(ThreadSafeQueue<uint32_t> *q,uint32_t startVal) {
uint32_t count = startVal;
while (true) {
q->enqueue(count);
cout << "Successfully enqueued: " << count << endl;
count += 2;
sleep(count);
}
}
void sleep_and_set_redraw(int *redraw,condition_variable *cond) {
while (true) {
sleep(3);
__sync_fetch_and_or(redraw,1);
cond->notify_one();
}
}
void process_events(vector<uint32_t> *qOut,condition_variable *cond,ThreadSafeQueue<uint32_t> *q1,ThreadSafeQueue<uint32_t> *q2,int *redraw,mutex *m) {
while (true) {
unique_lock<mutex> lck(*m);
cond->wait(lck);
q1->dequeueAll(qOut);
q2->dequeueAll(qOut);
if (__sync_fetch_and_and(redraw,0)) {
cout << "flaG SET" << endl;
qOut->push_back(0);
}
for (auto a : *qOut) cout << a << "\t";
cout << endl;
cout << "PROCESSING: " << qOut->size() << endl;
qOut->clear();
}
}
void test_2_queues_and_bool() {
try {
condition_variable cond;
mutex m;
ThreadSafeQueue<uint32_t> q1(&cond,&m,1024);
ThreadSafeQueue<uint32_t> q2(&cond,1024);
int redraw = 0;
vector<uint32_t> qOut;
thread t1(write_to_threadsafe_queue,&q1,2);
thread t2(write_to_threadsafe_queue,&q2,1);
thread t3(sleep_and_set_redraw,&redraw,&cond);
thread t4(process_events,&qOut,&cond,&m);
t1.join();
t2.join();
t3.join();
t4.join();
} catch (system_error &e) {
cout << "MAIN TEST CRASHED" << e.what();
}
}
int main() { test_2_queues_and_bool(); }