Boost C++ Libraries

...one of the most highly regarded and expertly designed C++ library projects in the world. Herb Sutter and Andrei Alexandrescu, C++ Coding Standards

This is the documentation for an old version of Boost. Click here to view this page for the latest version.

libs/fiber/examples/wait_stuff.cpp

//          Copyright Nat Goodspeed 2015.
// Distributed under the Boost Software License, Version 1.0.
//    (See accompanying file LICENSE_1_0.txt or copy at
//          http://www.boost.org/LICENSE_1_0.txt)
//
#include <algorithm>
#include <cassert>
#include <chrono>
#include <iostream>
#include <memory>
#include <sstream>
#include <string>
#include <type_traits>
#include <utility>
#include <vector>

#include <boost/fiber/all.hpp>
#include <boost/variant/variant.hpp>
#include <boost/variant/get.hpp>

// These are wait_something() functions rather than when_something()
// functions. A big part of the point of the Fiber library is to model
// sequencing using the processor's instruction pointer rather than chains of
// callbacks. The future-oriented when_all() / when_any() functions are still
// based on chains of callbacks. With Fiber, we can do better.

/*****************************************************************************
*   Verbose
*****************************************************************************/
class Verbose {
public:
    Verbose( std::string const& d):
        desc( d) {
        std::cout << desc << " start" << std::endl;
    }

    ~Verbose() {
        std::cout << desc << " stop" << std::endl;
    }

    Verbose( Verbose const&) = delete;
    Verbose & operator=( Verbose const&) = delete;

private:
    const std::string desc;
};

/*****************************************************************************
*   Runner and Example
*****************************************************************************/
// collect and ultimately run every Example
class Runner {
    typedef std::vector< std::pair< std::string, std::function< void() > > >    function_list;

public:
    void add( std::string const& desc, std::function< void() > const& func) {
        functions_.push_back( function_list::value_type( desc, func) );
    }

    void run() {
        for ( function_list::value_type const& pair : functions_) {
            Verbose v( pair.first);
            pair.second();
        }
    }

private:
    function_list   functions_;
};

Runner runner;

// Example allows us to embed Runner::add() calls at module scope
struct Example {
    Example( Runner & runner, std::string const& desc, std::function< void() > const& func) {
        runner.add( desc, func);
    }
};

/*****************************************************************************
*   example task functions
*****************************************************************************/
//[wait_sleeper
template< typename T >
T sleeper_impl( T item, int ms, bool thrw = false) {
    std::ostringstream descb, funcb;
    descb << item;
    std::string desc( descb.str() );
    funcb << "  sleeper(" << item << ")";
    Verbose v( funcb.str() );

    boost::this_fiber::sleep_for( std::chrono::milliseconds( ms) );
    if ( thrw) {
        throw std::runtime_error( desc);
    }
    return item;
}
//]

inline
std::string sleeper( std::string const& item, int ms, bool thrw = false) {
    return sleeper_impl( item, ms, thrw);
}

inline
double sleeper( double item, int ms, bool thrw = false) {
    return sleeper_impl( item, ms, thrw);
}

inline
int sleeper(int item, int ms, bool thrw = false) {
    return sleeper_impl( item, ms, thrw);
}

/*****************************************************************************
*   Done
*****************************************************************************/
//[wait_done
// Wrap canonical pattern for condition_variable + bool flag
struct Done {
private:
    boost::fibers::condition_variable   cond;
    boost::fibers::mutex                mutex;
    bool                                ready = false;

public:
    typedef std::shared_ptr< Done >     ptr;

    void wait() {
        std::unique_lock< boost::fibers::mutex > lock( mutex);
        cond.wait( lock, [this](){ return ready; });
    }

    void notify() {
        {
            std::unique_lock< boost::fibers::mutex > lock( mutex);
            ready = true;
        } // release mutex
        cond.notify_one();
    }
};
//]

/*****************************************************************************
*   when_any, simple completion
*****************************************************************************/
//[wait_first_simple_impl
// Degenerate case: when there are no functions to wait for, return
// immediately.
void wait_first_simple_impl( Done::ptr) {
}

