An Introduction to asionet

Asynchronous network programming in C++ made easy.
\author Philipp Badenhoop
\date Tue, Jan 8, 2019
\tags c++, network, boost, asionet

Introduction

In case you’ve ever done some network programming in C++, you probably stumbled upon the quasi-standard boost::asio library. It uses asynchronous programming making it scalable but on the other side, it takes quite some time to learn how to use it correctly.
asionet is built on top of boost::asio which makes it 100% compatible with it but easier to use at the same time. For example, managing timeouts and sending and receiving serialized messages is done with only a few lines of code.

Here is the link to the github repository.

Prerequisites

In order to use the library, you have to compile with the C++14 standard and make sure to include Boost 1.66 and your system’s thread library in your project. In your CMakeLists.txt, insert:

1set(CMAKE_CXX_STANDARD 14)
2find_package(Boost REQUIRED COMPONENTS system regex)
3find_package(Threads)
4link_libraries(${Boost_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})

Installation

Get the repository, build and install it.

1$ git clone https://github.com/Badenhoop/asionet
2$ cd asionet
3$ mkdir build
4$ cd build
5$ cmake ..
6$ sudo make install

Usage

Just insert the following into your CMakeLists.txt file:

1find_package(asionet)
2link_libraries(asionet)

Tutorial

Receiving string messages over UDP

The code below listens to port 4242 for receiving a UDP datagram with timeout 1 second.

 1// Just a typedef of boost::asio::io_context (aka io_service).
 2asionet::Context context;
 3// A thread which runs the context object and dispatches asynchronous handlers.
 4asionet::Worker worker{context};
 5// UDP datagram receiver operating on port 4242.
 6asionet::DatagramReceiver<std::string> receiver{context, 4242};
 7// Receive a string message with timeout 1 second.
 8receiver.asyncReceive(1s, [](const asionet::error::Error & error, 
 9                             std::string & message,
10                             const boost::asio::ip::udp::endpoint & senderEndpoint) 
11{
12    if (error) return;
13    std::cout << "received: " << message << "\n"
14              << "host: " << senderEndpoint.address().to_string() << "\n" 
15              << "port: " << senderEndpoint.port() << "\n"; 
16});

Sending string messages over UDP

The following code sends a UDP message containing the string “Hello World!” to IP 127.0.0.1 port 4242 with operation timeout 10ms.

1asionet::DatagramSender<std::string> sender{context};
2sender.asyncSend("Hello World!", "127.0.0.1", 4242, 10ms, [](const asionet::error::Error & error)
3{
4    if (error)
5        // handle error ...
6});

Defining custom messages

Wouldn’t it be nice to just send your own data types as messages over the network? Let’s assume we want to program the client for an online game so we have to send updates about our player’s state.

1struct PlayerState
2{
3    std::string name;
4    float posX;
5    float posY;
6    float health;
7};

Now we could replace the template parameter from std::string with PlayerState to tell DatagramSender to send PlayerState objects:

1asionet::DatagramSender<PlayerState> sender{context};
2PlayerState playerState{"WhatAPlayer", 0.15f, 1.7f, 0.1f};
3sender.asyncSend(playerState, "127.0.0.1", 4242, 10ms);

The only thing for that to work is to tell asionet how to serialize a PlayerState object into a string of bytes which is simply represented as a string. Therefore, we could just use the nlohmann json library which is an amazing piece of work by the way.

 1namespace asionet { namespace message {
 2
 3template<>
 4struct Encoder<PlayerState>
 5{
 6    void operator()(const PlayerState & playerState, std::string & data) const
 7    {
 8        auto j = nlohmann::json{ {"name", playerState.name },
 9                                 {"xPos", playerState.xPos },
10                                 {"yPos", playerState.yPos },
11                                 {"health", playerState.health } };
12        data = j.dump();
13    }
14};
15
16}}

Here we have to create a template specialization of the asionet::message::Encoder object. The call operator takes a PlayerState reference as input and expects the data reference to be assigned to the byte string that should be transmitted over the network.

Since we can now send PlayerState objects, we cover the server side next. Therefore, we have to specialize the asionet::message::Decoder struct to retrieve the PlayerState object from a buffer object.

 1namespace asionet { namespace message {
 2
 3template<>
 4struct Decoder<PlayerState>
 5{
 6    template<typename ConstBuffer>
 7    void operator()(const ConstBuffer & buffer, PlayerState & playerState) const
 8    {
 9        std::string s{buffer.begin(), buffer.end()};
10        auto j = nlohmann::json::parse(s);
11        playerState = PlayerState{
12            j.at("name").get<std::string>(),
13            j.at("xPos").get<float>(),
14            j.at("yPos").get<float>(),
15            j.at("health").get<float>()
16        };
17    }
18};
19
20}}

