Boost C++ Libraries

...one of the most highly regarded and expertly designed C++ library projects in the world. Herb Sutter and Andrei Alexandrescu, C++ Coding Standards

This is the documentation for an old version of Boost. Click here to view this page for the latest version.
Loading...
Searching...
No Matches
runner.hpp
1/* Copyright (c) 2018-2023 Marcelo Zimbres Silva (mzimbres@gmail.com)
2 *
3 * Distributed under the Boost Software License, Version 1.0. (See
4 * accompanying file LICENSE.txt)
5 */
6
7#ifndef BOOST_REDIS_RUNNER_HPP
8#define BOOST_REDIS_RUNNER_HPP
9
10#include <boost/redis/detail/health_checker.hpp>
11#include <boost/redis/config.hpp>
12#include <boost/redis/response.hpp>
13#include <boost/redis/detail/helper.hpp>
14#include <boost/redis/error.hpp>
15#include <boost/redis/logger.hpp>
16#include <boost/redis/operation.hpp>
17#include <boost/redis/detail/connector.hpp>
18#include <boost/redis/detail/resolver.hpp>
19#include <boost/redis/detail/handshaker.hpp>
20#include <boost/asio/compose.hpp>
21#include <boost/asio/connect.hpp>
22#include <boost/asio/coroutine.hpp>
23#include <boost/asio/experimental/parallel_group.hpp>
24#include <boost/asio/ip/tcp.hpp>
25#include <boost/asio/steady_timer.hpp>
26#include <string>
27#include <memory>
28#include <chrono>
29
30namespace boost::redis::detail
31{
32
33void push_hello(config const& cfg, request& req);
34
35template <class Runner, class Connection, class Logger>
36struct hello_op {
37 Runner* runner_ = nullptr;
38 Connection* conn_ = nullptr;
39 Logger logger_;
40 asio::coroutine coro_{};
41
42 template <class Self>
43 void operator()(Self& self, system::error_code ec = {}, std::size_t = 0)
44 {
45 BOOST_ASIO_CORO_REENTER (coro_)
46 {
47 runner_->add_hello();
48
49 BOOST_ASIO_CORO_YIELD
50 conn_->async_exec(runner_->hello_req_, runner_->hello_resp_, std::move(self));
51 logger_.on_hello(ec, runner_->hello_resp_);
52
53 if (ec || runner_->has_error_in_response() || is_cancelled(self)) {
54 logger_.trace("hello-op: error/canceled. Exiting ...");
55 conn_->cancel(operation::run);
56 self.complete(!!ec ? ec : asio::error::operation_aborted);
57 return;
58 }
59
60 self.complete({});
61 }
62 }
63};
64
65template <class Runner, class Connection, class Logger>
66class runner_op {
67private:
68 Runner* runner_ = nullptr;
69 Connection* conn_ = nullptr;
70 Logger logger_;
71 asio::coroutine coro_{};
72
73public:
74 runner_op(Runner* runner, Connection* conn, Logger l)
75 : runner_{runner}
76 , conn_{conn}
77 , logger_{l}
78 {}
79
80 template <class Self>
81 void operator()( Self& self
82 , std::array<std::size_t, 3> order = {}
83 , system::error_code ec0 = {}
84 , system::error_code ec1 = {}
85 , system::error_code ec2 = {}
86 , std::size_t = 0)
87 {
88 BOOST_ASIO_CORO_REENTER (coro_)
89 {
90 BOOST_ASIO_CORO_YIELD
91 asio::experimental::make_parallel_group(
92 [this](auto token) { return runner_->async_run_all(*conn_, logger_, token); },
93 [this](auto token) { return runner_->health_checker_.async_check_health(*conn_, logger_, token); },
94 [this](auto token) { return runner_->async_hello(*conn_, logger_, token); }
95 ).async_wait(
96 asio::experimental::wait_for_all(),
97 std::move(self));
98
99 logger_.on_runner(ec0, ec1, ec2);
100
101 if (is_cancelled(self)) {
102 self.complete(asio::error::operation_aborted);
103 return;
104 }
105
106 if (ec0 == error::connect_timeout || ec0 == error::resolve_timeout) {
107 self.complete(ec0);
108 return;
109 }
110
111 if (order[0] == 2 && !!ec2) {
112 self.complete(ec2);
113 return;
114 }
115
116 if (order[0] == 1 && ec1 == error::pong_timeout) {
117 self.complete(ec1);
118 return;
119 }
120
121 self.complete(ec0);
122 }
123 }
124};
125
126template <class Runner, class Connection, class Logger>
127struct run_all_op {
128 Runner* runner_ = nullptr;
129 Connection* conn_ = nullptr;
130 Logger logger_;
131 asio::coroutine coro_{};
132
133 template <class Self>
134 void operator()(Self& self, system::error_code ec = {}, std::size_t = 0)
135 {
136 BOOST_ASIO_CORO_REENTER (coro_)
137 {
138 BOOST_ASIO_CORO_YIELD
139 runner_->resv_.async_resolve(std::move(self));
140 logger_.on_resolve(ec, runner_->resv_.results());
141 BOOST_REDIS_CHECK_OP0(conn_->cancel(operation::run);)
142
143 BOOST_ASIO_CORO_YIELD
144 runner_->ctor_.async_connect(conn_->next_layer().next_layer(), runner_->resv_.results(), std::move(self));
145 logger_.on_connect(ec, runner_->ctor_.endpoint());
146 BOOST_REDIS_CHECK_OP0(conn_->cancel(operation::run);)
147
148 if (conn_->use_ssl()) {
149 BOOST_ASIO_CORO_YIELD
150 runner_->hsher_.async_handshake(conn_->next_layer(), std::move(self));
151 logger_.on_ssl_handshake(ec);
152 BOOST_REDIS_CHECK_OP0(conn_->cancel(operation::run);)
153 }
154
155 BOOST_ASIO_CORO_YIELD
156 conn_->async_run_lean(runner_->cfg_, logger_, std::move(self));
157 BOOST_REDIS_CHECK_OP0(;)
158 self.complete(ec);
159 }
160 }
161};
162
163template <class Executor>
164class runner {
165public:
166 runner(Executor ex, config cfg)
167 : resv_{ex}
168 , ctor_{ex}
169 , hsher_{ex}
170 , health_checker_{ex}
171 , cfg_{cfg}
172 { }
173
174 std::size_t cancel(operation op)
175 {
176 resv_.cancel(op);
177 ctor_.cancel(op);
178 hsher_.cancel(op);
179 health_checker_.cancel(op);
180 return 0U;
181 }
182
183 void set_config(config const& cfg)
184 {
185 cfg_ = cfg;
186 resv_.set_config(cfg);
187 ctor_.set_config(cfg);
188 hsher_.set_config(cfg);
189 health_checker_.set_config(cfg);
190 }
191
192 template <class Connection, class Logger, class CompletionToken>
193 auto async_run(Connection& conn, Logger l, CompletionToken token)
194 {
195 return asio::async_compose
196 < CompletionToken
197 , void(system::error_code)
198 >(runner_op<runner, Connection, Logger>{this, &conn, l}, token, conn);
199 }
200
201 config const& get_config() const noexcept {return cfg_;}
202
203private:
204 using resolver_type = resolver<Executor>;
205 using connector_type = connector<Executor>;
206 using handshaker_type = detail::handshaker<Executor>;
207 using health_checker_type = health_checker<Executor>;
208 using timer_type = typename connector_type::timer_type;
209
210 template <class, class, class> friend struct run_all_op;
211 template <class, class, class> friend class runner_op;
212 template <class, class, class> friend struct hello_op;
213
214 template <class Connection, class Logger, class CompletionToken>
215 auto async_run_all(Connection& conn, Logger l, CompletionToken token)
216 {
217 return asio::async_compose
218 < CompletionToken
219 , void(system::error_code)
220 >(run_all_op<runner, Connection, Logger>{this, &conn, l}, token, conn);
221 }
222
223 template <class Connection, class Logger, class CompletionToken>
224 auto async_hello(Connection& conn, Logger l, CompletionToken token)
225 {
226 return asio::async_compose
227 < CompletionToken
228 , void(system::error_code)
229 >(hello_op<runner, Connection, Logger>{this, &conn, l}, token, conn);
230 }
231
232 void add_hello()
233 {
234 hello_req_.clear();
235 if (hello_resp_.has_value())
236 hello_resp_.value().clear();
237 push_hello(cfg_, hello_req_);
238 }
239
240 bool has_error_in_response() const noexcept
241 {
242 if (!hello_resp_.has_value())
243 return true;
244
245 auto f = [](auto const& e)
246 {
247 switch (e.data_type) {
249 case resp3::type::blob_error: return true;
250 default: return false;
251 }
252 };
253
254 return std::any_of(std::cbegin(hello_resp_.value()), std::cend(hello_resp_.value()), f);
255 }
256
257 resolver_type resv_;
258 connector_type ctor_;
259 handshaker_type hsher_;
260 health_checker_type health_checker_;
261 request hello_req_;
262 generic_response hello_resp_;
263 config cfg_;
264};
265
266} // boost::redis::detail
267
268#endif // BOOST_REDIS_RUNNER_HPP
adapter::result< std::vector< resp3::node > > generic_response
A generic response to a request.
Definition: response.hpp:35
operation
Connection operations that can be cancelled.
Definition: operation.hpp:18
@ resolve_timeout
Resolve timeout.
@ pong_timeout
Connect timeout.
@ connect_timeout
Connect timeout.
@ run
Refers to connection::async_run operations.