// When there's at least one function to wait for, launch it and recur to
// process the rest.
template< typename Fn, typename ... Fns >
void wait_first_simple_impl( Done::ptr done, Fn && function, Fns && ... functions) {
    boost::fibers::fiber( [done, function](){
                              function();
                              done->notify();
                          }).detach();
    wait_first_simple_impl( done, std::forward< Fns >( functions) ... );
}
//]

// interface function: instantiate Done, launch tasks, wait for Done
//[wait_first_simple
template< typename ... Fns >
void wait_first_simple( Fns && ... functions) {
    // Use shared_ptr because each function's fiber will bind it separately,
    // and we're going to return before the last of them completes.
    auto done( std::make_shared< Done >() );
    wait_first_simple_impl( done, std::forward< Fns >( functions) ... );
    done->wait();
}
//]

// example usage
Example wfs( runner, "wait_first_simple()", [](){
//[wait_first_simple_ex
    wait_first_simple(
            [](){ sleeper("wfs_long",   150); },
            [](){ sleeper("wfs_medium", 100); },
            [](){ sleeper("wfs_short",   50); });
//]
});

/*****************************************************************************
*   when_any, return value
*****************************************************************************/
// When there's only one function, call this overload
//[wait_first_value_impl
template< typename T, typename Fn >
void wait_first_value_impl( std::shared_ptr< boost::fibers::buffered_channel< T > > chan,
                            Fn && function) {
    boost::fibers::fiber( [chan, function](){
                              // Ignore channel_op_status returned by push():
                              // might be closed; we simply don't care.
                              chan->push( function() );
                          }).detach();
}
//]

// When there are two or more functions, call this overload
template< typename T, typename Fn0, typename Fn1, typename ... Fns >
void wait_first_value_impl( std::shared_ptr< boost::fibers::buffered_channel< T > > chan,
                            Fn0 && function0,
                            Fn1 && function1,
                            Fns && ... functions) {
    // process the first function using the single-function overload
    wait_first_value_impl< T >( chan,
                                std::forward< Fn0 >( function0) );
    // then recur to process the rest
    wait_first_value_impl< T >( chan,
                                std::forward< Fn1 >( function1),
                                std::forward< Fns >( functions) ... );
}

//[wait_first_value
// Assume that all passed functions have the same return type. The return type
// of wait_first_value() is the return type of the first passed function. It is
// simply invalid to pass NO functions.
template< typename Fn, typename ... Fns >
typename std::result_of< Fn() >::type
wait_first_value( Fn && function, Fns && ... functions) {
    typedef typename std::result_of< Fn() >::type return_t;
    typedef boost::fibers::buffered_channel< return_t > channel_t;
    auto chanp( std::make_shared< channel_t >( 64) );
    // launch all the relevant fibers
    wait_first_value_impl< return_t >( chanp,
                                       std::forward< Fn >( function),
                                       std::forward< Fns >( functions) ... );
    // retrieve the first value
    return_t value( chanp->value_pop() );
    // close the channel: no subsequent push() has to succeed
    chanp->close();
    return value;
}
//]

// example usage
Example wfv( runner, "wait_first_value()", [](){
//[wait_first_value_ex
    std::string result = wait_first_value(
            [](){ return sleeper("wfv_third",  150); },
            [](){ return sleeper("wfv_second", 100); },
            [](){ return sleeper("wfv_first",   50); });
    std::cout << "wait_first_value() => " << result << std::endl;
    assert(result == "wfv_first");
//]
});

/*****************************************************************************
*   when_any, produce first outcome, whether result or exception
*****************************************************************************/
// When there's only one function, call this overload.
//[wait_first_outcome_impl
template< typename T, typename CHANP, typename Fn >
void wait_first_outcome_impl( CHANP chan, Fn && function) {
    boost::fibers::fiber(
        // Use std::bind() here for C++11 compatibility. C++11 lambda capture
        // can't move a move-only Fn type, but bind() can. Let bind() move the
        // channel pointer and the function into the bound object, passing
        // references into the lambda.
        std::bind(
            []( CHANP & chan,
                typename std::decay< Fn >::type & function) {
                // Instantiate a packaged_task to capture any exception thrown by
                // function.
                boost::fibers::packaged_task< T() > task( function);
                // Immediately run this packaged_task on same fiber. We want
                // function() to have completed BEFORE we push the future.
                task();
                // Pass the corresponding future to consumer. Ignore
                // channel_op_status returned by push(): might be closed; we
                // simply don't care.
                chan->push( task.get_future() );
            },
            chan,
            std::forward< Fn >( function)
        )).detach();
}
//]

