Boost C++ Libraries

...one of the most highly regarded and expertly designed C++ library projects in the world. Herb Sutter and Andrei Alexandrescu, C++ Coding Standards

This is the documentation for an old version of Boost. Click here to view this page for the latest version.
Loading...
Searching...
No Matches
connection_base.hpp
1/* Copyright (c) 2018-2023 Marcelo Zimbres Silva (mzimbres@gmail.com)
2 *
3 * Distributed under the Boost Software License, Version 1.0. (See
4 * accompanying file LICENSE.txt)
5 */
6
7#ifndef BOOST_REDIS_CONNECTION_BASE_HPP
8#define BOOST_REDIS_CONNECTION_BASE_HPP
9
10#include <boost/redis/adapter/adapt.hpp>
11#include <boost/redis/detail/helper.hpp>
12#include <boost/redis/error.hpp>
13#include <boost/redis/operation.hpp>
14#include <boost/redis/request.hpp>
15#include <boost/redis/resp3/type.hpp>
16#include <boost/redis/config.hpp>
17#include <boost/redis/detail/runner.hpp>
18#include <boost/redis/usage.hpp>
19
20#include <boost/system.hpp>
21#include <boost/asio/basic_stream_socket.hpp>
22#include <boost/asio/bind_executor.hpp>
23#include <boost/asio/experimental/parallel_group.hpp>
24#include <boost/asio/ip/tcp.hpp>
25#include <boost/asio/steady_timer.hpp>
26#include <boost/asio/write.hpp>
27#include <boost/assert.hpp>
28#include <boost/core/ignore_unused.hpp>
29#include <boost/asio/ssl/stream.hpp>
30#include <boost/asio/read_until.hpp>
31#include <boost/asio/buffer.hpp>
32#include <boost/asio/experimental/channel.hpp>
33
34#include <algorithm>
35#include <array>
36#include <chrono>
37#include <deque>
38#include <memory>
39#include <string_view>
40#include <type_traits>
41#include <functional>
42
43namespace boost::redis::detail
44{
45
46template <class DynamicBuffer>
47std::string_view buffer_view(DynamicBuffer buf) noexcept
48{
49 char const* start = static_cast<char const*>(buf.data(0, buf.size()).data());
50 return std::string_view{start, std::size(buf)};
51}
52
53template <class AsyncReadStream, class DynamicBuffer>
54class append_some_op {
55private:
56 AsyncReadStream& stream_;
57 DynamicBuffer buf_;
58 std::size_t size_ = 0;
59 std::size_t tmp_ = 0;
60 asio::coroutine coro_{};
61
62public:
63 append_some_op(AsyncReadStream& stream, DynamicBuffer buf, std::size_t size)
64 : stream_ {stream}
65 , buf_ {std::move(buf)}
66 , size_{size}
67 { }
68
69 template <class Self>
70 void operator()( Self& self
71 , system::error_code ec = {}
72 , std::size_t n = 0)
73 {
74 BOOST_ASIO_CORO_REENTER (coro_)
75 {
76 tmp_ = buf_.size();
77 buf_.grow(size_);
78
79 BOOST_ASIO_CORO_YIELD
80 stream_.async_read_some(buf_.data(tmp_, size_), std::move(self));
81 if (ec) {
82 self.complete(ec, 0);
83 return;
84 }
85
86 buf_.shrink(buf_.size() - tmp_ - n);
87 self.complete({}, n);
88 }
89 }
90};
91
92template <class AsyncReadStream, class DynamicBuffer, class CompletionToken>
93auto
94async_append_some(
95 AsyncReadStream& stream,
96 DynamicBuffer buffer,
97 std::size_t size,
98 CompletionToken&& token)
99{
100 return asio::async_compose
101 < CompletionToken
102 , void(system::error_code, std::size_t)
103 >(append_some_op<AsyncReadStream, DynamicBuffer> {stream, buffer, size}, token, stream);
104}
105
106template <class Conn>
107struct exec_op {
108 using req_info_type = typename Conn::req_info;
109 using adapter_type = typename Conn::adapter_type;
110
111 Conn* conn_ = nullptr;
112 std::shared_ptr<req_info_type> info_ = nullptr;
113 asio::coroutine coro{};
114
115 template <class Self>
116 void operator()(Self& self , system::error_code = {}, std::size_t = 0)
117 {
118 BOOST_ASIO_CORO_REENTER (coro)
119 {
120 // Check whether the user wants to wait for the connection to
121 // be stablished.
122 if (info_->req_->get_config().cancel_if_not_connected && !conn_->is_open()) {
123 BOOST_ASIO_CORO_YIELD
124 asio::post(std::move(self));
125 return self.complete(error::not_connected, 0);
126 }
127
128 conn_->add_request_info(info_);
129
130EXEC_OP_WAIT:
131 BOOST_ASIO_CORO_YIELD
132 info_->async_wait(std::move(self));
133
134 if (info_->ec_) {
135 self.complete(info_->ec_, 0);
136 return;
137 }
138
139 if (info_->stop_requested()) {
140 // Don't have to call remove_request as it has already
141 // been by cancel(exec).
142 return self.complete(asio::error::operation_aborted, 0);
143 }
144
145 if (is_cancelled(self)) {
146 if (!info_->is_waiting()) {
147 using c_t = asio::cancellation_type;
148 auto const c = self.get_cancellation_state().cancelled();
149 if ((c & c_t::terminal) != c_t::none) {
150 // Cancellation requires closing the connection
151 // otherwise it stays in inconsistent state.
152 conn_->cancel(operation::run);
153 return self.complete(asio::error::operation_aborted, 0);
154 } else {
155 // Can't implement other cancelation types, ignoring.
156 self.get_cancellation_state().clear();
157
158 // TODO: Find out a better way to ignore
159 // cancelation.
160 goto EXEC_OP_WAIT;
161 }
162 } else {
163 // Cancelation can be honored.
164 conn_->remove_request(info_);
165 self.complete(asio::error::operation_aborted, 0);
166 return;
167 }
168 }
169
170 self.complete(info_->ec_, info_->read_size_);
171 }
172 }
173};
174
175template <class Conn, class Logger>
176struct run_op {
177 Conn* conn = nullptr;
178 Logger logger_;
179 asio::coroutine coro{};
180
181 template <class Self>
182 void operator()( Self& self
183 , std::array<std::size_t, 2> order = {}
184 , system::error_code ec0 = {}
185 , system::error_code ec1 = {})
186 {
187 BOOST_ASIO_CORO_REENTER (coro)
188 {
189 conn->reset();
190
191 BOOST_ASIO_CORO_YIELD
192 asio::experimental::make_parallel_group(
193 [this](auto token) { return conn->reader(logger_, token);},
194 [this](auto token) { return conn->writer(logger_, token);}
195 ).async_wait(
196 asio::experimental::wait_for_one(),
197 std::move(self));
198
199 if (is_cancelled(self)) {
200 logger_.trace("run-op: canceled. Exiting ...");
201 self.complete(asio::error::operation_aborted);
202 return;
203 }
204
205 logger_.on_run(ec0, ec1);
206
207 switch (order[0]) {
208 case 0: self.complete(ec0); break;
209 case 1: self.complete(ec1); break;
210 default: BOOST_ASSERT(false);
211 }
212 }
213 }
214};
215
216template <class Conn, class Logger>
217struct writer_op {
218 Conn* conn_;
219 Logger logger_;
220 asio::coroutine coro{};
221
222 template <class Self>
223 void operator()( Self& self
224 , system::error_code ec = {}
225 , std::size_t n = 0)
226 {
227 ignore_unused(n);
228
229 BOOST_ASIO_CORO_REENTER (coro) for (;;)
230 {
231 while (conn_->coalesce_requests()) {
232 if (conn_->use_ssl())
233 BOOST_ASIO_CORO_YIELD asio::async_write(conn_->next_layer(), asio::buffer(conn_->write_buffer_), std::move(self));
234 else
235 BOOST_ASIO_CORO_YIELD asio::async_write(conn_->next_layer().next_layer(), asio::buffer(conn_->write_buffer_), std::move(self));
236
237 logger_.on_write(ec, conn_->write_buffer_);
238
239 if (ec) {
240 logger_.trace("writer-op: error. Exiting ...");
241 conn_->cancel(operation::run);
242 self.complete(ec);
243 return;
244 }
245
246 if (is_cancelled(self)) {
247 logger_.trace("writer-op: canceled. Exiting ...");
248 self.complete(asio::error::operation_aborted);
249 return;
250 }
251
252 conn_->on_write();
253
254 // A socket.close() may have been called while a
255 // successful write might had already been queued, so we
256 // have to check here before proceeding.
257 if (!conn_->is_open()) {
258 logger_.trace("writer-op: canceled (2). Exiting ...");
259 self.complete({});
260 return;
261 }
262 }
263
264 BOOST_ASIO_CORO_YIELD
265 conn_->writer_timer_.async_wait(std::move(self));
266 if (!conn_->is_open() || is_cancelled(self)) {
267 logger_.trace("writer-op: canceled (3). Exiting ...");
268 // Notice this is not an error of the op, stoping was
269 // requested from the outside, so we complete with
270 // success.
271 self.complete({});
272 return;
273 }
274 }
275 }
276};
277
278template <class Conn, class Logger>
279struct reader_op {
280 using parse_result = typename Conn::parse_result;
281 using parse_ret_type = typename Conn::parse_ret_type;
282 Conn* conn_;
283 Logger logger_;
284 parse_ret_type res_{parse_result::resp, 0};
285 asio::coroutine coro{};
286
287 template <class Self>
288 void operator()( Self& self
289 , system::error_code ec = {}
290 , std::size_t n = 0)
291 {
292 ignore_unused(n);
293
294 BOOST_ASIO_CORO_REENTER (coro) for (;;)
295 {
296 // Appends some data to the buffer if necessary.
297 if ((res_.first == parse_result::needs_more) || std::empty(conn_->read_buffer_)) {
298 if (conn_->use_ssl()) {
299 BOOST_ASIO_CORO_YIELD
300 async_append_some(
301 conn_->next_layer(),
302 conn_->dbuf_,
303 conn_->get_suggested_buffer_growth(),
304 std::move(self));
305 } else {
306 BOOST_ASIO_CORO_YIELD
307 async_append_some(
308 conn_->next_layer().next_layer(),
309 conn_->dbuf_,
310 conn_->get_suggested_buffer_growth(),
311 std::move(self));
312 }
313
314 logger_.on_read(ec, n);
315
316 // EOF is not treated as error.
317 if (ec == asio::error::eof) {
318 logger_.trace("reader-op: EOF received. Exiting ...");
319 conn_->cancel(operation::run);
320 return self.complete({}); // EOFINAE: EOF is not an error.
321 }
322
323 // The connection is not viable after an error.
324 if (ec) {
325 logger_.trace("reader-op: error. Exiting ...");
326 conn_->cancel(operation::run);
327 self.complete(ec);
328 return;
329 }
330
331 // Somebody might have canceled implicitly or explicitly
332 // while we were suspended and after queueing so we have to
333 // check.
334 if (!conn_->is_open() || is_cancelled(self)) {
335 logger_.trace("reader-op: canceled. Exiting ...");
336 self.complete(ec);
337 return;
338 }
339 }
340
341 res_ = conn_->on_read(buffer_view(conn_->dbuf_), ec);
342 if (ec) {
343 logger_.trace("reader-op: parse error. Exiting ...");
344 conn_->cancel(operation::run);
345 self.complete(ec);
346 return;
347 }
348
349 if (res_.first == parse_result::push) {
350 if (!conn_->receive_channel_.try_send(ec, res_.second)) {
351 BOOST_ASIO_CORO_YIELD
352 conn_->receive_channel_.async_send(ec, res_.second, std::move(self));
353 }
354
355 if (ec) {
356 logger_.trace("reader-op: error. Exiting ...");
357 conn_->cancel(operation::run);
358 self.complete(ec);
359 return;
360 }
361
362 if (!conn_->is_open() || is_cancelled(self)) {
363 logger_.trace("reader-op: canceled (2). Exiting ...");
364 self.complete(asio::error::operation_aborted);
365 return;
366 }
367
368 }
369 }
370 }
371};
372
379template <class Executor>
381public:
383 using executor_type = Executor;
384
386 using next_layer_type = asio::ssl::stream<asio::basic_stream_socket<asio::ip::tcp, Executor>>;
387
388 using clock_type = std::chrono::steady_clock;
389 using clock_traits_type = asio::wait_traits<clock_type>;
390 using timer_type = asio::basic_waitable_timer<clock_type, clock_traits_type, executor_type>;
391
393
396 executor_type ex,
397 asio::ssl::context ctx,
398 std::size_t max_read_size)
399 : ctx_{std::move(ctx)}
400 , stream_{std::make_unique<next_layer_type>(ex, ctx_)}
401 , writer_timer_{ex}
402 , receive_channel_{ex, 256}
403 , runner_{ex, {}}
404 , dbuf_{read_buffer_, max_read_size}
405 {
406 set_receive_response(ignore);
407 writer_timer_.expires_at((std::chrono::steady_clock::time_point::max)());
408 }
409
411 auto const& get_ssl_context() const noexcept
412 { return ctx_;}
413
416 {
417 stream_ = std::make_unique<next_layer_type>(writer_timer_.get_executor(), ctx_);
418 }
419
421 auto& next_layer() noexcept { return *stream_; }
422
424 auto const& next_layer() const noexcept { return *stream_; }
425
427 auto get_executor() {return writer_timer_.get_executor();}
428
431 {
432 runner_.cancel(op);
433 if (op == operation::all) {
434 cancel_impl(operation::run);
435 cancel_impl(operation::receive);
436 cancel_impl(operation::exec);
437 return;
438 }
439
440 cancel_impl(op);
441 }
442
443 template <class Response, class CompletionToken>
444 auto async_exec(request const& req, Response& resp, CompletionToken token)
445 {
446 using namespace boost::redis::adapter;
447 auto f = boost_redis_adapt(resp);
448 BOOST_ASSERT_MSG(req.get_expected_responses() <= f.get_supported_response_size(), "Request and response have incompatible sizes.");
449
450 auto info = std::make_shared<req_info>(req, f, get_executor());
451
452 return asio::async_compose
453 < CompletionToken
454 , void(system::error_code, std::size_t)
455 >(exec_op<this_type>{this, info}, token, writer_timer_);
456 }
457
458 template <class Response, class CompletionToken>
459 [[deprecated("Set the response with set_receive_response and use the other overload.")]]
460 auto async_receive(Response& response, CompletionToken token)
461 {
462 set_receive_response(response);
463 return receive_channel_.async_receive(std::move(token));
464 }
465
466 template <class CompletionToken>
467 auto async_receive(CompletionToken token)
468 { return receive_channel_.async_receive(std::move(token)); }
469
470 std::size_t receive(system::error_code& ec)
471 {
472 std::size_t size = 0;
473
474 auto f = [&](system::error_code const& ec2, std::size_t n)
475 {
476 ec = ec2;
477 size = n;
478 };
479
480 auto const res = receive_channel_.try_receive(f);
481 if (ec)
482 return 0;
483
484 if (!res)
486
487 return size;
488 }
489
490 template <class Logger, class CompletionToken>
491 auto async_run(config const& cfg, Logger l, CompletionToken token)
492 {
493 runner_.set_config(cfg);
494 l.set_prefix(runner_.get_config().log_prefix);
495 return runner_.async_run(*this, l, std::move(token));
496 }
497
498 template <class Response>
499 void set_receive_response(Response& response)
500 {
501 using namespace boost::redis::adapter;
502 auto g = boost_redis_adapt(response);
503 receive_adapter_ = adapter::detail::make_adapter_wrapper(g);
504 }
505
506 usage get_usage() const noexcept
507 { return usage_; }
508
509 auto run_is_canceled() const noexcept
510 { return cancel_run_called_; }
511
512private:
513 using receive_channel_type = asio::experimental::channel<executor_type, void(system::error_code, std::size_t)>;
514 using runner_type = runner<executor_type>;
515 using adapter_type = std::function<void(std::size_t, resp3::basic_node<std::string_view> const&, system::error_code&)>;
516 using receiver_adapter_type = std::function<void(resp3::basic_node<std::string_view> const&, system::error_code&)>;
517 using exec_notifier_type = receive_channel_type;
518
519 auto use_ssl() const noexcept
520 { return runner_.get_config().use_ssl;}
521
522 auto cancel_on_conn_lost() -> std::size_t
523 {
524 // Must return false if the request should be removed.
525 auto cond = [](auto const& ptr)
526 {
527 BOOST_ASSERT(ptr != nullptr);
528
529 if (ptr->is_waiting()) {
530 return !ptr->req_->get_config().cancel_on_connection_lost;
531 } else {
532 return !ptr->req_->get_config().cancel_if_unresponded;
533 }
534 };
535
536 auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), cond);
537
538 auto const ret = std::distance(point, std::end(reqs_));
539
540 std::for_each(point, std::end(reqs_), [](auto const& ptr) {
541 ptr->stop();
542 });
543
544 reqs_.erase(point, std::end(reqs_));
545
546 std::for_each(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
547 return ptr->mark_waiting();
548 });
549
550 return ret;
551 }
552
553 auto cancel_unwritten_requests() -> std::size_t
554 {
555 auto f = [](auto const& ptr)
556 {
557 BOOST_ASSERT(ptr != nullptr);
558 return !ptr->is_waiting();
559 };
560
561 auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), f);
562
563 auto const ret = std::distance(point, std::end(reqs_));
564
565 std::for_each(point, std::end(reqs_), [](auto const& ptr) {
566 ptr->stop();
567 });
568
569 reqs_.erase(point, std::end(reqs_));
570 return ret;
571 }
572
573 void cancel_impl(operation op)
574 {
575 switch (op) {
576 case operation::exec:
577 {
578 cancel_unwritten_requests();
579 } break;
580 case operation::run:
581 {
582 // Protects the code below from being called more than
583 // once, see https://github.com/boostorg/redis/issues/181
584 if (std::exchange(cancel_run_called_, true)) {
585 return;
586 }
587
588 close();
589 writer_timer_.cancel();
590 receive_channel_.cancel();
591 cancel_on_conn_lost();
592 } break;
594 {
595 receive_channel_.cancel();
596 } break;
597 default: /* ignore */;
598 }
599 }
600
601 void on_write()
602 {
603 // We have to clear the payload right after writing it to use it
604 // as a flag that informs there is no ongoing write.
605 write_buffer_.clear();
606
607 // Notice this must come before the for-each below.
608 cancel_push_requests();
609
610 // There is small optimization possible here: traverse only the
611 // partition of unwritten requests instead of them all.
612 std::for_each(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
613 BOOST_ASSERT_MSG(ptr != nullptr, "Expects non-null pointer.");
614 if (ptr->is_staged()) {
615 ptr->mark_written();
616 }
617 });
618 }
619
620 struct req_info {
621 public:
622 using node_type = resp3::basic_node<std::string_view>;
623 using wrapped_adapter_type = std::function<void(node_type const&, system::error_code&)>;
624
625 explicit req_info(request const& req, adapter_type adapter, executor_type ex)
626 : notifier_{ex, 1}
627 , req_{&req}
628 , adapter_{}
629 , expected_responses_{req.get_expected_responses()}
630 , status_{status::waiting}
631 , ec_{{}}
632 , read_size_{0}
633 {
634 adapter_ = [this, adapter](node_type const& nd, system::error_code& ec)
635 {
636 auto const i = req_->get_expected_responses() - expected_responses_;
637 adapter(i, nd, ec);
638 };
639 }
640
641 auto proceed()
642 {
643 notifier_.try_send(std::error_code{}, 0);
644 }
645
646 void stop()
647 {
648 notifier_.close();
649 }
650
651 [[nodiscard]] auto is_waiting() const noexcept
652 { return status_ == status::waiting; }
653
654 [[nodiscard]] auto is_written() const noexcept
655 { return status_ == status::written; }
656
657 [[nodiscard]] auto is_staged() const noexcept
658 { return status_ == status::staged; }
659
660 void mark_written() noexcept
661 { status_ = status::written; }
662
663 void mark_staged() noexcept
664 { status_ = status::staged; }
665
666 void mark_waiting() noexcept
667 { status_ = status::waiting; }
668
669 [[nodiscard]] auto stop_requested() const noexcept
670 { return !notifier_.is_open();}
671
672 template <class CompletionToken>
673 auto async_wait(CompletionToken token)
674 {
675 return notifier_.async_receive(std::move(token));
676 }
677
678 //private:
679 enum class status
680 { waiting
681 , staged
682 , written
683 };
684
685 exec_notifier_type notifier_;
686 request const* req_;
687 wrapped_adapter_type adapter_;
688
689 // Contains the number of commands that haven't been read yet.
690 std::size_t expected_responses_;
691 status status_;
692
693 system::error_code ec_;
694 std::size_t read_size_;
695 };
696
697 void remove_request(std::shared_ptr<req_info> const& info)
698 {
699 reqs_.erase(std::remove(std::begin(reqs_), std::end(reqs_), info));
700 }
701
702 using reqs_type = std::deque<std::shared_ptr<req_info>>;
703
704 template <class, class> friend struct reader_op;
705 template <class, class> friend struct writer_op;
706 template <class, class> friend struct run_op;
707 template <class> friend struct exec_op;
708 template <class, class, class> friend struct run_all_op;
709
710 void cancel_push_requests()
711 {
712 auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
713 return !(ptr->is_staged() && ptr->req_->get_expected_responses() == 0);
714 });
715
716 std::for_each(point, std::end(reqs_), [](auto const& ptr) {
717 ptr->proceed();
718 });
719
720 reqs_.erase(point, std::end(reqs_));
721 }
722
723 [[nodiscard]] bool is_writing() const noexcept
724 {
725 return !write_buffer_.empty();
726 }
727
728 void add_request_info(std::shared_ptr<req_info> const& info)
729 {
730 reqs_.push_back(info);
731
732 if (info->req_->has_hello_priority()) {
733 auto rend = std::partition_point(std::rbegin(reqs_), std::rend(reqs_), [](auto const& e) {
734 return e->is_waiting();
735 });
736
737 std::rotate(std::rbegin(reqs_), std::rbegin(reqs_) + 1, rend);
738 }
739
740 if (is_open() && !is_writing())
741 writer_timer_.cancel();
742 }
743
744 template <class CompletionToken, class Logger>
745 auto reader(Logger l, CompletionToken&& token)
746 {
747 return asio::async_compose
748 < CompletionToken
749 , void(system::error_code)
750 >(reader_op<this_type, Logger>{this, l}, token, writer_timer_);
751 }
752
753 template <class CompletionToken, class Logger>
754 auto writer(Logger l, CompletionToken&& token)
755 {
756 return asio::async_compose
757 < CompletionToken
758 , void(system::error_code)
759 >(writer_op<this_type, Logger>{this, l}, token, writer_timer_);
760 }
761
762 template <class Logger, class CompletionToken>
763 auto async_run_lean(config const& cfg, Logger l, CompletionToken token)
764 {
765 runner_.set_config(cfg);
766 l.set_prefix(runner_.get_config().log_prefix);
767 return asio::async_compose
768 < CompletionToken
769 , void(system::error_code)
770 >(run_op<this_type, Logger>{this, l}, token, writer_timer_);
771 }
772
773 [[nodiscard]] bool coalesce_requests()
774 {
775 // Coalesces the requests and marks them staged. After a
776 // successful write staged requests will be marked as written.
777 auto const point = std::partition_point(std::cbegin(reqs_), std::cend(reqs_), [](auto const& ri) {
778 return !ri->is_waiting();
779 });
780
781 std::for_each(point, std::cend(reqs_), [this](auto const& ri) {
782 // Stage the request.
783 write_buffer_ += ri->req_->payload();
784 ri->mark_staged();
785 usage_.commands_sent += ri->expected_responses_;
786 });
787
788 usage_.bytes_sent += std::size(write_buffer_);
789
790 return point != std::cend(reqs_);
791 }
792
793 bool is_waiting_response() const noexcept
794 {
795 if (std::empty(reqs_))
796 return false;
797
798 // Under load and on low-latency networks we might start
799 // receiving responses before the write operation completed and
800 // the request is still maked as staged and not written. See
801 // https://github.com/boostorg/redis/issues/170
802 return !reqs_.front()->is_waiting();
803 }
804
805 void close()
806 {
807 if (stream_->next_layer().is_open()) {
808 system::error_code ec;
809 stream_->next_layer().close(ec);
810 }
811 }
812
813 auto is_open() const noexcept { return stream_->next_layer().is_open(); }
814 auto& lowest_layer() noexcept { return stream_->lowest_layer(); }
815
816 auto is_next_push()
817 {
818 BOOST_ASSERT(!read_buffer_.empty());
819
820 // Useful links to understand the heuristics below.
821 //
822 // - https://github.com/redis/redis/issues/11784
823 // - https://github.com/redis/redis/issues/6426
824 // - https://github.com/boostorg/redis/issues/170
825
826 // The message's resp3 type is a push.
827 if (resp3::to_type(read_buffer_.front()) == resp3::type::push)
828 return true;
829
830 // This is non-push type and the requests queue is empty. I have
831 // noticed this is possible, for example with -MISCONF. I don't
832 // know why they are not sent with a push type so we can
833 // distinguish them from responses to commands. If we are lucky
834 // enough to receive them when the command queue is empty they
835 // can be treated as server pushes, otherwise it is impossible
836 // to handle them properly
837 if (reqs_.empty())
838 return true;
839
840 // The request does not expect any response but we got one. This
841 // may happen if for example, subscribe with wrong syntax.
842 if (reqs_.front()->expected_responses_ == 0)
843 return true;
844
845 // Added to deal with MONITOR and also to fix PR170 which
846 // happens under load and on low-latency networks, where we
847 // might start receiving responses before the write operation
848 // completed and the request is still maked as staged and not
849 // written.
850 return reqs_.front()->is_waiting();
851 }
852
853 auto get_suggested_buffer_growth() const noexcept
854 {
855 return parser_.get_suggested_buffer_growth(4096);
856 }
857
858 enum class parse_result { needs_more, push, resp };
859
860 using parse_ret_type = std::pair<parse_result, std::size_t>;
861
862 parse_ret_type on_finish_parsing(parse_result t)
863 {
864 if (t == parse_result::push) {
865 usage_.pushes_received += 1;
866 usage_.push_bytes_received += parser_.get_consumed();
867 } else {
868 usage_.responses_received += 1;
869 usage_.response_bytes_received += parser_.get_consumed();
870 }
871
872 on_push_ = false;
873 dbuf_.consume(parser_.get_consumed());
874 auto const res = std::make_pair(t, parser_.get_consumed());
875 parser_.reset();
876 return res;
877 }
878
879 parse_ret_type on_read(std::string_view data, system::error_code& ec)
880 {
881 // We arrive here in two states:
882 //
883 // 1. While we are parsing a message. In this case we
884 // don't want to determine the type of the message in the
885 // buffer (i.e. response vs push) but leave it untouched
886 // until the parsing of a complete message ends.
887 //
888 // 2. On a new message, in which case we have to determine
889 // whether the next messag is a push or a response.
890 //
891 if (!on_push_) // Prepare for new message.
892 on_push_ = is_next_push();
893
894 if (on_push_) {
895 if (!resp3::parse(parser_, data, receive_adapter_, ec))
896 return std::make_pair(parse_result::needs_more, 0);
897
898 if (ec)
899 return std::make_pair(parse_result::push, 0);
900
901 return on_finish_parsing(parse_result::push);
902 }
903
904 BOOST_ASSERT_MSG(is_waiting_response(), "Not waiting for a response (using MONITOR command perhaps?)");
905 BOOST_ASSERT(!reqs_.empty());
906 BOOST_ASSERT(reqs_.front() != nullptr);
907 BOOST_ASSERT(reqs_.front()->expected_responses_ != 0);
908
909 if (!resp3::parse(parser_, data, reqs_.front()->adapter_, ec))
910 return std::make_pair(parse_result::needs_more, 0);
911
912 if (ec) {
913 reqs_.front()->ec_ = ec;
914 reqs_.front()->proceed();
915 return std::make_pair(parse_result::resp, 0);
916 }
917
918 reqs_.front()->read_size_ += parser_.get_consumed();
919
920 if (--reqs_.front()->expected_responses_ == 0) {
921 // Done with this request.
922 reqs_.front()->proceed();
923 reqs_.pop_front();
924 }
925
926 return on_finish_parsing(parse_result::resp);
927 }
928
929 void reset()
930 {
931 write_buffer_.clear();
932 read_buffer_.clear();
933 parser_.reset();
934 on_push_ = false;
935 cancel_run_called_ = false;
936 }
937
938 asio::ssl::context ctx_;
939 std::unique_ptr<next_layer_type> stream_;
940
941 // Notice we use a timer to simulate a condition-variable. It is
942 // also more suitable than a channel and the notify operation does
943 // not suspend.
944 timer_type writer_timer_;
945 receive_channel_type receive_channel_;
946 runner_type runner_;
947 receiver_adapter_type receive_adapter_;
948
949 using dyn_buffer_type = asio::dynamic_string_buffer<char, std::char_traits<char>, std::allocator<char>>;
950
951 std::string read_buffer_;
952 dyn_buffer_type dbuf_;
953 std::string write_buffer_;
954 reqs_type reqs_;
955 resp3::parser parser_{};
956 bool on_push_ = false;
957 bool cancel_run_called_ = false;
958
959 usage usage_;
960};
961
962} // boost::redis::detail
963
964#endif // BOOST_REDIS_CONNECTION_BASE_HPP
Base class for high level Redis asynchronous connections.
auto get_executor()
Returns the associated executor.
void reset_stream()
Resets the underlying stream.
asio::ssl::stream< asio::basic_stream_socket< asio::ip::tcp, Executor > > next_layer_type
Type of the next layer.
auto & next_layer() noexcept
Returns a reference to the next layer.
auto const & get_ssl_context() const noexcept
Returns the ssl context.
void cancel(operation op)
Cancels specific operations.
auto const & next_layer() const noexcept
Returns a const reference to the next layer.
connection_base(executor_type ex, asio::ssl::context ctx, std::size_t max_read_size)
Constructs from an executor.
Creates Redis requests.
Definition: request.hpp:46
ignore_t ignore
Global ignore object.
std::tuple< adapter::result< Ts >... > response
Response with compile-time size.
Definition: response.hpp:25
operation
Connection operations that can be cancelled.
Definition: operation.hpp:18
@ sync_receive_push_failed
Can't receive push synchronously without blocking.
@ not_connected
There is no stablished connection.
@ exec
Refers to connection::async_exec operations.
@ all
Refers to all operations.
@ run
Refers to connection::async_run operations.
@ receive
Refers to connection::async_receive operations.
Configure parameters used by the connection classes.
Definition: config.hpp:30
Connection usage information.
Definition: usage.hpp:21
A node in the response tree.
Definition: node.hpp:28