Stefan Scherfke

Designing and Testing PyZMQ Applications – Part 3

The third and last part of this series is again just about testing. While the previous article focused on unit testing, this one will be about testing complete PyZMQ processes. This even involves some magic!

Once you’ve made sure that your message dispatching and application logic works fine, you can actually start sending real messages to your process and checking real replies. This can be done for single processes—I call this process testing—and for your complete application (system testing).

When you test a single process, you create sockets that mimic all processes the tested process communicates with. When you do a system test, you only mimic a client or just invoke your program from the command line and check its output (e.g., what it prints to stdtout and stderr or results written to a database).

I’ll start with process testing, which is a bit more generalizable than system testing.

Process Testing

The biggest problem I ran into when I started testing processes was that I often made blocking calls to recv methods and these halted my tests and gave me no output about what actually went wrong. Though you can make them non- blocking by passing zmq.NOBLOCK as an extra argument, this doesn’t solve your problems. You will now need a very precise timing and many time.sleep(x) calls, because recv will instantly raise an error if there is nothing to be received.

My solution for this was to wrap PyZMQ sockets and add a timeout to its send and recv methods. The following wrapper will try to receive something for one second and raise an exception if that failed. There’s also a simple wrapper for methods like connect or bind, but it’s really not that interesting, so I’ll omit it here.

# test/support.py

def get_wrapped_fwd(func):
    """
    Returns a wrapper, that tries to call *func* multiple time in non-blocking
    mode before rasing an :class:`zmq.ZMQError`.

    """
    def forwarder(*args, **kwargs):
        # 100 tries * 0.01 second == 1 second
        for i in range(100):
            try:
                rep = func(*args, flags=zmq.NOBLOCK, **kwargs)
                return rep

            except zmq.ZMQError:
                time.sleep(0.01)

        # We should not get here, so raise an error.
        msg = 'Could not %s message.' % func.__name__[:4]
        raise zmq.ZMQError(msg)

    return forwarder

This wrapper is now used to create a TestSocket class with the desired behavior:

# test/support.py

class TestSocket(object):
    """
    Wraps ZMQ :class:`~zmq.core.socket.Socket`. All *recv* and *send* methods
    will be called multiple times in non-blocking mode before a
    :class:`zmq.ZMQError` is raised.

    """
    def __init__(self, context, sock_type):
        self._context = context

        sock = context.socket(sock_type)
        self._sock = sock

        forwards = [  # These methods can simply be forwarded
            sock.bind,
            sock.bind_to_random_port,
            sock.connect,
            sock.close,
            sock.setsockopt,
        ]
        wrapped_fwd = [  # These methods are wrapped with a for loop
            sock.recv,
            sock.recv_json,
            sock.recv_multipart,
            sock.recv_unicode,
            sock.send,
            sock.send_json,
            sock.send_multipart,
            sock.send_unicode,
        ]

        for func in forwards:
            setattr(self, func.__name__, get_forwarder(func))

        for func in wrapped_fwd:
            setattr(self, func.__name__, get_wrapped_fwd(func))

In order to reuse the same ports for all test methods, you need to cleanly close all sockets after each test. To handle method level setup/teardown in pytest, you need to implement a setup_method and a teardown_method. In the setup method, you create one or more TestSocket instances that mimic other processes and you also start the process to be tested:

# test/process/test_pongproc.py
import pytest
import zmq

from test.support import ProcessTest, make_sock
import pongproc


host = '127.0.0.1'
port = 5678


class TestProngProc(ProcessTest):
    """Communication test for the Platform Manager process."""

    def setup_method(self, method):
        """
        Creates and starts a PongProc process and sets up sockets to
        communicate with it.

        """
        self.context = zmq.Context()

        # make_sock creates and connects a TestSocket that we will use to
        # mimic the Ping process
        self.req_sock = make_sock(self.context, zmq.REQ,
                                  connect=(host, port))

        self.pp = pongproc.PongProc((host, port))
        self.pp.start()

    def teardown_method(self, method):
        """
        Sends a kill message to the pp and waits for the process to terminate.

        """
        # Send a stop message to the prong process and wait until it joins
        self.req_sock.send_multipart([b'["plzdiekthxbye", null]'])
        self.pp.join()

        self.req_sock.close()

You may have noticed that our test class inherits ProcessTests. This class and some helpers in a conftest.py allow us to use some magic that improves the readability of the actual test:

# test/process/test_pongproc.py

    def test_ping(self):
        """Tests a ping-pong sequence."""
        yield ('send', self.req_sock, [], ['ping', 1])

        reply = yield ('recv', self.req_sock)
        assert reply == [['pong', 1]]