// When there are two or more functions, call this overload
template< typename T, typename CHANP, typename Fn0, typename Fn1, typename ... Fns >
void wait_first_outcome_impl( CHANP chan,
                              Fn0 && function0,
                              Fn1 && function1,
                              Fns && ... functions) {
    // process the first function using the single-function overload
    wait_first_outcome_impl< T >( chan,
                                  std::forward< Fn0 >( function0) );
    // then recur to process the rest
    wait_first_outcome_impl< T >( chan,
                                  std::forward< Fn1 >( function1),
                                  std::forward< Fns >( functions) ... );
}

// Assume that all passed functions have the same return type. The return type
// of wait_first_outcome() is the return type of the first passed function. It is
// simply invalid to pass NO functions.
//[wait_first_outcome
template< typename Fn, typename ... Fns >
typename std::result_of< Fn() >::type
wait_first_outcome( Fn && function, Fns && ... functions) {
    // In this case, the value we pass through the channel is actually a
    // future -- which is already ready. future can carry either a value or an
    // exception.
    typedef typename std::result_of< Fn() >::type return_t;
    typedef boost::fibers::future< return_t > future_t;
    typedef boost::fibers::buffered_channel< future_t > channel_t;
    auto chanp(std::make_shared< channel_t >( 64) );
    // launch all the relevant fibers
    wait_first_outcome_impl< return_t >( chanp,
                                         std::forward< Fn >( function),
                                         std::forward< Fns >( functions) ... );
    // retrieve the first future
    future_t future( chanp->value_pop() );
    // close the channel: no subsequent push() has to succeed
    chanp->close();
    // either return value or throw exception
    return future.get();
}
//]

// example usage
Example wfo( runner, "wait_first_outcome()", [](){
//[wait_first_outcome_ex
    std::string result = wait_first_outcome(
            [](){ return sleeper("wfos_first",   50); },
            [](){ return sleeper("wfos_second", 100); },
            [](){ return sleeper("wfos_third",  150); });
    std::cout << "wait_first_outcome(success) => " << result << std::endl;
    assert(result == "wfos_first");

    std::string thrown;
    try {
        result = wait_first_outcome(
                [](){ return sleeper("wfof_first",   50, true); },
                [](){ return sleeper("wfof_second", 100); },
                [](){ return sleeper("wfof_third",  150); });
    } catch ( std::exception const& e) {
        thrown = e.what();
    }
    std::cout << "wait_first_outcome(fail) threw '" << thrown
              << "'" << std::endl;
    assert(thrown == "wfof_first");
//]
});

/*****************************************************************************
*   when_any, collect exceptions until success; throw exception_list if no
*   success
*****************************************************************************/
// define an exception to aggregate exception_ptrs; prefer
// std::exception_list (N4407 et al.) once that becomes available
//[exception_list
class exception_list : public std::runtime_error {
public:
    exception_list( std::string const& what) :
        std::runtime_error( what) {
    }

    typedef std::vector< std::exception_ptr >   bundle_t;

    // N4407 proposed std::exception_list API
    typedef bundle_t::const_iterator iterator;

    std::size_t size() const noexcept {
        return bundle_.size();
    }

    iterator begin() const noexcept {
        return bundle_.begin();
    }

    iterator end() const noexcept {
        return bundle_.end();
    }

    // extension to populate
    void add( std::exception_ptr ep) {
        bundle_.push_back( ep);
    }

private:
    bundle_t bundle_;
};
//]

