Handling incoming messages using Processor and Worker
=====================================================

We have two separated component that compose a Consumer in Zope:

    1. ``MultiProcessor`` which is waiting for message to come. Once it has one it pass it start a
       new thread and pass the message over to one of its worker.

    2. ``ConsumingWorker`` which recieve a message, register it into the transaction machinery and pass it to the Zope event subscribers.

Create a bar connection first::

    >>> from affinitic.zamqp.connection import BrokerConnection
    >>> from affinitic.zamqp.interfaces import IBrokerConnection
    >>> from zope.component import provideUtility

    >>> class DummyBarBrokerConnection(BrokerConnection):
    ...     id = 'bar'
    >>> conn = DummyBarBrokerConnection()
    >>> provideUtility(conn, IBrokerConnection, name='bar')

We fake an Application, a site, a database object and a db connection::

    >>> from zope.component import getGlobalSiteManager
    >>> class Site(object):
    ...     """
    ...     A fake site with a site manager
    ...     """
    ...
    ...     def getSiteManager(self):
    ...         return getGlobalSiteManager()

    >>> class Application(object):
    ...     """
    ...     A fake Application with one site
    ...     """
    ...
    ...     site1 = Site()

    >>> class Connection(object):
    ...     """
    ...     A fake connection to the root of a ZODB
    ...     """
    ...
    ...     def root(self):
    ...         return {'Application':Application()}

    >>> class FakeDb(object):
    ...     """
    ...     A fake database
    ...     """
    ...
    ...     def open(self):
    ...         return Connection()

    >>> db = FakeDb()

We create by hand a MultiProcessor. It is normally instantiated by the ``bootStrapSubscriber``
once the connection to the ZODB is ready after Zope startup.

We pass it the database connection, the name of the site that will call the subscribers and then
name of the connection that will be used to instantiate the ``ConsumerSet``::

    >>> from affinitic.zamqp.processor import MultiProcessor
    >>> processor = MultiProcessor(db, 'site1', 'bar')

The site related to this processor is the one created previously::

    >>> processor.getSite()
     <Site object ...>

We fake now a message ::

    >>> from affinitic.zamqp.interfaces import IMessage
    >>> from zope.interface import implements
    >>> class DummyAMQPMessage(object):
    ...     implements(IMessage)
    ...     _state = 'NEW'
    ...     delivery_info = {'delivery_tag': 999,
    ...                      'exchange': 'SuperExchange'}
    ...
    ...     def ack(self):
    ...         self._state = 'ACK'

    >>> from affinitic.zamqp.interfaces import IMessageWrapper
    >>> message = DummyAMQPMessage()

We wrap the message (just as the ``Consumer`` or ``ConsumerSet`` are doing)::

    >>> messageWrapper = IMessageWrapper(message)

We also mark the message wrapper with a marker interface so that subscribers will be able
to subscribe to this specific type of message::

    >>> from zope.interface import Interface, alsoProvides
    >>> class IDummyMessage(Interface):
    ...     """
    ...     Marker interface for dummy message
    ...     """
    >>> alsoProvides(messageWrapper, IDummyMessage)

To be able to get the output from the threads that will be created, we
need to setup some logging (which is thread safe)::

    >>> import logging
    >>> import StringIO
    >>> logger = logging.getLogger('affinitic.zamqp')
    >>> logger.setLevel(logging.DEBUG)
    >>> logger.handlers = []
    >>> stream = StringIO.StringIO()
    >>> h = logging.StreamHandler(stream)
    >>> logger.addHandler(h)

Process a message without subscription adapter
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

We notify a new incoming message (this is normally called by the processor loop)::

    >>> processor.notify_message('A Message', messageWrapper)

This is asynchronous so let's wait a bit for the ConsumerWorker to do its job

    >>> from time import sleep
    >>> sleep(1)

We read what happen thanks to the logging. And we see that no subscriber handled our message and
nobody acknowledged it::

    >>> print stream.getvalue()
    Thread MainThread is starting new thread ConsumerWork-0
    Notify new message 999 in exchange: SuperExchange
    Before commit Message 999 (status = 'NEW')
    Handled Message 999 (status = 'NEW')

Process a message with one subscription adapter
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Now let's use a new message and define a subscriber to see what happens.

Reset the logging stream::

    >>> stream.seek(0)

Mark the message as being another one::

    >>> message.delivery_info['delivery_tag'] = 1000

Register a subscription adapter that log a dummy message and acknowledge the dummy message::

    >>> from zope.component import provideSubscriptionAdapter
    >>> def loggingSubscriber(message):
    ...     logger = logging.getLogger('affinitic.zamqp')
    ...     logger.debug('Subscriber handling message %s' % message)
    ...     message.ack()

    >>> from affinitic.zamqp.interfaces import IArrivedMessage
    >>> provideSubscriptionAdapter(loggingSubscriber, [IDummyMessage], IArrivedMessage)

We notify again new incoming message (this is normally called by the processor loop)::

    >>> processor.notify_message('A Message', messageWrapper)

This is asynchronous so let's wait a bit for the ConsumerWorker to do its job

    >>> sleep(1)

We read what happen thanks to the logging. And we see that our subscriber handled our message and
acknowledged it. You can see also that before transaction commit the message is still marked as not
being acknowledged::

    >>> print stream.getvalue()
    Thread MainThread is starting new thread ConsumerWork-0
    Notify new message 1000 in exchange: SuperExchange
    Subscriber handling message <affinitic.zamqp.message.MessageWrapper object ...>
    Before commit Message 1000 (status = 'NEW')
    Handled Message 1000 (status = 'ACK')

Process a message with one failing subscription adapter
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Now let's see what happen if one of the subscription adapter is failing.

Reset the logging stream::

    >>> stream.seek(0)

Mark the message as being another one::

    >>> message.delivery_info['delivery_tag'] = 1001

Register a subscription adapter that just fails::

    >>> def failingSubscriber(message):
    ...     raise AttributeError('Fake error')

    >>> provideSubscriptionAdapter(failingSubscriber, [IDummyMessage], IArrivedMessage)

We notify again new incoming message (this is normally called by the processor loop)::

    >>> processor.notify_message('A Message', messageWrapper)

This is asynchronous so let's wait a bit for the ConsumerWorker to do its job

    >>> sleep(1)

We read what happen thanks to the logging. And we see that our subscriber handled our message and
failed. As the transaction was not commited the message is not acknowledged.

    >>> print stream.getvalue()
    Thread MainThread is starting new thread ConsumerWork-0
    Notify new message 1001 in exchange: SuperExchange
    Subscriber handling message <affinitic.zamqp.message.MessageWrapper object ...>
    Error while running job 1001 on exchange SuperExchange
    Fake error
    Traceback (most recent call last):
    ...
    AttributeError: Fake error

