7#ifndef BOOST_REDIS_HEALTH_CHECKER_HPP
8#define BOOST_REDIS_HEALTH_CHECKER_HPP
11#include <boost/redis/request.hpp>
12#include <boost/redis/response.hpp>
13#include <boost/redis/operation.hpp>
14#include <boost/redis/detail/helper.hpp>
15#include <boost/redis/config.hpp>
16#include <boost/asio/steady_timer.hpp>
17#include <boost/asio/compose.hpp>
18#include <boost/asio/consign.hpp>
19#include <boost/asio/coroutine.hpp>
20#include <boost/asio/post.hpp>
21#include <boost/asio/experimental/parallel_group.hpp>
25namespace boost::redis::detail {
27template <
class HealthChecker,
class Connection,
class Logger>
30 HealthChecker* checker_ =
nullptr;
31 Connection* conn_ =
nullptr;
33 asio::coroutine coro_{};
36 void operator()(Self& self, system::error_code ec = {}, std::size_t = 0)
38 BOOST_ASIO_CORO_REENTER (coro_)
for (;;)
40 if (checker_->checker_has_exited_) {
41 logger_.trace(
"ping_op: checker has exited. Exiting ...");
47 conn_->async_exec(checker_->req_, checker_->resp_, std::move(self));
48 if (ec || is_cancelled(self)) {
49 logger_.trace(
"ping_op: error/cancelled (1).");
50 checker_->wait_timer_.cancel();
51 self.complete(!!ec ? ec : asio::error::operation_aborted);
56 checker_->ping_timer_.expires_after(checker_->ping_interval_);
58 checker_->ping_timer_.async_wait(std::move(self));
59 if (ec || is_cancelled(self)) {
60 logger_.trace(
"ping_op: error/cancelled (2).");
61 self.complete(!!ec ? ec : asio::error::operation_aborted);
68template <
class HealthChecker,
class Connection,
class Logger>
69class check_timeout_op {
71 HealthChecker* checker_ =
nullptr;
72 Connection* conn_ =
nullptr;
74 asio::coroutine coro_{};
77 void operator()(Self& self, system::error_code ec = {})
79 BOOST_ASIO_CORO_REENTER (coro_)
for (;;)
81 checker_->wait_timer_.expires_after(2 * checker_->ping_interval_);
83 checker_->wait_timer_.async_wait(std::move(self));
84 if (ec || is_cancelled(self)) {
85 logger_.trace(
"check-timeout-op: error/canceled. Exiting ...");
86 self.complete(!!ec ? ec : asio::error::operation_aborted);
90 if (checker_->resp_.has_error()) {
91 logger_.trace(
"check-timeout-op: Response error. Exiting ...");
96 if (checker_->resp_.value().empty()) {
97 logger_.trace(
"check-timeout-op: Response has no value. Exiting ...");
98 checker_->ping_timer_.cancel();
100 checker_->checker_has_exited_ =
true;
105 if (checker_->resp_.has_value()) {
106 checker_->resp_.value().clear();
112template <
class HealthChecker,
class Connection,
class Logger>
113class check_health_op {
115 HealthChecker* checker_ =
nullptr;
116 Connection* conn_ =
nullptr;
118 asio::coroutine coro_{};
120 template <
class Self>
124 std::array<std::size_t, 2> order = {},
125 system::error_code ec1 = {},
126 system::error_code ec2 = {})
128 BOOST_ASIO_CORO_REENTER (coro_)
130 if (checker_->ping_interval_ == std::chrono::seconds::zero()) {
131 logger_.trace(
"check-health-op: timeout disabled.");
132 BOOST_ASIO_CORO_YIELD
133 asio::post(std::move(self));
138 BOOST_ASIO_CORO_YIELD
139 asio::experimental::make_parallel_group(
140 [
this](
auto token) {
return checker_->async_ping(*conn_, logger_, token); },
141 [
this](
auto token) {
return checker_->async_check_timeout(*conn_, logger_, token);}
143 asio::experimental::wait_for_one(),
146 logger_.on_check_health(ec1, ec2);
148 if (is_cancelled(self)) {
149 logger_.trace(
"check-health-op: canceled. Exiting ...");
150 self.complete(asio::error::operation_aborted);
155 case 0: self.complete(ec1);
return;
156 case 1: self.complete(ec2);
return;
157 default: BOOST_ASSERT(
false);
163template <
class Executor>
164class health_checker {
167 asio::basic_waitable_timer<
168 std::chrono::steady_clock,
169 asio::wait_traits<std::chrono::steady_clock>,
173 health_checker(Executor ex)
177 req_.push(
"PING",
"Boost.Redis");
180 void set_config(config
const& cfg)
183 req_.push(
"PING", cfg.health_check_id);
184 ping_interval_ = cfg.health_check_interval;
190 class CompletionToken = asio::default_completion_token_t<Executor>
196 CompletionToken token = CompletionToken{})
198 checker_has_exited_ =
false;
199 return asio::async_compose
201 , void(system::error_code)
202 >(check_health_op<health_checker, Connection, Logger>{
this, &conn, l}, token, conn);
210 ping_timer_.cancel();
211 wait_timer_.cancel();
220 template <
class Connection,
class Logger,
class CompletionToken>
221 auto async_ping(Connection& conn, Logger l, CompletionToken token)
223 return asio::async_compose
225 , void(system::error_code)
226 >(ping_op<health_checker, Connection, Logger>{
this, &conn, l}, token, conn, ping_timer_);
229 template <
class Connection,
class Logger,
class CompletionToken>
230 auto async_check_timeout(Connection& conn, Logger l, CompletionToken token)
232 return asio::async_compose
234 , void(system::error_code)
235 >(check_timeout_op<health_checker, Connection, Logger>{
this, &conn, l}, token, conn, wait_timer_);
238 template <
class,
class,
class>
friend class ping_op;
239 template <
class,
class,
class>
friend class check_timeout_op;
240 template <
class,
class,
class>
friend class check_health_op;
242 timer_type ping_timer_;
243 timer_type wait_timer_;
246 std::chrono::steady_clock::duration ping_interval_ = std::chrono::seconds{5};
247 bool checker_has_exited_ =
false;
adapter::result< std::vector< resp3::node > > generic_response
A generic response to a request.
operation
Connection operations that can be cancelled.
@ pong_timeout
Connect timeout.
@ health_check
Health check operation.
@ all
Refers to all operations.
@ run
Refers to connection::async_run operations.