Boost C++ Libraries

...one of the most highly regarded and expertly designed C++ library projects in the world. Herb Sutter and Andrei Alexandrescu, C++ Coding Standards

This is the documentation for an old version of Boost. Click here to view this page for the latest version.

boost/asio/experimental/parallel_group.hpp

//
// experimental/parallel_group.hpp
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2024 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//

#ifndef BOOST_ASIO_EXPERIMENTAL_PARALLEL_GROUP_HPP
#define BOOST_ASIO_EXPERIMENTAL_PARALLEL_GROUP_HPP

#if defined(_MSC_VER) && (_MSC_VER >= 1200)
# pragma once
#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)

#include <boost/asio/detail/config.hpp>
#include <vector>
#include <boost/asio/async_result.hpp>
#include <boost/asio/detail/array.hpp>
#include <boost/asio/detail/memory.hpp>
#include <boost/asio/detail/type_traits.hpp>
#include <boost/asio/detail/utility.hpp>
#include <boost/asio/experimental/cancellation_condition.hpp>

#include <boost/asio/detail/push_options.hpp>

namespace boost {
namespace asio {
namespace experimental {
namespace detail {

// Helper trait for getting a tuple from a completion signature.

template <typename Signature>
struct parallel_op_signature_as_tuple;

template <typename R, typename... Args>
struct parallel_op_signature_as_tuple<R(Args...)>
{
  typedef std::tuple<decay_t<Args>...> type;
};

// Helper trait for concatenating completion signatures.

template <std::size_t N, typename Offsets, typename... Signatures>
struct parallel_group_signature;

template <std::size_t N, typename R0, typename... Args0>
struct parallel_group_signature<N, R0(Args0...)>
{
  typedef boost::asio::detail::array<std::size_t, N> order_type;
  typedef R0 raw_type(Args0...);
  typedef R0 type(order_type, Args0...);
};

template <std::size_t N,
    typename R0, typename... Args0,
    typename R1, typename... Args1>
struct parallel_group_signature<N, R0(Args0...), R1(Args1...)>
{
  typedef boost::asio::detail::array<std::size_t, N> order_type;
  typedef R0 raw_type(Args0..., Args1...);
  typedef R0 type(order_type, Args0..., Args1...);
};

template <std::size_t N, typename Sig0,
    typename Sig1, typename... SigN>
struct parallel_group_signature<N, Sig0, Sig1, SigN...>
{
  typedef boost::asio::detail::array<std::size_t, N> order_type;
  typedef typename parallel_group_signature<N,
    typename parallel_group_signature<N, Sig0, Sig1>::raw_type,
      SigN...>::raw_type raw_type;
  typedef typename parallel_group_signature<N,
    typename parallel_group_signature<N, Sig0, Sig1>::raw_type,
      SigN...>::type type;
};

template <typename Condition, typename Handler,
    typename... Ops, std::size_t... I>
void parallel_group_launch(Condition cancellation_condition, Handler handler,
    std::tuple<Ops...>& ops, boost::asio::detail::index_sequence<I...>);

// Helper trait for determining ranged parallel group completion signatures.

template <typename Signature, typename Allocator>
struct ranged_parallel_group_signature;

template <typename R, typename... Args, typename Allocator>
struct ranged_parallel_group_signature<R(Args...), Allocator>
{
  typedef std::vector<std::size_t,
    BOOST_ASIO_REBIND_ALLOC(Allocator, std::size_t)> order_type;
  typedef R raw_type(
      std::vector<Args, BOOST_ASIO_REBIND_ALLOC(Allocator, Args)>...);
  typedef R type(order_type,
      std::vector<Args, BOOST_ASIO_REBIND_ALLOC(Allocator, Args)>...);
};

template <typename Condition, typename Handler,
    typename Range, typename Allocator>
void ranged_parallel_group_launch(Condition cancellation_condition,
    Handler handler, Range&& range, const Allocator& allocator);

char (&parallel_group_has_iterator_helper(...))[2];

template <typename T>
char parallel_group_has_iterator_helper(T*, typename T::iterator* = 0);

template <typename T>
struct parallel_group_has_iterator_typedef
{
  enum { value = (sizeof((parallel_group_has_iterator_helper)((T*)(0))) == 1) };
};

} // namespace detail

/// Type trait used to determine whether a type is a range of asynchronous
/// operations that can be used with with @c make_parallel_group.
template <typename T>
struct is_async_operation_range
{
#if defined(GENERATING_DOCUMENTATION)
  /// The value member is true if the type may be used as a range of
  /// asynchronous operations.
  static const bool value;
#else
  enum
  {
    value = detail::parallel_group_has_iterator_typedef<T>::value
  };
#endif
};

/// A group of asynchronous operations that may be launched in parallel.
/**
 * See the documentation for boost::asio::experimental::make_parallel_group for
 * a usage example.
 */
template <typename... Ops>
class parallel_group
{
private:
  struct initiate_async_wait
  {
    template <typename Handler, typename Condition>
    void operator()(Handler&& h, Condition&& c, std::tuple<Ops...>&& ops) const
    {
      detail::parallel_group_launch(
          std::forward<Condition>(c), std::forward<Handler>(h),
          ops, boost::asio::detail::index_sequence_for<Ops...>());
    }
  };

