...one of the most highly
regarded and expertly designed C++ library projects in the
world.
— Herb Sutter and Andrei
Alexandrescu, C++
Coding Standards
In order to support a broad range of execution control behaviour the coroutine types of symmetric_coroutine<> and asymmetric_coroutine<> can be used to escape-and-reenter loops, to escape-and-reenter recursive computations and for cooperative multitasking helping to solve problems in a much simpler and more elegant way than with only a single flow of control.
The event-driven model is a programming paradigm where the flow of a program is determined by events. The events are generated by multiple independent sources and an event-dispatcher, waiting on all external sources, triggers callback functions (event-handlers) whenever one of those events is detected (event-loop). The application is divided into event selection (detection) and event handling.
The resulting applications are highly scalable, flexible, have high responsiveness and the components are loosely coupled. This makes the event-driven model suitable for user interface applications, rule-based productions systems or applications dealing with asynchronous I/O (for instance network servers).
A classic synchronous console program issues an I/O request (e.g. for user input or filesystem data) and blocks until the request is complete.
In contrast, an asynchronous I/O function initiates the physical operation but immediately returns to its caller, even though the operation is not yet complete. A program written to leverage this functionality does not block: it can proceed with other work (including other I/O requests in parallel) while the original operation is still pending. When the operation completes, the program is notified. Because asynchronous applications spend less overall time waiting for operations, they can outperform synchronous programs.
Events are one of the paradigms for asynchronous execution, but not all asynchronous systems use events. Although asynchronous programming can be done using threads, they come with their own costs:
The event-based asynchronous model avoids those issues:
The downside of this paradigm consists in a sub-optimal program structure. An event-driven program is required to split its code into multiple small callback functions, i.e. the code is organized in a sequence of small steps that execute intermittently. An algorithm that would usually be expressed as a hierarchy of functions and loops must be transformed into callbacks. The complete state has to be stored into a data structure while the control flow returns to the event-loop. As a consequence, event-driven applications are often tedious and confusing to write. Each callback introduces a new scope, error callback etc. The sequential nature of the algorithm is split into multiple callstacks, making the application hard to debug. Exception handlers are restricted to local handlers: it is impossible to wrap a sequence of events into a single try-catch block. The use of local variables, while/for loops, recursions etc. together with the event-loop is not possible. The code becomes less expressive.
In the past, code using asio's asynchronous operations was convoluted by callback functions.
class session { public: session(boost::asio::io_service& io_service) : socket_(io_service) // construct a TCP-socket from io_service {} tcp::socket& socket(){ return socket_; } void start(){ // initiate asynchronous read; handle_read() is callback-function socket_.async_read_some(boost::asio::buffer(data_,max_length), boost::bind(&session::handle_read,this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); } private: void handle_read(const boost::system::error_code& error, size_t bytes_transferred){ if (!error) // initiate asynchronous write; handle_write() is callback-function boost::asio::async_write(socket_, boost::asio::buffer(data_,bytes_transferred), boost::bind(&session::handle_write,this, boost::asio::placeholders::error)); else delete this; } void handle_write(const boost::system::error_code& error){ if (!error) // initiate asynchronous read; handle_read() is callback-function socket_.async_read_some(boost::asio::buffer(data_,max_length), boost::bind(&session::handle_read,this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); else delete this; } boost::asio::ip::tcp::socket socket_; enum { max_length=1024 }; char data_[max_length]; };
In this example, a simple echo server, the logic is split into three member functions - local state (such as data buffer) is moved to member variables.
Boost.Asio provides with its new asynchronous result feature a new framework combining event-driven model and coroutines, hiding the complexity of event-driven programming and permitting the style of classic sequential code. The application is not required to pass callback functions to asynchronous operations and local state is kept as local variables. Therefore the code is much easier to read and understand. [4]. boost::asio::yield_context internally uses Boost.Coroutine:
void session(boost::asio::io_service& io_service){ // construct TCP-socket from io_service boost::asio::ip::tcp::socket socket(io_service); try{ for(;;){ // local data-buffer char data[max_length]; boost::system::error_code ec; // read asynchronous data from socket // execution context will be suspended until // some bytes are read from socket std::size_t length=socket.async_read_some( boost::asio::buffer(data), boost::asio::yield[ec]); if (ec==boost::asio::error::eof) break; //connection closed cleanly by peer else if(ec) throw boost::system::system_error(ec); //some other error // write some bytes asynchronously boost::asio::async_write( socket, boost::asio::buffer(data,length), boost::asio::yield[ec]); if (ec==boost::asio::error::eof) break; //connection closed cleanly by peer else if(ec) throw boost::system::system_error(ec); //some other error } } catch(std::exception const& e){ std::cerr<<"Exception: "<<e.what()<<"\n"; } }
In contrast to the previous example this one gives the impression of sequential code and local data (data) while using asynchronous operations (async_read(), async_write()). The algorithm is implemented in one function and error handling is done by one try-catch block.
To someone who knows SAX, the phrase "recursive SAX parsing" might sound nonsensical. You get callbacks from SAX; you have to manage the element stack yourself. If you want recursive XML processing, you must first read the entire DOM into memory, then walk the tree.
But coroutines let you invert the flow of control so you can ask for SAX events. Once you can do that, you can process them recursively.
// Represent a subset of interesting SAX events struct BaseEvent{ BaseEvent(const BaseEvent&)=delete; BaseEvent& operator=(const BaseEvent&)=delete; }; // End of document or element struct CloseEvent: public BaseEvent{ // CloseEvent binds (without copying) the TagType reference. CloseEvent(const xml::sax::Parser::TagType& name): mName(name) {} const xml::sax::Parser::TagType& mName; }; // Start of document or element struct OpenEvent: public CloseEvent{ // In addition to CloseEvent's TagType, OpenEvent binds AttributeIterator. OpenEvent(const xml::sax::Parser::TagType& name, xml::sax::AttributeIterator& attrs): CloseEvent(name), mAttrs(attrs) {} xml::sax::AttributeIterator& mAttrs; }; // text within an element struct TextEvent: public BaseEvent{ // TextEvent binds the CharIterator. TextEvent(xml::sax::CharIterator& text): mText(text) {} xml::sax::CharIterator& mText; }; // The parsing coroutine instantiates BaseEvent subclass instances and // successively shows them to the main program. It passes a reference so we // don't slice the BaseEvent subclass. typedef boost::coroutines::asymmetric_coroutine<const BaseEvent&> coro_t; void parser(coro_t::push_type& sink,std::istream& in){ xml::sax::Parser xparser; // startDocument() will send OpenEvent xparser.startDocument([&sink](const xml::sax::Parser::TagType& name, xml::sax::AttributeIterator& attrs) { sink(OpenEvent(name,attrs)); }); // startTag() will likewise send OpenEvent xparser.startTag([&sink](const xml::sax::Parser::TagType& name, xml::sax::AttributeIterator& attrs) { sink(OpenEvent(name,attrs)); }); // endTag() will send CloseEvent xparser.endTag([&sink](const xml::sax::Parser::TagType& name) { sink(CloseEvent(name)); }); // endDocument() will likewise send CloseEvent xparser.endDocument([&sink](const xml::sax::Parser::TagType& name) { sink(CloseEvent(name)); }); // characters() will send TextEvent xparser.characters([&sink](xml::sax::CharIterator& text) { sink(TextEvent(text)); }); try { // parse the document, firing all the above xparser.parse(in); } catch (xml::Exception e) { // xml::sax::Parser throws xml::Exception. Helpfully translate the // name and provide it as the what() string. throw std::runtime_error(exception_name(e)); } } // Recursively traverse the incoming XML document on the fly, pulling // BaseEvent& references from 'events'. // 'indent' illustrates the level of recursion. // Each time we're called, we've just retrieved an OpenEvent from 'events'; // accept that as a param. // Return the CloseEvent that ends this element. const CloseEvent& process(coro_t::pull_type& events,const OpenEvent& context, const std::string& indent=""){ // Capture OpenEvent's tag name: as soon as we advance the parser, the // TagType& reference bound in this OpenEvent will be invalidated. xml::sax::Parser::TagType tagName = context.mName; // Since the OpenEvent is still the current value from 'events', pass // control back to 'events' until the next event. Of course, each time we // come back we must check for the end of the results stream. while(events()){ // Another event is pending; retrieve it. const BaseEvent& event=events.get(); const OpenEvent* oe; const CloseEvent* ce; const TextEvent* te; if((oe=dynamic_cast<const OpenEvent*>(&event))){ // When we see OpenEvent, recursively process it. process(events,*oe,indent+" "); } else if((ce=dynamic_cast<const CloseEvent*>(&event))){ // When we see CloseEvent, validate its tag name and then return // it. (This assert is really a check on xml::sax::Parser, since // it already validates matching open/close tags.) assert(ce->mName == tagName); return *ce; } else if((te=dynamic_cast<const TextEvent*>(&event))){ // When we see TextEvent, just report its text, along with // indentation indicating recursion level. std::cout<<indent<<"text: '"<<te->mText.getText()<<"'\n"; } } } // pretend we have an XML file of arbitrary size std::istringstream in(doc); try { coro_t::pull_type events(std::bind(parser,_1,std::ref(in))); // We fully expect at least ONE event. assert(events); // This dynamic_cast<&> is itself an assertion that the first event is an // OpenEvent. const OpenEvent& context=dynamic_cast<const OpenEvent&>(events.get()); process(events, context); } catch (std::exception& e) { std::cout << "Parsing error: " << e.what() << '\n'; }
This problem does not map at all well to communicating between independent threads. It makes no sense for either side to proceed independently of the other. You want them to pass control back and forth.
The solution involves a small polymorphic class event hierarchy, to which we're passing references. The actual instances are temporaries on the coroutine's stack; the coroutine passes each reference in turn to the main logic. Copying them as base-class values would slice them.
If we were trying to let the SAX parser proceed independently of the consuming logic, one could imagine allocating event-subclass instances on the heap, passing them along on a thread-safe queue of pointers. But that doesn't work either, because these event classes bind references passed by the SAX parser. The moment the parser moves on, those references become invalid.
Instead of binding a TagType& reference, we could store a copy of the TagType in CloseEvent. But that doesn't solve the whole problem. For attributes, we get an AttributeIterator&; for text we get a CharIterator&. Storing a copy of those iterators is pointless: once the parser moves on, those iterators are invalidated. You must process the attribute iterator (or character iterator) during the SAX callback for that event.
Naturally we could retrieve and store a copy of every attribute and its value; we could store a copy of every chunk of text. That would effectively be all the text in the document -- a heavy price to pay, if the reason we're using SAX is concern about fitting the entire DOM into memory.
There's yet another advantage to using coroutines. This SAX parser throws an exception when parsing fails. With a coroutine implementation, you need only wrap the calling code in try/catch.
With communicating threads, you would have to arrange to catch the exception and pass along the exception pointer on the same queue you're using to deliver the other events. You would then have to rethrow the exception to unwind the recursive document processing.
The coroutine solution maps very naturally to the problem space.
The advantages of suspending at an arbitrary call depth can be seen particularly clearly with the use of a recursive function, such as traversal of trees. If traversing two different trees in the same deterministic order produces the same list of leaf nodes, then both trees have the same fringe.
Both trees in the picture have the same fringe even though the structure of the trees is different.
The same fringe problem could be solved using coroutines by iterating over the leaf nodes and comparing this sequence via std::equal(). The range of data values is generated by function traverse() which recursively traverses the tree and passes each node's data value to its asymmetric_coroutine<>::push_type. asymmetric_coroutine<>::push_type suspends the recursive computation and transfers the data value to the main execution context. asymmetric_coroutine<>::pull_type::iterator, created from asymmetric_coroutine<>::pull_type, steps over those data values and delivers them to std::equal() for comparison. Each increment of asymmetric_coroutine<>::pull_type::iterator resumes traverse(). Upon return from iterator::operator++(), either a new data value is available, or tree traversal is finished (iterator is invalidated).
In effect, the coroutine iterator presents a flattened view of the recursive data structure.
struct node{ typedef boost::shared_ptr<node> ptr_t; // Each tree node has an optional left subtree, // an optional right subtree and a value of its own. // The value is considered to be between the left // subtree and the right. ptr_t left,right; std::string value; // construct leaf node(const std::string& v): left(),right(),value(v) {} // construct nonleaf node(ptr_t l,const std::string& v,ptr_t r): left(l),right(r),value(v) {} static ptr_t create(const std::string& v){ return ptr_t(new node(v)); } static ptr_t create(ptr_t l,const std::string& v,ptr_t r){ return ptr_t(new node(l,v,r)); } }; node::ptr_t create_left_tree_from(const std::string& root){ /* -------- root / \ b e / \ a c -------- */ return node::create( node::create( node::create("a"), "b", node::create("c")), root, node::create("e")); } node::ptr_t create_right_tree_from(const std::string& root){ /* -------- root / \ a d / \ c e -------- */ return node::create( node::create("a"), root, node::create( node::create("c"), "d", node::create("e"))); } // recursively walk the tree, delivering values in order void traverse(node::ptr_t n, boost::coroutines::asymmetric_coroutine<std::string>::push_type& out){ if(n->left) traverse(n->left,out); out(n->value); if(n->right) traverse(n->right,out); } // evaluation { node::ptr_t left_d(create_left_tree_from("d")); boost::coroutines::asymmetric_coroutine<std::string>::pull_type left_d_reader( [&]( boost::coroutines::asymmetric_coroutine<std::string>::push_type & out){ traverse(left_d,out); }); node::ptr_t right_b(create_right_tree_from("b")); boost::coroutines::asymmetric_coroutine<std::string>::pull_type right_b_reader( [&]( boost::coroutines::asymmetric_coroutine<std::string>::push_type & out){ traverse(right_b,out); }); std::cout << "left tree from d == right tree from b? " << std::boolalpha << std::equal(boost::begin(left_d_reader), boost::end(left_d_reader), boost::begin(right_b_reader)) << std::endl; } { node::ptr_t left_d(create_left_tree_from("d")); boost::coroutines::asymmetric_coroutine<std::string>::pull_type left_d_reader( [&]( boost::coroutines::asymmetric_coroutine<std::string>::push_type & out){ traverse(left_d,out); }); node::ptr_t right_x(create_right_tree_from("x")); boost::coroutines::asymmetric_coroutine<std::string>::pull_type right_x_reader( [&]( boost::coroutines::asymmetric_coroutine<std::string>::push_type & out){ traverse(right_x,out); }); std::cout << "left tree from d == right tree from x? " << std::boolalpha << std::equal(boost::begin(left_d_reader), boost::end(left_d_reader), boost::begin(right_x_reader)) << std::endl; } std::cout << "Done" << std::endl; output: left tree from d == right tree from b? true left tree from d == right tree from x? false Done
This example demonstrates how symmetric coroutines merge two sorted arrays.
std::vector<int> merge(const std::vector<int>& a,const std::vector<int>& b){ std::vector<int> c; std::size_t idx_a=0,idx_b=0; boost::coroutines::symmetric_coroutine<void>::call_type *other_a=0,*other_b=0; boost::coroutines::symmetric_coroutine<void>::call_type coro_a( [&](boost::coroutines::symmetric_coroutine<void>::yield_type& yield){ while(idx_a<a.size()){ if(b[idx_b]<a[idx_a]) // test if element in array b is less than in array a yield(*other_b); // yield to coroutine coro_b c.push_back(a[idx_a++]); // add element to final array } // add remaining elements of array b while(idx_b<b.size()) c.push_back(b[idx_b++]); }); boost::coroutines::symmetric_coroutine<void>::call_type coro_b( [&](boost::coroutines::symmetric_coroutine<void>::yield_type& yield){ while(idx_b<b.size()){ if(a[idx_a]<b[idx_b]) // test if element in array a is less than in array b yield(*other_a); // yield to coroutine coro_a c.push_back(b[idx_b++]); // add element to final array } // add remaining elements of array a while(idx_a<a.size()) c.push_back(a[idx_a++]); }); other_a=&coro_a; other_b=&coro_b; coro_a(); // enter coroutine-fn of coro_a return c; }
This code shows how coroutines could be chained.
typedef boost::coroutines::asymmetric_coroutine<std::string> coro_t; // deliver each line of input stream to sink as a separate string void readlines(coro_t::push_type& sink,std::istream& in){ std::string line; while(std::getline(in,line)) sink(line); } void tokenize(coro_t::push_type& sink, coro_t::pull_type& source){ // This tokenizer doesn't happen to be stateful: you could reasonably // implement it with a single call to push each new token downstream. But // I've worked with stateful tokenizers, in which the meaning of input // characters depends in part on their position within the input line. BOOST_FOREACH(std::string line,source){ std::string::size_type pos=0; while(pos<line.length()){ if(line[pos]=='"'){ std::string token; ++pos; // skip open quote while(pos<line.length()&&line[pos]!='"') token+=line[pos++]; ++pos; // skip close quote sink(token); // pass token downstream } else if (std::isspace(line[pos])){ ++pos; // outside quotes, ignore whitespace } else if (std::isalpha(line[pos])){ std::string token; while (pos < line.length() && std::isalpha(line[pos])) token += line[pos++]; sink(token); // pass token downstream } else { // punctuation sink(std::string(1,line[pos++])); } } } } void only_words(coro_t::push_type& sink,coro_t::pull_type& source){ BOOST_FOREACH(std::string token,source){ if (!token.empty() && std::isalpha(token[0])) sink(token); } } void trace(coro_t::push_type& sink, coro_t::pull_type& source){ BOOST_FOREACH(std::string token,source){ std::cout << "trace: '" << token << "'\n"; sink(token); } } struct FinalEOL{ ~FinalEOL(){ std::cout << std::endl; } }; void layout(coro_t::pull_type& source,int num,int width){ // Finish the last line when we leave by whatever means FinalEOL eol; // Pull values from upstream, lay them out 'num' to a line for (;;){ for (int i = 0; i < num; ++i){ // when we exhaust the input, stop if (!source) return; std::cout << std::setw(width) << source.get(); // now that we've handled this item, advance to next source(); } // after 'num' items, line break std::cout << std::endl; } } // For example purposes, instead of having a separate text file in the // local filesystem, construct an istringstream to read. std::string data( "This is the first line.\n" "This, the second.\n" "The third has \"a phrase\"!\n" ); { std::cout << "\nfilter:\n"; std::istringstream infile(data); coro_t::pull_type reader(boost::bind(readlines, _1, boost::ref(infile))); coro_t::pull_type tokenizer(boost::bind(tokenize, _1, boost::ref(reader))); coro_t::pull_type filter(boost::bind(only_words, _1, boost::ref(tokenizer))); coro_t::pull_type tracer(boost::bind(trace, _1, boost::ref(filter))); BOOST_FOREACH(std::string token,tracer){ // just iterate, we're already pulling through tracer } } { std::cout << "\nlayout() as coroutine::push_type:\n"; std::istringstream infile(data); coro_t::pull_type reader(boost::bind(readlines, _1, boost::ref(infile))); coro_t::pull_type tokenizer(boost::bind(tokenize, _1, boost::ref(reader))); coro_t::pull_type filter(boost::bind(only_words, _1, boost::ref(tokenizer))); coro_t::push_type writer(boost::bind(layout, _1, 5, 15)); BOOST_FOREACH(std::string token,filter){ writer(token); } } { std::cout << "\nfiltering output:\n"; std::istringstream infile(data); coro_t::pull_type reader(boost::bind(readlines,_1,boost::ref(infile))); coro_t::pull_type tokenizer(boost::bind(tokenize,_1,boost::ref(reader))); coro_t::push_type writer(boost::bind(layout,_1,5,15)); // Because of the symmetry of the API, we can use any of these // chaining functions in a push_type coroutine chain as well. coro_t::push_type filter(boost::bind(only_words,boost::ref(writer),_1)); BOOST_FOREACH(std::string token,tokenizer){ filter(token); } }