7#ifndef BOOST_REDIS_CONNECTION_BASE_HPP
8#define BOOST_REDIS_CONNECTION_BASE_HPP
10#include <boost/redis/adapter/adapt.hpp>
11#include <boost/redis/detail/helper.hpp>
12#include <boost/redis/error.hpp>
13#include <boost/redis/operation.hpp>
14#include <boost/redis/request.hpp>
15#include <boost/redis/resp3/type.hpp>
16#include <boost/redis/config.hpp>
17#include <boost/redis/detail/runner.hpp>
18#include <boost/redis/usage.hpp>
20#include <boost/system.hpp>
21#include <boost/asio/basic_stream_socket.hpp>
22#include <boost/asio/bind_executor.hpp>
23#include <boost/asio/experimental/parallel_group.hpp>
24#include <boost/asio/ip/tcp.hpp>
25#include <boost/asio/steady_timer.hpp>
26#include <boost/asio/write.hpp>
27#include <boost/assert.hpp>
28#include <boost/core/ignore_unused.hpp>
29#include <boost/asio/ssl/stream.hpp>
30#include <boost/asio/read_until.hpp>
31#include <boost/asio/buffer.hpp>
32#include <boost/asio/experimental/channel.hpp>
43namespace boost::redis::detail
46template <
class DynamicBuffer>
47std::string_view buffer_view(DynamicBuffer buf)
noexcept
49 char const* start =
static_cast<char const*
>(buf.data(0, buf.size()).data());
50 return std::string_view{start, std::size(buf)};
53template <
class AsyncReadStream,
class DynamicBuffer>
56 AsyncReadStream& stream_;
58 std::size_t size_ = 0;
60 asio::coroutine coro_{};
63 append_some_op(AsyncReadStream& stream, DynamicBuffer buf, std::size_t size)
65 , buf_ {std::move(buf)}
70 void operator()( Self& self
71 , system::error_code ec = {}
74 BOOST_ASIO_CORO_REENTER (coro_)
80 stream_.async_read_some(buf_.data(tmp_, size_), std::move(self));
86 buf_.shrink(buf_.size() - tmp_ - n);
92template <
class AsyncReadStream,
class DynamicBuffer,
class CompletionToken>
95 AsyncReadStream& stream,
98 CompletionToken&& token)
100 return asio::async_compose
102 , void(system::error_code, std::size_t)
103 >(append_some_op<AsyncReadStream, DynamicBuffer> {stream, buffer, size}, token, stream);
108 using req_info_type =
typename Conn::req_info;
109 using adapter_type =
typename Conn::adapter_type;
111 Conn* conn_ =
nullptr;
112 std::shared_ptr<req_info_type> info_ =
nullptr;
113 asio::coroutine coro{};
115 template <
class Self>
116 void operator()(Self& self , system::error_code = {}, std::size_t = 0)
118 BOOST_ASIO_CORO_REENTER (coro)
122 if (info_->req_->get_config().cancel_if_not_connected && !conn_->is_open()) {
123 BOOST_ASIO_CORO_YIELD
124 asio::post(std::move(self));
128 conn_->add_request_info(info_);
131 BOOST_ASIO_CORO_YIELD
132 info_->async_wait(std::move(self));
135 self.complete(info_->ec_, 0);
139 if (info_->stop_requested()) {
142 return self.complete(asio::error::operation_aborted, 0);
145 if (is_cancelled(self)) {
146 if (!info_->is_waiting()) {
147 using c_t = asio::cancellation_type;
148 auto const c = self.get_cancellation_state().cancelled();
149 if ((c & c_t::terminal) != c_t::none) {
153 return self.complete(asio::error::operation_aborted, 0);
156 self.get_cancellation_state().clear();
164 conn_->remove_request(info_);
165 self.complete(asio::error::operation_aborted, 0);
170 self.complete(info_->ec_, info_->read_size_);
175template <
class Conn,
class Logger>
177 Conn* conn =
nullptr;
179 asio::coroutine coro{};
181 template <
class Self>
182 void operator()( Self& self
183 , std::array<std::size_t, 2> order = {}
184 , system::error_code ec0 = {}
185 , system::error_code ec1 = {})
187 BOOST_ASIO_CORO_REENTER (coro)
191 BOOST_ASIO_CORO_YIELD
192 asio::experimental::make_parallel_group(
193 [
this](
auto token) {
return conn->reader(logger_, token);},
194 [
this](
auto token) {
return conn->writer(logger_, token);}
196 asio::experimental::wait_for_one(),
199 if (is_cancelled(self)) {
200 logger_.trace(
"run-op: canceled. Exiting ...");
201 self.complete(asio::error::operation_aborted);
205 logger_.on_run(ec0, ec1);
208 case 0: self.complete(ec0);
break;
209 case 1: self.complete(ec1);
break;
210 default: BOOST_ASSERT(
false);
216template <
class Conn,
class Logger>
220 asio::coroutine coro{};
222 template <
class Self>
223 void operator()( Self& self
224 , system::error_code ec = {}
229 BOOST_ASIO_CORO_REENTER (coro)
for (;;)
231 while (conn_->coalesce_requests()) {
232 if (conn_->use_ssl())
233 BOOST_ASIO_CORO_YIELD asio::async_write(conn_->next_layer(), asio::buffer(conn_->write_buffer_), std::move(self));
235 BOOST_ASIO_CORO_YIELD asio::async_write(conn_->next_layer().next_layer(), asio::buffer(conn_->write_buffer_), std::move(self));
237 logger_.on_write(ec, conn_->write_buffer_);
240 logger_.trace(
"writer-op: error. Exiting ...");
246 if (is_cancelled(self)) {
247 logger_.trace(
"writer-op: canceled. Exiting ...");
248 self.complete(asio::error::operation_aborted);
257 if (!conn_->is_open()) {
258 logger_.trace(
"writer-op: canceled (2). Exiting ...");
264 BOOST_ASIO_CORO_YIELD
265 conn_->writer_timer_.async_wait(std::move(self));
266 if (!conn_->is_open() || is_cancelled(self)) {
267 logger_.trace(
"writer-op: canceled (3). Exiting ...");
278template <
class Conn,
class Logger>
280 using parse_result =
typename Conn::parse_result;
281 using parse_ret_type =
typename Conn::parse_ret_type;
284 parse_ret_type res_{parse_result::resp, 0};
285 asio::coroutine coro{};
287 template <
class Self>
288 void operator()( Self& self
289 , system::error_code ec = {}
294 BOOST_ASIO_CORO_REENTER (coro)
for (;;)
297 if ((res_.first == parse_result::needs_more) || std::empty(conn_->read_buffer_)) {
298 if (conn_->use_ssl()) {
299 BOOST_ASIO_CORO_YIELD
303 conn_->get_suggested_buffer_growth(),
306 BOOST_ASIO_CORO_YIELD
308 conn_->next_layer().next_layer(),
310 conn_->get_suggested_buffer_growth(),
314 logger_.on_read(ec, n);
317 if (ec == asio::error::eof) {
318 logger_.trace(
"reader-op: EOF received. Exiting ...");
320 return self.complete({});
325 logger_.trace(
"reader-op: error. Exiting ...");
334 if (!conn_->is_open() || is_cancelled(self)) {
335 logger_.trace(
"reader-op: canceled. Exiting ...");
341 res_ = conn_->on_read(buffer_view(conn_->dbuf_), ec);
343 logger_.trace(
"reader-op: parse error. Exiting ...");
349 if (res_.first == parse_result::push) {
350 if (!conn_->receive_channel_.try_send(ec, res_.second)) {
351 BOOST_ASIO_CORO_YIELD
352 conn_->receive_channel_.async_send(ec, res_.second, std::move(self));
356 logger_.trace(
"reader-op: error. Exiting ...");
362 if (!conn_->is_open() || is_cancelled(self)) {
363 logger_.trace(
"reader-op: canceled (2). Exiting ...");
364 self.complete(asio::error::operation_aborted);
379template <
class Executor>
386 using next_layer_type = asio::ssl::stream<asio::basic_stream_socket<asio::ip::tcp, Executor>>;
388 using clock_type = std::chrono::steady_clock;
389 using clock_traits_type = asio::wait_traits<clock_type>;
390 using timer_type = asio::basic_waitable_timer<clock_type, clock_traits_type, executor_type>;
397 asio::ssl::context ctx,
398 std::size_t max_read_size)
399 : ctx_{std::move(ctx)}
402 , receive_channel_{ex, 256}
404 , dbuf_{read_buffer_, max_read_size}
406 set_receive_response(
ignore);
407 writer_timer_.expires_at((std::chrono::steady_clock::time_point::max)());
417 stream_ = std::make_unique<next_layer_type>(writer_timer_.get_executor(), ctx_);
443 template <
class Response,
class CompletionToken>
444 auto async_exec(
request const& req, Response& resp, CompletionToken token)
446 using namespace boost::redis::adapter;
447 auto f = boost_redis_adapt(resp);
448 BOOST_ASSERT_MSG(req.get_expected_responses() <= f.get_supported_response_size(),
"Request and response have incompatible sizes.");
450 auto info = std::make_shared<req_info>(req, f, get_executor());
452 return asio::async_compose
454 , void(system::error_code, std::size_t)
455 >(exec_op<this_type>{
this, info}, token, writer_timer_);
458 template <
class Response,
class CompletionToken>
459 [[deprecated(
"Set the response with set_receive_response and use the other overload.")]]
460 auto async_receive(Response&
response, CompletionToken token)
463 return receive_channel_.async_receive(std::move(token));
466 template <
class CompletionToken>
467 auto async_receive(CompletionToken token)
468 {
return receive_channel_.async_receive(std::move(token)); }
470 std::size_t
receive(system::error_code& ec)
472 std::size_t size = 0;
474 auto f = [&](system::error_code
const& ec2, std::size_t n)
480 auto const res = receive_channel_.try_receive(f);
490 template <
class Logger,
class CompletionToken>
491 auto async_run(
config const& cfg, Logger l, CompletionToken token)
493 runner_.set_config(cfg);
494 l.set_prefix(runner_.get_config().log_prefix);
495 return runner_.async_run(*
this, l, std::move(token));
498 template <
class Response>
499 void set_receive_response(Response&
response)
501 using namespace boost::redis::adapter;
502 auto g = boost_redis_adapt(
response);
503 receive_adapter_ = adapter::detail::make_adapter_wrapper(g);
506 usage get_usage() const noexcept
509 auto run_is_canceled() const noexcept
510 {
return cancel_run_called_; }
513 using receive_channel_type = asio::experimental::channel<executor_type, void(system::error_code, std::size_t)>;
514 using runner_type = runner<executor_type>;
517 using exec_notifier_type = receive_channel_type;
519 auto use_ssl() const noexcept
520 {
return runner_.get_config().use_ssl;}
522 auto cancel_on_conn_lost() -> std::size_t
525 auto cond = [](
auto const& ptr)
527 BOOST_ASSERT(ptr !=
nullptr);
529 if (ptr->is_waiting()) {
530 return !ptr->req_->get_config().cancel_on_connection_lost;
532 return !ptr->req_->get_config().cancel_if_unresponded;
536 auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), cond);
538 auto const ret = std::distance(point, std::end(reqs_));
540 std::for_each(point, std::end(reqs_), [](
auto const& ptr) {
544 reqs_.erase(point, std::end(reqs_));
546 std::for_each(std::begin(reqs_), std::end(reqs_), [](
auto const& ptr) {
547 return ptr->mark_waiting();
553 auto cancel_unwritten_requests() -> std::size_t
555 auto f = [](
auto const& ptr)
557 BOOST_ASSERT(ptr !=
nullptr);
558 return !ptr->is_waiting();
561 auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), f);
563 auto const ret = std::distance(point, std::end(reqs_));
565 std::for_each(point, std::end(reqs_), [](
auto const& ptr) {
569 reqs_.erase(point, std::end(reqs_));
578 cancel_unwritten_requests();
584 if (std::exchange(cancel_run_called_,
true)) {
589 writer_timer_.cancel();
590 receive_channel_.cancel();
591 cancel_on_conn_lost();
595 receive_channel_.cancel();
605 write_buffer_.clear();
608 cancel_push_requests();
612 std::for_each(std::begin(reqs_), std::end(reqs_), [](
auto const& ptr) {
613 BOOST_ASSERT_MSG(ptr !=
nullptr,
"Expects non-null pointer.");
614 if (ptr->is_staged()) {
623 using wrapped_adapter_type = std::function<void(node_type
const&, system::error_code&)>;
625 explicit req_info(
request const& req, adapter_type adapter, executor_type ex)
629 , expected_responses_{req.get_expected_responses()}
630 , status_{status::waiting}
634 adapter_ = [
this, adapter](node_type
const& nd, system::error_code& ec)
636 auto const i = req_->get_expected_responses() - expected_responses_;
643 notifier_.try_send(std::error_code{}, 0);
651 [[nodiscard]]
auto is_waiting() const noexcept
652 {
return status_ == status::waiting; }
654 [[nodiscard]]
auto is_written() const noexcept
655 {
return status_ == status::written; }
657 [[nodiscard]]
auto is_staged() const noexcept
658 {
return status_ == status::staged; }
660 void mark_written() noexcept
661 { status_ = status::written; }
663 void mark_staged() noexcept
664 { status_ = status::staged; }
666 void mark_waiting() noexcept
667 { status_ = status::waiting; }
669 [[nodiscard]]
auto stop_requested() const noexcept
670 {
return !notifier_.is_open();}
672 template <
class CompletionToken>
673 auto async_wait(CompletionToken token)
675 return notifier_.async_receive(std::move(token));
685 exec_notifier_type notifier_;
687 wrapped_adapter_type adapter_;
690 std::size_t expected_responses_;
693 system::error_code ec_;
694 std::size_t read_size_;
697 void remove_request(std::shared_ptr<req_info>
const& info)
699 reqs_.erase(std::remove(std::begin(reqs_), std::end(reqs_), info));
702 using reqs_type = std::deque<std::shared_ptr<req_info>>;
704 template <
class,
class>
friend struct reader_op;
705 template <
class,
class>
friend struct writer_op;
706 template <
class,
class>
friend struct run_op;
707 template <
class>
friend struct exec_op;
708 template <
class,
class,
class>
friend struct run_all_op;
710 void cancel_push_requests()
712 auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), [](
auto const& ptr) {
713 return !(ptr->is_staged() && ptr->req_->get_expected_responses() == 0);
716 std::for_each(point, std::end(reqs_), [](
auto const& ptr) {
720 reqs_.erase(point, std::end(reqs_));
723 [[nodiscard]]
bool is_writing() const noexcept
725 return !write_buffer_.empty();
728 void add_request_info(std::shared_ptr<req_info>
const& info)
730 reqs_.push_back(info);
732 if (info->req_->has_hello_priority()) {
733 auto rend = std::partition_point(std::rbegin(reqs_), std::rend(reqs_), [](
auto const& e) {
734 return e->is_waiting();
737 std::rotate(std::rbegin(reqs_), std::rbegin(reqs_) + 1, rend);
740 if (is_open() && !is_writing())
741 writer_timer_.cancel();
744 template <
class CompletionToken,
class Logger>
745 auto reader(Logger l, CompletionToken&& token)
747 return asio::async_compose
749 , void(system::error_code)
750 >(reader_op<this_type, Logger>{
this, l}, token, writer_timer_);
753 template <
class CompletionToken,
class Logger>
754 auto writer(Logger l, CompletionToken&& token)
756 return asio::async_compose
758 , void(system::error_code)
759 >(writer_op<this_type, Logger>{
this, l}, token, writer_timer_);
762 template <
class Logger,
class CompletionToken>
763 auto async_run_lean(
config const& cfg, Logger l, CompletionToken token)
765 runner_.set_config(cfg);
766 l.set_prefix(runner_.get_config().log_prefix);
767 return asio::async_compose
769 , void(system::error_code)
770 >(run_op<this_type, Logger>{
this, l}, token, writer_timer_);
773 [[nodiscard]]
bool coalesce_requests()
777 auto const point = std::partition_point(std::cbegin(reqs_), std::cend(reqs_), [](
auto const& ri) {
778 return !ri->is_waiting();
781 std::for_each(point, std::cend(reqs_), [
this](
auto const& ri) {
783 write_buffer_ += ri->req_->payload();
785 usage_.commands_sent += ri->expected_responses_;
788 usage_.bytes_sent += std::size(write_buffer_);
790 return point != std::cend(reqs_);
793 bool is_waiting_response() const noexcept
795 if (std::empty(reqs_))
802 return !reqs_.front()->is_waiting();
807 if (stream_->next_layer().is_open()) {
808 system::error_code ec;
809 stream_->next_layer().close(ec);
813 auto is_open() const noexcept {
return stream_->next_layer().is_open(); }
814 auto& lowest_layer() noexcept {
return stream_->lowest_layer(); }
818 BOOST_ASSERT(!read_buffer_.empty());
827 if (resp3::to_type(read_buffer_.front()) == resp3::type::push)
842 if (reqs_.front()->expected_responses_ == 0)
850 return reqs_.front()->is_waiting();
853 auto get_suggested_buffer_growth() const noexcept
855 return parser_.get_suggested_buffer_growth(4096);
858 enum class parse_result { needs_more,
push, resp };
860 using parse_ret_type = std::pair<parse_result, std::size_t>;
862 parse_ret_type on_finish_parsing(parse_result t)
864 if (t == parse_result::push) {
865 usage_.pushes_received += 1;
866 usage_.push_bytes_received += parser_.get_consumed();
868 usage_.responses_received += 1;
869 usage_.response_bytes_received += parser_.get_consumed();
873 dbuf_.consume(parser_.get_consumed());
874 auto const res = std::make_pair(t, parser_.get_consumed());
879 parse_ret_type on_read(std::string_view data, system::error_code& ec)
892 on_push_ = is_next_push();
895 if (!resp3::parse(parser_, data, receive_adapter_, ec))
896 return std::make_pair(parse_result::needs_more, 0);
899 return std::make_pair(parse_result::push, 0);
901 return on_finish_parsing(parse_result::push);
904 BOOST_ASSERT_MSG(is_waiting_response(),
"Not waiting for a response (using MONITOR command perhaps?)");
905 BOOST_ASSERT(!reqs_.empty());
906 BOOST_ASSERT(reqs_.front() !=
nullptr);
907 BOOST_ASSERT(reqs_.front()->expected_responses_ != 0);
909 if (!resp3::parse(parser_, data, reqs_.front()->adapter_, ec))
910 return std::make_pair(parse_result::needs_more, 0);
913 reqs_.front()->ec_ = ec;
914 reqs_.front()->proceed();
915 return std::make_pair(parse_result::resp, 0);
918 reqs_.front()->read_size_ += parser_.get_consumed();
920 if (--reqs_.front()->expected_responses_ == 0) {
922 reqs_.front()->proceed();
926 return on_finish_parsing(parse_result::resp);
931 write_buffer_.clear();
932 read_buffer_.clear();
935 cancel_run_called_ =
false;
938 asio::ssl::context ctx_;
939 std::unique_ptr<next_layer_type> stream_;
944 timer_type writer_timer_;
945 receive_channel_type receive_channel_;
947 receiver_adapter_type receive_adapter_;
949 using dyn_buffer_type = asio::dynamic_string_buffer<char, std::char_traits<char>, std::allocator<char>>;
951 std::string read_buffer_;
952 dyn_buffer_type dbuf_;
953 std::string write_buffer_;
955 resp3::parser parser_{};
956 bool on_push_ =
false;
957 bool cancel_run_called_ =
false;
Base class for high level Redis asynchronous connections.
auto get_executor()
Returns the associated executor.
void reset_stream()
Resets the underlying stream.
asio::ssl::stream< asio::basic_stream_socket< asio::ip::tcp, Executor > > next_layer_type
Type of the next layer.
auto & next_layer() noexcept
Returns a reference to the next layer.
Executor executor_type
Executor type.
auto const & get_ssl_context() const noexcept
Returns the ssl context.
void cancel(operation op)
Cancels specific operations.
auto const & next_layer() const noexcept
Returns a const reference to the next layer.
connection_base(executor_type ex, asio::ssl::context ctx, std::size_t max_read_size)
Constructs from an executor.
ignore_t ignore
Global ignore object.
std::tuple< adapter::result< Ts >... > response
Response with compile-time size.
operation
Connection operations that can be cancelled.
@ sync_receive_push_failed
Can't receive push synchronously without blocking.
@ not_connected
There is no stablished connection.
@ exec
Refers to connection::async_exec operations.
@ all
Refers to all operations.
@ run
Refers to connection::async_run operations.
@ receive
Refers to connection::async_receive operations.
Configure parameters used by the connection classes.
Connection usage information.
A node in the response tree.