Designing and Testing PyZMQ Applications – Part 2
Posted on
This is the second part of the series Designing and Testing PyZMQ Applications. In the first part, I wrote about designing a PyZMQ application, so this time it’s all about (unit) testing (remember, if it’s not tested, it’s broken). I also updated the repository for this article with the new code examples.
My favorite testing tools are pytest by Holger
Krekel and Mock by Michael Ford.
Pytest is particularly awesome because of its re-evaluation of assert
statements. If your test contains an assert spam == 'eggs'
and the assert
fails, pytest re-evaluates it and prints the value of spam
. Really helpful
and you don’t need any boilerplate code for that. Mock is really nice for
mocking external dependencies and asserting that your code called them in the
correct way.
If you cloned the repository for this article, just run py.test
from its root directory:
$ pip install pytest mock
...
Successfully installed pytest mock
Cleaning up...
$ py.test
=================== test session starts ====================
platform darwin -- Python 3.2.2 -- pytest-2.2.3
collected 11 items
example_app/test/test_base.py ....
example_app/test/test_pongproc.py .......
================ 11 passed in 0.12 seconds =================
Unit Testing
The probability that PyZMQ works correctly is very high. The probability that your code will call a PyZMQ function in such a way that it blocks forever and halts your test runner is also very high. Therefore, it’s a good idea to mock everything PyZMQ-related for your unit tests. And since your application logic might also not be implemented when you start testing your process, you should mock that, too.
What you’ll actually end up testing is the following:
- Does your message handler call your application logic in the right way given a certain input message?
- Does your message handler create and send the correct reply based on the return value of your application logic?
ZmqProcess
Let’s start with ZmqProcess
again. After all, everything else depends on it.
Testing its setup method is easy. We just check that it creates a context
and a loop:
# example_app/test/test_zmqproc.py
from zmq.eventloop import ioloop
import mock
import pytest
import zmq
import zmqproc
class TestZmqProcess(object):
"""Tests for :class:`base.ZmqProcess`."""
def test_setup(self):
zp = base.ZmqProcess()
zp.setup()
assert isinstance(zp.context, zmq.Context)
assert isinstance(zp.loop, ioloop.IOLoop)
Testing stream is more complicated. We need to test if it can handle various address formats, if it creates or binds correctly and if it performs a default subscription for SUB sockets.
Pytest 2.2 introduced a parametrize decorator, that helps calling a test multiple times with varying inputs. You just define one or more arguments for your test function and a list of values for these arguments. For test_stream, I only need a kwargs parameter containing the parameters for the stream call:
# example_app/test/test_zmqproc.py
@pytest.mark.parametrize('kwargs', [
dict(sock_type=23, addr='127.0.0.1:1234', bind=True,
callback=mock.Mock()),
dict(sock_type=23, addr='127.0.0.1', bind=True,
callback=mock.Mock()),
dict(sock_type=zmq.SUB, addr=('localhost', 1234), bind=False,
callback=mock.Mock(), subscribe=b'ohai'),
])
def test_stream(self, kwargs):
The next step is to create an instance of ZmqProcess and patch some of its attributes. We also need to set a defined return value for the socket’s bind_to_random_port method:
# example_app/test/test_zmqproc.py
zp = base.ZmqProcess()
# Patch the ZmqProcess instance
zp.context = mock.Mock(spec_set=zmq.Context)
zp.loop = mock.Mock(spec_set=ioloop.IOLoop)
sock_mock = zp.context.socket.return_value
sock_mock.bind_to_random_port.return_value = 42
For the actual test, we also need to patch ZMQStream. Although mock.patch could work as a function decorator, we need to use it as context processor if we also uses pytest funcargs (e.g., via the parametrize decorator—I don’t know if it’s even possible to uses both, mock.patch as decorator and pytest funcargs in one test).
# example_app/test/test_zmqproc.py
# Patch ZMQStream and start testing
with mock.patch('zmq.eventloop.zmqstream.ZMQStream') as zmqstream_mock:
stream, port = zp.stream(**kwargs)
Finally, we can check the return values of our stream method and it made the correct calls to create the stream:
# example_app/test/test_zmqproc.py
# Assert that the return values are correct
assert stream is zmqstream_mock.return_value
if isinstance(kwargs['addr'], tuple):
assert port == kwargs['addr'][1]
elif ':' in kwargs['addr']:
assert port == int(kwargs['addr'][-4:])
else:
assert port == sock_mock.bind_to_random_port.return_value
# Check that the socket was crated correctly
assert zp.context.socket.call_args == ((kwargs['sock_type'],), {})
if kwargs['bind'] and ':' in kwargs['addr']:
assert sock_mock.bind.call_args == (
('tcp://%s' % kwargs['addr'],), {})
elif kwargs['bind']:
assert sock_mock.bind_to_random_port.call_args == (
('tcp://%s' % kwargs['addr'],), {})
else:
assert sock_mock.connect.call_args == (
('tcp://%s:%s' % kwargs['addr'],), {})
# Check creation of the stream
assert zmqstream_mock.call_args == ((sock_mock, zp.loop), {})
assert zmqstream_mock.return_value.on_recv.call_args == (
(kwargs['callback'],), {})
# Check default subscribtion
if 'subscribe' in kwargs:
assert sock_mock.setsockopt.call_args == (
(zmq.SUBSCRIBE, kwargs['subscribe']), {})
Note: You may have noticed that I use assert my_mock.call_args == ...
rather than my_mock.assert_called_with(...)
. The reason for that is
simply, that assert statements are highlighted but ordinary function calls are
not. This makes it easier for me to find all assertions in a test.
MessageHandler
The MessageHandler base class has only one methd, __call__, but I split the test for it into two methods—one that tests the JSON-loading functionality and one that checks if the correct handler method is called:
# example_app/test/test_base.py
class TestMessageHandler(object):
"""Tests for :class:`base.TestMessageHandler`."""
@pytest.mark.parametrize(('idx', 'msg'), [
(-1, [23, b'["test", null]']),
(1, [23, b'["test", "spam"]', 42]),
(TypeError, [23, 42]),
(ValueError, [23, b'["test"]23spam']),
])
def test_call_json_load(self, idx, msg):
handler = mock.Mock()
mh = base.MessageHandler(idx if isinstance(idx, int) else -1)
mh.test = handler
if isinstance(idx, int):
mh(msg)
assert handler.call_count == 1
else:
pytest.raises(idx, mh, msg)
@pytest.mark.parametrize(('ok', 'msg'), [
(True, [23, b'["test", "spam"]', 42]),
(AttributeError, [23, b'["_test", "spam"]', 42]),
(TypeError, [23, b'["spam", "spam"]', 42]),
(AttributeError, [23, b'["eggs", "spam"]', 42]),
])
def test_call_get_handler(self, ok, msg):
handler = mock.Mock()
mh = base.MessageHandler(1)
mh.test = handler
mh.spam = 'spam'
if ok is True:
mh(msg)
assert handler.call_args == (
(msg[0], 'spam', msg[2]), {})
else:
pytest.raises(ok, mh, msg)
PongProc
Testing the PongProc is not much different from testing its base class.
pytest_funcarg__pp will instantiate a PongProc instance for each test that
has a pp
argument. The tests for setup, run and stop are easy to do.
We create a few mocks and then ask them if the tested function called them correctly:
# example_app/test/test_pongproc.py
from zmq.utils import jsonapi as json
import mock, pytest, zmq
import pongproc
host, port = '127.0.0.1', 5678
def pytest_funcarg__pp(request):
"""Creates a PongProc instance."""
return pongproc.PongProc((host, port))
class TestPongProc(object):
"""Tests :class:`pongproc.PongProc`."""
def test_setup(self, pp):
def make_stream(*args, **kwargs):
stream = mock.Mock()
stream.type = args[0]
return stream, mock.Mock()
pp.stream = mock.Mock(side_effect=make_stream)
with mock.patch('base.ZmqProcess.setup') as setup_mock:
pp.setup()
assert setup_mock.call_count == 1
assert pp.stream.call_args_list == [
((zmq.REP, (host, port)), dict(bind=True)),
]
assert pp.rep_stream.type == zmq.REP
# Test if the message handler was configured correctly
rsh = pp.rep_stream.on_recv.call_args[0][0] # Get the msg handler
assert rsh._rep_stream == pp.rep_stream
assert rsh._stop == pp.stop
def test_run(self, pp):
pp.setup = mock.Mock()
pp.loop = mock.Mock()
pp.run()
assert pp.setup.call_count == 1
assert pp.loop.start.call_count == 1
def test_stop(self, pp):
pp.loop = mock.Mock()
pp.stop()
assert pp.loop.stop.call_count == 1
RepStreamHandler
Testing the actual message handler requires some mocks, but is apart from that straight forward. A funcarg method creates an instance of the message handler for each test case which we feed with a message. We than check if the application logic was called correctly and/or if a correct reply is sent:
# example_app/test/test_pongproc.py
def pytest_funcarg__rsh(request):
"""Creates a RepStreamHandler instance."""
return pongproc.RepStreamHandler(
rep_stream=mock.Mock(),
stop=mock.Mock(),
ping_handler=mock.Mock(spec_set=pongproc.PingHandler()))
class TestRepStreamHandler(object):
def test_ping(self, rsh):
msg = ['ping', 1]
retval = 'spam'
rsh._ping_handler = mock.Mock(spec_set=pongproc.PingHandler)
rsh._ping_handler.make_pong.return_value = retval
rsh([json.dumps(msg)])
assert rsh._ping_handler.make_pong.call_args == ((msg[1],), {})
assert rsh._rep_stream.send_json.call_args == ((retval,), {})
def test_plzdiekthybye(self, rsh):
rsh([b'["plzdiekthxbye", null]'])
assert rsh._stop.call_count == 1
PingHandler
When we are done with all that network stuff, we can finally test the application logic. Easy-peasy in our case:
# example_app/test/test_pongproc.py
def pytest_funcarg__ph(request):
"""Creates a PingHandler instance."""
return pongproc.PingHandler()
class TestPingHandler(object):
def test_make_pong(self, ph):
ping_num = 23
ret = ph.make_pong(ping_num)
assert ret == ['pong', ping_num]
Summary
Thanks to the Mock library, unit testing PyZMQ apps is really not that hard and not much different from normal unit testing. However, what we know now is only, that our process should work in theory. We haven’t yet started it and sent real messages to it.
The next and final part of this series will show you how you can automate testing complete processes. Until then, you should get your test coverage up to 100% to protect yourself from nasty surprises when you start with process testing.