// Assume that all passed functions have the same return type. The return type
// of wait_first_success() is the return type of the first passed function. It is
// simply invalid to pass NO functions.
//[wait_first_success
template< typename Fn, typename ... Fns >
typename std::result_of< Fn() >::type
wait_first_success( Fn && function, Fns && ... functions) {
    std::size_t count( 1 + sizeof ... ( functions) );
    // In this case, the value we pass through the channel is actually a
    // future -- which is already ready. future can carry either a value or an
    // exception.
    typedef typename std::result_of< typename std::decay< Fn >::type() >::type return_t;
    typedef boost::fibers::future< return_t > future_t;
    typedef boost::fibers::buffered_channel< future_t > channel_t;
    auto chanp( std::make_shared< channel_t >( 64) );
    // launch all the relevant fibers
    wait_first_outcome_impl< return_t >( chanp,
                                         std::forward< Fn >( function),
                                         std::forward< Fns >( functions) ... );
    // instantiate exception_list, just in case
    exception_list exceptions("wait_first_success() produced only errors");
    // retrieve up to 'count' results -- but stop there!
    for ( std::size_t i = 0; i < count; ++i) {
        // retrieve the next future
        future_t future( chanp->value_pop() );
        // retrieve exception_ptr if any
        std::exception_ptr error( future.get_exception_ptr() );
        // if no error, then yay, return value
        if ( ! error) {
            // close the channel: no subsequent push() has to succeed
            chanp->close();
            // show caller the value we got
            return future.get();
        }

        // error is non-null: collect
        exceptions.add( error);
    }
    // We only arrive here when every passed function threw an exception.
    // Throw our collection to inform caller.
    throw exceptions;
}
//]

// example usage
Example wfss( runner, "wait_first_success()", [](){
//[wait_first_success_ex
    std::string result = wait_first_success(
                [](){ return sleeper("wfss_first",   50, true); },
                [](){ return sleeper("wfss_second", 100); },
                [](){ return sleeper("wfss_third",  150); });
    std::cout << "wait_first_success(success) => " << result << std::endl;
    assert(result == "wfss_second");
//]

    std::string thrown;
    std::size_t count = 0;
    try {
        result = wait_first_success(
                [](){ return sleeper("wfsf_first",   50, true); },
                [](){ return sleeper("wfsf_second", 100, true); },
                [](){ return sleeper("wfsf_third",  150, true); });
    } catch ( exception_list const& e) {
        thrown = e.what();
        count = e.size();
    } catch ( std::exception const& e) {
        thrown = e.what();
    }
    std::cout << "wait_first_success(fail) threw '" << thrown << "': "
              << count << " errors" << std::endl;
    assert(thrown == "wait_first_success() produced only errors");
    assert(count == 3);
});

/*****************************************************************************
*   when_any, heterogeneous
*****************************************************************************/
//[wait_first_value_het
// No need to break out the first Fn for interface function: let the compiler
// complain if empty.
// Our functions have different return types, and we might have to return any
// of them. Use a variant, expanding std::result_of<Fn()>::type for each Fn in
// parameter pack.
template< typename ... Fns >
boost::variant< typename std::result_of< Fns() >::type ... >
wait_first_value_het( Fns && ... functions) {
    // Use buffered_channel<boost::variant<T1, T2, ...>>; see remarks above.
    typedef boost::variant< typename std::result_of< Fns() >::type ... > return_t;
    typedef boost::fibers::buffered_channel< return_t > channel_t;
    auto chanp( std::make_shared< channel_t >( 64) );
    // launch all the relevant fibers
    wait_first_value_impl< return_t >( chanp,
                                       std::forward< Fns >( functions) ... );
    // retrieve the first value
    return_t value( chanp->value_pop() );
    // close the channel: no subsequent push() has to succeed
    chanp->close();
    return value;
}
//]

// example usage
Example wfvh( runner, "wait_first_value_het()", [](){
//[wait_first_value_het_ex
    boost::variant< std::string, double, int > result =
        wait_first_value_het(
                [](){ return sleeper("wfvh_third",  150); },
                [](){ return sleeper(3.14,          100); },
                [](){ return sleeper(17,             50); });
    std::cout << "wait_first_value_het() => " << result << std::endl;
    assert(boost::get< int >( result) == 17);
//]
});

/*****************************************************************************
*   when_all, simple completion
*****************************************************************************/
// Degenerate case: when there are no functions to wait for, return
// immediately.
void wait_all_simple_impl( std::shared_ptr< boost::fibers::barrier >) {
}

