...one of the most highly
regarded and expertly designed C++ library projects in the
world.
— Herb Sutter and Andrei
Alexandrescu, C++
Coding Standards
By now the alert reader is thinking: but surely, with Asio in particular, we ought to be able to do much better than periodic polling pings!
This turns out to be surprisingly tricky. We present a possible approach
in examples/asio/round_robin.hpp
.
One consequence of using Boost.Asio is that you must always let Asio suspend the running thread. Since Asio is aware of pending I/O requests, it can arrange to suspend the thread in such a way that the OS will wake it on I/O completion. No one else has sufficient knowledge.
So the fiber scheduler must depend on Asio for suspension and resumption. It requires Asio handler calls to wake it.
One dismaying implication is that we cannot support multiple threads calling
io_service::run()
on the same io_service
instance.
The reason is that Asio provides no way to constrain a particular handler
to be called only on a specified thread. A fiber scheduler instance is locked
to a particular thread: that instance cannot manage any other thread’s fibers.
Yet if we allow multiple threads to call io_service::run()
on the same io_service
instance,
a fiber scheduler which needs to sleep can have no guarantee that it will
reawaken in a timely manner. It can set an Asio timer, as described above
— but that timer’s handler may well execute on a different thread!
Another implication is that since an Asio-aware fiber scheduler (not to mention
boost::fibers::asio::yield
)
depends on handler calls from the io_service
,
it is the application’s responsibility to ensure that io_service::stop()
is not called until every fiber has terminated.
It is easier to reason about the behavior of the presented asio::round_robin
scheduler if we require that
after initial setup, the thread’s main fiber is the fiber that calls io_service::run()
,
so let’s impose that requirement.
Naturally, the first thing we must do on each thread using a custom fiber
scheduler is call use_scheduling_algorithm()
. However,
since asio::round_robin
requires an io_service
instance, we must first declare that.
std::shared_ptr< boost::asio::io_service > io_svc = std::make_shared< boost::asio::io_service >(); boost::fibers::use_scheduling_algorithm< boost::fibers::asio::round_robin >( io_svc);
use_scheduling_algorithm()
instantiates asio::round_robin
,
which naturally calls its constructor:
round_robin( std::shared_ptr< boost::asio::io_service > const& io_svc) : io_svc_( io_svc), suspend_timer_( * io_svc_) { // We use add_service() very deliberately. This will throw // service_already_exists if you pass the same io_service instance to // more than one round_robin instance. boost::asio::add_service( * io_svc_, new service( * io_svc_) ); io_svc_->post([this]() mutable {
asio::round_robin
binds the passed io_service
pointer and initializes a boost::asio::steady_timer
:
std::shared_ptr< boost::asio::io_service > io_svc_; boost::asio::steady_timer suspend_timer_;
Then it calls boost::asio::add_service()
with a nested service
struct:
struct service : public boost::asio::io_service::service { static boost::asio::io_service::id id; std::unique_ptr< boost::asio::io_service::work > work_; service( boost::asio::io_service & io_svc) : boost::asio::io_service::service( io_svc), work_{ new boost::asio::io_service::work( io_svc) } { } virtual ~service() {} service( service const&) = delete; service & operator=( service const&) = delete; void shutdown_service() override final { work_.reset(); } };
... [asio_rr_service_bottom]
The service
struct has a
couple of roles.
Its foremost role is to manage a std::unique_ptr<
. We want the
boost::asio::io_service::work
>io_service
instance to continue
its main loop even when there is no pending Asio I/O.
But when boost::asio::io_service::service::shutdown_service()
is called, we discard the io_service::work
instance so the io_service
can shut down properly.
Its other purpose is to post()
a lambda (not yet shown). Let’s walk further through the example program before
coming back to explain that lambda.
The service
constructor returns
to asio::round_robin
’s constructor, which returns
to use_scheduling_algorithm()
, which returns to the application code.
Once it has called use_scheduling_algorithm()
, the application may now launch some number
of fibers:
// server tcp::acceptor a( * io_svc, tcp::endpoint( tcp::v4(), 9999) ); boost::fibers::fiber( server, io_svc, std::ref( a) ).detach(); // client const unsigned iterations = 2; const unsigned clients = 3; boost::fibers::barrier b( clients); for ( unsigned i = 0; i < clients; ++i) { boost::fibers::fiber( client, io_svc, std::ref( a), std::ref( b), iterations).detach(); }
Since we don’t specify a launch
, these fibers are ready to run,
but have not yet been entered.
Having set everything up, the application calls io_service::run()
:
io_svc->run();
Now what?
Because this io_service
instance
owns an io_service::work
instance, run()
does not immediately return. But — none of
the fibers that will perform actual work has even been entered yet!
Without that initial post()
call in service
’s
constructor, nothing would happen. The application would
hang right here.
So, what should the post()
handler execute? Simply this_fiber::yield()
?
That would be a promising start. But we have no guarantee that any of the
other fibers will initiate any Asio operations to keep the ball rolling.
For all we know, every other fiber could reach a similar boost::this_fiber::yield()
call first. Control would return to the
post()
handler, which would return to Asio, and... the application would hang.
The post()
handler could post()
itself again. But as discussed in the
previous section, once there are actual I/O operations in flight — once
we reach a state in which no fiber is ready —
that would cause the thread to
spin.
We could, of course, set an Asio timer — again as previously discussed. But in this “deeper dive,” we’re trying to do a little better.
The key to doing better is that since we’re in a fiber, we can run an actual
loop — not just a chain of callbacks. We can wait for “something to happen”
by calling io_service::run_one()
— or we can execute already-queued Asio handlers by calling io_service::poll()
.
Here’s the body of the lambda passed to the post()
call.
while ( ! io_svc_->stopped() ) { if ( has_ready_fibers() ) { // run all pending handlers in round_robin while ( io_svc_->poll() ); // block this fiber till all pending (ready) fibers are processed // == round_robin::suspend_until() has been called std::unique_lock< boost::fibers::mutex > lk( mtx_); cnd_.wait( lk); } else { // run one handler inside io_service // if no handler available, block this thread if ( ! io_svc_->run_one() ) { break; } } }
We want this loop to exit once the io_service
instance has been stopped()
.
As long as there are ready fibers, we interleave running ready Asio handlers with running ready fibers.
If there are no ready fibers, we wait by calling run_one()
. Once any Asio handler has been called
— no matter which — run_one()
returns. That handler may have transitioned some fiber to ready state, so
we loop back to check again.
(We won’t describe awakened()
, pick_next()
or has_ready_fibers()
, as these are just like round_robin::awakened()
,
round_robin::pick_next()
and round_robin::has_ready_fibers()
.)
That leaves suspend_until()
and notify()
.
Doubtless you have been asking yourself: why are we calling io_service::run_one()
in the lambda loop? Why not call it in suspend_until()
, whose very API was designed for just such
a purpose?
Under normal circumstances, when the fiber manager finds no ready fibers,
it calls algorithm::suspend_until()
. Why test has_ready_fibers()
in the lambda loop? Why not leverage the normal mechanism?
The answer is: it matters who’s asking.
Consider the lambda loop shown above. The only Boost.Fiber
APIs it engages are has_ready_fibers()
and this_fiber::yield()
.
yield()
does not block the calling fiber: the calling fiber
does not become unready. It is immediately passed back to algorithm::awakened()
,
to be resumed in its turn when all other ready fibers have had a chance to
run. In other words: during a yield()
call, there is always at least
one ready fiber.
As long as this lambda loop is still running, the fiber manager does not
call suspend_until()
because it always has a fiber ready to run.
However, the lambda loop itself can detect the case when no other fibers are ready to run: the running fiber is not ready but running.
That said, suspend_until()
and notify()
are in fact called during orderly shutdown
processing, so let’s try a plausible implementation.
void suspend_until( std::chrono::steady_clock::time_point const& abs_time) noexcept { // Set a timer so at least one handler will eventually fire, causing // run_one() to eventually return. if ( (std::chrono::steady_clock::time_point::max)() != abs_time) { // Each expires_at(time_point) call cancels any previous pending // call. We could inadvertently spin like this: // dispatcher calls suspend_until() with earliest wake time // suspend_until() sets suspend_timer_ // lambda loop calls run_one() // some other asio handler runs before timer expires // run_one() returns to lambda loop // lambda loop yields to dispatcher // dispatcher finds no ready fibers // dispatcher calls suspend_until() with SAME wake time // suspend_until() sets suspend_timer_ to same time, canceling // previous async_wait() // lambda loop calls run_one() // asio calls suspend_timer_ handler with operation_aborted // run_one() returns to lambda loop... etc. etc. // So only actually set the timer when we're passed a DIFFERENT // abs_time value. suspend_timer_.expires_at( abs_time); suspend_timer_.async_wait([](boost::system::error_code const&){ this_fiber::yield(); }); } cnd_.notify_one(); }
As you might expect, suspend_until()
sets an asio::steady_timer
to expires_at()
the passed std::chrono::steady_clock::time_point
.
Usually.
As indicated in comments, we avoid setting suspend_timer_
multiple times to the same time_point
value since every expires_at()
call cancels any previous async_wait()
call. There is a chance that we could spin. Reaching suspend_until()
means the fiber manager intends to yield
the processor to Asio. Cancelling the previous async_wait()
call would fire its handler, causing run_one()
to return, potentially causing the fiber manager to call suspend_until()
again with the same time_point
value...
Given that we suspend the thread by calling io_service::run_one()
, what’s important is that our async_wait()
call will cause a handler to run, which will cause run_one()
to return. It’s not so important specifically
what that handler does.
void notify() noexcept { // Something has happened that should wake one or more fibers BEFORE // suspend_timer_ expires. Reset the timer to cause it to fire // immediately, causing the run_one() call to return. In theory we // could use cancel() because we don't care whether suspend_timer_'s // handler is called with operation_aborted or success. However -- // cancel() doesn't change the expiration time, and we use // suspend_timer_'s expiration time to decide whether it's already // set. If suspend_until() set some specific wake time, then notify() // canceled it, then suspend_until() was called again with the same // wake time, it would match suspend_timer_'s expiration time and we'd // refrain from setting the timer. So instead of simply calling // cancel(), reset the timer, which cancels the pending sleep AND sets // a new expiration time. This will cause us to spin the loop twice -- // once for the operation_aborted handler, once for timer expiration // -- but that shouldn't be a big problem. suspend_timer_.async_wait([](boost::system::error_code const&){ this_fiber::yield(); }); suspend_timer_.expires_at( std::chrono::steady_clock::now() ); }
Since an expires_at()
call cancels any previous async_wait()
call, we can make notify()
simply call steady_timer::expires_at()
. That should cause the io_service
to call the async_wait()
handler with operation_aborted
.
The comments in notify()
explain why we call expires_at()
rather than cancel()
.
This boost::fibers::asio::round_robin
implementation is used in
examples/asio/autoecho.cpp
.
It seems possible that you could put together a more elegant Fiber / Asio integration. But as noted at the outset: it’s tricky.