Stefan Scherfke

Designing and Testing PyZMQ Applications – Part 2

This is the second part of the series Designing and Testing PyZMQ Applications. In the first part, I wrote about designing a PyZMQ application, so this time it’s all about (unit) testing (remember, if it’s not tested, it’s broken). I also updated the repository for this article with the new code examples.

My favorite testing tools are pytest by Holger Krekel and Mock by Michael Ford. Pytest is particularly awesome because of its re-evaluation of assert statements. If your test contains an assert spam == 'eggs' and the assert fails, pytest re-evaluates it and prints the value of spam. Really helpful and you don’t need any boilerplate code for that. Mock is really nice for mocking external dependencies and asserting that your code called them in the correct way.

If you cloned the repository for this article, just run py.test from its root directory:

$ pip install pytest mock
Successfully installed pytest mock
Cleaning up...
$ py.test
=================== test session starts ====================
platform darwin -- Python 3.2.2 -- pytest-2.2.3
collected 11 items

example_app/test/ ....
example_app/test/ .......

================ 11 passed in 0.12 seconds =================

Unit Testing

The probability that PyZMQ works correctly is very high. The probability that your code will call a PyZMQ function in such a way that it blocks forever and halts your test runner is also very high. Therefore, it’s a good idea to mock everything PyZMQ-related for your unit tests. And since your application logic might also not be implemented when you start testing your process, you should mock that, too.

What you’ll actually end up testing is the following:

  • Does your message handler call your application logic in the right way given a certain input message?
  • Does your message handler create and send the correct reply based on the return value of your application logic?


Let’s start with ZmqProcess again. After all, everything else depends on it. Testing its setup method is easy. We just check that it creates a context and a loop:

# example_app/test/
from zmq.eventloop import ioloop
import mock
import pytest
import zmq

import zmqproc

class TestZmqProcess(object):
    """Tests for :class:`base.ZmqProcess`."""

    def test_setup(self):
        zp = base.ZmqProcess()

        assert isinstance(zp.context, zmq.Context)
        assert isinstance(zp.loop, ioloop.IOLoop)

Testing stream is more complicated. We need to test if it can handle various address formats, if it creates or binds correctly and if it performs a default subscription for SUB sockets.

Pytest 2.2 introduced a parametrize decorator, that helps calling a test multiple times with varying inputs. You just define one or more arguments for your test function and a list of values for these arguments. For test_stream, I only need a kwargs parameter containing the parameters for the stream call:

# example_app/test/

    @pytest.mark.parametrize('kwargs', [
        dict(sock_type=23, addr='', bind=True,
        dict(sock_type=23, addr='', bind=True,
        dict(sock_type=zmq.SUB, addr=('localhost', 1234), bind=False,
              callback=mock.Mock(), subscribe=b'ohai'),
    def test_stream(self, kwargs):

The next step is to create an instance of ZmqProcess and patch some of its attributes. We also need to set a defined return value for the socket’s bind_to_random_port method:

# example_app/test/

        zp = base.ZmqProcess()

        # Patch the ZmqProcess instance
        zp.context = mock.Mock(spec_set=zmq.Context)
        zp.loop = mock.Mock(spec_set=ioloop.IOLoop)
        sock_mock = zp.context.socket.return_value
        sock_mock.bind_to_random_port.return_value = 42

For the actual test, we also need to patch ZMQStream. Although mock.patch could work as a function decorator, we need to use it as context processor if we also uses pytest funcargs (e.g., via the parametrize decorator—I don’t know if it’s even possible to uses both, mock.patch as decorator and pytest funcargs in one test).

# example_app/test/

        # Patch ZMQStream and start testing
        with mock.patch('zmq.eventloop.zmqstream.ZMQStream') as zmqstream_mock:
            stream, port =**kwargs)

Finally, we can check the return values of our stream method and it made the correct calls to create the stream:

# example_app/test/

            # Assert that the return values are correct
            assert stream is zmqstream_mock.return_value
            if isinstance(kwargs['addr'], tuple):
                assert port == kwargs['addr'][1]
            elif ':' in kwargs['addr']:
                assert port == int(kwargs['addr'][-4:])
                assert port == sock_mock.bind_to_random_port.return_value

            # Check that the socket was crated correctly
            assert zp.context.socket.call_args == ((kwargs['sock_type'],), {})
            if kwargs['bind'] and ':' in kwargs['addr']:
                assert sock_mock.bind.call_args == (
                        ('tcp://%s' % kwargs['addr'],), {})
            elif kwargs['bind']:
                assert sock_mock.bind_to_random_port.call_args == (
                        ('tcp://%s' % kwargs['addr'],), {})
                assert sock_mock.connect.call_args == (
                        ('tcp://%s:%s' % kwargs['addr'],), {})

            # Check creation of the stream
            assert zmqstream_mock.call_args == ((sock_mock, zp.loop), {})
            assert zmqstream_mock.return_value.on_recv.call_args == (
                    (kwargs['callback'],), {})

            # Check default subscribtion
            if 'subscribe' in kwargs:
                assert sock_mock.setsockopt.call_args == (
                        (zmq.SUBSCRIBE, kwargs['subscribe']), {})

Note: You may have noticed that I use assert my_mock.call_args == ... rather than my_mock.assert_called_with(...). The reason for that is simply, that assert statements are highlighted but ordinary function calls are not. This makes it easier for me to find all assertions in a test.


The MessageHandler base class has only one methd, __call__, but I split the test for it into two methods—one that tests the JSON-loading functionality and one that checks if the correct handler method is called:

# example_app/test/

class TestMessageHandler(object):
    """Tests for :class:`base.TestMessageHandler`."""

    @pytest.mark.parametrize(('idx', 'msg'), [
        (-1, [23, b'["test", null]']),
        (1, [23, b'["test", "spam"]', 42]),
        (TypeError, [23, 42]),
        (ValueError, [23, b'["test"]23spam']),
    def test_call_json_load(self, idx, msg):
        handler = mock.Mock()
        mh = base.MessageHandler(idx if isinstance(idx, int) else -1)
        mh.test = handler

        if isinstance(idx, int):
            assert handler.call_count == 1
            pytest.raises(idx, mh, msg)

    @pytest.mark.parametrize(('ok', 'msg'), [
        (True, [23, b'["test", "spam"]', 42]),
        (AttributeError, [23, b'["_test", "spam"]', 42]),
        (TypeError, [23, b'["spam", "spam"]', 42]),
        (AttributeError, [23, b'["eggs", "spam"]', 42]),
    def test_call_get_handler(self, ok, msg):
        handler = mock.Mock()
        mh = base.MessageHandler(1)
        mh.test = handler
        mh.spam = 'spam'

        if ok is True:
            assert handler.call_args == (
                    (msg[0], 'spam', msg[2]), {})
            pytest.raises(ok, mh, msg)


Testing the PongProc is not much different from testing its base class. pytest_funcarg__pp will instantiate a PongProc instance for each test that has a pp argument. The tests for setup, run and stop are easy to do. We create a few mocks and then ask them if the tested function called them correctly:

# example_app/test/
from zmq.utils import jsonapi as json
import mock, pytest, zmq

import pongproc

host, port = '', 5678

def pytest_funcarg__pp(request):
    """Creates a PongProc instance."""
    return pongproc.PongProc((host, port))

class TestPongProc(object):
    """Tests :class:`pongproc.PongProc`."""

    def test_setup(self, pp):
        def make_stream(*args, **kwargs):
            stream = mock.Mock()
            stream.type = args[0]
            return stream, mock.Mock() = mock.Mock(side_effect=make_stream)

        with mock.patch('base.ZmqProcess.setup') as setup_mock:
            assert setup_mock.call_count == 1

        assert == [
            ((zmq.REP, (host, port)), dict(bind=True)),
        assert pp.rep_stream.type == zmq.REP

        # Test if the message handler was configured correctly
        rsh = pp.rep_stream.on_recv.call_args[0][0]  # Get the msg handler
        assert rsh._rep_stream == pp.rep_stream
        assert rsh._stop == pp.stop

    def test_run(self, pp):
        pp.setup = mock.Mock()
        pp.loop = mock.Mock()

        assert pp.setup.call_count == 1
        assert pp.loop.start.call_count == 1

    def test_stop(self, pp):
        pp.loop = mock.Mock()
        assert pp.loop.stop.call_count == 1


Testing the actual message handler requires some mocks, but is apart from that straight forward. A funcarg method creates an instance of the message handler for each test case which we feed with a message. We than check if the application logic was called correctly and/or if a correct reply is sent:

# example_app/test/

def pytest_funcarg__rsh(request):
    """Creates a RepStreamHandler instance."""
    return pongproc.RepStreamHandler(

class TestRepStreamHandler(object):
    def test_ping(self, rsh):
        msg = ['ping', 1]
        retval = 'spam'
        rsh._ping_handler = mock.Mock(spec_set=pongproc.PingHandler)
        rsh._ping_handler.make_pong.return_value = retval


        assert rsh._ping_handler.make_pong.call_args == ((msg[1],), {})
        assert rsh._rep_stream.send_json.call_args == ((retval,), {})

    def test_plzdiekthybye(self, rsh):
        rsh([b'["plzdiekthxbye", null]'])
        assert rsh._stop.call_count == 1


When we are done with all that network stuff, we can finally test the application logic. Easy-peasy in our case:

# example_app/test/

def pytest_funcarg__ph(request):
    """Creates a PingHandler instance."""
    return pongproc.PingHandler()

class TestPingHandler(object):
    def test_make_pong(self, ph):
        ping_num = 23
        ret = ph.make_pong(ping_num)
        assert ret == ['pong', ping_num]


Thanks to the Mock library, unit testing PyZMQ apps is really not that hard and not much different from normal unit testing. However, what we know now is only, that our process should work in theory. We haven’t yet started it and sent real messages to it.

The next and final part of this series will show you how you can automate testing complete processes. Until then, you should get your test coverage up to 100% to protect yourself from nasty surprises when you start with process testing.