interviews/training->training
This commit is contained in:
148
puzzles/training/multithreaded_queue.cpp
Normal file
148
puzzles/training/multithreaded_queue.cpp
Normal file
@@ -0,0 +1,148 @@
|
||||
/*
|
||||
VIM: let b:lcppflags="-std=c++14 -O2 -pthread -I."
|
||||
VIM: let b:wcppflags="/O2 /EHsc /DWIN32"
|
||||
*/
|
||||
#include "stdafx.h"
|
||||
#include <iostream>
|
||||
#include <exception>
|
||||
#include <string>
|
||||
#include <chrono>
|
||||
#include <vector>
|
||||
#include <algorithm>
|
||||
#include <queue>
|
||||
#include <mutex>
|
||||
#include <condition_variable>
|
||||
#include <atomic>
|
||||
|
||||
/*
|
||||
Queue.
|
||||
*/
|
||||
|
||||
thread_local size_t this_tid;
|
||||
|
||||
template <typename M>
|
||||
class MtQueue {
|
||||
|
||||
std::queue<M> m_queue;
|
||||
std::mutex m_mutex;
|
||||
std::condition_variable m_push_cond;
|
||||
std::condition_variable m_pop_cond;
|
||||
std::atomic<bool> m_stopped;
|
||||
|
||||
static const size_t max_queue = 100;
|
||||
|
||||
|
||||
public:
|
||||
|
||||
/* Stop Queue */
|
||||
void stop() {
|
||||
std::unique_lock<std::mutex> lock(m_mutex);
|
||||
m_stopped.store(true);
|
||||
m_pop_cond.notify_all();
|
||||
m_push_cond.notify_all();
|
||||
}
|
||||
|
||||
/* Blocks when */
|
||||
bool push(M m) {
|
||||
std::unique_lock<std::mutex> lock(m_mutex);
|
||||
while (m_queue.size() >= max_queue && !m_stopped.load()) {
|
||||
printf(" %lld: is full.\n", this_tid);
|
||||
m_push_cond.wait(lock);
|
||||
}
|
||||
if (m_stopped.load()) {
|
||||
return false;
|
||||
}
|
||||
m_queue.push(m);
|
||||
printf("%lld: size %lld.\n", this_tid, m_queue.size());
|
||||
m_pop_cond.notify_one();
|
||||
return true;
|
||||
}
|
||||
|
||||
/* Blocks when empty.*/
|
||||
bool pop(M& m) {
|
||||
std::unique_lock<std::mutex> lock(m_mutex);
|
||||
while (m_queue.empty() && !m_stopped.load()) {
|
||||
printf(" %lld: is empty.\n", this_tid);
|
||||
m_pop_cond.wait(lock);
|
||||
}
|
||||
if (m_stopped.load()) {
|
||||
return false;
|
||||
}
|
||||
m = m_queue.front();
|
||||
m_queue.pop();
|
||||
printf("%lld: size %lld.\n", this_tid, m_queue.size());
|
||||
m_push_cond.notify_one();
|
||||
return true;
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
void test() {
|
||||
MtQueue<int> q;
|
||||
|
||||
auto push_proc = [&q](size_t tid) {
|
||||
this_tid = tid;
|
||||
int i = 0;
|
||||
while (q.push(2)) {
|
||||
if (++i % 101 == 0) {
|
||||
auto m = std::chrono::milliseconds(100);
|
||||
//printf("%lld: pusher waiting for %lldms.\n", this_tid, m.count());
|
||||
//std::this_thread::sleep_for(m);
|
||||
}
|
||||
}
|
||||
};
|
||||
auto pop_proc = [&q](size_t tid) {
|
||||
this_tid = tid;
|
||||
int m;
|
||||
while (q.pop(m)) {
|
||||
|
||||
//printf("%lld: popper waiting for %dms.\n", this_tid, m);
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(m));
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
std::vector<std::thread> v;
|
||||
v.reserve(16);
|
||||
v.emplace_back(push_proc, v.size());
|
||||
v.emplace_back(pop_proc, v.size());
|
||||
v.emplace_back(pop_proc, v.size());
|
||||
v.emplace_back(pop_proc, v.size());
|
||||
v.emplace_back(pop_proc, v.size());
|
||||
v.emplace_back(pop_proc, v.size());
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(500));
|
||||
q.stop();
|
||||
printf(" STOPPING.\n");
|
||||
for (auto& t : v) {
|
||||
t.join();
|
||||
}
|
||||
}
|
||||
|
||||
int main()
|
||||
{
|
||||
try {
|
||||
auto begin = std::chrono::high_resolution_clock::now();
|
||||
|
||||
while (true) {
|
||||
test();
|
||||
}
|
||||
|
||||
auto end = std::chrono::high_resolution_clock::now();
|
||||
std::chrono::duration<double> seconds = end - begin;
|
||||
std::cout << "Time: " << seconds.count() << std::endl;
|
||||
return 0;
|
||||
}
|
||||
catch (const std::exception& e)
|
||||
{
|
||||
std::cerr << std::endl
|
||||
<< "std::exception(\"" << e.what() << "\")." << std::endl;
|
||||
return 2;
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
std::cerr << std::endl
|
||||
<< "unknown exception." << std::endl;
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user