boost/graph/distributed/detail/remote_update_set.hpp
// Copyright (C) 2005-2006 The Trustees of Indiana University.
// Use, modification and distribution is subject to the Boost Software
// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
// Authors: Douglas Gregor
// Andrew Lumsdaine
#ifndef BOOST_GRAPH_DETAIL_REMOTE_UPDATE_SET_HPP
#define BOOST_GRAPH_DETAIL_REMOTE_UPDATE_SET_HPP
#ifndef BOOST_GRAPH_USE_MPI
#error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
#endif
#include <boost/graph/parallel/process_group.hpp>
#include <boost/type_traits/is_convertible.hpp>
#include <vector>
#include <boost/assert.hpp>
#include <boost/optional.hpp>
#include <queue>
namespace boost { namespace graph { namespace detail {
template<typename ProcessGroup>
void do_synchronize(ProcessGroup& pg)
{
using boost::parallel::synchronize;
synchronize(pg);
}
struct remote_set_queued {};
struct remote_set_immediate {};
template<typename ProcessGroup>
class remote_set_semantics
{
BOOST_STATIC_CONSTANT
(bool,
queued = (is_convertible<
typename ProcessGroup::communication_category,
parallel::bsp_process_group_tag>::value));
public:
typedef typename mpl::if_c<queued,
remote_set_queued,
remote_set_immediate>::type type;
};
template<typename Derived, typename ProcessGroup, typename Value,
typename OwnerMap,
typename Semantics = typename remote_set_semantics<ProcessGroup>::type>
class remote_update_set;
/**********************************************************************
* Remote updating set that queues messages until synchronization *
**********************************************************************/
template<typename Derived, typename ProcessGroup, typename Value,
typename OwnerMap>
class remote_update_set<Derived, ProcessGroup, Value, OwnerMap,
remote_set_queued>
{
typedef typename property_traits<OwnerMap>::key_type Key;
typedef std::vector<std::pair<Key, Value> > Updates;
typedef typename Updates::size_type updates_size_type;
typedef typename Updates::value_type updates_pair_type;
public:
private:
typedef typename ProcessGroup::process_id_type process_id_type;
enum message_kind {
/** Message containing the number of updates that will be sent in
* a msg_updates message that will immediately follow. This
* message will contain a single value of type
* updates_size_type.
*/
msg_num_updates,
/** Contains (key, value) pairs with all of the updates from a
* particular source. The number of updates is variable, but will
* be provided in a msg_num_updates message that immediately
* preceeds this message.
*
*/
msg_updates
};
struct handle_messages
{
explicit
handle_messages(remote_update_set* self, const ProcessGroup& pg)
: self(self), update_sizes(num_processes(pg), 0) { }
void operator()(process_id_type source, int tag)
{
switch(tag) {
case msg_num_updates:
{
// Receive the # of updates
updates_size_type num_updates;
receive(self->process_group, source, tag, num_updates);
update_sizes[source] = num_updates;
}
break;
case msg_updates:
{
updates_size_type num_updates = update_sizes[source];
BOOST_ASSERT(num_updates);
// Receive the actual updates
std::vector<updates_pair_type> updates(num_updates);
receive(self->process_group, source, msg_updates, &updates[0],
num_updates);
// Send updates to derived "receive_update" member
Derived* derived = static_cast<Derived*>(self);
for (updates_size_type u = 0; u < num_updates; ++u)
derived->receive_update(source, updates[u].first, updates[u].second);
update_sizes[source] = 0;
}
break;
};
}
private:
remote_update_set* self;
std::vector<updates_size_type> update_sizes;
};
friend struct handle_messages;
protected:
remote_update_set(const ProcessGroup& pg, const OwnerMap& owner)
: process_group(pg, handle_messages(this, pg)),
updates(num_processes(pg)), owner(owner) {
}
void update(const Key& key, const Value& value)
{
if (get(owner, key) == process_id(process_group)) {
Derived* derived = static_cast<Derived*>(this);
derived->receive_update(get(owner, key), key, value);
}
else {
updates[get(owner, key)].push_back(std::make_pair(key, value));
}
}
void collect() { }
void synchronize()
{
// Emit all updates and then remove them
process_id_type num_processes = updates.size();
for (process_id_type p = 0; p < num_processes; ++p) {
if (!updates[p].empty()) {
send(process_group, p, msg_num_updates, updates[p].size());
send(process_group, p, msg_updates,
&updates[p].front(), updates[p].size());
updates[p].clear();
}
}
do_synchronize(process_group);
}
ProcessGroup process_group;
private:
std::vector<Updates> updates;
OwnerMap owner;
};
/**********************************************************************
* Remote updating set that sends messages immediately *
**********************************************************************/
template<typename Derived, typename ProcessGroup, typename Value,
typename OwnerMap>
class remote_update_set<Derived, ProcessGroup, Value, OwnerMap,
remote_set_immediate>
{
typedef typename property_traits<OwnerMap>::key_type Key;
typedef std::pair<Key, Value> update_pair_type;
typedef typename std::vector<update_pair_type>::size_type updates_size_type;
public:
typedef typename ProcessGroup::process_id_type process_id_type;
private:
enum message_kind {
/** Contains a (key, value) pair that will be updated. */
msg_update
};
struct handle_messages
{
explicit handle_messages(remote_update_set* self, const ProcessGroup& pg)
: self(self)
{ update_sizes.resize(num_processes(pg), 0); }
void operator()(process_id_type source, int tag)
{
// Receive the # of updates
BOOST_ASSERT(tag == msg_update);
update_pair_type update;
receive(self->process_group, source, tag, update);
// Send update to derived "receive_update" member
Derived* derived = static_cast<Derived*>(self);
derived->receive_update(source, update.first, update.second);
}
private:
std::vector<updates_size_type> update_sizes;
remote_update_set* self;
};
friend struct handle_messages;
protected:
remote_update_set(const ProcessGroup& pg, const OwnerMap& owner)
: process_group(pg, handle_messages(this, pg)), owner(owner) { }
void update(const Key& key, const Value& value)
{
if (get(owner, key) == process_id(process_group)) {
Derived* derived = static_cast<Derived*>(this);
derived->receive_update(get(owner, key), key, value);
}
else
send(process_group, get(owner, key), msg_update,
update_pair_type(key, value));
}
void collect()
{
typedef std::pair<process_id_type, int> probe_type;
handle_messages handler(this, process_group);
while (optional<probe_type> stp = probe(process_group))
if (stp->second == msg_update) handler(stp->first, stp->second);
}
void synchronize()
{
do_synchronize(process_group);
}
ProcessGroup process_group;
OwnerMap owner;
};
} } } // end namespace boost::graph::detail
#endif // BOOST_GRAPH_DETAIL_REMOTE_UPDATE_SET_HPP