Bitbucket stopped rendering README.txt for Python projects [resolved]

Until recently, Bitbucket nicely rendered reStructuredText (reST) formatted README.txt files for Python projects. This made totally sense, because PyPI requires the project’s description in reST and most people put it into README.txt files.

However, for some reason the Bitbucket guys removed that feature and it seems that they don’t intend to bring it back. Instead, you are urged to rename your files from *.txt to *.rst.

Maybe some feedback would help to bring that feature back …

Update: Bitbucket fixed that issue. :-)

SimPy 3 Preview

SimPy is a process-based and event-driven simulation framework written in pure Python. It can also be used for multi-agent systems and other eventloop-based applications.

After several months of work and various iterations of SimPy’s new API, we can finally present a preview with the most important features working.

Here is a simple example:

>>> import simpy
>>>
>>> def clock(env):
...     while True:
...         print(env.now)
...         yield env.timeout(1)
...
>>> env = simpy.Environment()
>>> env.start(clock(env))
Process(clock)
>>> simpy.simulate(env, until=3)
0
1
2

You can find the full announcement at Google+ and on our mailing list.

The code is at Bitbucket, the documentation at Read the Docs.

django-sphinxdoc 1.1

Most Python projects use Sphinx for their documentation. And many (most?) Python powered websites use Django as framework.

So there might be some people who use both Sphinx and Django. If you belong to this group and want to integrate the documentation of your projects into your Django powered website, django-sphinxdoc might be the app you’re searching for.

Django-sphinxdoc can build and import your Sphinxdocumentation and provides views for browsing and searching it. You can see django-sphinxdoc in action be reading its documentation.

What’s new in this version?

  • [NEW] Support static and download files.
  • [NEW] Additional context to search view so that project information is available in the template.
  • [CHANGE] Updated some templates
  • [FIX] Fixed a bug with the updatedoc command and ~ in paths.
  • [FIX] Include all module index files.
  • [FIX] Improved indexing behaviour
  • [FIX] Improved behaviour when building the docs.

You can find django-sphinxdoc in the Cheese Shop or at Bitbucket.

Check Python site-packages for Updates

A while ago, I found a nice little script called check_for_updates.py which uses PIP to check your installed Python packages for updates. However, it didn’t work under Python 3, so I ported it myself:

"""
Use pip to get a list of local packages to check against one or more package
indexes for updated versions.

"""
try:
    from cStringIO import StringIO
    import xmlrpclib
except ImportError:
    from io import StringIO
    import xmlrpc.client as xmlrpclib

from distutils.version import StrictVersion, LooseVersion
import sys

import pip


def get_local_packages():
    """
    Call pip's freeze -l

    returns a list of package_name, version tuples

    """
    sys.stdout = mystdout = StringIO()
    pip.main(['freeze', '-l'])
    sys.stdout = sys.__stdout__

    pkgs = mystdout.getvalue().split('\n')
    return [p.split('==') for p in pkgs]


def find_current_version(package, index_urls=None):
    """
    Using the XMLRPC method available for PyPI, get the most recent version
    of <package> from each of the index_urls and figure out which one (if any)
    is higher

    Returns a tuple of the index with the higher version and the version it has

    """
    if index_urls is None:
        index_urls = ['http://pypi.python.org/pypi']
    cur_version = '0'
    cur_index = ''
    for index_url in index_urls:
        pypi = xmlrpclib.ServerProxy(index_url, xmlrpclib.Transport())
        pypi_hits = pypi.package_releases(package)
        if len(pypi_hits) > 0:
            if compare_versions(pypi_hits[0], cur_version):
                cur_version = pypi_hits[0]
                cur_index = index_url

    return cur_index, cur_version


def compare_versions(version1, version2):
    """
    Compare 2 versions, starting with StrictVersion, and falling back on
    LooseVersion. Returns ``True`` if *version1* is greater than *version2*.

    """
    try:
        return StrictVersion(version1) > StrictVersion(version2)
    # in case of abnormal version number, fall back to LooseVersion
    except ValueError:
        return LooseVersion(version1) > LooseVersion(version2)


def output_line(pkg_name, new_version, old_version, index_url):
    """Output the line showing the formatted information."""
    msg = '%(bd)s%(pkg_name)s%(nm)s (%(new)s) via %(index)s. Currently %(old)s.'
    params = {
        'bd': BOLD,
        'nm': NORMAL,
        'pkg_name': pkg_name,
        'new': new_version,
        'old': old_version,
        'index': index_url,
    }
    print(msg % params)


NEWER = lambda x, y: compare_versions(str(x), y) == 1


if __name__ == '__main__':
    import curses
    curses.setupterm()
    CLEAR_SCREEN = curses.tigetstr('clear').decode('utf-8')
    BOLD = curses.tigetstr('bold').decode('utf-8')
    NORMAL = curses.tigetstr('sgr0').decode('utf-8')

    if len(sys.argv) > 1:
        indexes = sys.argv[1:]
    else:
        indexes = ['http://pypi.python.org/pypi']
    print(CLEAR_SCREEN + BOLD + 'Packages with newer versions:' + NORMAL)
    print('')

    for pkg in get_local_packages():
        # pip outputs a single 0 at the end of the list. Ignore it.
        if len(pkg) < 2:
            continue

        index, current_version = find_current_version(pkg[0], index_urls=indexes)
        if current_version and NEWER(str(current_version), pkg[1]):
            output_line(pkg[0], current_version, pkg[1], index)

Just execute it from anywhere using a Python interpreter whose packages you'd like to check (e.g., python3 check_for_updates.py or from within a virtualenv). You can then uses pip install -U <packagename> (or pip-3.2 for Python 3) to update the packages.

Update: The yolk package does the same via yolk -U. It doesn’t support Python 3, though. I wonder when PIP will get that functionality.

A Simple Web Bot with Requests and BeautifulSoup

Today I helped a colleague debugging a web bot written in Java. Since I did’t really work with Java since a few years, I thought it would be easier for me to reproduce (and solve) the problem with Requests and BeautifulSoup. (I’ve actually been looking for an opportunity to try Requests out for a while, since I’ve heard so much good about it.)

And what can I say—I was blown away by how easy it was to implement a simple web bot that filled out a form and grabbed a huge table of data for me.

I cannot show you exactly that web bot, but here’s how you could search my website for “Python” and get the headings of the resulting posts:

