boost/beast/core/impl/basic_stream.hpp
//
// Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
// Official repository: https://github.com/boostorg/beast
//
#ifndef BOOST_BEAST_CORE_IMPL_BASIC_STREAM_HPP
#define BOOST_BEAST_CORE_IMPL_BASIC_STREAM_HPP
#include <boost/beast/core/async_base.hpp>
#include <boost/beast/core/buffer_traits.hpp>
#include <boost/beast/core/buffers_prefix.hpp>
#include <boost/beast/websocket/teardown.hpp>
#include <boost/asio/append.hpp>
#include <boost/asio/coroutine.hpp>
#include <boost/assert.hpp>
#include <boost/make_shared.hpp>
#include <boost/core/exchange.hpp>
#include <cstdlib>
#include <type_traits>
#include <utility>
namespace boost {
namespace beast {
//------------------------------------------------------------------------------
template<class Protocol, class Executor, class RatePolicy>
template<class... Args>
basic_stream<Protocol, Executor, RatePolicy>::
impl_type::
impl_type(std::false_type, Args&&... args)
: socket(std::forward<Args>(args)...)
, read(ex())
, write(ex())
, timer(ex())
{
reset();
}
template<class Protocol, class Executor, class RatePolicy>
template<class RatePolicy_, class... Args>
basic_stream<Protocol, Executor, RatePolicy>::
impl_type::
impl_type(std::true_type,
RatePolicy_&& policy, Args&&... args)
: boost::empty_value<RatePolicy>(
boost::empty_init_t{},
std::forward<RatePolicy_>(policy))
, socket(std::forward<Args>(args)...)
, read(ex())
, write(ex())
, timer(ex())
{
reset();
}
template<class Protocol, class Executor, class RatePolicy>
template<class Executor2>
void
basic_stream<Protocol, Executor, RatePolicy>::
impl_type::
on_timer(Executor2 const& ex2)
{
BOOST_ASSERT(waiting > 0);
// the last waiter starts the new slice
if(--waiting > 0)
return;
// update the expiration time
BOOST_VERIFY(timer.expires_after(
std::chrono::seconds(1)) == 0);
rate_policy_access::on_timer(policy());
struct handler : boost::empty_value<Executor2>
{
boost::weak_ptr<impl_type> wp;
using executor_type = Executor2;
executor_type
get_executor() const noexcept
{
return this->get();
}
handler(
Executor2 const& ex2,
boost::shared_ptr<impl_type> const& sp)
: boost::empty_value<Executor2>(
boost::empty_init_t{}, ex2)
, wp(sp)
{
}
void
operator()(error_code ec)
{
auto sp = wp.lock();
if(! sp)
return;
if(ec == net::error::operation_aborted)
return;
BOOST_ASSERT(! ec);
if(ec)
return;
sp->on_timer(this->get());
}
};
// wait on the timer again
++waiting;
timer.async_wait(handler(ex2, this->shared_from_this()));
}
template<class Protocol, class Executor, class RatePolicy>
void
basic_stream<Protocol, Executor, RatePolicy>::
impl_type::
reset()
{
// If assert goes off, it means that there are
// already read or write (or connect) operations
// outstanding, so there is nothing to apply
// the expiration time to!
//
BOOST_ASSERT(! read.pending || ! write.pending);
if(! read.pending)
BOOST_VERIFY(
read.timer.expires_at(never()) == 0);
if(! write.pending)
BOOST_VERIFY(
write.timer.expires_at(never()) == 0);
}
template<class Protocol, class Executor, class RatePolicy>
void
basic_stream<Protocol, Executor, RatePolicy>::
impl_type::
close() noexcept
{
{
error_code ec;
socket.close(ec);
}
#if !defined(BOOST_NO_EXCEPTIONS)
try
{
timer.cancel();
}
catch(...)
{
}
#else
timer.cancel();
#endif
}
//------------------------------------------------------------------------------
template<class Protocol, class Executor, class RatePolicy>
template<class Executor2>
struct basic_stream<Protocol, Executor, RatePolicy>::
timeout_handler
{
using executor_type = Executor2;
op_state& state;
boost::weak_ptr<impl_type> wp;
tick_type tick;
executor_type ex;
executor_type get_executor() const noexcept
{
return ex;
}
void
operator()(error_code ec)
{
// timer canceled
if(ec == net::error::operation_aborted)
return;
BOOST_ASSERT(! ec);
auto sp = wp.lock();
// stream destroyed
if(! sp)
return;
// stale timer
if(tick < state.tick)
return;
BOOST_ASSERT(tick == state.tick);
// timeout
BOOST_ASSERT(! state.timeout);
sp->close();
state.timeout = true;
}
};
//------------------------------------------------------------------------------
template<class Protocol, class Executor, class RatePolicy>
struct basic_stream<Protocol, Executor, RatePolicy>::ops
{
template<bool isRead, class Buffers, class Handler>
class transfer_op
: public async_base<Handler, Executor>
, public boost::asio::coroutine
{
boost::shared_ptr<impl_type> impl_;
pending_guard pg_;
Buffers b_;
using is_read = std::integral_constant<bool, isRead>;
op_state&
state()
{
if (isRead)
return impl_->read;
else
return impl_->write;
}
std::size_t
available_bytes()
{
if (isRead)
return rate_policy_access::
available_read_bytes(impl_->policy());
else
return rate_policy_access::
available_write_bytes(impl_->policy());
}
void
transfer_bytes(std::size_t n)
{
if (isRead)
rate_policy_access::
transfer_read_bytes(impl_->policy(), n);
else
rate_policy_access::
transfer_write_bytes(impl_->policy(), n);
}
void
async_perform(
std::size_t amount, std::true_type)
{
impl_->socket.async_read_some(
beast::buffers_prefix(amount, b_),
std::move(*this));
}
void
async_perform(
std::size_t amount, std::false_type)
{
impl_->socket.async_write_some(
beast::buffers_prefix(amount, b_),
std::move(*this));
}
static bool never_pending_;
public:
template<class Handler_>
transfer_op(
Handler_&& h,
basic_stream& s,
Buffers const& b)
: async_base<Handler, Executor>(
std::forward<Handler_>(h), s.get_executor())
, impl_(s.impl_)
, pg_()
, b_(b)
{
this->set_allowed_cancellation(net::cancellation_type::all);
if (buffer_bytes(b_) == 0 && state().pending)
{
// Workaround:
// Corner case discovered in https://github.com/boostorg/beast/issues/2065
// Enclosing SSL stream wishes to complete a 0-length write early by
// executing a 0-length read against the underlying stream.
// This can occur even if an existing async_read is in progress.
// In this specific case, we will complete the async op with no error
// in order to prevent assertions and/or internal corruption of the basic_stream
this->complete(false, error_code(), std::size_t{0});
}
else
{
pg_.assign(state().pending);
(*this)({});
}
}
void
operator()(
error_code ec,
std::size_t bytes_transferred = 0)
{
BOOST_ASIO_CORO_REENTER(*this)
{
// apply the timeout manually, otherwise
// behavior varies across platforms.
if(state().timer.expiry() <= now())
{
BOOST_ASIO_CORO_YIELD
{
BOOST_ASIO_HANDLER_LOCATION((
__FILE__, __LINE__,
(isRead ? "basic_stream::async_read_some"
: "basic_stream::async_write_some")));
const auto ex = this->get_immediate_executor();
net::dispatch(ex, net::append(std::move(*this), ec, 0));
}
impl_->close();
BOOST_BEAST_ASSIGN_EC(ec, beast::error::timeout);
goto upcall;
}
// handle empty buffers
if(detail::buffers_empty(b_))
{
// make sure we perform the no-op
BOOST_ASIO_CORO_YIELD
{
BOOST_ASIO_HANDLER_LOCATION((
__FILE__, __LINE__,
(isRead ? "basic_stream::async_read_some"
: "basic_stream::async_write_some")));
async_perform(0, is_read{});
}
goto upcall;
}
// if a timeout is active, wait on the timer
if(state().timer.expiry() != never())
{
BOOST_ASIO_HANDLER_LOCATION((
__FILE__, __LINE__,
(isRead ? "basic_stream::async_read_some"
: "basic_stream::async_write_some")));
state().timer.async_wait(
timeout_handler<decltype(this->get_executor())>{
state(),
impl_,
state().tick,
this->get_executor()});
}
// check rate limit, maybe wait
std::size_t amount;
amount = available_bytes();
if(amount == 0)
{
++impl_->waiting;
BOOST_ASIO_CORO_YIELD
{
BOOST_ASIO_HANDLER_LOCATION((
__FILE__, __LINE__,
(isRead ? "basic_stream::async_read_some"
: "basic_stream::async_write_some")));
impl_->timer.async_wait(std::move(*this));
}
if(ec)
{
// socket was closed, or a timeout
BOOST_ASSERT(ec ==
net::error::operation_aborted);
// timeout handler invoked?
if(state().timeout)
{
// yes, socket already closed
BOOST_BEAST_ASSIGN_EC(ec, beast::error::timeout);
state().timeout = false;
}
goto upcall;
}
impl_->on_timer(this->get_executor());
// Allow at least one byte, otherwise
// bytes_transferred could be 0.
amount = std::max<std::size_t>(
available_bytes(), 1);
}
BOOST_ASIO_CORO_YIELD
{
BOOST_ASIO_HANDLER_LOCATION((
__FILE__, __LINE__,
(isRead ? "basic_stream::async_read_some"
: "basic_stream::async_write_some")));
async_perform(amount, is_read{});
}
if(state().timer.expiry() != never())
{
++state().tick;
// try cancelling timer
auto const n =
state().timer.cancel();
if(n == 0)
{
// timeout handler invoked?
if(state().timeout)
{
// yes, socket already closed
BOOST_BEAST_ASSIGN_EC(ec, beast::error::timeout);
state().timeout = false;
}
}
else
{
BOOST_ASSERT(n == 1);
BOOST_ASSERT(! state().timeout);
}
}
upcall:
pg_.reset();
transfer_bytes(bytes_transferred);
this->complete_now(ec, bytes_transferred);
}
}
};
template<class Handler>
class connect_op
: public async_base<Handler, Executor>
{
boost::shared_ptr<impl_type> impl_;
pending_guard pg0_;
pending_guard pg1_;
op_state&
state() noexcept
{
return impl_->write;
}
public:
template<class Handler_>
connect_op(
Handler_&& h,
basic_stream& s,
endpoint_type ep)
: async_base<Handler, Executor>(
std::forward<Handler_>(h), s.get_executor())
, impl_(s.impl_)
, pg0_(impl_->read.pending)
, pg1_(impl_->write.pending)
{
this->set_allowed_cancellation(net::cancellation_type::all);
if(state().timer.expiry() != stream_base::never())
{
BOOST_ASIO_HANDLER_LOCATION((
__FILE__, __LINE__,
"basic_stream::async_connect"));
impl_->write.timer.async_wait(
timeout_handler<decltype(this->get_executor())>{
state(),
impl_,
state().tick,
this->get_executor()});
}
BOOST_ASIO_HANDLER_LOCATION((
__FILE__, __LINE__,
"basic_stream::async_connect"));
impl_->socket.async_connect(
ep, std::move(*this));
// *this is now moved-from
}
template<
class Endpoints, class Condition,
class Handler_>
connect_op(
Handler_&& h,
basic_stream& s,
Endpoints const& eps,
Condition const& cond)
: async_base<Handler, Executor>(
std::forward<Handler_>(h), s.get_executor())
, impl_(s.impl_)
, pg0_(impl_->read.pending)
, pg1_(impl_->write.pending)
{
this->set_allowed_cancellation(net::cancellation_type::all);
if(state().timer.expiry() != stream_base::never())
{
BOOST_ASIO_HANDLER_LOCATION((
__FILE__, __LINE__,
"basic_stream::async_connect"));
impl_->write.timer.async_wait(
timeout_handler<decltype(this->get_executor())>{
state(),
impl_,
state().tick,
this->get_executor()});
}
BOOST_ASIO_HANDLER_LOCATION((
__FILE__, __LINE__,
"basic_stream::async_connect"));
net::async_connect(impl_->socket,
eps, cond, std::move(*this));
// *this is now moved-from
}
template<
class Iterator, class Condition,
class Handler_>
connect_op(
Handler_&& h,
basic_stream& s,
Iterator begin, Iterator end,
Condition const& cond)
: async_base<Handler, Executor>(
std::forward<Handler_>(h), s.get_executor())
, impl_(s.impl_)
, pg0_(impl_->read.pending)
, pg1_(impl_->write.pending)
{
this->set_allowed_cancellation(net::cancellation_type::all);
if(state().timer.expiry() != stream_base::never())
{
BOOST_ASIO_HANDLER_LOCATION((
__FILE__, __LINE__,
"basic_stream::async_connect"));
impl_->write.timer.async_wait(
timeout_handler<decltype(this->get_executor())>{
state(),
impl_,
state().tick,
this->get_executor()});
}
BOOST_ASIO_HANDLER_LOCATION((
__FILE__, __LINE__,
"basic_stream::async_connect"));
net::async_connect(impl_->socket,
begin, end, cond, std::move(*this));
// *this is now moved-from
}
template<class... Args>
void
operator()(error_code ec, Args&&... args)
{
if(state().timer.expiry() != stream_base::never())
{
++state().tick;
// try cancelling timer
auto const n =
impl_->write.timer.cancel();
if(n == 0)
{
// timeout handler invoked?
if(state().timeout)
{
// yes, socket already closed
BOOST_BEAST_ASSIGN_EC(ec, beast::error::timeout);
state().timeout = false;
}
}
else
{
BOOST_ASSERT(n == 1);
BOOST_ASSERT(! state().timeout);
}
}
pg0_.reset();
pg1_.reset();
this->complete_now(ec, std::forward<Args>(args)...);
}
};
struct run_read_op
{
basic_stream* self;
using executor_type = typename basic_stream::executor_type;
executor_type
get_executor() const noexcept
{
return self->get_executor();
}
template<class ReadHandler, class Buffers>
void
operator()(
ReadHandler&& h,
Buffers const& b)
{
// If you get an error on the following line it means
// that your handler does not meet the documented type
// requirements for the handler.
static_assert(
detail::is_invocable<ReadHandler,
void(error_code, std::size_t)>::value,
"ReadHandler type requirements not met");
transfer_op<
true,
Buffers,
typename std::decay<ReadHandler>::type>(
std::forward<ReadHandler>(h), *self, b);
}
};
struct run_write_op
{
basic_stream* self;
using executor_type = typename basic_stream::executor_type;
executor_type
get_executor() const noexcept
{
return self->get_executor();
}
template<class WriteHandler, class Buffers>
void
operator()(
WriteHandler&& h,
Buffers const& b)
{
// If you get an error on the following line it means
// that your handler does not meet the documented type
// requirements for the handler.
static_assert(
detail::is_invocable<WriteHandler,
void(error_code, std::size_t)>::value,
"WriteHandler type requirements not met");
transfer_op<
false,
Buffers,
typename std::decay<WriteHandler>::type>(
std::forward<WriteHandler>(h), *self, b);
}
};
struct run_connect_op
{
basic_stream* self;
using executor_type = typename basic_stream::executor_type;
executor_type
get_executor() const noexcept
{
return self->get_executor();
}
template<class ConnectHandler>
void
operator()(
ConnectHandler&& h,
endpoint_type const& ep)
{
// If you get an error on the following line it means
// that your handler does not meet the documented type
// requirements for the handler.
static_assert(
detail::is_invocable<ConnectHandler,
void(error_code)>::value,
"ConnectHandler type requirements not met");
connect_op<typename std::decay<ConnectHandler>::type>(
std::forward<ConnectHandler>(h), *self, ep);
}
};
struct run_connect_range_op
{
basic_stream* self;
using executor_type = typename basic_stream::executor_type;
executor_type
get_executor() const noexcept
{
return self->get_executor();
}
template<
class RangeConnectHandler,
class EndpointSequence,
class Condition>
void
operator()(
RangeConnectHandler&& h,
EndpointSequence const& eps,
Condition const& cond)
{
// If you get an error on the following line it means
// that your handler does not meet the documented type
// requirements for the handler.
static_assert(
detail::is_invocable<RangeConnectHandler,
void(error_code, typename Protocol::endpoint)>::value,
"RangeConnectHandler type requirements not met");
connect_op<typename std::decay<RangeConnectHandler>::type>(
std::forward<RangeConnectHandler>(h), *self, eps, cond);
}
};
struct run_connect_iter_op
{
basic_stream* self;
using executor_type = typename basic_stream::executor_type;
executor_type
get_executor() const noexcept
{
return self->get_executor();
}
template<
class IteratorConnectHandler,
class Iterator,
class Condition>
void
operator()(
IteratorConnectHandler&& h,
Iterator begin, Iterator end,
Condition const& cond)
{
// If you get an error on the following line it means
// that your handler does not meet the documented type
// requirements for the handler.
static_assert(
detail::is_invocable<IteratorConnectHandler,
void(error_code, Iterator)>::value,
"IteratorConnectHandler type requirements not met");
connect_op<typename std::decay<IteratorConnectHandler>::type>(
std::forward<IteratorConnectHandler>(h), *self, begin, end, cond);
}
};
};
//------------------------------------------------------------------------------
template<class Protocol, class Executor, class RatePolicy>
basic_stream<Protocol, Executor, RatePolicy>::
~basic_stream()
{
// the shared object can outlive *this,
// cancel any operations so the shared
// object is destroyed as soon as possible.
impl_->close();
}
template<class Protocol, class Executor, class RatePolicy>
template<class Arg0, class... Args, class>
basic_stream<Protocol, Executor, RatePolicy>::
basic_stream(Arg0&& arg0, Args&&... args)
: impl_(boost::make_shared<impl_type>(
std::false_type{},
std::forward<Arg0>(arg0),
std::forward<Args>(args)...))
{
}
template<class Protocol, class Executor, class RatePolicy>
template<class RatePolicy_, class Arg0, class... Args, class>
basic_stream<Protocol, Executor, RatePolicy>::
basic_stream(
RatePolicy_&& policy, Arg0&& arg0, Args&&... args)
: impl_(boost::make_shared<impl_type>(
std::true_type{},
std::forward<RatePolicy_>(policy),
std::forward<Arg0>(arg0),
std::forward<Args>(args)...))
{
}
template<class Protocol, class Executor, class RatePolicy>
basic_stream<Protocol, Executor, RatePolicy>::
basic_stream(basic_stream&& other)
: impl_(boost::make_shared<impl_type>(
std::move(*other.impl_)))
{
// Explainer: Asio's sockets provide the guarantee that a moved-from socket
// will be in a state as-if newly created. i.e.:
// * having the same (valid) executor
// * the socket shall not be open
// We provide the same guarantee by moving the impl rather than the pointer
// controlling its lifetime.
}
template<class Protocol, class Executor, class RatePolicy>
template<class Executor_>
basic_stream<Protocol, Executor, RatePolicy>::
basic_stream(basic_stream<Protocol, Executor_, RatePolicy> && other)
: impl_(boost::make_shared<impl_type>(std::false_type{}, std::move(other.impl_->socket)))
{
}
//------------------------------------------------------------------------------
template<class Protocol, class Executor, class RatePolicy>
auto
basic_stream<Protocol, Executor, RatePolicy>::
release_socket() ->
socket_type
{
this->cancel();
return std::move(impl_->socket);
}
template<class Protocol, class Executor, class RatePolicy>
void
basic_stream<Protocol, Executor, RatePolicy>::
expires_after(net::steady_timer::duration expiry_time)
{
// If assert goes off, it means that there are
// already read or write (or connect) operations
// outstanding, so there is nothing to apply
// the expiration time to!
//
BOOST_ASSERT(
! impl_->read.pending ||
! impl_->write.pending);
if(! impl_->read.pending)
BOOST_VERIFY(
impl_->read.timer.expires_after(
expiry_time) == 0);
if(! impl_->write.pending)
BOOST_VERIFY(
impl_->write.timer.expires_after(
expiry_time) == 0);
}
template<class Protocol, class Executor, class RatePolicy>
void
basic_stream<Protocol, Executor, RatePolicy>::
expires_at(
net::steady_timer::time_point expiry_time)
{
// If assert goes off, it means that there are
// already read or write (or connect) operations
// outstanding, so there is nothing to apply
// the expiration time to!
//
BOOST_ASSERT(
! impl_->read.pending ||
! impl_->write.pending);
if(! impl_->read.pending)
BOOST_VERIFY(
impl_->read.timer.expires_at(
expiry_time) == 0);
if(! impl_->write.pending)
BOOST_VERIFY(
impl_->write.timer.expires_at(
expiry_time) == 0);
}
template<class Protocol, class Executor, class RatePolicy>
void
basic_stream<Protocol, Executor, RatePolicy>::
expires_never()
{
impl_->reset();
}
template<class Protocol, class Executor, class RatePolicy>
void
basic_stream<Protocol, Executor, RatePolicy>::
cancel()
{
error_code ec;
impl_->socket.cancel(ec);
impl_->timer.cancel();
}
template<class Protocol, class Executor, class RatePolicy>
void
basic_stream<Protocol, Executor, RatePolicy>::
close()
{
impl_->close();
}
//------------------------------------------------------------------------------
template<class Protocol, class Executor, class RatePolicy>
template<BOOST_BEAST_ASYNC_TPARAM1 ConnectHandler>
BOOST_BEAST_ASYNC_RESULT1(ConnectHandler)
basic_stream<Protocol, Executor, RatePolicy>::
async_connect(
endpoint_type const& ep,
ConnectHandler&& handler)
{
return net::async_initiate<
ConnectHandler,
void(error_code)>(
typename ops::run_connect_op{this},
handler,
ep);
}
template<class Protocol, class Executor, class RatePolicy>
template<
class EndpointSequence,
BOOST_ASIO_COMPLETION_TOKEN_FOR(void(error_code, typename Protocol::endpoint)) RangeConnectHandler,
class,
class>
BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(RangeConnectHandler,void(error_code, typename Protocol::endpoint))
basic_stream<Protocol, Executor, RatePolicy>::
async_connect(
EndpointSequence const& endpoints,
RangeConnectHandler&& handler)
{
return net::async_initiate<
RangeConnectHandler,
void(error_code, typename Protocol::endpoint)>(
typename ops::run_connect_range_op{this},
handler,
endpoints,
detail::any_endpoint{});
}
template<class Protocol, class Executor, class RatePolicy>
template<
class EndpointSequence,
class ConnectCondition,
BOOST_ASIO_COMPLETION_TOKEN_FOR(void(error_code, typename Protocol::endpoint)) RangeConnectHandler,
class,
class>
BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(RangeConnectHandler,void (error_code, typename Protocol::endpoint))
basic_stream<Protocol, Executor, RatePolicy>::
async_connect(
EndpointSequence const& endpoints,
ConnectCondition connect_condition,
RangeConnectHandler&& handler)
{
return net::async_initiate<
RangeConnectHandler,
void(error_code, typename Protocol::endpoint)>(
typename ops::run_connect_range_op{this},
handler,
endpoints,
connect_condition);
}
template<class Protocol, class Executor, class RatePolicy>
template<
class Iterator,
BOOST_ASIO_COMPLETION_TOKEN_FOR(void(error_code, Iterator)) IteratorConnectHandler,
class>
BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(IteratorConnectHandler,void (error_code, Iterator))
basic_stream<Protocol, Executor, RatePolicy>::
async_connect(
Iterator begin, Iterator end,
IteratorConnectHandler&& handler)
{
return net::async_initiate<
IteratorConnectHandler,
void(error_code, Iterator)>(
typename ops::run_connect_iter_op{this},
handler,
begin, end,
detail::any_endpoint{});
}
template<class Protocol, class Executor, class RatePolicy>
template<
class Iterator,
class ConnectCondition,
BOOST_ASIO_COMPLETION_TOKEN_FOR(void(error_code, Iterator)) IteratorConnectHandler,
class>
BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(IteratorConnectHandler,void (error_code, Iterator))
basic_stream<Protocol, Executor, RatePolicy>::
async_connect(
Iterator begin, Iterator end,
ConnectCondition connect_condition,
IteratorConnectHandler&& handler)
{
return net::async_initiate<
IteratorConnectHandler,
void(error_code, Iterator)>(
typename ops::run_connect_iter_op{this},
handler,
begin, end,
connect_condition);
}
//------------------------------------------------------------------------------
template<class Protocol, class Executor, class RatePolicy>
template<class MutableBufferSequence, BOOST_BEAST_ASYNC_TPARAM2 ReadHandler>
BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
basic_stream<Protocol, Executor, RatePolicy>::
async_read_some(
MutableBufferSequence const& buffers,
ReadHandler&& handler)
{
static_assert(net::is_mutable_buffer_sequence<
MutableBufferSequence>::value,
"MutableBufferSequence type requirements not met");
return net::async_initiate<
ReadHandler,
void(error_code, std::size_t)>(
typename ops::run_read_op{this},
handler,
buffers);
}
template<class Protocol, class Executor, class RatePolicy>
template<class ConstBufferSequence, BOOST_BEAST_ASYNC_TPARAM2 WriteHandler>
BOOST_BEAST_ASYNC_RESULT2(WriteHandler)
basic_stream<Protocol, Executor, RatePolicy>::
async_write_some(
ConstBufferSequence const& buffers,
WriteHandler&& handler)
{
static_assert(net::is_const_buffer_sequence<
ConstBufferSequence>::value,
"ConstBufferSequence type requirements not met");
return net::async_initiate<
WriteHandler,
void(error_code, std::size_t)>(
typename ops::run_write_op{this},
handler,
buffers);
}
//------------------------------------------------------------------------------
//
// Customization points
//
#if ! BOOST_BEAST_DOXYGEN
template<
class Protocol, class Executor, class RatePolicy>
void
beast_close_socket(
basic_stream<Protocol, Executor, RatePolicy>& stream)
{
error_code ec;
stream.socket().close(ec);
}
template<
class Protocol, class Executor, class RatePolicy>
void
teardown(
role_type role,
basic_stream<Protocol, Executor, RatePolicy>& stream,
error_code& ec)
{
using beast::websocket::teardown;
teardown(role, stream.socket(), ec);
}
template<
class Protocol, class Executor, class RatePolicy,
class TeardownHandler>
void
async_teardown(
role_type role,
basic_stream<Protocol, Executor, RatePolicy>& stream,
TeardownHandler&& handler)
{
using beast::websocket::async_teardown;
async_teardown(role, stream.socket(),
std::forward<TeardownHandler>(handler));
}
#endif
} // beast
} // boost
#endif