Note that we have to define the call operator which takes a template argument and the message to be decoded. So what exactly is a ConstBuffer? Since it’s a template argument, a ConstBuffer is not an actual class but instead represents an abstract buffer interface. This interface provides four methods:

By using this abstraction, asionet may internally use the most suitable buffer representation for a specific operation.

Finally, we can set up the UDP receiver as follows:

1asionet::DatagramReceiver<PlayerState> receiver{context, 4242};
2receiver.asyncReceive(1s, [](const auto & error, 
3                             auto & playerState,
4                             auto & senderEndpoint) 
5{
6    if (error) return;
7    std::cout << "player: " << playerState.name << "\n"; 
8});

Services

A common network pattern consists of sending a request to a server which reacts by sending a response back to the client. This happens in http for instance. Using asionet, it’s easy to implement this pattern.

Assume that we want to create a server which delivers chat messages based on a query. The query consists of two user-IDs defining the chat and the number of most recent messages that should be delivered. Let’s create some classes to model this scenario.

 1struct Query
 2{
 3    unsigned long user1;
 4    unsigned long user2;
 5    unsigned int numRequestedMessages;
 6};
 7
 8struct ChatMessage
 9{
10    unsigned long author;
11    std::string content;
12};
13
14struct Response
15{
16    std::vector<ChatMessage> messages;
17};

Next, we have to specialize the Encoder/Decoder classes for the Query and Response types. Since this works exactly as shown above using the PlayerState class, we just jump over that.

Now, we have to create a service description:

1struct ChatService
2{
3    using RequestMessage = Query;
4    using ResponseMessage = Response;
5}

To create a server which receives incoming requests:

1asionet::ServiceServer<ChatService> server{context, 4242};
2server.advertiseService([](const boost::asio::ip::tcp::endpoint & senderEndpoint, 
3                           Query & query,
4                           Response & response) 
5{
6    std::cout << "Requesting " << query->numRequestedMessages << " messages\n";
7    response = /* create your response */
8});
That’s it. Simple, right?

Finally, calling the server on the client side looks like this:

1asionet::ServiceClient<ChatService> client{context};
2client.asyncCall(
3    Query{10, 12, 50}, "mychatserver.com", 4242, 10s, 
4    [](const asionet::error::Error & error, Response & response) 
5    {
6           if (error) return;
7           for (const auto & message : response.messages)
8                std::cout << message.author << " wrote: " << message.content << "\n";
9    });

Ensuring thread-safety

An important advantage of asynchronous programming is that it is easier to write thread-safe code. Imagine all asynchronous handlers are invoked from a single thread. Then there’s no need for explicit locking of shared state between the handlers since everything is running in sequence (not concurrently).

However, running only a single thread may not be an option as we want to benefit from being able to run things in parallel. Therefore, we can wrap handlers inside a WorkSerializer object which guarantees that handlers that are wrapped inside the WorkSerializer are executed in sequence. In fact WorkSerializer just inherits from boost::asio::io_context::strand and can be used in exactly the same manner.

Let’s consider this example:

 1asionet::Context context;
 2// Create 4 threads that are concurrently dispatching handlers from the context object.
 3asionet::WorkerPool workers{context, 4};
 4std::size_t counter = 0;
 5for (std::size_t i = 0; i < 1000000; ++i)
 6{
 7    // Post a handler that increments the counter.
 8    context.post([&] { counter++; });
 9}
10sleep(/* long enough */);
11std::cout << counter;

If you are familiar with concurrency problems, you are not surprised that the outcome is very likely NOT 1000000. We can either fix this by making counter atomic or we could employ a WorkSerializer:

 1asionet::Context context;
 2// Create 4 threads that are concurrently dispatching handlers from the context object.
 3asionet::WorkerPool workers{context, 4};
 4asionet::WorkSerializer{context} serializer;
 5std::size_t counter = 0;
 6for (std::size_t i = 0; i < 1000000; ++i)
 7{
 8    // Post a handler that increments the counter.
 9    // Now the handler is wrapped by the WorkSerializer.
10    context.post(serializer([&] { counter++; }));
11    // Alternatively, use:
12    // serializer.post([&] { counter++; });
13}
14sleep(/* long enough */);
15std::cout << counter;

We just use the WorkSerializer’s call operator by taking the handler as input and the output should be 1000000 now. So whenever you want your handlers to not run concurrently, just wrap them inside the SAME WorkSerializer object.

… and of course, in this particular example, there’s nothing else that is executed so we could have used only a single worker instead to make it thread-safe. But imagine you would also have other asynchronous operations running next to those which increment the counter. Then, all other handlers would still be running concurrently if they are not wrapped inside a WorkSerializer.