from BeautifulSoup import BeautifulSoup
import requests

url = 'http://stefan.sofa-rockers.org/search/?q=%(q)s'
payload = {
    'q': 'Python',
}
r = requests.get(url % payload)

soup = BeautifulSoup(r.text)
titles = [h2.text for h2 in soup.findAll('h2', attrs={'class': 'post_title'})]

for t in titles:
    print(t)

Designing and Testing PyZMQ Applications – Part 3

The third and last part of this series is again just about testing. While the previous article focused on unit testing, this one will be about testing complete PyZMQ processes. This even involves some magic!

Once you’ve made sure that your message dispatching and application logic works fine, you can actually start sending real messages to your process and checking real replies. This can be done for single processes—I call this process testing—and for your complete application (system testing).

When you test a single process, you create sockets that mimic all processes the tested process communicates with. When you do a system test, you only mimic a client or just invoke your program from the command line and check its output (e.g., what it prints to stdtout and stderr or results written to a database).

I’ll start with process testing, which is a bit more generalizable than system testing.

Process Testing

The biggest problem I ran into when I started testing processes was that I often made blocking calls to recv methods and these halted my tests and gave me no output about what actually went wrong. Though you can make them non- blocking by passing zmq.NOBLOCK as an extra argument, this doesn’t solve your problems. You will now need a very precise timing and many time.sleep(x) calls, because recv will instantly raise an error if there is nothing to be received.

My solution for this was to wrap PyZMQ sockets and add a timeout to its send and recv methods. The following wrapper will try to receive something for one second and raise an exception if that failed. There’s also a simple wrapper for methods like connect or bind, but it’s really not that interesting, so I’ll omit it here.

# test/support.py

def get_wrapped_fwd(func):
    """
    Returns a wrapper, that tries to call *func* multiple time in non-blocking
    mode before rasing an :class:`zmq.ZMQError`.

    """
    def forwarder(*args, **kwargs):
        # 100 tries * 0.01 second == 1 second
        for i in range(100):
            try:
                rep = func(*args, flags=zmq.NOBLOCK, **kwargs)
                return rep

            except zmq.ZMQError:
                time.sleep(0.01)

        # We should not get here, so raise an error.
        msg = 'Could not %s message.' % func.__name__[:4]
        raise zmq.ZMQError(msg)

    return forwarder

This wrapper is now used to create a TestSocket class with the desired behavior:

# test/support.py

class TestSocket(object):
    """
    Wraps ZMQ :class:`~zmq.core.socket.Socket`. All *recv* and *send* methods
    will be called multiple times in non-blocking mode before a
    :class:`zmq.ZMQError` is raised.

    """
    def __init__(self, context, sock_type):
        self._context = context

        sock = context.socket(sock_type)
        self._sock = sock

        forwards = [  # These methods can simply be forwarded
            sock.bind,
            sock.bind_to_random_port,
            sock.connect,
            sock.close,
            sock.setsockopt,
        ]
        wrapped_fwd = [  # These methods are wrapped with a for loop
            sock.recv,
            sock.recv_json,
            sock.recv_multipart,
            sock.recv_unicode,
            sock.send,
            sock.send_json,
            sock.send_multipart,
            sock.send_unicode,
        ]

        for func in forwards:
            setattr(self, func.__name__, get_forwarder(func))

        for func in wrapped_fwd:
            setattr(self, func.__name__, get_wrapped_fwd(func))

In order to reuse the same ports for all test methods, you need to cleanly close all sockets after each test. To handle method level setup/teardown in pytest, you need to implement a setup_method and a teardown_method. In the setup method, you create one or more TestSocket instances that mimic other processes and you also start the process to be tested:

# test/process/test_pongproc.py
import pytest
import zmq

from test.support import ProcessTest, make_sock
import pongproc


host = '127.0.0.1'
port = 5678


class TestProngProc(ProcessTest):
    """Communication test for the Platform Manager process."""

    def setup_method(self, method):
        """
        Creates and starts a PongProc process and sets up sockets to
        communicate with it.

        """
        self.context = zmq.Context()

        # make_sock creates and connects a TestSocket that we will use to
        # mimic the Ping process
        self.req_sock = make_sock(self.context, zmq.REQ,
                                  connect=(host, port))

        self.pp = pongproc.PongProc((host, port))
        self.pp.start()

    def teardown_method(self, method):
        """
        Sends a kill message to the pp and waits for the process to terminate.

        """
        # Send a stop message to the prong process and wait until it joins
        self.req_sock.send_multipart([b'["plzdiekthxbye", null]'])
        self.pp.join()

        self.req_sock.close()

You may have noticed that our test class inherits ProcessTests. This class and some helpers in a conftest.py allow us to use some magic that improves the readability of the actual test:

# test/process/test_pongproc.py

    def test_ping(self):
        """Tests a ping-pong sequence."""
        yield ('send', self.req_sock, [], ['ping', 1])

        reply = yield ('recv', self.req_sock)
        assert reply == [['pong', 1]]

You can just yield send or recv events from your test case! When you yield a send, the test machinery tries to send a message via the specified socket. When you yield a receive, ProcessTest tries to receive something from the socket and sends its result back to your test function, so that you can easily compare the reply with the expected result.

The example above is roughly equivalent to the following code:

self.req_sock.send_multipart([] + [json.dumps(['ping', 1])])

reply = self.req_sock.recv_multipart()
reply[-1] = json.loads[reply[-1]]
assert reply == [['pong', 1]]

So how does this work? By default, if pytests finds a test function that is a generator, it assumes that it generates further test functions. Hence, our first step is to override this behavior. We can do this in a conftest.py file in the test/process/ directory by implementing a pytest_pycollect_makeitem function. In this case, we collect generator functions like normal functions:

# test/process/conftest.py
from inspect import isfunction, isgeneratorfunction


def pytest_pycollect_makeitem(collector, name, obj):
    """
    Collects all instance methods that are generators and returns them as
    normal function items.

    """
    if collector.funcnamefilter(name) and hasattr(obj, '__call__'):
        if isfunction(obj) or isgeneratorfunction(obj):
            return collector._genfunctions(name, obj)

Now, we need to tell pytest how to run a test on the collected generator functions. This can be done by implementing pytest_runtest_call. If the object we are going to test (item.obj) is a generator function, we call the run method of the object’s instance (item.obj.__self__.run) and pass the generator function to it. If the test item contains a normal function, we run the default test.

