/* VIM: let b:lcppflags="-std=c++14 -O2 -pthread -I." VIM: let b:wcppflags="/O2 /EHsc /DWIN32" */ #include "stdafx.h" #include #include #include #include #include #include #include #include #include #include /* Queue. */ thread_local size_t this_tid; template class MtQueue { std::queue m_queue; std::mutex m_mutex; std::condition_variable m_push_cond; std::condition_variable m_pop_cond; std::atomic m_stopped; static const size_t max_queue = 100; public: /* Stop Queue */ void stop() { std::unique_lock lock(m_mutex); m_stopped.store(true); m_pop_cond.notify_all(); m_push_cond.notify_all(); } /* Blocks when */ bool push(M m) { std::unique_lock lock(m_mutex); while (m_queue.size() >= max_queue && !m_stopped.load()) { printf(" %lld: is full.\n", this_tid); m_push_cond.wait(lock); } if (m_stopped.load()) { return false; } m_queue.push(m); printf("%lld: size %lld.\n", this_tid, m_queue.size()); m_pop_cond.notify_one(); return true; } /* Blocks when empty.*/ bool pop(M& m) { std::unique_lock lock(m_mutex); while (m_queue.empty() && !m_stopped.load()) { printf(" %lld: is empty.\n", this_tid); m_pop_cond.wait(lock); } if (m_stopped.load()) { return false; } m = m_queue.front(); m_queue.pop(); printf("%lld: size %lld.\n", this_tid, m_queue.size()); m_push_cond.notify_one(); return true; } }; void test() { MtQueue q; auto push_proc = [&q](size_t tid) { this_tid = tid; int i = 0; while (q.push(2)) { if (++i % 101 == 0) { auto m = std::chrono::milliseconds(100); //printf("%lld: pusher waiting for %lldms.\n", this_tid, m.count()); //std::this_thread::sleep_for(m); } } }; auto pop_proc = [&q](size_t tid) { this_tid = tid; int m; while (q.pop(m)) { //printf("%lld: popper waiting for %dms.\n", this_tid, m); std::this_thread::sleep_for(std::chrono::milliseconds(m)); } }; std::vector v; v.reserve(16); v.emplace_back(push_proc, v.size()); v.emplace_back(pop_proc, v.size()); v.emplace_back(pop_proc, v.size()); v.emplace_back(pop_proc, v.size()); v.emplace_back(pop_proc, v.size()); v.emplace_back(pop_proc, v.size()); std::this_thread::sleep_for(std::chrono::milliseconds(500)); q.stop(); printf(" STOPPING.\n"); for (auto& t : v) { t.join(); } } int main() { try { auto begin = std::chrono::high_resolution_clock::now(); while (true) { test(); } auto end = std::chrono::high_resolution_clock::now(); std::chrono::duration seconds = end - begin; std::cout << "Time: " << seconds.count() << std::endl; return 0; } catch (const std::exception& e) { std::cerr << std::endl << "std::exception(\"" << e.what() << "\")." << std::endl; return 2; } catch (...) { std::cerr << std::endl << "unknown exception." << std::endl; return 1; } }