And finally, if you have two WorkSerializer objects s1 and s2, they don’t care about each other meaning that handlers wrapped inside s1 are running concurrently to handlers wrapped inside s2.

Lifetime management

We silently ignored the dangerous dangling references problem in the code snippets above which can be easily overlooked. The problem with running objects in handlers is that by the time a handler is executed, its containing objects could be already destructed.

This is made clear by the following example:

1asionet::Context context;
2asionet::Worker worker{context};
3
4{
5    std::string text = "This goes out of scope!";
6    context.post([&] { std::cout << text; });
7}
8
9// Do something else...

Here, ‘text’ could be already destructed by the time the posted handler executes since this happens on a different thread. When accessing an invalid reference, the behavior is undefined. Those types of bugs can be extremely hard to debug. Therefore, we need some coding practice to systematically avoid this issue.

A good solution to the example above is to use a shared_ptr and pass that inside the lambda capture of the handler.

1asionet::Context context;
2asionet::Worker worker{context};
3
4{
5    auto text = std::make_shared<std::string>("I don't mind going out of scope!");
6    context.post([text] { std::cout << *text; });
7}
8
9// Do something else...

However, it can be tedious and of bad performance to make every object a shared_ptr. Therefore, we could also use the shared_from_this pattern:

 1class ComplexObject : public std::enable_shared_from_this<ComplexObject>
 2{
 3public:
 4    ComplexObject(asionet::Context & context)
 5        : context(context), sender(context), receier(context, 4242) {}
 6
 7    void run()
 8    {
 9        // Get a shared_ptr of 'this'.
10        auto self = shared_from_this();
11        // Pass self inside the capture.
12        receiver.receive(10s, [self] { /* Safe! */ });
13    }
14
15private:
16    asionet::DatagramSender<std::string> sender;
17    asionet::DatagramReceiver<std::string> receiver;
18    // more state ...
19};

When using ComplexObject, you have to instantiate it in a shared_ptr:

1auto complexObject = std::make_shared<ComplexObject>(context);
2complexObject->run();

Even if complexObject leaves its scope, any handlers invoked inside run() which capture the self pointer will not suffer from the dangling references problem.

Waiting

Sometimes you want to wait for one or more events to complete. Consider the following:

1asionet::Context context;
2asionet::WorkerPool workers{context, 4};
3
4context.post([] { /* Operation 1 */ });
5context.post([] { /* Operation 2 */ });
6context.post([] { /* Operation 3 */ });
7
8// Objective: wait until all operation 1, 2 and 3 are done.

Of course, we could mess around with atomic booleans or mutexes again but if your program gets more complex, we want something more elegant. asionet provides the Waiter and Waitable classes for this purpose:

 1asionet::Context context;
 2asionet::WorkerPool workers{context, 4};
 3
 4asionet::Waiter{context} w;
 5asionet::Waitable w1{w}, w2{w}, w3{w};
 6
 7context.post(w1([] { /* Operation 1 */ }));
 8context.post(w2([] { /* Operation 2 */ }));
 9context.post(w3([] { /* Operation 3 */ }));
10
11w.await(w1 && w2 && w3);

Just like the WorkSerializer object, a Waitable wraps its corresponding handler and notifies its Waiter object when the handler finishes execution. The Waiter object can then await an expression of Waitable objects. In this case, we want to wait until all Waitable objects are ready which is represented by the chain of &&-operators. Instead, we could also wait until any handler finishes execution which would be done with:

1w.await(w1 || w2 || w3);

Or we could say that at least two of them should be ready:

1w.await((w1 && w2) || (w1 && w3) || (w2 && w3));

You can also set the state of a Waitable object directly:

1w1.setReady();

If you want to reuse the waitable objects, you have to set their state to waiting again:

1w.await(w1 && w2 && w3);
2// Reset states.
3w1.setWaiting();
4w2.setWaiting();
5w3.setWaiting();

Compatibility with boost::asio

As already mentioned, asionet was designed to be seamlessly usable with existing boost::asio code. For example, we can send and receive messages with boost::asio::ip::tcp::socket objects directly without having to use the ServiceServer or ServiceClient object:

1asionet::Context & context;
2boost::asio::ip::tcp::socket socket{context};
3boost::asio::ip::tcp::endpoint endpoint{
4    boost::asio::ip::address::from_string("1.2.3.4"), 4242};
5socket.connect(endpoint);
6// Send the message over the socket.
7asionet::message::asyncSend(socket, PlayerState{"name", 1.f, 0.f, 0.5f}, 1s, [](auto && ...){});