Boost C++ Libraries

...one of the most highly regarded and expertly designed C++ library projects in the world. Herb Sutter and Andrei Alexandrescu, C++ Coding Standards

This is the documentation for an old version of Boost. Click here to view this page for the latest version.
Loading...
Searching...
No Matches
health_checker.hpp
1/* Copyright (c) 2018-2023 Marcelo Zimbres Silva (mzimbres@gmail.com)
2 *
3 * Distributed under the Boost Software License, Version 1.0. (See
4 * accompanying file LICENSE.txt)
5 */
6
7#ifndef BOOST_REDIS_HEALTH_CHECKER_HPP
8#define BOOST_REDIS_HEALTH_CHECKER_HPP
9
10// Has to included before promise.hpp to build on msvc.
11#include <boost/redis/request.hpp>
12#include <boost/redis/response.hpp>
13#include <boost/redis/operation.hpp>
14#include <boost/redis/detail/helper.hpp>
15#include <boost/redis/config.hpp>
16#include <boost/asio/steady_timer.hpp>
17#include <boost/asio/compose.hpp>
18#include <boost/asio/consign.hpp>
19#include <boost/asio/coroutine.hpp>
20#include <boost/asio/post.hpp>
21#include <boost/asio/experimental/parallel_group.hpp>
22#include <memory>
23#include <chrono>
24
25namespace boost::redis::detail {
26
27template <class HealthChecker, class Connection, class Logger>
28class ping_op {
29public:
30 HealthChecker* checker_ = nullptr;
31 Connection* conn_ = nullptr;
32 Logger logger_;
33 asio::coroutine coro_{};
34
35 template <class Self>
36 void operator()(Self& self, system::error_code ec = {}, std::size_t = 0)
37 {
38 BOOST_ASIO_CORO_REENTER (coro_) for (;;)
39 {
40 if (checker_->checker_has_exited_) {
41 logger_.trace("ping_op: checker has exited. Exiting ...");
42 self.complete({});
43 return;
44 }
45
46 BOOST_ASIO_CORO_YIELD
47 conn_->async_exec(checker_->req_, checker_->resp_, std::move(self));
48 if (ec || is_cancelled(self)) {
49 logger_.trace("ping_op: error/cancelled (1).");
50 checker_->wait_timer_.cancel();
51 self.complete(!!ec ? ec : asio::error::operation_aborted);
52 return;
53 }
54
55 // Wait before pinging again.
56 checker_->ping_timer_.expires_after(checker_->ping_interval_);
57 BOOST_ASIO_CORO_YIELD
58 checker_->ping_timer_.async_wait(std::move(self));
59 if (ec || is_cancelled(self)) {
60 logger_.trace("ping_op: error/cancelled (2).");
61 self.complete(!!ec ? ec : asio::error::operation_aborted);
62 return;
63 }
64 }
65 }
66};
67
68template <class HealthChecker, class Connection, class Logger>
69class check_timeout_op {
70public:
71 HealthChecker* checker_ = nullptr;
72 Connection* conn_ = nullptr;
73 Logger logger_;
74 asio::coroutine coro_{};
75
76 template <class Self>
77 void operator()(Self& self, system::error_code ec = {})
78 {
79 BOOST_ASIO_CORO_REENTER (coro_) for (;;)
80 {
81 checker_->wait_timer_.expires_after(2 * checker_->ping_interval_);
82 BOOST_ASIO_CORO_YIELD
83 checker_->wait_timer_.async_wait(std::move(self));
84 if (ec || is_cancelled(self)) {
85 logger_.trace("check-timeout-op: error/canceled. Exiting ...");
86 self.complete(!!ec ? ec : asio::error::operation_aborted);
87 return;
88 }
89
90 if (checker_->resp_.has_error()) {
91 logger_.trace("check-timeout-op: Response error. Exiting ...");
92 self.complete({});
93 return;
94 }
95
96 if (checker_->resp_.value().empty()) {
97 logger_.trace("check-timeout-op: Response has no value. Exiting ...");
98 checker_->ping_timer_.cancel();
99 conn_->cancel(operation::run);
100 checker_->checker_has_exited_ = true;
101 self.complete(error::pong_timeout);
102 return;
103 }
104
105 if (checker_->resp_.has_value()) {
106 checker_->resp_.value().clear();
107 }
108 }
109 }
110};
111
112template <class HealthChecker, class Connection, class Logger>
113class check_health_op {
114public:
115 HealthChecker* checker_ = nullptr;
116 Connection* conn_ = nullptr;
117 Logger logger_;
118 asio::coroutine coro_{};
119
120 template <class Self>
121 void
122 operator()(
123 Self& self,
124 std::array<std::size_t, 2> order = {},
125 system::error_code ec1 = {},
126 system::error_code ec2 = {})
127 {
128 BOOST_ASIO_CORO_REENTER (coro_)
129 {
130 if (checker_->ping_interval_ == std::chrono::seconds::zero()) {
131 logger_.trace("check-health-op: timeout disabled.");
132 BOOST_ASIO_CORO_YIELD
133 asio::post(std::move(self));
134 self.complete({});
135 return;
136 }
137
138 BOOST_ASIO_CORO_YIELD
139 asio::experimental::make_parallel_group(
140 [this](auto token) { return checker_->async_ping(*conn_, logger_, token); },
141 [this](auto token) { return checker_->async_check_timeout(*conn_, logger_, token);}
142 ).async_wait(
143 asio::experimental::wait_for_one(),
144 std::move(self));
145
146 logger_.on_check_health(ec1, ec2);
147
148 if (is_cancelled(self)) {
149 logger_.trace("check-health-op: canceled. Exiting ...");
150 self.complete(asio::error::operation_aborted);
151 return;
152 }
153
154 switch (order[0]) {
155 case 0: self.complete(ec1); return;
156 case 1: self.complete(ec2); return;
157 default: BOOST_ASSERT(false);
158 }
159 }
160 }
161};
162
163template <class Executor>
164class health_checker {
165private:
166 using timer_type =
167 asio::basic_waitable_timer<
168 std::chrono::steady_clock,
169 asio::wait_traits<std::chrono::steady_clock>,
170 Executor>;
171
172public:
173 health_checker(Executor ex)
174 : ping_timer_{ex}
175 , wait_timer_{ex}
176 {
177 req_.push("PING", "Boost.Redis");
178 }
179
180 void set_config(config const& cfg)
181 {
182 req_.clear();
183 req_.push("PING", cfg.health_check_id);
184 ping_interval_ = cfg.health_check_interval;
185 }
186
187 template <
188 class Connection,
189 class Logger,
190 class CompletionToken = asio::default_completion_token_t<Executor>
191 >
192 auto
193 async_check_health(
194 Connection& conn,
195 Logger l,
196 CompletionToken token = CompletionToken{})
197 {
198 checker_has_exited_ = false;
199 return asio::async_compose
200 < CompletionToken
201 , void(system::error_code)
202 >(check_health_op<health_checker, Connection, Logger>{this, &conn, l}, token, conn);
203 }
204
205 std::size_t cancel(operation op)
206 {
207 switch (op) {
209 case operation::all:
210 ping_timer_.cancel();
211 wait_timer_.cancel();
212 break;
213 default: /* ignore */;
214 }
215
216 return 0;
217 }
218
219private:
220 template <class Connection, class Logger, class CompletionToken>
221 auto async_ping(Connection& conn, Logger l, CompletionToken token)
222 {
223 return asio::async_compose
224 < CompletionToken
225 , void(system::error_code)
226 >(ping_op<health_checker, Connection, Logger>{this, &conn, l}, token, conn, ping_timer_);
227 }
228
229 template <class Connection, class Logger, class CompletionToken>
230 auto async_check_timeout(Connection& conn, Logger l, CompletionToken token)
231 {
232 return asio::async_compose
233 < CompletionToken
234 , void(system::error_code)
235 >(check_timeout_op<health_checker, Connection, Logger>{this, &conn, l}, token, conn, wait_timer_);
236 }
237
238 template <class, class, class> friend class ping_op;
239 template <class, class, class> friend class check_timeout_op;
240 template <class, class, class> friend class check_health_op;
241
242 timer_type ping_timer_;
243 timer_type wait_timer_;
244 redis::request req_;
246 std::chrono::steady_clock::duration ping_interval_ = std::chrono::seconds{5};
247 bool checker_has_exited_ = false;
248};
249
250} // boost::redis::detail
251
252#endif // BOOST_REDIS_HEALTH_CHECKER_HPP
adapter::result< std::vector< resp3::node > > generic_response
A generic response to a request.
Definition: response.hpp:35
operation
Connection operations that can be cancelled.
Definition: operation.hpp:18
@ pong_timeout
Connect timeout.
@ health_check
Health check operation.
@ all
Refers to all operations.
@ run
Refers to connection::async_run operations.