# test/process/conftest.py

def pytest_runtest_call(item):
    """
    Passes the test generator (``item.obj``) to the ``run()`` method of the
    generator's instance. This method should be inherited from
    :class:`test.support.ProcessTest`.

    """
    if isgeneratorfunction(item.obj):
        item.obj.__self__.run(item.obj)
    else:  # Normal test execution for normal instance methods
        item.runtest()

But wait—we didn’t implement a run method in our test case! So it must be inherited from ProcessTest. Let’s take a look at it:

# test/support.py
class ProcessTest(object):
    """
    Base class for process tests. It offers basic actions for sending and
    receiving messages and implements the *run* methods that handles the
    actual test generators.

    """

    def run(self, testfunc):
        """
        Iterates over the *testfunc* generator and executes all actions it
        yields. Results will be sent back into the generator.

        :param testfunc: A generator function that yields tuples containing
                an action keyword, which should be a function of this or
                the inheriting class (like ``send`` or ``recv``) and additional
                parameters that will be passed to that function, e.g.:
                ``('send', socket_obj, ['header'], 'body')``
        :type testfunc:  generatorfunction

        """
        item_gen = testfunc()
        item = next(item_gen)

        def throw_err(skip_levels=0):
            """
            Throws the last error to *item_gen* and skips *skip_levels* in
            the traceback to point to the line that yielded the last event.

            """
            etype, evalue, tb = sys.exc_info()
            for i in range(skip_levels):
                tb = tb.tb_next
            item_gen.throw(etype, evalue, tb)

        try:
            while True:
                try:
                    # Call the event handler and pass the args,
                    # e.g., self.send(socket_obj, header, body)
                    ret = getattr(self, item[0])(*item[1:])

                    # Send the results back to the test and get the next item
                    item = item_gen.send(ret)

                except zmq.ZMQError:
                    throw_err(3)  # PyZMQ could not send/recv
                except AssertionError:
                    throw_err(1)  # Error in the test
        except StopIteration:
            pass

The run method simply iterates over all events our testfunc generates and calls a method with the name of the event (e.g., send or recv). Their return value is sent back into the generator. If an error occurs, the exception’s traceback is modified to point to the line of code that yielded the according event and not to the run method itself.

The methods send and recv roughly do the same as the snippet I showed you above:

# test/support.py

    def send(self, socket, header, body, extra_data=[]):
        """
        JSON-encodes *body*, concatenates it with *header*, appends
        *extra_data* and sends it as multipart message over *socket*.

        *header* and *extra_data* should be lists containg byte objects or
        objects implementing the buffer interface (like NumPy arrays).

        """
        socket.send_multipart(header + [json.dumps(body)] + extra_data)

    def recv(self, socket, json_load_index=-1):
        """
        Receives and returns a multipart message from *socket* and tries to
        JSON-decode the item at position *json_load_index* (defaults to ``-1``;
        the last element in the list). The original byte string will be
        replaced by the loaded object. Set *json_load_index* to ``None`` to get
        the original, unchanged message.

        """
        msg = socket.recv_multipart()
        if json_load_index is not None:
            msg[json_load_index] = json.loads(msg[json_load_index])
        return msg

You can even add your own event handler to your test class. I used this, for example, to add a log event that checks if a PyZMQ log handler sent the expected log messages:

def log(self, substr=''):
    """
    Receives a message and asserts, that it is a log message and that
    *substr* is in that message.

    Usage:
        yield ('log', 'Ai iz in ur log mesage')

    """
    msg = self.log_sock.recv_json()
    assert msg[0] == 'log_message'
    assert substr in msg[1]

What if your process starts further subprocesses?

In some cases, the process you are about to test starts additional subprocesses that you don’t want to test. Even worse, these processes might communicate via sockets bound to random ports. And EVEN WORSE, the process you are testing might depend on excepting a KeyboardInterrupt to send stop messages to child processes or to clean something up!

The last problem is quite easy to solve: You just a send a SIGINT to your process from the test:

import os, signal

def teardown_method(self, method):
    os.kill(self.my_proc.pid, signal.SIGINT)
    self.my_proc.join()

    # Now you can close the test sockets

If you don’t want to start a certain subprocess, you can just mock it. Imagine, you have two processes a.A and b.B, where A starts B, then you just mock B before starting A:

with mock.patch('b.B'):
    self.a = A()
    self.a.start()

Imagine now, that A binds a socket to a random port and uses that socket to communicate with B. If you want to mock B in your tests, you need that port number in order to connect to it and send messages to A.

But how can you get that number? When A creates B, it already runs in its own process, so a simple attribute access won’t work. Setting a random seed would only work if you did that directly in A when it’s already running. But doing that just for the tests is not such a good idea. It also may not work reliably on all systems and Python versions.

However, A must pass the socket number to B, so that B can connect to A. Thus, we can create a mock for B that will send us its port number via a queue <http://docs.python.org/py3k/library/multiprocessing#exchanging-objects- between-processes>:

class ProcMock(mock.Mock):
    """
    This mock returns itself when called, so it acts like both, the
    process’ class and instance object.

    """
    def __init__(self):
        super().__init__()
        self.queue = multiprocessing.Queue()

    def __call__(self, port):
        """Will be called when A instantiates B and passes its port number."""
        self.queue.put(port)
        return self

    def start(self):
        return  # Just make sure the methods exists and returns nothing

    def join(self):
        return  # Just make sure the methods exists and returns nothing


class TestA(ProcessTest):

    def setup_method(self):

        b_mock = ProcMock()
        with mock.patch('b.B', new=b_mock):
            self.a = A()
            self.a.start()

        # Get the port A is listening on
        port = b_mock.queue.get()

        # ...

As you’ve seen, process testing is really not as simple as unit testing. But I always found bugs with it that my unit tests coudn’t detect. If you cover all communication sequences for a process in a process test, you can be pretty sure, that it will also work flawlessly in the final application.

System Testing

If your application consists of more than one process, you still need to test whether all processes work nicely together or not. This is something you cannot simulate reliably with a process tests, as much as unit tests can’t replace the process test.

Writing a good system test is very application-specific and can, depending on the complexity of your application, be very hard or very easy. Fortunately, the latter is the case for our ping-pong app. We just start it and copy its output to a file. If the output is not what we expected, we modify the file accordingly. In our test, we can now simply invoke our programm again, capture its output and compare it to the contents of the file we created before:

# test/system/test_pongproc.py
import os.path
import subprocess

import pytest


def test_pongproc():
    filename = os.path.join('test', 'data', 'pongproc.out')
    expected = open(filename).read()

    output = subprocess.check_output(['python', 'pongproc.py'],
                                     universal_newlines=True)

    assert output == expected

If your application was a server, another way of doing the system test would be to emulate a client that speaks with it. Your system test would then be very similar to your process tests, except that you only mimic the client and not all processes your main process communicates with.

Other applications (like, for instance, simulations) might create a database containing collected data. Here, you might check if these results match your expectations.

Of course you can also combine these possibilities or do something completely different …

“My Test Are now Running sooo Slow!”

System and process tests often run much slower than simple unit tests, so you may want to skip them most of the time. Pytest allows you to mark a test with a given name. You can then (de)select tests based on their mark when you invoke pytest.

To mark a module e.g. as process test, just put a line pytestmark = pytest.mark.process somewhere in it. Likewise, you can add a pytestmark = pytest.mark.system to mark a module as system test.

You can now deselect process and system tests:

$ py.test -m "not (process or system)"

You can put this into a pytest.ini as a default setting. To override this again, use 1 or True as selection expression:

$ py.test -m 1

Summary

Process and system testing were the last two topics I wanted to cover in this series. Compared to simple unit tests, they require a bit more effort. I think they are definitely worth the extra work since they give you a lot more confidence that your program actually works, because they are much more realistic than unit tests can be.

In the end, these articles became much longer and more elaborate then I originally planned them to be. However, I hope they provided a good overview about how to design and test applications with PyZMQ and I hope that you now run into much less problems than I did when I first started working with PyZMQ.

Designing and Testing PyZMQ Applications – Part 2

This is the second part of the series Designing and Testing PyZMQ Applications. In the first part, I wrote about designing a PyZMQ application, so this time it’s all about (unit) testing (remember, if it’s not tested, it’s broken). I also updated the repository for this article with the new code examples.

My favorite testing tools are pytest by Holger Krekel and Mock by Michael Ford. Pytest is particularly awesome because of its re-evaluation of assert statements. If your test contains an assert spam == 'eggs' and the assert fails, pytest re-evaluates it and prints the value of spam. Really helpful and you don’t need any boilerplate code for that. Mock is really nice for mocking external dependencies and asserting that your code called them in the correct way.

If you cloned the repository for this article, just run py.test from its root directory:

$ pip install pytest mock
...
Successfully installed pytest mock
Cleaning up...
$ py.test
=================== test session starts ====================
platform darwin -- Python 3.2.2 -- pytest-2.2.3
collected 11 items

example_app/test/test_base.py ....
example_app/test/test_pongproc.py .......

================ 11 passed in 0.12 seconds =================

Unit Testing

The probability that PyZMQ works correctly is very high. The probability that your code will call a PyZMQ function in such a way that it blocks forever and halts your test runner is also very high. Therefore, it’s a good idea to mock everything PyZMQ-related for your unit tests. And since your application logic might also not be implemented when you start testing your process, you should mock that, too.

What you’ll actually end up testing is the following:

  • Does your message handler call your application logic in the right way given a certain input message?
  • Does your message handler create and send the correct reply based on the return value of your application logic?

ZmqProcess

Let’s start with ZmqProcess again. After all, everything else depends on it. Testing its setup method is easy. We just check that it creates a context and a loop:

# example_app/test/test_zmqproc.py
from zmq.eventloop import ioloop
import mock
import pytest
import zmq

import zmqproc


class TestZmqProcess(object):
    """Tests for :class:`base.ZmqProcess`."""

    def test_setup(self):
        zp = base.ZmqProcess()
        zp.setup()

        assert isinstance(zp.context, zmq.Context)
        assert isinstance(zp.loop, ioloop.IOLoop)

Testing stream is more complicated. We need to test if it can handle various address formats, if it creates or binds correctly and if it performs a default subscription for SUB sockets.

Pytest 2.2 introduced a parametrize decorator, that helps calling a test multiple times with varying inputs. You just define one or more arguments for your test function and a list of values for these arguments. For test_stream, I only need a kwargs parameter containing the parameters for the stream call:

# example_app/test/test_zmqproc.py

    @pytest.mark.parametrize('kwargs', [
        dict(sock_type=23, addr='127.0.0.1:1234', bind=True,
              callback=mock.Mock()),
        dict(sock_type=23, addr='127.0.0.1', bind=True,
              callback=mock.Mock()),
        dict(sock_type=zmq.SUB, addr=('localhost', 1234), bind=False,
              callback=mock.Mock(), subscribe=b'ohai'),
    ])
    def test_stream(self, kwargs):

The next step is to create an instance of ZmqProcess and patch some of its attributes. We also need to set a defined return value for the socket’s bind_to_random_port method:

# example_app/test/test_zmqproc.py

        zp = base.ZmqProcess()

        # Patch the ZmqProcess instance
        zp.context = mock.Mock(spec_set=zmq.Context)
        zp.loop = mock.Mock(spec_set=ioloop.IOLoop)
        sock_mock = zp.context.socket.return_value
        sock_mock.bind_to_random_port.return_value = 42

For the actual test, we also need to patch ZMQStream. Although mock.patch could work as a function decorator, we need to use it as context processor if we also uses pytest funcargs (e.g., via the parametrize decorator—I don’t know if it’s even possible to uses both, mock.patch as decorator and pytest funcargs in one test).

# example_app/test/test_zmqproc.py

        # Patch ZMQStream and start testing
        with mock.patch('zmq.eventloop.zmqstream.ZMQStream') as zmqstream_mock:
            stream, port = zp.stream(**kwargs)

Finally, we can check the return values of our stream method and it made the correct calls to create the stream:

# example_app/test/test_zmqproc.py

            # Assert that the return values are correct
            assert stream is zmqstream_mock.return_value
            if isinstance(kwargs['addr'], tuple):
                assert port == kwargs['addr'][1]
            elif ':' in kwargs['addr']:
                assert port == int(kwargs['addr'][-4:])
            else:
                assert port == sock_mock.bind_to_random_port.return_value

            # Check that the socket was crated correctly
            assert zp.context.socket.call_args == ((kwargs['sock_type'],), {})
            if kwargs['bind'] and ':' in kwargs['addr']:
                assert sock_mock.bind.call_args == (
                        ('tcp://%s' % kwargs['addr'],), {})
            elif kwargs['bind']:
                assert sock_mock.bind_to_random_port.call_args == (
                        ('tcp://%s' % kwargs['addr'],), {})
            else:
                assert sock_mock.connect.call_args == (
                        ('tcp://%s:%s' % kwargs['addr'],), {})

            # Check creation of the stream
            assert zmqstream_mock.call_args == ((sock_mock, zp.loop), {})
            assert zmqstream_mock.return_value.on_recv.call_args == (
                    (kwargs['callback'],), {})

            # Check default subscribtion
            if 'subscribe' in kwargs:
                assert sock_mock.setsockopt.call_args == (
                        (zmq.SUBSCRIBE, kwargs['subscribe']), {})

Note: You may have noticed that I use assert my_mock.call_args == ... rather than my_mock.assert_called_with(...). The reason for that is simply, that assert statements are highlighted but ordinary function calls are not. This makes it easier for me to find all assertions in a test.

MessageHandler

The MessageHandler base class has only one methd, __call__, but I split the test for it into two methods—one that tests the JSON-loading functionality and one that checks if the correct handler method is called:

# example_app/test/test_base.py

class TestMessageHandler(object):
    """Tests for :class:`base.TestMessageHandler`."""

    @pytest.mark.parametrize(('idx', 'msg'), [
        (-1, [23, b'["test", null]']),
        (1, [23, b'["test", "spam"]', 42]),
        (TypeError, [23, 42]),
        (ValueError, [23, b'["test"]23spam']),
    ])
    def test_call_json_load(self, idx, msg):
        handler = mock.Mock()
        mh = base.MessageHandler(idx if isinstance(idx, int) else -1)
        mh.test = handler

        if isinstance(idx, int):
            mh(msg)
            assert handler.call_count == 1
        else:
            pytest.raises(idx, mh, msg)

    @pytest.mark.parametrize(('ok', 'msg'), [
        (True, [23, b'["test", "spam"]', 42]),
        (AttributeError, [23, b'["_test", "spam"]', 42]),
        (TypeError, [23, b'["spam", "spam"]', 42]),
        (AttributeError, [23, b'["eggs", "spam"]', 42]),
    ])
    def test_call_get_handler(self, ok, msg):
        handler = mock.Mock()
        mh = base.MessageHandler(1)
        mh.test = handler
        mh.spam = 'spam'

        if ok is True:
            mh(msg)
            assert handler.call_args == (
                    (msg[0], 'spam', msg[2]), {})
        else:
            pytest.raises(ok, mh, msg)

PongProc

Testing the PongProc is not much different from testing its base class. pytest_funcarg__pp will instantiate a PongProc instance for each test that has a pp argument. The tests for setup, run and stop are easy to do. We create a few mocks and then ask them if the tested function called them correctly:

# example_app/test/test_pongproc.py
from zmq.utils import jsonapi as json
import mock, pytest, zmq

import pongproc

host, port = '127.0.0.1', 5678

def pytest_funcarg__pp(request):
    """Creates a PongProc instance."""
    return pongproc.PongProc((host, port))


class TestPongProc(object):
    """Tests :class:`pongproc.PongProc`."""

    def test_setup(self, pp):
        def make_stream(*args, **kwargs):
            stream = mock.Mock()
            stream.type = args[0]
            return stream, mock.Mock()
        pp.stream = mock.Mock(side_effect=make_stream)

        with mock.patch('base.ZmqProcess.setup') as setup_mock:
            pp.setup()
            assert setup_mock.call_count == 1

        assert pp.stream.call_args_list == [
            ((zmq.REP, (host, port)), dict(bind=True)),
        ]
        assert pp.rep_stream.type == zmq.REP

        # Test if the message handler was configured correctly
        rsh = pp.rep_stream.on_recv.call_args[0][0]  # Get the msg handler
        assert rsh._rep_stream == pp.rep_stream
        assert rsh._stop == pp.stop

    def test_run(self, pp):
        pp.setup = mock.Mock()
        pp.loop = mock.Mock()

        pp.run()

        assert pp.setup.call_count == 1
        assert pp.loop.start.call_count == 1

    def test_stop(self, pp):
        pp.loop = mock.Mock()
        pp.stop()
        assert pp.loop.stop.call_count == 1

RepStreamHandler

Testing the actual message handler requires some mocks, but is apart from that straight forward. A funcarg method creates an instance of the message handler for each test case which we feed with a message. We than check if the application logic was called correctly and/or if a correct reply is sent:

# example_app/test/test_pongproc.py

def pytest_funcarg__rsh(request):
    """Creates a RepStreamHandler instance."""
    return pongproc.RepStreamHandler(
            rep_stream=mock.Mock(),
            stop=mock.Mock(),
            ping_handler=mock.Mock(spec_set=pongproc.PingHandler()))


class TestRepStreamHandler(object):
    def test_ping(self, rsh):
        msg = ['ping', 1]
        retval = 'spam'
        rsh._ping_handler = mock.Mock(spec_set=pongproc.PingHandler)
        rsh._ping_handler.make_pong.return_value = retval

        rsh([json.dumps(msg)])

        assert rsh._ping_handler.make_pong.call_args == ((msg[1],), {})
        assert rsh._rep_stream.send_json.call_args == ((retval,), {})

    def test_plzdiekthybye(self, rsh):
        rsh([b'["plzdiekthxbye", null]'])
        assert rsh._stop.call_count == 1

PingHandler

When we are done with all that network stuff, we can finally test the application logic. Easy-peasy in our case:

# example_app/test/test_pongproc.py

def pytest_funcarg__ph(request):
    """Creates a PingHandler instance."""
    return pongproc.PingHandler()

class TestPingHandler(object):
    def test_make_pong(self, ph):
        ping_num = 23
        ret = ph.make_pong(ping_num)
        assert ret == ['pong', ping_num]

Summary

Thanks to the Mock library, unit testing PyZMQ apps is really not that hard and not much different from normal unit testing. However, what we know now is only, that our process should work in theory. We haven’t yet started it and sent real messages to it.

The next and final part of this series will show you how you can automate testing complete processes. Until then, you should get your test coverage up to 100% to protect yourself from nasty surprises when you start with process testing.

Designing and Testing PyZMQ Applications – Part 1

ZeroMQ (or ØMQ or ZMQ) is an intelligent messaging framework and described as “sockets on steroids”. That is, they look like normal TCP sockets but actually work as you’d expect sockets to work. PyZMQ adds even more convenience to them, which makes it a really a good choice if you want to implement a distributed application. Another big plus for ØMQ is that you can integrate sub-systems written in C, Java or any other language ØMQ supports (which are a lot).

If you’ve never heard of ØMQ before, I recommend to read ZeroMQ an Introduction by Nicholas Piël, before you go on with this article.

The ØMQ Guide and PyZMQ’s documentation are really good, so you can easily get started. However, when we began to implement a larger application with it (a distributed simulation framework), several questions arose which were not covered by the documentation:

  • What’s the best way do design our application?
  • How can we keep it readable, flexible and maintainable?
  • How do we test it?

I didn’t find something like a best practice article that answered my questions. So in this series of articles, I’m going to talk about what I’ve learned during the last months. I’m not a PyZMQ expert (yet ;-)), but what I’ve done so far works quite well and I never had more tests in a project than I do have now.

You’ll find the source for the examples at bitbucket. They are written in Python 3.2 and tested under Mac OS X Lion, Ubuntu 11.10 and Windows 7, 64 bit in each case. If you have any suggestions or improvements, please fork me or just leave a comment.

In this first article, I’m going to talk a bit about how you could generally design your application to be flexible, maintainable and testable. The second part will be about unit testing and the finally, I’ll cover process and system testing.

Comparison of Different Approaches

There are basically three possible ways to implement a PyZMQ application. One, that’s easy, but limited in practical use, one that’s more flexible, but not really pythonic and one, that needs a bit more setup, but is flexible and pythonic.

All three examples feature a simple ping process and a pong process with varying complexity. I use multiprocessing to run the pong process, because that’s what you should usually do in real PyZMQ applications (you don’t want to use threads and if both processes are running on the same machine, there’s no need to invoke both of them separately).

All of the examples will have the following output:

(zmq)$ python blocking_recv.py
Pong got request: ping 0
Ping got reply: pong 0
...
Pong got request: ping 4
Ping got reply: pong 4

Let’s start with the easy one first. You just use on of the socket’s recv methods in a loop:

# blocking_recv.py
import multiprocessing
import zmq


addr = 'tcp://127.0.0.1:5678'


def ping():
    """Sends ping requests and waits for replies."""
    context = zmq.Context()
    sock = context.socket(zmq.REQ)
    sock.bind(addr)

    for i in range(5):
        sock.send_unicode('ping %s' % i)
        rep = sock.recv_unicode()  # This blocks until we get something
        print('Ping got reply:', rep)


def pong():
    """Waits for ping requests and replies with a pong."""
    context = zmq.Context()
    sock = context.socket(zmq.REP)
    sock.connect(addr)

    for i in range(5):
        req = sock.recv_unicode()  # This also blocks
        print('Pong got request:', req)
        sock.send_unicode('pong %s' % i)


if __name__ == '__main__':
    pong_proc = multiprocessing.Process(target=pong)
    pong_proc.start()

    ping()

    pong_proc.join()

So this is very easy and no that much code. The problem with this is, that it only works well if your process only uses one socket. Unfortunately, in larger applications that is rather rarely the case.

A way to handle multiple sockets per process is polling. In addition to your context and socket(s), you need a poller. You also have to tell it which events on which socket you are going to poll:

# polling.py
def pong():
    """Waits for ping requests and replies with a pong."""
    context = zmq.Context()
    sock = context.socket(zmq.REP)
    sock.bind(addr)

    # Create a poller and register the events we want to poll
    poller = zmq.Poller()
    poller.register(sock, zmq.POLLIN|zmq.POLLOUT)

    for i in range(10):
        # Get all sockets that can do something
        socks = dict(poller.poll())

        # Check if we can receive something
        if sock in socks and socks[sock] == zmq.POLLIN:
            req = sock.recv_unicode()
            print('Pong got request:', req)

        # Check if we cann send something
        if sock in socks and socks[sock] == zmq.POLLOUT:
            sock.send_unicode('pong %s' % (i // 2))

    poller.unregister(sock)

You see, that our pong function got pretty ugly. You need 10 iterations to do five ping-pongs, because in each iteration you can either send or reply. And each socket you add to your process adds two more if-statements. You could improve that design if you created a base class wrapping the polling loop and just register sockets and callbacks in an inheriting class.

That brings us to our final example. PyZMQ comes with with an adapted Tornado eventloop that handles the polling and works with ZMQStreams, that wrap sockets and add some functionality:

# eventloop.py
from zmq.eventloop import ioloop, zmqstream


class Pong(multiprocessing.Process):
    """Waits for ping requests and replies with a pong."""
    def __init__(self):
        super().__init__()
        self.loop = None
        self.stream = None
        self.i = 0

    def run(self):
        """
        Initializes the event loop, creates the sockets/streams and
        starts the (blocking) loop.

        """
        context = zmq.Context()
        self.loop = ioloop.IOLoop.instance()  # This is the event loop

        sock = context.socket(zmq.REP)
        sock.bind(addr)
        # We need to create a stream from our socket and
        # register a callback for recv events.
        self.stream = zmqstream.ZMQStream(sock, self.loop)
        self.stream.on_recv(self.handle_ping)

        # Start the loop. It runs until we stop it.
        self.loop.start()

    def handle_ping(self, msg):
        """Handles ping requests and sends back a pong."""
        # req is a list of byte objects
        req = msg[0].decode()
        print('Pong got request:', req)
        self.stream.send_unicode('pong %s' % self.i)

        # We’ll stop the loop after 5 pings
        self.i += 1
        if self.i == 5:
            self.stream.flush()
            self.loop.stop()

This even adds more boilerplate code, but it will pay of if you use more sockets and most of that stuff in run() can be put into a base class. Another drawback is, that the IOLoop only uses recv_multipart(). So you always get a lists of byte strings which you have to decode or deserialize on your own. However, you can use all the send methods socket offers (like send_unicode() or send_json()). You can also stop the loop from within a message handler.

In the next sections, I’ll discuss how you could implement a PyZMQ process that uses the event loop.

Communication Design

Before you start to implement anything, you should think about what kind of processes you need in your application and which messages they exchange. You should also decide what kind of message format and serialization you want to use.

PyZMQ has built-in support for Unicode (send sends plain C strings which map to Python byte objects, so there’s a separate method to send Unicode strings), JSON and Pickle.

JSON is nice, because it’s fast and lets you integrate processes written in other languages into you application. It’s also a bit safer, because you cannot receive arbitrary objects as with pickle. The most straightforward syntax for JSON messages is to let them be triples [msg_type, args, kwargs], where msg_type maps to a method name and args and kwargs get passed as positional and keyword arguments.

I strongly recommend you to document each chain of messages your application sends to perform a certain task. I do this with fancy PowerPoint graphics and with even fancier ASCII art in Sphinx. Here is how I would document our ping-pong:

Sending pings
-------------

* If the ping process sends a *ping*, the pong processes responds with a
  *pong*.
* The number of pings (and pongs) is counted. The current ping count is
  sent with each message.

::

    PingProc      PongProc
     [REQ] ---1--> [REP]
           <--2---


    1 IN : ['ping, count']
    1 OUT: ['ping, count']

    2 IN : ['pong, count']
    2 OUT: ['pong, count']

First, I write some bullet points that explain how the processes behave and why they behave this way. This is followed by some kind of sequence diagram that shows when which process sents which message using which socket type. Finally, I write down how the messages are looking. # IN is what you would pass to send_multipart and # OUT is, what is received on the other side by recv_multipart. If one of the participating sockets is a ROUTER or DEALER, IN and OUT will differ (though that’s not the case in this example). Everything in single quotation marks (') represents a JSON serialized list.

If our pong process used a ROUTER socket instead of the REP socket, it would look like this:

1 IN : ['ping, count']
1 OUT: [ping_uuid, '', 'ping, count']

2 IN : [ping_uuid, '', 'pong, count']
2 OUT: ['pong, count']

This seems like a lot of tedious work, but trust me, it really helps a lot when you need to change something a few weeks later!

Application Design

In the examples above, the Pong process was responsible for setting everything up, for receiving/sending messages and for the actual application logic (counting incoming pings and creating a pong).

Obviously, this is not a very good design (at least if your application gets more complex than our little ping-pong example). What we can do about this is to put most of that nasty setup stuff into a base class which all your processes can inherit from, separate message handling and (de)serialization from it and finally put all the actual application logic into a separate (PyZMQ-independent) class. This will result in a three-level architecture:

  1. The lowest tier will contain the entry point of the process, set-up everything and start the event loop. A common base class provides utilities for creating sockets/streams and setting everything up.
  2. The second level is message handling and (de) serialization. A base class performs the (de)serialization and error handling. A message handler inherits this class and implements a method for each message type that should be handled.
  3. The third level will be the application logic and completely PyZMQ-agnostic.

Base classes should be defined for the first two tiers two reduce redundant code in multiple processes or message handlers. The following figure shows the five classes our process is going to consist of:

The architecture of our pong process.

The refactored PongProc now consists of three layers. The main class PongProc inherits ZMQProcess. Every stream gets a MessageHandler. In our example it’s just RepStreamHandler. Finally, you can have one ore more classes containing the (PyZMQ-agnostic) application logic. In our example, it’s called PingHandler, because it handles incoming pings.

ZmqPocess – The Base Class for all Processes

The base class basically implements two things:

  • a setup method that creates a context an a loop
  • a stream factory method for streams with a on_recv callback. It creates a socket and can connect/bind it to a given address or bind it to a random port (that’s why it returns the port number in addition to the stream itself).

It also inherits multiprocessing.Process so that it is easier to spawn it as sub-process. Of course, you can also just call its run() method from you main().

# example_app/base.py
import multiprocessing

from zmq.eventloop import ioloop, zmqstream
import zmq


class ZmqProcess(multiprocessing.Process):
    """
    This is the base for all processes and offers utility functions
    for setup and creating new streams.

    """
    def __init__(self):
        super().__init__()

        self.context = None
        """The ØMQ :class:`~zmq.Context` instance."""

        self.loop = None
        """PyZMQ's event loop (:class:`~zmq.eventloop.ioloop.IOLoop`)."""

    def setup(self):
        """
        Creates a :attr:`context` and an event :attr:`loop` for the process.

        """
        self.context = zmq.Context()
        self.loop = ioloop.IOLoop.instance()

    def stream(self, sock_type, addr, bind, callback=None, subscribe=b''):
        """
        Creates a :class:`~zmq.eventloop.zmqstream.ZMQStream`.

        :param sock_type: The ØMQ socket type (e.g. ``zmq.REQ``)
        :param addr: Address to bind or connect to formatted as *host:port*,
                *(host, port)* or *host* (bind to random port).
                If *bind* is ``True``, *host* may be:

                - the wild-card ``*``, meaning all available interfaces,
                - the primary IPv4 address assigned to the interface, in its
                numeric representation or
                - the interface name as defined by the operating system.

                If *bind* is ``False``, *host* may be:

                - the DNS name of the peer or
                - the IPv4 address of the peer, in its numeric representation.

                If *addr* is just a host name without a port and *bind* is
                ``True``, the socket will be bound to a random port.
        :param bind: Binds to *addr* if ``True`` or tries to connect to it
                otherwise.
        :param callback: A callback for
                :meth:`~zmq.eventloop.zmqstream.ZMQStream.on_recv`, optional
        :param subscribe: Subscription pattern for *SUB* sockets, optional,
                defaults to ``b''``.
        :returns: A tuple containg the stream and the port number.

        """
        sock = self.context.socket(sock_type)

        # addr may be 'host:port' or ('host', port)
        if isinstance(addr, str):
            addr = addr.split(':')
        host, port = addr if len(addr) == 2 else (addr[0], None)

        # Bind/connect the socket
        if bind:
            if port:
                sock.bind('tcp://%s:%s' % (host, port))
            else:
                port = sock.bind_to_random_port('tcp://%s' % host)
        else:
            sock.connect('tcp://%s:%s' % (host, port))

        # Add a default subscription for SUB sockets
        if sock_type == zmq.SUB:
            sock.setsockopt(zmq.SUBSCRIBE, subscribe)

        # Create the stream and add the callback
        stream = zmqstream.ZMQStream(sock, self.loop)
        if callback:
            stream.on_recv(callback)

        return stream, int(port)

PongProc – The Actual Process

The PongProc inherits ZmqProcess and is the main class for our process. It creates the streams, starts the event loop and dispatches all messages to the appropriate handlers:

# example_app/pongproc.py
import zmq

import base


host = '127.0.0.1'
port = 5678


class PongProc(base.ZmqProcess):
    """
    Main processes for the Ponger. It handles ping requests and sends back
    a pong.

    """
    def __init__(self, bind_addr):
        super().__init__()

        self.bind_addr = bind_addr
        self.rep_stream = None
        self.ping_handler = PingHandler()

    def setup(self):
        """Sets up PyZMQ and creates all streams."""
        super().setup()

        # Create the stream and add the message handler
        self.rep_stream, _ = self.stream(zmq.REP, self.bind_addr, bind=True)
        self.rep_stream.on_recv(RepStreamHandler(self.rep_stream, self.stop,
                                                 self.ping_handler))

    def run(self):
        """Sets up everything and starts the event loop."""
        self.setup()
        self.loop.start()

    def stop(self):
        """Stops the event loop."""
        self.loop.stop()

If you are going to start this process as a sub-process via start, make sure everything you instantiate in __init__ is pickle-able or it won’t work on Windows (Linux and Mac OS X use fork to create a sub-process and fork just makes a copy of the main process and gives it a new process ID. On Windows, there is no fork and the context of your main process is pickled and sent to the sub-process).

In setup, call super().setup() before you create a stream or you won’t have a loop instance for them. We call setup from run, because the context must be created within the new system process, which wouldn’t be the case if we called setup from __init__.

The stop method is not really necessary in this example, but it can be used to send stop messages to sub-processes when the main process terminates and to do other kinds of clean-up. You can also execute it if you except a KeyboardInterrupt after calling run.

MessageHandler — The Base Class for Message Handlers

A PyZMQ message handler can be any callable that accepts one argument—the list of message parts as byte objects. Hence, our MessageHandler class needs to implement __call__:

# exmaple_app/base.py

from zmq.utils import jsonapi as json


class MessageHandler(object):
    """
    Base class for message handlers for a :class:`ZMQProcess`.

    Inheriting classes only need to implement a handler function for each
    message type.

    """
    def __init__(self, json_load=-1):
        self._json_load = json_load

    def __call__(self, msg):
        """
        Gets called when a messages is received by the stream this handlers is
        registered at. *msg* is a list as return by
        :meth:`zmq.core.socket.Socket.recv_multipart`.

        """
        # Try to JSON-decode the index "self._json_load" of the message
        i = self._json_load
        msg_type, data = json.loads(msg[i])
        msg[i] = data

        # Get the actual message handler and call it
        if msg_type.startswith('_'):
            raise AttributeError('%s starts with an "_"' % msg_type)

        getattr(self, msg_type)(*msg)

As you can see, it’s quite simle. It just tries to JSON-load the index defined by self._json_load. We earlier defined, that the first element of the JSON-encoded message defines the message type (e.g., ping). If an attribute of the same name exists in the inheriting class, it is called with the remainer of the message.

You can also add logging or additional security measures here, but that is not necessary here.

RepStreamHandler — The Concrete Message Handler

This class inherits the MessageHandler I just showed you and is used in PongProc.setup. It defines a handler method for ping messages and the plzdiekthxbye stop message. In its __init__ it receives references to the rep_stream, PongProcs stop method and to the ping_handler, our actual application logic:

# example_app/pongproc.py

class RepStreamHandler(base.MessageHandler):
    """Handels messages arrvinge at the PongProc’s REP stream."""
    def __init__(self, rep_stream, stop, ping_handler):
        super().__init__()
        self._rep_stream = rep_stream
        self._stop = stop
        self._ping_handler = ping_handler

    def ping(self, data):
        """Send back a pong."""
        rep = self._ping_handler.make_pong(data)
        self._rep_stream.send_json(rep)

    def plzdiekthxbye(self, data):
        """Just calls :meth:`PongProc.stop`."""
        self._stop()

PingHandler – The Application Logic

The PingHandler contains the actual application logic (which is not much, in this example). The make_pong method just gets the number of pings sent with the ping message and creates a new pong message. The serialization is done by PongProc, so our Handler does not depend on PyZMQ:

# example_app/pongproc.py

class PingHandler(object):

    def make_pong(self, num_pings):
        """Creates and returns a pong message."""
        print('Pong got request number %s' % num_pings)

        return ['pong', num_pings]

Summary

Okay, that’s it for now. I showed you three ways to use PyZMQ. If you have a very simple process with only one socket, you can easily use its blocking recv methods. If you need more than one socket, I recommend using the event loop. And polling … you don’t want to use that.

If you decide to use PyZMQ’s event loop, you should separate the application logic from all the PyZMQ stuff (like creating streams, sending/receiving messages and dispatching them). If your application consists of more then one process (which is usually the case), you should also create a base class with shared functionality for them.

In the next part, I’m going to talk about how you can test your application.

Book review: NumPy 1.5 Beginner’s Guide

I recently got the chance to review the book NumPy 1.5 Beginner’s Guide by Ivan Idris and published by Packt Publishing.

It covers many aspects of NumPy and also introduces SciPy as well as Matplotlib. The author includes a lot of examples and exercises and also shows the effects of some not-so-easy-to-understand functions using matplotlib graphs.

The book is easy to read, so you should make fast progress in learning NumPy. Overall, it’s a good read for NumPy beginners. Advanced NumPy users, who just want to look up how specific things work, are better of with NumPy’s documentation, though.

Tea Timer 1.8

Since everything seems to work nicely, I’ve just uploaded the final Tea Timer 1.8 release.

Unfortunately, I wasn’t able to submit the new version to apple.com. Seems like they don’t accept new widgets anymore …

However, here is the list of changes for this release:

  • [NEW] Growl 1.3 support
  • [NEW] Pink background
  • [CHANGE] Alarms are now called asynchronously. The voice and Growl message appear at the same time now (instead of after the other).
  • [CHANGE] The list of alarm sounds and voices is now created dynamically and thus contain Lion’s new voices.
  • [CHANGE] Editing the fields for the timer target (e.g. “Tea”) or “ready in” during a countdown will now longer reset the countdown. However, editing the countdown time during an active countdown will reset the countdown accordingly.
  • [FIX] Fixed a bug with keep alarming and sticky Growl messages. You won’t get flooded by them anymore. Instead, there will be only one sticky Growl message when keep alarming is activated.
older posts »