Home > C++ > Move In, Move Out

Move In, Move Out

In many application domains, the Producer-Queue-Consumer pattern is used to transport data from input to output within a multi-threaded program:

producerconsumer

The ProducerThread creates Messages in accordance with the application’s requirements and pushes pointers to them into a lock-protected queue. The ConsumerThread, running asynchronous to the ProducerThread, pops sequences of Message pointers from the queue and processes them accordingly. The ConsumerThread may be notified by the ProducerThread when one or more Messages are available for processing or it can periodically poll the queue.

Instead of passing Message pointers, Message copies can be passed between the threads. However, copying the content can be expensive for large messages.

When using pointers to pass messages between threads, the memory to hold the data content must come from somewhere. One way to provide this memory is to use a Message buffer pool allocated on startup.

MsgBufferPool

Another, simpler way that avoids the complexity of managing a Message buffer pool, is to manually “new” up the memory in the ProducerThread and then manually “delete” memory in the ConsumerThread.

newdelete

Since the introduction of smart pointers in C++11, a third way of communicating messages between threads is to “movestd::unique_ptrs into and out of the InterThreadQueue:

moveinmoveout

The advantage of using smart pointers is that no “deletes” need to be manually written in the ConsumerThread code.

The following code shows the implementation and usage of a simple InterThreadQueue that moves std::unique_ptrs into and out of a lock protected std::deque.

InterthreadQueueImpl

InterThreadQueueUsage

#include "catch.hpp"
#include <memory>
#include <deque>
#include <mutex>
#include <vector>
#include <stdexcept>

template<typename Msg>
class InterThreadQueue {
public:
  InterThreadQueue(int32_t capacity) :
    _capacity(capacity) {}

  void push(std::unique_ptr<Msg> msg) {
    std::lock_guard<std::mutex> lg(_mtx);
    if(_queue.size() not_eq _capacity) {
      _queue.push_back(std::move(msg));
    }
    else {
      throw std::runtime_error{"Capacity Exceeded"};
    }
  }

  std::vector<std::unique_ptr<Msg>> pop() {
    std::vector<std::unique_ptr<Msg>> msgs{};
    std::lock_guard<std::mutex> lg(_mtx);
    while(not _queue.empty()) {
      msgs.emplace_back(std::move(_queue.front()));
      _queue.pop_front();
    }
    return msgs; //Move the vector to the caller
  }

private:
  mutable std::mutex _mtx{};
  const std::size_t _capacity;
  std::deque<std::unique_ptr<Msg>> _queue;
};

TEST_CASE( "InterThreadQueue" ) {
  //Create our object under test
  InterThreadQueue<int32_t> itq{2};

  //Note: my compiler version doesn't have std::make_unique<T>()
  std::unique_ptr<int32_t> dataIn{new int32_t{5}};
  itq.push(std::move(dataIn));
  dataIn = std::unique_ptr<int32_t>{new int32_t{10}};
  itq.push(std::move(dataIn));

  dataIn = std::unique_ptr<int32_t>{new int32_t{15}};
  //Queue capacity is only 2
  REQUIRE_THROWS(itq.push(std::move(dataIn)));

  auto dataOut = itq.pop();
  REQUIRE(2 == dataOut.size());
  REQUIRE(5 == *dataOut[0]);
  REQUIRE(10 == *dataOut[1]);

  REQUIRE(0 == itq.pop().size());
}
Categories: C++ Tags: ,
  1. TG
    August 19, 2016 at 11:26 am

    Why do you use a deque for _queue? If you used a std::vector, you could simplify your ::pop() method to basically
    {
    std::lock_guard lck(_mutex);
    std::vector<…> ret;
    std::swap(ret, _queue);
    return ret;
    }

    • August 19, 2016 at 1:21 pm

      Simply because I didn’t think of using a vector. You’re swap idea is a terrific suggestion. Thanks for the optimization!

  1. No trackbacks yet.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: