Boost C++ Libraries

...one of the most highly regarded and expertly designed C++ library projects in the world. Herb Sutter and Andrei Alexandrescu, C++ Coding Standards

PrevUpHomeNext

when_any, produce first success

One scenario for when_any functionality is when we're redundantly contacting some number of possibly-unreliable web services. Not only might they be slow — any one of them might produce a failure rather than the desired result.

In such a case, wait_first_outcome() isn't the right approach. If one of the services produces an error quickly, while another follows up with a real answer, we don't want to prefer the error just because it arrived first!

Given the queue< future< T > > we already constructed for wait_first_outcome(), though, we can readily recast the interface function to deliver the first successful result.

That does beg the question: what if all the task functions throw an exception? In that case we'd probably better know about it.

The C++ Parallelism Draft Technical Specification proposes a std::exception_list exception capable of delivering a collection of std::exception_ptrs. Until that becomes universally available, let's fake up an exception_list of our own:

class exception_list : public std::runtime_error {
public:
    exception_list( std::string const& what) :
        std::runtime_error( what) {
    }

    typedef std::vector< std::exception_ptr >   bundle_t;

    // N4407 proposed std::exception_list API
    typedef bundle_t::const_iterator iterator;

    std::size_t size() const noexcept {
        return bundle_.size();
    }

    iterator begin() const noexcept {
        return bundle_.begin();
    }

    iterator end() const noexcept {
        return bundle_.end();
    }

    // extension to populate
    void add( std::exception_ptr ep) {
        bundle_.push_back( ep);
    }

private:
    bundle_t bundle_;
};

Now we can build wait_first_success(), using wait_first_outcome_impl().

Instead of retrieving only the first future<> from the queue, we must now loop over future<> items. Of course we must limit that iteration! If we launch only count producer fibers, the (count+1)st buffered_channel::pop() call would block forever.

Given a ready future<>, we can distinguish failure by calling future::get_exception_ptr(). If the future<> in fact contains a result rather than an exception, get_exception_ptr() returns nullptr. In that case, we can confidently call future::get() to return that result to our caller.

If the std::exception_ptr is not nullptr, though, we collect it into our pending exception_list and loop back for the next future<> from the queue.

If we fall out of the loop — if every single task fiber threw an exception — we throw the exception_list exception into which we've been collecting those std::exception_ptrs.

template< typename Fn, typename ... Fns >
typename std::result_of< Fn() >::type
wait_first_success( Fn && function, Fns && ... functions) {
    std::size_t count( 1 + sizeof ... ( functions) );
    // In this case, the value we pass through the channel is actually a
    // future -- which is already ready. future can carry either a value or an
    // exception.
    typedef typename std::result_of< typename std::decay< Fn >::type() >::type return_t;
    typedef boost::fibers::future< return_t > future_t;
    typedef boost::fibers::buffered_channel< future_t > channel_t;
    auto chanp( std::make_shared< channel_t >( 64) );
    // launch all the relevant fibers
    wait_first_outcome_impl< return_t >( chanp,
                                         std::forward< Fn >( function),
                                         std::forward< Fns >( functions) ... );
    // instantiate exception_list, just in case
    exception_list exceptions("wait_first_success() produced only errors");
    // retrieve up to 'count' results -- but stop there!
    for ( std::size_t i = 0; i < count; ++i) {
        // retrieve the next future
        future_t future( chanp->value_pop() );
        // retrieve exception_ptr if any
        std::exception_ptr error( future.get_exception_ptr() );
        // if no error, then yay, return value
        if ( ! error) {
            // close the channel: no subsequent push() has to succeed
            chanp->close();
            // show caller the value we got
            return future.get();
        }

        // error is non-null: collect
        exceptions.add( error);
    }
    // We only arrive here when every passed function threw an exception.
    // Throw our collection to inform caller.
    throw exceptions;
}

A call might look like this:

std::string result = wait_first_success(
            [](){ return sleeper("wfss_first",   50, true); },
            [](){ return sleeper("wfss_second", 100); },
            [](){ return sleeper("wfss_third",  150); });
std::cout << "wait_first_success(success) => " << result << std::endl;
assert(result == "wfss_second");


PrevUpHomeNext