// When there's at least one function to wait for, launch it and recur to
// process the rest.
//[wait_all_simple_impl
template< typename Fn, typename ... Fns >
void wait_all_simple_impl( std::shared_ptr< boost::fibers::barrier > barrier,
                           Fn && function, Fns && ... functions) {
    boost::fibers::fiber(
            std::bind(
                []( std::shared_ptr< boost::fibers::barrier > & barrier,
                    typename std::decay< Fn >::type & function) mutable {
                        function();
                        barrier->wait();
                },
                barrier,
                std::forward< Fn >( function)
            )).detach();
    wait_all_simple_impl( barrier, std::forward< Fns >( functions) ... );
}
//]

// interface function: instantiate barrier, launch tasks, wait for barrier
//[wait_all_simple
template< typename ... Fns >
void wait_all_simple( Fns && ... functions) {
    std::size_t count( sizeof ... ( functions) );
    // Initialize a barrier(count+1) because we'll immediately wait on it. We
    // don't want to wake up until 'count' more fibers wait on it. Even though
    // we'll stick around until the last of them completes, use shared_ptr
    // anyway because it's easier to be confident about lifespan issues.
    auto barrier( std::make_shared< boost::fibers::barrier >( count + 1) );
    wait_all_simple_impl( barrier, std::forward< Fns >( functions) ... );
    barrier->wait();
}
//]

// example usage
Example was( runner, "wait_all_simple()", [](){
//[wait_all_simple_ex
    wait_all_simple(
            [](){ sleeper("was_long",   150); },
            [](){ sleeper("was_medium", 100); },
            [](){ sleeper("was_short",   50); });
//]
});

/*****************************************************************************
*   when_all, return values
*****************************************************************************/
//[wait_nchannel
// Introduce a channel facade that closes the channel once a specific number
// of items has been pushed. This allows an arbitrary consumer to read until
// 'closed' without itself having to count items.
template< typename T >
class nchannel {
public:
    nchannel( std::shared_ptr< boost::fibers::buffered_channel< T > > chan,
              std::size_t lm):
        chan_( chan),
        limit_( lm) {
        assert(chan_);
        if ( 0 == limit_) {
            chan_->close();
        }
    }

    boost::fibers::channel_op_status push( T && va) {
        boost::fibers::channel_op_status ok =
            chan_->push( std::forward< T >( va) );
        if ( ok == boost::fibers::channel_op_status::success &&
             --limit_ == 0) {
            // after the 'limit_'th successful push, close the channel
            chan_->close();
        }
        return ok;
    }

private:
    std::shared_ptr< boost::fibers::buffered_channel< T > >    chan_;
    std::size_t                                                 limit_;
};
//]

// When there's only one function, call this overload
//[wait_all_values_impl
template< typename T, typename Fn >
void wait_all_values_impl( std::shared_ptr< nchannel< T > > chan,
                           Fn && function) {
    boost::fibers::fiber( [chan, function](){
                              chan->push(function());
                          }).detach();
}
//]

// When there are two or more functions, call this overload
template< typename T, typename Fn0, typename Fn1, typename ... Fns >
void wait_all_values_impl( std::shared_ptr< nchannel< T > > chan,
                           Fn0 && function0,
                           Fn1 && function1,
                           Fns && ... functions) {
    // process the first function using the single-function overload
    wait_all_values_impl< T >( chan, std::forward< Fn0 >( function0) );
    // then recur to process the rest
    wait_all_values_impl< T >( chan,
                               std::forward< Fn1 >( function1),
                               std::forward< Fns >( functions) ... );
}

//[wait_all_values_source
// Return a shared_ptr<buffered_channel<T>> from which the caller can
// retrieve each new result as it arrives, until 'closed'.
template< typename Fn, typename ... Fns >
std::shared_ptr< boost::fibers::buffered_channel< typename std::result_of< Fn() >::type > >
wait_all_values_source( Fn && function, Fns && ... functions) {
    std::size_t count( 1 + sizeof ... ( functions) );
    typedef typename std::result_of< Fn() >::type return_t;
    typedef boost::fibers::buffered_channel< return_t > channel_t;
    // make the channel
    auto chanp( std::make_shared< channel_t >( 64) );
    // and make an nchannel facade to close it after 'count' items
    auto ncp( std::make_shared< nchannel< return_t > >( chanp, count) );
    // pass that nchannel facade to all the relevant fibers
    wait_all_values_impl< return_t >( ncp,
                                      std::forward< Fn >( function),
                                      std::forward< Fns >( functions) ... );
    // then return the channel for consumer
    return chanp;
}
//]

