boost/fiber/detail/context_spmc_queue.hpp
// Copyright Oliver Kowalke 2013. // Distributed under the Boost Software License, Version 1.0. // (See accompanying file LICENSE_1_0.txt or copy at // http://www.boost.org/LICENSE_1_0.txt) #ifndef BOOST_FIBERS_DETAIL_CONTEXT_SPMC_QUEUE_H #define BOOST_FIBERS_DETAIL_CONTEXT_SPMC_QUEUE_H #include <atomic> #include <cstddef> #include <cstdint> #include <memory> #include <type_traits> #include <utility> #include <boost/assert.hpp> #include <boost/config.hpp> #include <boost/fiber/detail/config.hpp> #include <boost/fiber/context.hpp> // David Chase and Yossi Lev. Dynamic circular work-stealing deque. // In SPAA ’05: Proceedings of the seventeenth annual ACM symposium // on Parallelism in algorithms and architectures, pages 21–28, // New York, NY, USA, 2005. ACM. // // Nhat Minh Lê, Antoniu Pop, Albert Cohen, and Francesco Zappa Nardelli. 2013. // Correct and efficient work-stealing for weak memory models. // In Proceedings of the 18th ACM SIGPLAN symposium on Principles and practice // of parallel programming (PPoPP '13). ACM, New York, NY, USA, 69-80. namespace boost { namespace fibers { namespace detail { class context_spmc_queue { private: class array { private: typedef std::atomic< context * > atomic_type; typedef std::aligned_storage< sizeof( atomic_type), cache_alignment >::type storage_type; std::size_t size_; storage_type * storage_; public: array( std::size_t size) : size_{ size }, storage_{ new storage_type[size_] } { for ( std::size_t i = 0; i < size_; ++i) { ::new ( static_cast< void * >( std::addressof( storage_[i]) ) ) atomic_type{ nullptr }; } } ~array() { for ( std::size_t i = 0; i < size_; ++i) { reinterpret_cast< atomic_type * >( std::addressof( storage_[i]) )->~atomic_type(); } delete [] storage_; } std::size_t size() const noexcept { return size_; } void push( std::size_t bottom, context * ctx) noexcept { reinterpret_cast< atomic_type * >( std::addressof( storage_[bottom % size_]) ) ->store( ctx, std::memory_order_relaxed); } context * pop( std::size_t top) noexcept { return reinterpret_cast< atomic_type * >( std::addressof( storage_[top % size_]) ) ->load( std::memory_order_relaxed); } array * resize( std::size_t bottom, std::size_t top) { std::unique_ptr< array > tmp{ new array{ 2 * size_ } }; for ( std::size_t i = top; i != bottom; ++i) { tmp->push( i, pop( i) ); } return tmp.release(); } }; alignas(cache_alignment) std::atomic< std::size_t > top_{ 0 }; alignas(cache_alignment) std::atomic< std::size_t > bottom_{ 0 }; alignas(cache_alignment) std::atomic< array * > array_; std::vector< array * > old_arrays_{}; char padding_[cacheline_length]; public: context_spmc_queue() : array_{ new array{ 1024 } } { old_arrays_.reserve( 32); } ~context_spmc_queue() { for ( array * a : old_arrays_) { delete a; } delete array_.load(); } context_spmc_queue( context_spmc_queue const&) = delete; context_spmc_queue & operator=( context_spmc_queue const&) = delete; bool empty() const noexcept { std::size_t bottom{ bottom_.load( std::memory_order_relaxed) }; std::size_t top{ top_.load( std::memory_order_relaxed) }; return bottom <= top; } void push( context * ctx) { std::size_t bottom{ bottom_.load( std::memory_order_relaxed) }; std::size_t top{ top_.load( std::memory_order_acquire) }; array * a{ array_.load( std::memory_order_relaxed) }; if ( (a->size() - 1) < (bottom - top) ) { // queue is full // resize array * tmp{ a->resize( bottom, top) }; old_arrays_.push_back( a); std::swap( a, tmp); array_.store( a, std::memory_order_relaxed); } a->push( bottom, ctx); std::atomic_thread_fence( std::memory_order_release); bottom_.store( bottom + 1, std::memory_order_relaxed); } context * pop() { std::size_t top{ top_.load( std::memory_order_acquire) }; std::atomic_thread_fence( std::memory_order_seq_cst); std::size_t bottom{ bottom_.load( std::memory_order_acquire) }; context * ctx{ nullptr }; if ( top < bottom) { // queue is not empty array * a{ array_.load( std::memory_order_consume) }; ctx = a->pop( top); if ( ctx->is_context( type::pinned_context) || ! top_.compare_exchange_strong( top, top + 1, std::memory_order_seq_cst, std::memory_order_relaxed) ) { // lose the race return nullptr; } } return ctx; } }; }}} #endif // BOOST_FIBERS_DETAIL_CONTEXT_SPMC_QUEUE_H