Boost C++ Libraries

...one of the most highly regarded and expertly designed C++ library projects in the world. Herb Sutter and Andrei Alexandrescu, C++ Coding Standards

PrevUpHomeNext

Then There’s Boost.Asio

Since the simplest form of Boost.Asio asynchronous operation completion token is a callback function, we could apply the same tactics for Asio as for our hypothetical AsyncAPI asynchronous operations.

Fortunately we need not. Boost.Asio incorporates a mechanism[5] by which the caller can customize the notification behavior of any async operation. Therefore we can construct a completion token which, when passed to a Boost.Asio async operation, requests blocking for the calling fiber.

A typical Asio async function might look something like this:[6]

template < ..., class CompletionToken >
deduced_return_type
async_something( ... , CompletionToken&& token)
{
    // construct handler_type instance from CompletionToken
    handler_type<CompletionToken, ...>::type handler(token);
    // construct async_result instance from handler_type
    async_result<decltype(handler)> result(handler);

    // ... arrange to call handler on completion ...
    // ... initiate actual I/O operation ...

    return result.get();
}

We will engage that mechanism, which is based on specializing Asio’s handler_type<> template for the CompletionToken type and the signature of the specific callback. The remainder of this discussion will refer back to async_something() as the Asio async function under consideration.

The implementation described below uses lower-level facilities than promise and future because the promise mechanism interacts badly with io_service::stop(). It produces broken_promise exceptions.

boost::fibers::asio::yield is a completion token of this kind. yield is an instance of yield_t:

class yield_t {
public:
    yield_t() = default;

    /**
     * @code
     * static yield_t yield;
     * boost::system::error_code myec;
     * func(yield[myec]);
     * @endcode
     * @c yield[myec] returns an instance of @c yield_t whose @c ec_ points
     * to @c myec. The expression @c yield[myec] "binds" @c myec to that
     * (anonymous) @c yield_t instance, instructing @c func() to store any
     * @c error_code it might produce into @c myec rather than throwing @c
     * boost::system::system_error.
     */
    yield_t operator[]( boost::system::error_code & ec) const {
        yield_t tmp;
        tmp.ec_ = & ec;
        return tmp;
    }

//private:
    // ptr to bound error_code instance if any
    boost::system::error_code   *   ec_{ nullptr };
};

yield_t is in fact only a placeholder, a way to trigger Boost.Asio customization. It can bind a boost::system::error_code for use by the actual handler.

yield is declared as:

// canonical instance
thread_local yield_t yield{};

Asio customization is engaged by specializing boost::asio::handler_type<> for yield_t:

[asio_handler_type]

(There are actually four different specializations in detail/yield.hpp, one for each of the four Asio async callback signatures we expect.)

The above directs Asio to use yield_handler as the actual handler for an async operation to which yield is passed. There’s a generic yield_handler<T> implementation and a yield_handler<void> specialization. Let’s start with the <void> specialization:

// yield_handler<void> is like yield_handler<T> without value_. In fact it's
// just like yield_handler_base.
template<>
class yield_handler< void >: public yield_handler_base {
public:
    explicit yield_handler( yield_t const& y) :
        yield_handler_base{ y } {
    }

    // nullary completion callback
    void operator()() {
        ( * this)( boost::system::error_code() );
    }

    // inherit operator()(error_code) overload from base class
    using yield_handler_base::operator();
};

async_something(), having consulted the handler_type<> traits specialization, instantiates a yield_handler<void> to be passed as the actual callback for the async operation. yield_handler’s constructor accepts the yield_t instance (the yield object passed to the async function) and passes it along to yield_handler_base:

// This class encapsulates common elements between yield_handler<T> (capturing
// a value to return from asio async function) and yield_handler<void> (no
// such value). See yield_handler<T> and its <void> specialization below. Both
// yield_handler<T> and yield_handler<void> are passed by value through
// various layers of asio functions. In other words, they're potentially
// copied multiple times. So key data such as the yield_completion instance
// must be stored in our async_result<yield_handler<>> specialization, which
// should be instantiated only once.
class yield_handler_base {
public:
    yield_handler_base( yield_t const& y) :
        // capture the context* associated with the running fiber
        ctx_{ boost::fibers::context::active() },
        // capture the passed yield_t
        yt_( y ) {
    }

    // completion callback passing only (error_code)
    void operator()( boost::system::error_code const& ec) {
        BOOST_ASSERT_MSG( ycomp_,
                          "Must inject yield_completion* "
                          "before calling yield_handler_base::operator()()");
        BOOST_ASSERT_MSG( yt_.ec_,
                          "Must inject boost::system::error_code* "
                          "before calling yield_handler_base::operator()()");
        // If originating fiber is busy testing state_ flag, wait until it
        // has observed (completed != state_).
        yield_completion::lock_t lk{ ycomp_->mtx_ };
        yield_completion::state_t state = ycomp_->state_;
        // Notify a subsequent yield_completion::wait() call that it need not
        // suspend.
        ycomp_->state_ = yield_completion::complete;
        // set the error_code bound by yield_t
        * yt_.ec_ = ec;
        // unlock the lock that protects state_
        lk.unlock();
        // If ctx_ is still active, e.g. because the async operation
        // immediately called its callback (this method!) before the asio
        // async function called async_result_base::get(), we must not set it
        // ready.
        if ( yield_completion::waiting == state) {
            // wake the fiber
            fibers::context::active()->schedule( ctx_);
        }
    }

//private:
    boost::fibers::context      *   ctx_;
    yield_t                         yt_;
    // We depend on this pointer to yield_completion, which will be injected
    // by async_result.
    yield_completion::ptr_t         ycomp_{};
};

yield_handler_base stores a copy of the yield_t instance — which, as shown above, contains only an error_code*. It also captures the context* for the currently-running fiber by calling context::active().

You will notice that yield_handler_base has one more data member (ycomp_) that is initialized to nullptr by its constructor — though its operator()() method relies on ycomp_ being non-null. More on this in a moment.

Having constructed the yield_handler<void> instance, async_something() goes on to construct an async_result specialized for the handler_type<>::type: in this case, async_result<yield_handler<void>>. It passes the yield_handler<void> instance to the new async_result instance.

// Without the need to handle a passed value, our yield_handler<void>
// specialization is just like async_result_base.
template<>
class async_result< boost::fibers::asio::yield_t, void(boost::system::error_code) > :
    public boost::fibers::asio::detail::async_result_base {
public:
    using return_type = void;
    using completion_handler_type = fibers::asio::detail::yield_handler<void>;

    explicit async_result( boost::fibers::asio::detail::yield_handler< void > & h):
        boost::fibers::asio::detail::async_result_base{ h } {
    }
};

Naturally that leads us straight to async_result_base:

// Factor out commonality between async_result<yield_handler<T>> and
// async_result<yield_handler<void>>
class async_result_base {
public:
    explicit async_result_base( yield_handler_base & h) :
            ycomp_{ new yield_completion{} } {
        // Inject ptr to our yield_completion instance into this
        // yield_handler<>.
        h.ycomp_ = this->ycomp_;
        // if yield_t didn't bind an error_code, make yield_handler_base's
        // error_code* point to an error_code local to this object so
        // yield_handler_base::operator() can unconditionally store through
        // its error_code*
        if ( ! h.yt_.ec_) {
            h.yt_.ec_ = & ec_;
        }
    }

    void get() {
        // Unless yield_handler_base::operator() has already been called,
        // suspend the calling fiber until that call.
        ycomp_->wait();
        // The only way our own ec_ member could have a non-default value is
        // if our yield_handler did not have a bound error_code AND the
        // completion callback passed a non-default error_code.
        if ( ec_) {
            throw_exception( boost::system::system_error{ ec_ } );
        }
    }

private:
    // If yield_t does not bind an error_code instance, store into here.
    boost::system::error_code       ec_{};
    yield_completion::ptr_t         ycomp_;
};

This is how yield_handler_base::ycomp_ becomes non-null: async_result_base’s constructor injects a pointer back to its own yield_completion member.

Recall that the canonical yield_t instance yield initializes its error_code* member ec_ to nullptr. If this instance is passed to async_something() (ec_ is still nullptr), the copy stored in yield_handler_base will likewise have null ec_. async_result_base’s constructor sets yield_handler_base’s yield_t’s ec_ member to point to its own error_code member.

The stage is now set. async_something() initiates the actual async operation, arranging to call its yield_handler<void> instance on completion. Let’s say, for the sake of argument, that the actual async operation’s callback has signature void(error_code).

But since it’s an async operation, control returns at once to async_something(). async_something() calls async_result<yield_handler<void>>::get(), and will return its return value.

async_result<yield_handler<void>>::get() inherits async_result_base::get().

async_result_base::get() immediately calls yield_completion::wait().

// Bundle a completion bool flag with a spinlock to protect it.
struct yield_completion {
    enum state_t {
        init,
        waiting,
        complete
    };

    typedef fibers::detail::spinlock                    mutex_t;
    typedef std::unique_lock< mutex_t >                 lock_t;
    typedef boost::intrusive_ptr< yield_completion >    ptr_t;

    std::atomic< std::size_t >  use_count_{ 0 };
    mutex_t                     mtx_{};
    state_t                     state_{ init };

    void wait() {
        // yield_handler_base::operator()() will set state_ `complete` and
        // attempt to wake a suspended fiber. It would be Bad if that call
        // happened between our detecting (complete != state_) and suspending.
        lock_t lk{ mtx_ };
        // If state_ is already set, we're done here: don't suspend.
        if ( complete != state_) {
            state_ = waiting;
            // suspend(unique_lock<spinlock>) unlocks the lock in the act of
            // resuming another fiber
            fibers::context::active()->suspend( lk);
        }
    }

    friend void intrusive_ptr_add_ref( yield_completion * yc) noexcept {
        BOOST_ASSERT( nullptr != yc);
        yc->use_count_.fetch_add( 1, std::memory_order_relaxed);
    }

    friend void intrusive_ptr_release( yield_completion * yc) noexcept {
        BOOST_ASSERT( nullptr != yc);
        if ( 1 == yc->use_count_.fetch_sub( 1, std::memory_order_release) ) {
            std::atomic_thread_fence( std::memory_order_acquire);
            delete yc;
        }
    }
};

Supposing that the pending async operation has not yet completed, yield_completion::completed_ will still be false, and wait() will call context::suspend() on the currently-running fiber.

Other fibers will now have a chance to run.

Some time later, the async operation completes. It calls yield_handler<void>::operator()(error_code const&) with an error_code indicating either success or failure. We’ll consider both cases.

yield_handler<void> explicitly inherits operator()(error_code const&) from yield_handler_base.

yield_handler_base::operator()(error_code const&) first sets yield_completion::completed_ true. This way, if async_something()’s async operation completes immediately — if yield_handler_base::operator() is called even before async_result_base::get() — the calling fiber will not suspend.

The actual error_code produced by the async operation is then stored through the stored yield_t::ec_ pointer. If async_something()’s caller used (e.g.) yield[my_ec] to bind a local error_code instance, the actual error_code value is stored into the caller’s variable. Otherwise, it is stored into async_result_base::ec_.

If the stored fiber context yield_handler_base::ctx_ is not already running, it is marked as ready to run by passing it to context::schedule(). Control then returns from yield_handler_base::operator(): the callback is done.

In due course, that fiber is resumed. Control returns from context::suspend() to yield_completion::wait(), which returns to async_result_base::get().

The case in which async_something()’s completion callback has signature void() is similar. yield_handler<void>::operator()() invokes the machinery above with a success error_code.

A completion callback with signature void(error_code, T) (that is: in addition to error_code, callback receives some data item) is handled somewhat differently. For this kind of signature, handler_type<>::type specifies yield_handler<T> (for T other than void).

A yield_handler<T> reserves a value_ pointer to a value of type T:

// asio uses handler_type<completion token type, signature>::type to decide
// what to instantiate as the actual handler. Below, we specialize
// handler_type< yield_t, ... > to indicate yield_handler<>. So when you pass
// an instance of yield_t as an asio completion token, asio selects
// yield_handler<> as the actual handler class.
template< typename T >
class yield_handler: public yield_handler_base {
public:
    // asio passes the completion token to the handler constructor
    explicit yield_handler( yield_t const& y) :
        yield_handler_base{ y } {
    }

    // completion callback passing only value (T)
    void operator()( T t) {
        // just like callback passing success error_code
        (*this)( boost::system::error_code(), std::move(t) );
    }

    // completion callback passing (error_code, T)
    void operator()( boost::system::error_code const& ec, T t) {
        BOOST_ASSERT_MSG( value_,
                          "Must inject value ptr "
                          "before caling yield_handler<T>::operator()()");
        // move the value to async_result<> instance BEFORE waking up a
        // suspended fiber
        * value_ = std::move( t);
        // forward the call to base-class completion handler
        yield_handler_base::operator()( ec);
    }

//private:
    // pointer to destination for eventual value
    // this must be injected by async_result before operator()() is called
    T   *   value_{ nullptr };
};

This pointer is initialized to nullptr.

When async_something() instantiates async_result<yield_handler<T>>:

// asio constructs an async_result<> instance from the yield_handler specified
// by handler_type<>::type. A particular asio async method constructs the
// yield_handler, constructs this async_result specialization from it, then
// returns the result of calling its get() method.
template< typename ReturnType, typename T >
class async_result< boost::fibers::asio::yield_t, ReturnType(boost::system::error_code, T) > :
    public boost::fibers::asio::detail::async_result_base {
public:
    // type returned by get()
    using return_type = T;
    using completion_handler_type = fibers::asio::detail::yield_handler<T>;

    explicit async_result( boost::fibers::asio::detail::yield_handler< T > & h) :
        boost::fibers::asio::detail::async_result_base{ h } {
        // Inject ptr to our value_ member into yield_handler<>: result will
        // be stored here.
        h.value_ = & value_;
    }

    // asio async method returns result of calling get()
    return_type get() {
        boost::fibers::asio::detail::async_result_base::get();
        return std::move( value_);
    }

private:
    return_type value_{};
};

this async_result<> specialization reserves a member of type T to receive the passed data item, and sets yield_handler<T>::value_ to point to its own data member.

async_result<yield_handler<T>> overrides get(). The override calls async_result_base::get(), so the calling fiber suspends as described above.

yield_handler<T>::operator()(error_code, T) stores its passed T value into async_result<yield_handler<T>>::value_.

Then it passes control to yield_handler_base::operator()(error_code) to deal with waking the original fiber as described above.

When async_result<yield_handler<T>>::get() resumes, it returns the stored value_ to async_something() and ultimately to async_something()’s caller.

The case of a callback signature void(T) is handled by having yield_handler<T>::operator()(T) engage the void(error_code, T) machinery, passing a success error_code.

The source code above is found in yield.hpp and detail/yield.hpp.



[5] This mechanism has been proposed as a conventional way to allow the caller of an arbitrary async function to specify completion handling: N4045.


PrevUpHomeNext