...one of the most highly
regarded and expertly designed C++ library projects in the
world.
— Herb Sutter and Andrei
Alexandrescu, C++
Coding Standards
boost::log::ipc::reliable_message_queue — A reliable interprocess message queue.
// In header: <boost/log/utility/ipc/reliable_message_queue.hpp> class reliable_message_queue { public: // types typedef uint32_t size_type; // Queue message size type. // Result codes for various operations on the queue. enum operation_result { succeeded, no_space, aborted }; // Interprocess queue overflow policies. enum overflow_policy { block_on_overflow, fail_on_overflow, throw_on_overflow }; // construct/copy/destruct reliable_message_queue() noexcept; reliable_message_queue(open_mode::create_only_tag, object_name const &, uint32_t, size_type, overflow_policy = block_on_overflow, permissions const & = permissions()); reliable_message_queue(open_mode::open_or_create_tag, object_name const &, uint32_t, size_type, overflow_policy = block_on_overflow, permissions const & = permissions()); reliable_message_queue(open_mode::open_only_tag, object_name const &, overflow_policy = block_on_overflow, permissions const & = permissions()); template<typename... Args> explicit reliable_message_queue(Args const &...); reliable_message_queue(reliable_message_queue &&) noexcept; reliable_message_queue & operator=(reliable_message_queue &&) noexcept; ~reliable_message_queue(); // public member functions void swap(reliable_message_queue &) noexcept; void create(object_name const &, uint32_t, size_type, overflow_policy = block_on_overflow, permissions const & = permissions()); void open_or_create(object_name const &, uint32_t, size_type, overflow_policy = block_on_overflow, permissions const & = permissions()); void open(object_name const &, overflow_policy = block_on_overflow, permissions const & = permissions()); bool is_open() const noexcept; void clear(); object_name const & name() const; uint32_t capacity() const; size_type block_size() const; void stop_local(); void reset_local(); void close() noexcept; operation_result send(void const *, size_type); bool try_send(void const *, size_type); operation_result receive(void *, size_type, size_type &); template<typename ElementT, size_type SizeV> operation_result receive(ElementT(&), size_type &); template<typename ContainerT> operation_result receive(ContainerT &); bool try_receive(void *, size_type, size_type &); template<typename ElementT, size_type SizeV> bool try_receive(ElementT(&), size_type &); template<typename ContainerT> bool try_receive(ContainerT &); // friend functions friend void swap(reliable_message_queue &, reliable_message_queue &) noexcept; // public static functions static void remove(object_name const &); };
The queue implements a reliable one-way channel of passing messages from one or multiple writers to a single reader. The format of the messages is user-defined and must be consistent across all writers and the reader. The queue does not enforce any specific format of the messages, other than they should be supplied as a contiguous array of bytes.
The queue internally uses a process-shared storage identified by an object_name
(the queue name). Refer to object_name
documentation for details on restrictions imposed on object names.
The queue storage is organized as a fixed number of blocks of a fixed size. The block size must be an integer power of 2 and is expressed in bytes. Each written message, together with some metadata added by the queue, consumes an integer number of blocks. Each read message received by the reader releases the blocks allocated for that message. As such the maximum size of a message is slightly less than block size times capacity of the queue. For efficiency, it is recommended to choose block size large enough to accommodate most of the messages to be passed through the queue.
The queue is considered empty when no messages are enqueued (all blocks are free). The queue is considered full at the point of enqueueing a message when there is not enough free blocks to accommodate the message.
The queue is reliable in that it will not drop successfully sent messages that are not received by the reader, other than the case when a non-empty queue is destroyed by the last user. If a message cannot be enqueued by the writer because the queue is full, the queue can either block the writer or return an error or throw an exception, depending on the policy specified at the queue creation. The policy is object local, i.e. different writers and the reader can have different overflow policies.
If the queue is empty and the reader attempts to dequeue a message, it will block until a message is enqueued by a writer.
A blocked reader or writer can be unblocked by calling stop_local
. After this method is called, all threads blocked on this particular object are released and return operation_result::aborted
. The other instances of the queue (in the current or other processes) are unaffected. In order to restore the normal functioning of the queue instance after the stop_local
call the user has to invoke reset_local
.
The queue does not guarantee any particular order of received messages from different writer threads. Messages sent by a particular writer thread will be received in the order of sending.
Methods of this class are not thread-safe, unless otherwise specified.
reliable_message_queue
public
construct/copy/destructreliable_message_queue() noexcept;
Default constructor. The method constructs an object that is not associated with any message queue.
Postconditions: |
|
reliable_message_queue(open_mode::create_only_tag, object_name const & name, uint32_t capacity, size_type block_size, overflow_policy oflow_policy = block_on_overflow, permissions const & perms = permissions());
Constructor. The method is used to construct an object and create the associated message queue. The constructed object will be in running state if the message queue is successfully created.
Parameters: |
|
||||||||||
Postconditions: |
|
reliable_message_queue(open_mode::open_or_create_tag, object_name const & name, uint32_t capacity, size_type block_size, overflow_policy oflow_policy = block_on_overflow, permissions const & perms = permissions());
Constructor. The method is used to construct an object and create or open the associated message queue. The constructed object will be in running state if the message queue is successfully created or opened. If the message queue that is identified by the name already exists then the other queue parameters are ignored. The actual queue parameters can be obtained with accessors from the constructed object.
Parameters: |
|
||||||||||
Postconditions: |
|
reliable_message_queue(open_mode::open_only_tag, object_name const & name, overflow_policy oflow_policy = block_on_overflow, permissions const & perms = permissions());
Constructor. The method is used to construct an object and open the existing message queue. The constructed object will be in running state if the message queue is successfully opened.
Parameters: |
|
||||||
Postconditions: |
|
template<typename... Args> explicit reliable_message_queue(Args const &... args);
Constructor with named parameters. The method is used to construct an object and create or open the associated message queue. The constructed object will be in running state if the message queue is successfully created.
The following named parameters are accepted:
open_mode - One of the open mode tags: open_mode::create_only
, open_mode::open_only
or open_mode::open_or_create
.
name - Name of the message queue to be associated with.
capacity - Maximum number of allocation blocks the queue can hold. Used only if the queue is created.
block_size - Size in bytes of allocation block. Must be a power of 2. Used only if the queue is created.
overflow_policy - Queue behavior policy in case of overflow, see overflow_policy
.
permissions - Access permissions for the associated message queue.
Postconditions: |
|
reliable_message_queue(reliable_message_queue && that) noexcept;
Move constructor. The method move-constructs an object from other
. After the call, the constructed object becomes other
, while other
is left in default constructed state.
Parameters: |
|
reliable_message_queue & operator=(reliable_message_queue && that) noexcept;
Move assignment operator. If the object is associated with a message queue, close()
is first called and the precondition to calling close()
applies. After the call, the object becomes that while that is left in default constructed state.
Parameters: |
|
||
Returns: |
A reference to the assigned object. |
~reliable_message_queue();
Destructor. Calls close()
.
reliable_message_queue
public member functionsvoid swap(reliable_message_queue & that) noexcept;
The method swaps the object with that.
Parameters: |
|
void create(object_name const & name, uint32_t capacity, size_type block_size, overflow_policy oflow_policy = block_on_overflow, permissions const & perms = permissions());
The method creates the message queue to be associated with the object. After the call, the object will be in running state if a message queue is successfully created.
Parameters: |
|
||||||||||
Requires: |
|
||||||||||
Postconditions: |
|
void open_or_create(object_name const & name, uint32_t capacity, size_type block_size, overflow_policy oflow_policy = block_on_overflow, permissions const & perms = permissions());
The method creates or opens the message queue to be associated with the object. After the call, the object will be in running state if a message queue is successfully created or opened. If the message queue that is identified by the name already exists then the other queue parameters are ignored. The actual queue parameters can be obtained with accessors from this object after this method returns.
Parameters: |
|
||||||||||
Requires: |
|
||||||||||
Postconditions: |
|
void open(object_name const & name, overflow_policy oflow_policy = block_on_overflow, permissions const & perms = permissions());
The method opens the existing message queue to be associated with the object. After the call, the object will be in running state if a message queue is successfully opened.
Parameters: |
|
||||||
Requires: |
|
||||||
Postconditions: |
|
bool is_open() const noexcept;
Tests whether the object is associated with any message queue.
Returns: |
|
void clear();
This method empties the associated message queue. Concurrent calls to this method, send()
, try_send()
, receive()
, try_receive()
, and stop_local()
are allowed.
Requires: |
|
object_name const & name() const;
The method returns the name of the associated message queue.
Requires: |
|
Returns: |
Name of the associated message queue |
uint32_t capacity() const;
The method returns the maximum number of allocation blocks the associated message queue can hold. Note that the returned value may be different from the corresponding value passed to the constructor or open_or_create()
, for the message queue may not have been created by this object.
Requires: |
|
Returns: |
Maximum number of allocation blocks the associated message queue can hold. |
size_type block_size() const;
The method returns the allocation block size, in bytes. Each message in the associated message queue consumes an integer number of allocation blocks. Note that the returned value may be different from the corresponding value passed to the constructor or open_or_create()
, for the message queue may not have been created by this object.
Requires: |
|
Returns: |
Allocation block size, in bytes. |
void stop_local();
The method wakes up all threads that are blocked in calls to send()
or receive()
. Those calls would then return operation_result::aborted
. Note that, the method does not block until the woken-up threads have actually returned from send()
or receive()
. Other means is needed to ensure that calls to send()
or receive()
have returned, e.g., joining the threads that might be blocking on the calls.
The method also puts the object in stopped state. When in stopped state, calls to send()
or receive()
will return immediately with return value operation_result::aborted
when they would otherwise block in running state.
Concurrent calls to this method, send()
, try_send()
, receive()
, try_receive()
, and clear()
are allowed.
Requires: |
|
void reset_local();
The method puts the object in running state where calls to send()
or receive()
may block. This method is not thread-safe.
Requires: |
|
void close() noexcept;
The method disassociates the associated message queue, if any. No other threads should be using this object before calling this method. The stop_local()
method can be used to have any threads currently blocked in send()
or receive()
return, and prevent further calls to them from blocking. Typically, before calling this method, one would first call stop_local()
and then join all threads that might be blocking on send()
or receive()
to ensure that they have returned from the calls. The associated message queue is destroyed if the object represents the last outstanding reference to it.
Postconditions: |
|
operation_result send(void const * message_data, size_type message_size);
The method sends a message to the associated message queue. When the object is in running state and the queue has no free space for the message, the method either blocks or throws an exception, depending on the overflow policy that was specified on the queue opening/creation. If blocking policy is in effect, the blocking can be interrupted by calling stop_local()
, in which case the method returns operation_result::aborted
. When the object is already in the stopped state, the method does not block but returns immediately with return value operation_result::aborted
.
It is possible to send an empty message by passing 0
to the parameter message_size
.
Concurrent calls to send()
, try_send()
, receive()
, try_receive()
, stop_local()
, and clear()
are allowed.
Throws: std::logic_error
in case if the message size exceeds the queue capacity, system_error
in case if a native OS method fails.
Parameters: |
|
||||
Requires: |
|
bool try_send(void const * message_data, size_type message_size);
The method performs an attempt to send a message to the associated message queue. The method is non-blocking, and always returns immediately. boost::system::system_error
is thrown for errors resulting from native operating system calls. Note that it is possible to send an empty message by passing 0
to the parameter message_size
. Concurrent calls to send()
, try_send()
, receive()
, try_receive()
, stop_local()
, and clear()
are allowed.
Throws: std::logic_error
in case if the message size exceeds the queue capacity, system_error
in case if a native OS method fails.
Parameters: |
|
||||
Requires: |
|
||||
Returns: |
|
operation_result receive(void * buffer, size_type buffer_size, size_type & message_size);
The method takes a message from the associated message queue. When the object is in running state and the queue is empty, the method blocks. The blocking is interrupted when stop_local()
is called, in which case the method returns operation_result::aborted
. When the object is already in the stopped state and the queue is empty, the method does not block but returns immediately with return value operation_result::aborted
.
Concurrent calls to send()
, try_send()
, receive()
, try_receive()
, stop_local()
, and clear()
are allowed.
Parameters: |
|
||||||
Requires: |
|
template<typename ElementT, size_type SizeV> operation_result receive(ElementT(&) buffer, size_type & message_size);
The method takes a message from the associated message queue. When the object is in running state and the queue is empty, the method blocks. The blocking is interrupted when stop_local()
is called, in which case the method returns operation_result::aborted
. When the object is already in the stopped state and the queue is empty, the method does not block but returns immediately with return value operation_result::aborted
.
Concurrent calls to send()
, try_send()
, receive()
, try_receive()
, stop_local()
, and clear()
are allowed.
Parameters: |
|
||||
Requires: |
|
template<typename ContainerT> operation_result receive(ContainerT & container);
The method takes a message from the associated message queue. When the object is in running state and the queue is empty, the method blocks. The blocking is interrupted when stop_local()
is called, in which case the method returns operation_result::aborted
. When the object is already in the stopped state and the queue is empty, the method does not block but returns immediately with return value operation_result::aborted
.
Concurrent calls to send()
, try_send()
, receive()
, try_receive()
, stop_local()
, and clear()
are allowed.
Parameters: |
|
||
Requires: |
|
bool try_receive(void * buffer, size_type buffer_size, size_type & message_size);
The method performs an attempt to take a message from the associated message queue. The method is non-blocking, and always returns immediately.
Concurrent calls to send()
, try_send()
, receive()
, try_receive()
, stop_local()
, and clear()
are allowed.
Parameters: |
|
||||||
Requires: |
|
||||||
Returns: |
|
template<typename ElementT, size_type SizeV> bool try_receive(ElementT(&) buffer, size_type & message_size);
The method performs an attempt to take a message from the associated message queue. The method is non-blocking, and always returns immediately.
Concurrent calls to send()
, try_send()
, receive()
, try_receive()
, stop_local()
, and clear()
are allowed.
Parameters: |
|
||||
Requires: |
|
||||
Returns: |
|
template<typename ContainerT> bool try_receive(ContainerT & container);
The method performs an attempt to take a message from the associated message queue. The method is non-blocking, and always returns immediately.
Concurrent calls to send()
, try_send()
, receive()
, try_receive()
, stop_local()
, and clear()
are allowed.
Parameters: |
|
||
Requires: |
|
||
Returns: |
|
reliable_message_queue
friend functionsfriend void swap(reliable_message_queue & a, reliable_message_queue & b) noexcept;Swaps the two
reliable_message_queue
objects. reliable_message_queue
public static functionsstatic void remove(object_name const & name);
The method frees system-wide resources, associated with the interprocess queue with the supplied name. The queue referred to by the specified name must not be opened in any process at the point of this call. After this call succeeds a new queue with the specified name can be created.
This call can be useful to recover from an earlier process misbehavior (e.g. a crash without properly closing the message queue). In this case resources allocated for the interprocess queue may remain allocated after the last process closed the queue, which in turn may prevent creating a new queue with the same name. By calling this method before creating a queue the application can attempt to ensure it starts with a clean slate.
On some platforms resources associated with the queue are automatically reclaimed by the operating system when the last process using those resources terminates (even if it terminates abnormally). On these platforms this call may be a no-op. However, portable code should still call this method at appropriate places to ensure compatibility with other platforms and future library versions, which may change implementation of the queue.
Parameters: |
|