// When all passed functions have completed, return vector<T> containing
// collected results. Assume that all passed functions have the same return
// type. It is simply invalid to pass NO functions.
//[wait_all_values
template< typename Fn, typename ... Fns >
std::vector< typename std::result_of< Fn() >::type >
wait_all_values( Fn && function, Fns && ... functions) {
    std::size_t count( 1 + sizeof ... ( functions) );
    typedef typename std::result_of< Fn() >::type return_t;
    typedef std::vector< return_t > vector_t;
    vector_t results;
    results.reserve( count);

    // get channel
    std::shared_ptr< boost::fibers::buffered_channel< return_t > > chan =
        wait_all_values_source( std::forward< Fn >( function),
                                std::forward< Fns >( functions) ... );
    // fill results vector
    return_t value;
    while ( boost::fibers::channel_op_status::success == chan->pop(value) ) {
        results.push_back( value);
    }
    // return vector to caller
    return results;
}
//]

Example wav( runner, "wait_all_values()", [](){
//[wait_all_values_source_ex
    std::shared_ptr< boost::fibers::buffered_channel< std::string > > chan =
        wait_all_values_source(
                [](){ return sleeper("wavs_third",  150); },
                [](){ return sleeper("wavs_second", 100); },
                [](){ return sleeper("wavs_first",   50); });
    std::string value;
    while ( boost::fibers::channel_op_status::success == chan->pop(value) ) {
        std::cout << "wait_all_values_source() => '" << value
                  << "'" << std::endl;
    }
//]

//[wait_all_values_ex
    std::vector< std::string > values =
        wait_all_values(
                [](){ return sleeper("wav_late",   150); },
                [](){ return sleeper("wav_middle", 100); },
                [](){ return sleeper("wav_early",   50); });
//]
    std::cout << "wait_all_values() =>";
    for ( std::string const& v : values) {
        std::cout << " '" << v << "'";
    }
    std::cout << std::endl;
});

/*****************************************************************************
*   when_all, throw first exception
*****************************************************************************/
//[wait_all_until_error_source
// Return a shared_ptr<buffered_channel<future<T>>> from which the caller can
// get() each new result as it arrives, until 'closed'.
template< typename Fn, typename ... Fns >
std::shared_ptr<
    boost::fibers::buffered_channel<
        boost::fibers::future<
            typename std::result_of< Fn() >::type > > >
wait_all_until_error_source( Fn && function, Fns && ... functions) {
    std::size_t count( 1 + sizeof ... ( functions) );
    typedef typename std::result_of< Fn() >::type return_t;
    typedef boost::fibers::future< return_t > future_t;
    typedef boost::fibers::buffered_channel< future_t > channel_t;
    // make the channel
    auto chanp( std::make_shared< channel_t >( 64) );
    // and make an nchannel facade to close it after 'count' items
    auto ncp( std::make_shared< nchannel< future_t > >( chanp, count) );
    // pass that nchannel facade to all the relevant fibers
    wait_first_outcome_impl< return_t >( ncp,
                                         std::forward< Fn >( function),
                                         std::forward< Fns >( functions) ... );
    // then return the channel for consumer
    return chanp;
}
//]

// When all passed functions have completed, return vector<T> containing
// collected results, or throw the first exception thrown by any of the passed
// functions. Assume that all passed functions have the same return type. It
// is simply invalid to pass NO functions.
//[wait_all_until_error
template< typename Fn, typename ... Fns >
std::vector< typename std::result_of< Fn() >::type >
wait_all_until_error( Fn && function, Fns && ... functions) {
    std::size_t count( 1 + sizeof ... ( functions) );
    typedef typename std::result_of< Fn() >::type return_t;
    typedef typename boost::fibers::future< return_t > future_t;
    typedef std::vector< return_t > vector_t;
    vector_t results;
    results.reserve( count);

    // get channel
    std::shared_ptr<
        boost::fibers::buffered_channel< future_t > > chan(
            wait_all_until_error_source( std::forward< Fn >( function),
                                         std::forward< Fns >( functions) ... ) );
    // fill results vector
    future_t future;
    while ( boost::fibers::channel_op_status::success == chan->pop( future) ) {
        results.push_back( future.get() );
    }
    // return vector to caller
    return results;
}
//]

Example waue( runner, "wait_all_until_error()", [](){
//[wait_all_until_error_source_ex
    typedef boost::fibers::future< std::string > future_t;
    std::shared_ptr< boost::fibers::buffered_channel< future_t > > chan =
        wait_all_until_error_source(
                [](){ return sleeper("wauess_third",  150); },
                [](){ return sleeper("wauess_second", 100); },
                [](){ return sleeper("wauess_first",   50); });
    future_t future;
    while ( boost::fibers::channel_op_status::success == chan->pop( future) ) {
        std::string value( future.get() );
        std::cout << "wait_all_until_error_source(success) => '" << value
                  << "'" << std::endl;
    }
//]

    chan = wait_all_until_error_source(
            [](){ return sleeper("wauesf_third",  150); },
            [](){ return sleeper("wauesf_second", 100, true); },
            [](){ return sleeper("wauesf_first",   50); });
//[wait_all_until_error_ex
    std::string thrown;
//<-
    try {
        while ( boost::fibers::channel_op_status::success == chan->pop( future) ) {
            std::string value( future.get() );
            std::cout << "wait_all_until_error_source(fail) => '" << value
                      << "'" << std::endl;
        }
    } catch ( std::exception const& e) {
        thrown = e.what();
    }
    std::cout << "wait_all_until_error_source(fail) threw '" << thrown
              << "'" << std::endl;

    thrown.clear();
//->
    try {
        std::vector< std::string > values = wait_all_until_error(
                [](){ return sleeper("waue_late",   150); },
                [](){ return sleeper("waue_middle", 100, true); },
                [](){ return sleeper("waue_early",   50); });
//<-
        std::cout << "wait_all_until_error(fail) =>";
        for ( std::string const& v : values) {
            std::cout << " '" << v << "'";
        }
        std::cout << std::endl;
//->
    } catch ( std::exception const& e) {
        thrown = e.what();
    }
    std::cout << "wait_all_until_error(fail) threw '" << thrown
              << "'" << std::endl;
//]
});

/*****************************************************************************
*   when_all, collect exceptions
*****************************************************************************/
// When all passed functions have succeeded, return vector<T> containing
// collected results, or throw exception_list containing all exceptions thrown
// by any of the passed functions. Assume that all passed functions have the
// same return type. It is simply invalid to pass NO functions.
//[wait_all_collect_errors
template< typename Fn, typename ... Fns >
std::vector< typename std::result_of< Fn() >::type >
wait_all_collect_errors( Fn && function, Fns && ... functions) {
    std::size_t count( 1 + sizeof ... ( functions) );
    typedef typename std::result_of< Fn() >::type return_t;
    typedef typename boost::fibers::future< return_t > future_t;
    typedef std::vector< return_t > vector_t;
    vector_t results;
    results.reserve( count);
    exception_list exceptions("wait_all_collect_errors() exceptions");

    // get channel
    std::shared_ptr<
        boost::fibers::buffered_channel< future_t > > chan(
            wait_all_until_error_source( std::forward< Fn >( function),
                                         std::forward< Fns >( functions) ... ) );
    // fill results and/or exceptions vectors
    future_t future;
    while ( boost::fibers::channel_op_status::success == chan->pop( future) ) {
        std::exception_ptr exp = future.get_exception_ptr();
        if ( ! exp) {
            results.push_back( future.get() );
        } else {
            exceptions.add( exp);
        }
    }
    // if there were any exceptions, throw
    if ( exceptions.size() ) {
        throw exceptions;
    }
    // no exceptions: return vector to caller
    return results;
}
//]

