boost/thread/win32/condition_variable.hpp
#ifndef BOOST_THREAD_CONDITION_VARIABLE_WIN32_HPP #define BOOST_THREAD_CONDITION_VARIABLE_WIN32_HPP // Distributed under the Boost Software License, Version 1.0. (See // accompanying file LICENSE_1_0.txt or copy at // http://www.boost.org/LICENSE_1_0.txt) // (C) Copyright 2007 Anthony Williams #include <boost/thread/mutex.hpp> #include "thread_primitives.hpp" #include <limits.h> #include <boost/assert.hpp> #include <algorithm> #include <boost/thread/thread.hpp> #include <boost/thread/thread_time.hpp> #include "interlocked_read.hpp" #include <boost/thread/xtime.hpp> namespace boost { namespace detail { class basic_condition_variable { boost::mutex internal_mutex; long total_count; unsigned active_generation_count; struct list_entry { detail::win32::handle semaphore; long count; bool notified; list_entry(): semaphore(0),count(0),notified(0) {} }; BOOST_STATIC_CONSTANT(unsigned,generation_count=3); list_entry generations[generation_count]; detail::win32::handle wake_sem; static bool no_waiters(list_entry const& entry) { return entry.count==0; } void shift_generations_down() { list_entry* const last_active_entry=std::remove_if(generations,generations+generation_count,no_waiters); if(last_active_entry==generations+generation_count) { broadcast_entry(generations[generation_count-1],false); } else { active_generation_count=unsigned(last_active_entry-generations)+1; } #ifdef BOOST_MSVC #pragma warning(push) #pragma warning(disable:4996) #endif std::copy_backward(generations,generations+active_generation_count-1,generations+active_generation_count); #ifdef BOOST_MSVC #pragma warning(pop) #endif generations[0]=list_entry(); } void broadcast_entry(list_entry& entry,bool wake) { long const count_to_wake=entry.count; detail::interlocked_write_release(&total_count,total_count-count_to_wake); if(wake) { detail::win32::ReleaseSemaphore(wake_sem,count_to_wake,0); } detail::win32::ReleaseSemaphore(entry.semaphore,count_to_wake,0); entry.count=0; dispose_entry(entry); } void dispose_entry(list_entry& entry) { if(entry.semaphore) { BOOST_VERIFY(detail::win32::CloseHandle(entry.semaphore)); entry.semaphore=0; } entry.notified=false; } template<typename lock_type> struct relocker { lock_type& lock; bool unlocked; relocker(lock_type& lock_): lock(lock_),unlocked(false) {} void unlock() { lock.unlock(); unlocked=true; } ~relocker() { if(unlocked) { lock.lock(); } } private: void operator=(relocker&); }; template<typename lock_type> void start_wait_loop_first_time(relocker<lock_type>& locker, detail::win32::handle_manager& local_wake_sem) { locker.unlock(); if(!wake_sem) { wake_sem=detail::win32::create_anonymous_semaphore(0,LONG_MAX); BOOST_ASSERT(wake_sem); } local_wake_sem=detail::win32::duplicate_handle(wake_sem); if(generations[0].notified) { shift_generations_down(); } else if(!active_generation_count) { active_generation_count=1; } } template<typename lock_type> void start_wait_loop(relocker<lock_type>& locker, detail::win32::handle_manager& local_wake_sem, detail::win32::handle_manager& sem) { boost::mutex::scoped_lock internal_lock(internal_mutex); detail::interlocked_write_release(&total_count,total_count+1); if(!local_wake_sem) { start_wait_loop_first_time(locker,local_wake_sem); } if(!generations[0].semaphore) { generations[0].semaphore=detail::win32::create_anonymous_semaphore(0,LONG_MAX); BOOST_ASSERT(generations[0].semaphore); } ++generations[0].count; sem=detail::win32::duplicate_handle(generations[0].semaphore); } protected: template<typename lock_type> bool do_wait(lock_type& lock,timeout wait_until) { detail::win32::handle_manager local_wake_sem; detail::win32::handle_manager sem; bool woken=false; relocker<lock_type> locker(lock); while(!woken) { start_wait_loop(locker,local_wake_sem,sem); if(!this_thread::interruptible_wait(sem,wait_until)) { return false; } unsigned long const woken_result=detail::win32::WaitForSingleObject(local_wake_sem,0); BOOST_ASSERT(woken_result==detail::win32::timeout || woken_result==0); woken=(woken_result==0); } return woken; } template<typename lock_type,typename predicate_type> bool do_wait(lock_type& m,timeout const& wait_until,predicate_type pred) { while (!pred()) { if(!do_wait(m, wait_until)) return pred(); } return true; } basic_condition_variable(const basic_condition_variable& other); basic_condition_variable& operator=(const basic_condition_variable& other); public: basic_condition_variable(): total_count(0),active_generation_count(0),wake_sem(0) {} ~basic_condition_variable() { for(unsigned i=0;i<generation_count;++i) { dispose_entry(generations[i]); } detail::win32::CloseHandle(wake_sem); } void notify_one() { if(detail::interlocked_read_acquire(&total_count)) { boost::mutex::scoped_lock internal_lock(internal_mutex); detail::win32::ReleaseSemaphore(wake_sem,1,0); for(unsigned generation=active_generation_count;generation!=0;--generation) { list_entry& entry=generations[generation-1]; if(entry.count) { detail::interlocked_write_release(&total_count,total_count-1); entry.notified=true; detail::win32::ReleaseSemaphore(entry.semaphore,1,0); if(!--entry.count) { dispose_entry(entry); if(generation==active_generation_count) { --active_generation_count; } } } } } } void notify_all() { if(detail::interlocked_read_acquire(&total_count)) { boost::mutex::scoped_lock internal_lock(internal_mutex); for(unsigned generation=active_generation_count;generation!=0;--generation) { list_entry& entry=generations[generation-1]; if(entry.count) { broadcast_entry(entry,true); } } active_generation_count=0; } } }; } class condition_variable: public detail::basic_condition_variable { public: void wait(unique_lock<mutex>& m) { do_wait(m,detail::timeout::sentinel()); } template<typename predicate_type> void wait(unique_lock<mutex>& m,predicate_type pred) { while(!pred()) wait(m); } bool timed_wait(unique_lock<mutex>& m,boost::system_time const& wait_until) { return do_wait(m,wait_until); } bool timed_wait(unique_lock<mutex>& m,boost::xtime const& wait_until) { return do_wait(m,system_time(wait_until)); } template<typename duration_type> bool timed_wait(unique_lock<mutex>& m,duration_type const& wait_duration) { return do_wait(m,wait_duration.total_milliseconds()); } template<typename predicate_type> bool timed_wait(unique_lock<mutex>& m,boost::system_time const& wait_until,predicate_type pred) { return do_wait(m,wait_until,pred); } template<typename predicate_type> bool timed_wait(unique_lock<mutex>& m,boost::xtime const& wait_until,predicate_type pred) { return do_wait(m,system_time(wait_until),pred); } template<typename duration_type,typename predicate_type> bool timed_wait(unique_lock<mutex>& m,duration_type const& wait_duration,predicate_type pred) { return do_wait(m,wait_duration.total_milliseconds(),pred); } }; class condition_variable_any: public detail::basic_condition_variable { public: template<typename lock_type> void wait(lock_type& m) { do_wait(m,detail::timeout::sentinel()); } template<typename lock_type,typename predicate_type> void wait(lock_type& m,predicate_type pred) { while(!pred()) wait(m); } template<typename lock_type> bool timed_wait(lock_type& m,boost::system_time const& wait_until) { return do_wait(m,wait_until); } template<typename lock_type> bool timed_wait(lock_type& m,boost::xtime const& wait_until) { return do_wait(m,system_time(wait_until)); } template<typename lock_type,typename duration_type> bool timed_wait(lock_type& m,duration_type const& wait_duration) { return do_wait(m,wait_duration.total_milliseconds()); } template<typename lock_type,typename predicate_type> bool timed_wait(lock_type& m,boost::system_time const& wait_until,predicate_type pred) { return do_wait(m,wait_until,pred); } template<typename lock_type,typename predicate_type> bool timed_wait(lock_type& m,boost::xtime const& wait_until,predicate_type pred) { return do_wait(m,system_time(wait_until),pred); } template<typename lock_type,typename duration_type,typename predicate_type> bool timed_wait(lock_type& m,duration_type const& wait_duration,predicate_type pred) { return do_wait(m,wait_duration.total_milliseconds(),pred); } }; } #endif