You can just yield send or recv events from your test case! When you yield a send, the test machinery tries to send a message via the specified socket. When you yield a receive, ProcessTest tries to receive something from the socket and sends its result back to your test function, so that you can easily compare the reply with the expected result.

The example above is roughly equivalent to the following code:

self.req_sock.send_multipart([] + [json.dumps(['ping', 1])])

reply = self.req_sock.recv_multipart()
reply[-1] = json.loads[reply[-1]]
assert reply == [['pong', 1]]

So how does this work? By default, if pytests finds a test function that is a generator, it assumes that it generates further test functions. Hence, our first step is to override this behavior. We can do this in a conftest.py file in the test/process/ directory by implementing a pytest_pycollect_makeitem function. In this case, we collect generator functions like normal functions:

# test/process/conftest.py
from inspect import isfunction, isgeneratorfunction


def pytest_pycollect_makeitem(collector, name, obj):
    """
    Collects all instance methods that are generators and returns them as
    normal function items.

    """
    if collector.funcnamefilter(name) and hasattr(obj, '__call__'):
        if isfunction(obj) or isgeneratorfunction(obj):
            return collector._genfunctions(name, obj)

Now, we need to tell pytest how to run a test on the collected generator functions. This can be done by implementing pytest_runtest_call. If the object we are going to test (item.obj) is a generator function, we call the run method of the object’s instance (item.obj.__self__.run) and pass the generator function to it. If the test item contains a normal function, we run the default test.

# test/process/conftest.py

def pytest_runtest_call(item):
    """
    Passes the test generator (``item.obj``) to the ``run()`` method of the
    generator's instance. This method should be inherited from
    :class:`test.support.ProcessTest`.

    """
    if isgeneratorfunction(item.obj):
        item.obj.__self__.run(item.obj)
    else:  # Normal test execution for normal instance methods
        item.runtest()

But wait—we didn’t implement a run method in our test case! So it must be inherited from ProcessTest. Let’s take a look at it:

# test/support.py
class ProcessTest(object):
    """
    Base class for process tests. It offers basic actions for sending and
    receiving messages and implements the *run* methods that handles the
    actual test generators.

    """

    def run(self, testfunc):
        """
        Iterates over the *testfunc* generator and executes all actions it
        yields. Results will be sent back into the generator.

        :param testfunc: A generator function that yields tuples containing
                an action keyword, which should be a function of this or
                the inheriting class (like ``send`` or ``recv``) and additional
                parameters that will be passed to that function, e.g.:
                ``('send', socket_obj, ['header'], 'body')``
        :type testfunc:  generatorfunction

        """
        item_gen = testfunc()
        item = next(item_gen)

        def throw_err(skip_levels=0):
            """
            Throws the last error to *item_gen* and skips *skip_levels* in
            the traceback to point to the line that yielded the last event.

            """
            etype, evalue, tb = sys.exc_info()
            for i in range(skip_levels):
                tb = tb.tb_next
            item_gen.throw(etype, evalue, tb)

        try:
            while True:
                try:
                    # Call the event handler and pass the args,
                    # e.g., self.send(socket_obj, header, body)
                    ret = getattr(self, item[0])(*item[1:])

                    # Send the results back to the test and get the next item
                    item = item_gen.send(ret)

                except zmq.ZMQError:
                    throw_err(3)  # PyZMQ could not send/recv
                except AssertionError:
                    throw_err(1)  # Error in the test
        except StopIteration:
            pass

The run method simply iterates over all events our testfunc generates and calls a method with the name of the event (e.g., send or recv). Their return value is sent back into the generator. If an error occurs, the exception’s traceback is modified to point to the line of code that yielded the according event and not to the run method itself.

The methods send and recv roughly do the same as the snippet I showed you above:

# test/support.py

    def send(self, socket, header, body, extra_data=[]):
        """
        JSON-encodes *body*, concatenates it with *header*, appends
        *extra_data* and sends it as multipart message over *socket*.

        *header* and *extra_data* should be lists containg byte objects or
        objects implementing the buffer interface (like NumPy arrays).

        """
        socket.send_multipart(header + [json.dumps(body)] + extra_data)

    def recv(self, socket, json_load_index=-1):
        """
        Receives and returns a multipart message from *socket* and tries to
        JSON-decode the item at position *json_load_index* (defaults to ``-1``;
        the last element in the list). The original byte string will be
        replaced by the loaded object. Set *json_load_index* to ``None`` to get
        the original, unchanged message.

        """
        msg = socket.recv_multipart()
        if json_load_index is not None:
            msg[json_load_index] = json.loads(msg[json_load_index])
        return msg

You can even add your own event handler to your test class. I used this, for example, to add a log event that checks if a PyZMQ log handler sent the expected log messages:

def log(self, substr=''):
    """
    Receives a message and asserts, that it is a log message and that
    *substr* is in that message.

    Usage:
        yield ('log', 'Ai iz in ur log mesage')

    """
    msg = self.log_sock.recv_json()
    assert msg[0] == 'log_message'
    assert substr in msg[1]

What if your process starts further subprocesses?

In some cases, the process you are about to test starts additional subprocesses that you don’t want to test. Even worse, these processes might communicate via sockets bound to random ports. And EVEN WORSE, the process you are testing might depend on excepting a KeyboardInterrupt to send stop messages to child processes or to clean something up!

The last problem is quite easy to solve: You just a send a SIGINT to your process from the test:

import os, signal

def teardown_method(self, method):
    os.kill(self.my_proc.pid, signal.SIGINT)
    self.my_proc.join()

    # Now you can close the test sockets

If you don’t want to start a certain subprocess, you can just mock it. Imagine, you have two processes a.A and b.B, where A starts B, then you just mock B before starting A:

with mock.patch('b.B'):
    self.a = A()
    self.a.start()

Imagine now, that A binds a socket to a random port and uses that socket to communicate with B. If you want to mock B in your tests, you need that port number in order to connect to it and send messages to A.

But how can you get that number? When A creates B, it already runs in its own process, so a simple attribute access won’t work. Setting a random seed would only work if you did that directly in A when it’s already running. But doing that just for the tests is not such a good idea. It also may not work reliably on all systems and Python versions.

However, A must pass the socket number to B, so that B can connect to A. Thus, we can create a mock for B that will send us its port number via a queue <http://docs.python.org/py3k/library/multiprocessing#exchanging-objects- between-processes>:

class ProcMock(mock.Mock):
    """
    This mock returns itself when called, so it acts like both, the
    process’ class and instance object.

    """
    def __init__(self):
        super().__init__()
        self.queue = multiprocessing.Queue()

    def __call__(self, port):
        """Will be called when A instantiates B and passes its port number."""
        self.queue.put(port)
        return self

    def start(self):
        return  # Just make sure the methods exists and returns nothing

    def join(self):
        return  # Just make sure the methods exists and returns nothing


class TestA(ProcessTest):

    def setup_method(self):

        b_mock = ProcMock()
        with mock.patch('b.B', new=b_mock):
            self.a = A()
            self.a.start()

        # Get the port A is listening on
        port = b_mock.queue.get()

        # ...

As you’ve seen, process testing is really not as simple as unit testing. But I always found bugs with it that my unit tests coudn’t detect. If you cover all communication sequences for a process in a process test, you can be pretty sure, that it will also work flawlessly in the final application.

System Testing

If your application consists of more than one process, you still need to test whether all processes work nicely together or not. This is something you cannot simulate reliably with a process tests, as much as unit tests can’t replace the process test.

Writing a good system test is very application-specific and can, depending on the complexity of your application, be very hard or very easy. Fortunately, the latter is the case for our ping-pong app. We just start it and copy its output to a file. If the output is not what we expected, we modify the file accordingly. In our test, we can now simply invoke our programm again, capture its output and compare it to the contents of the file we created before:

# test/system/test_pongproc.py
import os.path
import subprocess

import pytest


def test_pongproc():
    filename = os.path.join('test', 'data', 'pongproc.out')
    expected = open(filename).read()

    output = subprocess.check_output(['python', 'pongproc.py'],
                                     universal_newlines=True)

    assert output == expected

If your application was a server, another way of doing the system test would be to emulate a client that speaks with it. Your system test would then be very similar to your process tests, except that you only mimic the client and not all processes your main process communicates with.

Other applications (like, for instance, simulations) might create a database containing collected data. Here, you might check if these results match your expectations.

Of course you can also combine these possibilities or do something completely different …

“My Test Are now Running sooo Slow!”

System and process tests often run much slower than simple unit tests, so you may want to skip them most of the time. Pytest allows you to mark a test with a given name. You can then (de)select tests based on their mark when you invoke pytest.

To mark a module e.g. as process test, just put a line pytestmark = pytest.mark.process somewhere in it. Likewise, you can add a pytestmark = pytest.mark.system to mark a module as system test.

You can now deselect process and system tests:

$ py.test -m "not (process or system)"

You can put this into a pytest.ini as a default setting. To override this again, use 1 or True as selection expression:

$ py.test -m 1

Summary

Process and system testing were the last two topics I wanted to cover in this series. Compared to simple unit tests, they require a bit more effort. I think they are definitely worth the extra work since they give you a lot more confidence that your program actually works, because they are much more realistic than unit tests can be.

In the end, these articles became much longer and more elaborate then I originally planned them to be. However, I hope they provided a good overview about how to design and test applications with PyZMQ and I hope that you now run into much less problems than I did when I first started working with PyZMQ.