Example wace( runner, "wait_all_collect_errors()", [](){
    std::vector< std::string > values = wait_all_collect_errors(
            [](){ return sleeper("waces_late",   150); },
            [](){ return sleeper("waces_middle", 100); },
            [](){ return sleeper("waces_early",   50); });
    std::cout << "wait_all_collect_errors(success) =>";
    for ( std::string const& v : values) {
        std::cout << " '" << v << "'";
    }
    std::cout << std::endl;

    std::string thrown;
    std::size_t errors = 0;
    try {
        values = wait_all_collect_errors(
                [](){ return sleeper("wacef_late",   150, true); },
                [](){ return sleeper("wacef_middle", 100, true); },
                [](){ return sleeper("wacef_early",   50); });
        std::cout << "wait_all_collect_errors(fail) =>";
        for ( std::string const& v : values) {
            std::cout << " '" << v << "'";
        }
        std::cout << std::endl;
    } catch ( exception_list const& e) {
        thrown = e.what();
        errors = e.size();
    } catch ( std::exception const& e) {
        thrown = e.what();
    }
    std::cout << "wait_all_collect_errors(fail) threw '" << thrown
              << "': " << errors << " errors" << std::endl;
});

/*****************************************************************************
*   when_all, heterogeneous
*****************************************************************************/
//[wait_all_members_get
template< typename Result, typename ... Futures >
Result wait_all_members_get( Futures && ... futures) {
    // Fetch the results from the passed futures into Result's initializer
    // list. It's true that the get() calls here will block the implicit
    // iteration over futures -- but that doesn't matter because we won't be
    // done until the slowest of them finishes anyway. As results are
    // processed in argument-list order rather than order of completion, the
    // leftmost get() to throw an exception will cause that exception to
    // propagate to the caller.
    return Result{ futures.get() ... };
}
//]

//[wait_all_members
// Explicitly pass Result. This can be any type capable of being initialized
// from the results of the passed functions, such as a struct.
template< typename Result, typename ... Fns >
Result wait_all_members( Fns && ... functions) {
    // Run each of the passed functions on a separate fiber, passing all their
    // futures to helper function for processing.
    return wait_all_members_get< Result >(
            boost::fibers::async( std::forward< Fns >( functions) ) ... );
}
//]

// used by following example
//[wait_Data
struct Data {
    std::string str;
    double      inexact;
    int         exact;

    friend std::ostream& operator<<( std::ostream& out, Data const& data)/*=;
    ...*/
//<-
    {
        return out << "Data{str='" << data.str << "', inexact=" << data.inexact
                   << ", exact=" << data.exact << "}";
    }
//->
};
//]

// example usage
Example wam( runner, "wait_all_members()", [](){
//[wait_all_members_data_ex
    Data data = wait_all_members< Data >(
            [](){ return sleeper("wams_left", 100); },
            [](){ return sleeper(3.14,        150); },
            [](){ return sleeper(17,          50); });
    std::cout << "wait_all_members<Data>(success) => " << data << std::endl;
//]

    std::string thrown;
    try {
        data = wait_all_members< Data >(
                [](){ return sleeper("wamf_left", 100, true); },
                [](){ return sleeper(3.14,        150); },
                [](){ return sleeper(17,          50, true); });
        std::cout << "wait_all_members<Data>(fail) => " << data << std::endl;
    } catch ( std::exception const& e) {
        thrown = e.what();
    }
    std::cout << "wait_all_members<Data>(fail) threw '" << thrown
              << '"' << std::endl;

//[wait_all_members_vector_ex
    // If we don't care about obtaining results as soon as they arrive, and we
    // prefer a result vector in passed argument order rather than completion
    // order, wait_all_members() is another possible implementation of
    // wait_all_until_error().
    auto strings = wait_all_members< std::vector< std::string > >(
            [](){ return sleeper("wamv_left",   150); },
            [](){ return sleeper("wamv_middle", 100); },
            [](){ return sleeper("wamv_right",   50); });
    std::cout << "wait_all_members<vector>() =>";
    for ( std::string const& str : strings) {
        std::cout << " '" << str << "'";
    }
    std::cout << std::endl;
//]
});


/*****************************************************************************
*   main()
*****************************************************************************/
int main( int argc, char *argv[]) {
    runner.run();
    std::cout << "done." << std::endl;
    return EXIT_SUCCESS;
}