  std::tuple<Ops...> ops_;

public:
  /// Constructor.
  explicit parallel_group(Ops... ops)
    : ops_(std::move(ops)...)
  {
  }

  /// The completion signature for the group of operations.
  typedef typename detail::parallel_group_signature<sizeof...(Ops),
      completion_signature_of_t<Ops>...>::type signature;

  /// Initiate an asynchronous wait for the group of operations.
  /**
   * Launches the group and asynchronously waits for completion.
   *
   * @param cancellation_condition A function object, called on completion of
   * an operation within the group, that is used to determine whether to cancel
   * the remaining operations. The function object is passed the arguments of
   * the completed operation's handler. To trigger cancellation of the remaining
   * operations, it must return a boost::asio::cancellation_type value other
   * than <tt>boost::asio::cancellation_type::none</tt>.
   *
   * @param token A @ref completion_token whose signature is comprised of
   * a @c std::array<std::size_t, N> indicating the completion order of the
   * operations, followed by all operations' completion handler arguments.
   *
   * The library provides the following @c cancellation_condition types:
   *
   * @li boost::asio::experimental::wait_for_all
   * @li boost::asio::experimental::wait_for_one
   * @li boost::asio::experimental::wait_for_one_error
   * @li boost::asio::experimental::wait_for_one_success
   */
  template <typename CancellationCondition,
      BOOST_ASIO_COMPLETION_TOKEN_FOR(signature) CompletionToken>
  auto async_wait(CancellationCondition cancellation_condition,
      CompletionToken&& token)
    -> decltype(
      boost::asio::async_initiate<CompletionToken, signature>(
        declval<initiate_async_wait>(), token,
        std::move(cancellation_condition), std::move(ops_)))
  {
    return boost::asio::async_initiate<CompletionToken, signature>(
        initiate_async_wait(), token,
        std::move(cancellation_condition), std::move(ops_));
  }
};

/// Create a group of operations that may be launched in parallel.
/**
 * For example:
 * @code boost::asio::experimental::make_parallel_group(
 *    [&](auto token)
 *    {
 *      return in.async_read_some(boost::asio::buffer(data), token);
 *    },
 *    [&](auto token)
 *    {
 *      return timer.async_wait(token);
 *    }
 *  ).async_wait(
 *    boost::asio::experimental::wait_for_all(),
 *    [](
 *        std::array<std::size_t, 2> completion_order,
 *        boost::system::error_code ec1, std::size_t n1,
 *        boost::system::error_code ec2
 *    )
 *    {
 *      switch (completion_order[0])
 *      {
 *      case 0:
 *        {
 *          std::cout << "descriptor finished: " << ec1 << ", " << n1 << "\n";
 *        }
 *        break;
 *      case 1:
 *        {
 *          std::cout << "timer finished: " << ec2 << "\n";
 *        }
 *        break;
 *      }
 *    }
 *  );
 * @endcode
 */
template <typename... Ops>
BOOST_ASIO_NODISCARD inline parallel_group<Ops...>
make_parallel_group(Ops... ops)
{
  return parallel_group<Ops...>(std::move(ops)...);
}

/// A range-based group of asynchronous operations that may be launched in
/// parallel.
/**
 * See the documentation for boost::asio::experimental::make_parallel_group for
 * a usage example.
 */
template <typename Range, typename Allocator = std::allocator<void>>
class ranged_parallel_group
{
private:
  struct initiate_async_wait
  {
    template <typename Handler, typename Condition>
    void operator()(Handler&& h, Condition&& c,
        Range&& range, const Allocator& allocator) const
    {
      detail::ranged_parallel_group_launch(std::move(c),
          std::move(h), std::forward<Range>(range), allocator);
    }
  };

  Range range_;
  Allocator allocator_;

public:
  /// Constructor.
  explicit ranged_parallel_group(Range range,
      const Allocator& allocator = Allocator())
    : range_(std::move(range)),
      allocator_(allocator)
  {
  }

  /// The completion signature for the group of operations.
  typedef typename detail::ranged_parallel_group_signature<
      completion_signature_of_t<
        decay_t<decltype(*std::declval<typename Range::iterator>())>>,
      Allocator>::type signature;

  /// Initiate an asynchronous wait for the group of operations.
  /**
   * Launches the group and asynchronously waits for completion.
   *
   * @param cancellation_condition A function object, called on completion of
   * an operation within the group, that is used to determine whether to cancel
   * the remaining operations. The function object is passed the arguments of
   * the completed operation's handler. To trigger cancellation of the remaining
   * operations, it must return a boost::asio::cancellation_type value other
   * than <tt>boost::asio::cancellation_type::none</tt>.
   *
   * @param token A @ref completion_token whose signature is comprised of
   * a @c std::vector<std::size_t, Allocator> indicating the completion order of
   * the operations, followed by a vector for each of the completion signature's
   * arguments.
   *
   * The library provides the following @c cancellation_condition types:
   *
   * @li boost::asio::experimental::wait_for_all
   * @li boost::asio::experimental::wait_for_one
   * @li boost::asio::experimental::wait_for_one_error
   * @li boost::asio::experimental::wait_for_one_success
   */
  template <typename CancellationCondition,
      BOOST_ASIO_COMPLETION_TOKEN_FOR(signature) CompletionToken>
  auto async_wait(CancellationCondition cancellation_condition,
      CompletionToken&& token)
    -> decltype(
      boost::asio::async_initiate<CompletionToken, signature>(
        declval<initiate_async_wait>(), token,
        std::move(cancellation_condition),
        std::move(range_), allocator_))
  {
    return boost::asio::async_initiate<CompletionToken, signature>(
        initiate_async_wait(), token,
        std::move(cancellation_condition),
        std::move(range_), allocator_);
  }
};

/// Create a group of operations that may be launched in parallel.
/**
 * @param range A range containing the operations to be launched.
 *
 * For example:
 * @code
 * using op_type = decltype(
 *     socket1.async_read_some(
 *       boost::asio::buffer(data1),
 *       boost::asio::deferred
 *     )
 *   );
 *
 * std::vector<op_type> ops;
 *
 * ops.push_back(
 *     socket1.async_read_some(
 *       boost::asio::buffer(data1),
 *       boost::asio::deferred
 *     )
 *   );
 *
 * ops.push_back(
 *     socket2.async_read_some(
 *       boost::asio::buffer(data2),
 *       boost::asio::deferred
 *     )
 *   );
 *
 * boost::asio::experimental::make_parallel_group(ops).async_wait(
 *     boost::asio::experimental::wait_for_all(),
 *     [](
 *         std::vector<std::size_t> completion_order,
 *         std::vector<boost::system::error_code> e,
 *         std::vector<std::size_t> n
 *       )
 *     {
 *       for (std::size_t i = 0; i < completion_order.size(); ++i)
 *       {
 *         std::size_t idx = completion_order[i];
 *         std::cout << "socket " << idx << " finished: ";
 *         std::cout << e[idx] << ", " << n[idx] << "\n";
 *       }
 *     }
 *   );
 * @endcode
 */
template <typename Range>
BOOST_ASIO_NODISCARD inline ranged_parallel_group<decay_t<Range>>
make_parallel_group(Range&& range,
    constraint_t<
      is_async_operation_range<decay_t<Range>>::value
    > = 0)
{
  return ranged_parallel_group<decay_t<Range>>(std::forward<Range>(range));
}

/// Create a group of operations that may be launched in parallel.
/**
 * @param allocator Specifies the allocator to be used with the result vectors.
 *
 * @param range A range containing the operations to be launched.
 *
 * For example:
 * @code
 * using op_type = decltype(
 *     socket1.async_read_some(
 *       boost::asio::buffer(data1),
 *       boost::asio::deferred
 *     )
 *   );
 *
 * std::vector<op_type> ops;
 *
 * ops.push_back(
 *     socket1.async_read_some(
 *       boost::asio::buffer(data1),
 *       boost::asio::deferred
 *     )
 *   );
 *
 * ops.push_back(
 *     socket2.async_read_some(
 *       boost::asio::buffer(data2),
 *       boost::asio::deferred
 *     )
 *   );
 *
 * boost::asio::experimental::make_parallel_group(
 *     std::allocator_arg_t,
 *     my_allocator,
 *     ops
 *   ).async_wait(
 *     boost::asio::experimental::wait_for_all(),
 *     [](
 *         std::vector<std::size_t> completion_order,
 *         std::vector<boost::system::error_code> e,
 *         std::vector<std::size_t> n
 *       )
 *     {
 *       for (std::size_t i = 0; i < completion_order.size(); ++i)
 *       {
 *         std::size_t idx = completion_order[i];
 *         std::cout << "socket " << idx << " finished: ";
 *         std::cout << e[idx] << ", " << n[idx] << "\n";
 *       }
 *     }
 *   );
 * @endcode
 */
template <typename Allocator, typename Range>
BOOST_ASIO_NODISCARD inline ranged_parallel_group<decay_t<Range>, Allocator>
make_parallel_group(allocator_arg_t, const Allocator& allocator, Range&& range,
    constraint_t<
      is_async_operation_range<decay_t<Range>>::value
    > = 0)
{
  return ranged_parallel_group<decay_t<Range>, Allocator>(
      std::forward<Range>(range), allocator);
}

} // namespace experimental
} // namespace asio
} // namespace boost

#include <boost/asio/detail/pop_options.hpp>

#include <boost/asio/experimental/impl/parallel_group.hpp>

#endif // BOOST_ASIO_EXPERIMENTAL_PARALLEL_GROUP_HPP