#!/usr/bin/python3.4

# Copyright (C) 2010  Internet Systems Consortium.
#
# Permission to use, copy, modify, and distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

import sys; sys.path.append ('/usr/lib64/python3.4/site-packages')

"""This code implements the msgq daemon."""

import subprocess
import signal
import os
import socket
import sys
import struct
import errno
import time
import select
import random
import threading
import bundy.config.ccsession
from optparse import OptionParser, OptionValueError
import bundy.util.process
import bundy.util.traceback_handler
from bundy.cc.proto_defs import *
import bundy.log
from bundy.log_messages.msgq_messages import *

import bundy.cc

bundy.util.process.rename()

bundy.log.init("bundy-msgq", buffer=True)
# Logger that is used in the actual msgq handling - startup, shutdown and the
# poller thread.
logger = bundy.log.Logger("msgq")
# A separate copy for the master/config thread when the poller thread runs.
# We use a separate instance, since the logger itself doesn't have to be
# thread safe.
config_logger = bundy.log.Logger("msgq")
TRACE_START = logger.DBGLVL_START_SHUT
TRACE_BASIC = logger.DBGLVL_TRACE_BASIC
TRACE_DETAIL = logger.DBGLVL_TRACE_DETAIL

# This is the version that gets displayed to the user.
# The VERSION string consists of the module name, the module version
# number, and the overall BUNDY version number (set in configure.ac).
VERSION = "bundy-msgq 20110127 (BUNDY 1.2.0)"

# If BUNDY_FROM_BUILD is set in the environment, we use data files
# from a directory relative to that, otherwise we use the ones
# installed on the system
if "BUNDY_FROM_SOURCE" in os.environ:
    SPECFILE_PATH = os.environ["BUNDY_FROM_SOURCE"] + "/src/bin/msgq"
else:
    PREFIX = "/usr"
    DATAROOTDIR = "${prefix}/share"
    SPECFILE_PATH = "/usr/share/bundy".replace("${datarootdir}",
                                                  DATAROOTDIR). \
                                                  replace("${prefix}", PREFIX)
SPECFILE_LOCATION = SPECFILE_PATH + "/msgq.spec"

class MsgQReceiveError(Exception): pass

class MsgQRunningError(Exception): pass

class MsgQCloseOnReceive(Exception):
    """Exception raised when reading data from a socket results in 'shutdown'.

    This happens when msgq received 0-length data.  This class holds whether
    it happens in the middle of reading (i.e. after reading some) via
    partial_read parameter, which is set to True if and only if so.
    This will be used by an upper layer catching the exception to distinguish
    the severity of the event.

    """
    def __init__(self, reason, partial_read):
        self.partial_read = partial_read
        self.__reason = reason

    def __str__(self):
        return self.__reason

class SubscriptionManager:
    def __init__(self, cfgmgr_ready):
        """
        Initialize the subscription manager.
        parameters:
        * cfgmgr_ready: A callable object run once the config manager
            subscribes. This is a hackish solution, but we can't read
            the configuration sooner.
        """
        self.subscriptions = {}
        self.__cfgmgr_ready = cfgmgr_ready
        self.__cfgmgr_ready_called = False

    def subscribe(self, group, instance, socket):
        """Add a subscription."""
        target = ( group, instance )
        if target in self.subscriptions:
            logger.debug(TRACE_BASIC, MSGQ_SUBS_APPEND_TARGET, group, instance)
            if socket not in self.subscriptions[target]:
                self.subscriptions[target].append(socket)
        else:
            logger.debug(TRACE_BASIC, MSGQ_SUBS_NEW_TARGET, group, instance)
            self.subscriptions[target] = [ socket ]
        if group == "ConfigManager" and not self.__cfgmgr_ready_called:
            logger.debug(TRACE_BASIC, MSGQ_CFGMGR_SUBSCRIBED)
            self.__cfgmgr_ready_called = True
            self.__cfgmgr_ready()

    def unsubscribe(self, group, instance, socket):
        """Remove the socket from the one specific subscription."""
        target = ( group, instance )
        if target in self.subscriptions:
            if socket in self.subscriptions[target]:
                self.subscriptions[target].remove(socket)
                return True
        return False

    def unsubscribe_all(self, socket):
        """Remove the socket from all subscriptions."""
        removed_from = []
        for subs, socklist in self.subscriptions.items():
            if socket in socklist:
                socklist.remove(socket)
                removed_from.append(subs)
        return removed_from

    def find_sub(self, group, instance):
        """Return an array of sockets which want this specific group,
        instance."""
        target = (group, instance)
        if target in self.subscriptions:
            return self.subscriptions[target]
        else:
            return []

    def find(self, group, instance):
        """Return an array of sockets who should get something sent to
        this group, instance pair.  This includes wildcard subscriptions."""
        target = (group, instance)
        partone = self.find_sub(group, instance)
        parttwo = self.find_sub(group, CC_INSTANCE_WILDCARD)
        return list(set(partone + parttwo))

class MsgQ:
    """Message Queue class."""
    # did we find a better way to do this?
    SOCKET_FILE = os.path.join("/var",
                               "bundy",
                               "msgq_socket").replace("${prefix}",
                                                      "/usr")

    def __init__(self, socket_file=None, verbose=False):
        """Initialize the MsgQ master.

        The socket_file specifies the path to the UNIX domain socket
        that the msgq process listens on. If it is None, the
        environment variable BUNDY_MSGQ_SOCKET_FILE is used. If that
        is not set, it will default to
        /var/bundy/msg_socket.
        If verbose is True, then the MsgQ reports
        what it is doing.
        """

        if socket_file is None:
            if "BUNDY_MSGQ_SOCKET_FILE" in os.environ:
                self.socket_file = os.environ["BUNDY_MSGQ_SOCKET_FILE"]
            else:
                self.socket_file = self.SOCKET_FILE
        else:
            self.socket_file = socket_file

        self.verbose = verbose
        self.runnable = False
        self.listen_socket = False
        self.sockets = {}
        self.connection_counter = random.random()
        self.hostname = socket.gethostname()
        self.subs = SubscriptionManager(self.cfgmgr_ready)
        self.lnames = {}
        self.fd_to_lname = {}
        self.sendbuffs = {}
        self.running = False
        self.__cfgmgr_ready = None
        self.__cfgmgr_ready_cond = threading.Condition()
        # A lock used when the message queue does anything more complicated.
        # It is mostly a safety measure, the threads doing so should be mostly
        # independent, and the one with config session should be read only,
        # but with threads, one never knows. We use threads for concurrency,
        # not for performance, so we use wide lock scopes to be on the safe
        # side.
        self.__lock = threading.Lock()
        self._session = None
        self.__poller_sock = None

    def members_notify(self, event, params):
        """
        Thin wrapper around ccs's notify. Send a notification about change
        of some list that can be requested by the members command.

        The event is one of:
        - connected (client connected to MsgQ)
        - disconected (client disconnected from MsgQ)
        - subscribed (client subscribed to a group)
        - unsubscribed (client unsubscribed from a group)

        The params is dict containing:
        - client: The lname of the client in question.
        - group (for 'subscribed' and 'unsubscribed' events):
          The group the client subscribed or unsubscribed from.

        The notification occurs after the event, so client a subscribing for
        notifications will get a notification about its own subscription, but
        will not get a notification when it unsubscribes.
        """
        # Due to the interaction between threads (and fear it might influence
        # sending stuff), we test this method in msgq_run_test, instead of
        # mocking the ccs.
        if self._session: # Don't send before we have started up
            self._session.notify('cc_members', event, params)

    def cfgmgr_ready(self, ready=True):
        """Notify that the config manager is either subscribed, or
           that the msgq is shutting down and it won't connect, but
           anybody waiting for it should stop anyway.

           The ready parameter signifies if the config manager is subscribed.

           This method can be called multiple times, but second and any
           following call is simply ignored. This means the "abort" version
           of the call can be used on any stop unconditionally, even when
           the config manager already connected.
        """
        with self.__cfgmgr_ready_cond:
            if self.__cfgmgr_ready is not None:
                # This is a second call to this method. In that case it does
                # nothing.
                return
            self.__cfgmgr_ready = ready
            self.__cfgmgr_ready_cond.notify_all()

    def wait_cfgmgr(self):
        """Wait for msgq to subscribe.

           When this returns, the config manager is either subscribed, or
           msgq gave up waiting for it. Success is signified by the return
           value.
        """
        with self.__cfgmgr_ready_cond:
            # Wait until it either aborts or subscribes
            while self.__cfgmgr_ready is None:
                self.__cfgmgr_ready_cond.wait()
            return self.__cfgmgr_ready

    def setup_listener(self):
        """Set up the listener socket.  Internal function."""
        logger.debug(TRACE_BASIC, MSGQ_LISTENER_SETUP, self.socket_file)

        if os.path.exists(self.socket_file):
            # Rather than just blindly removing the socket file, attempt to
            # connect to the existing socket to see if there is an existing
            # msgq running. Only if that fails do we remove the file and
            # attempt to create a new socket.
            existing_msgq = None
            try:
                existing_msgq = bundy.cc.Session(self.socket_file)
            except bundy.cc.session.SessionError:
                existing_msgq = None

            if existing_msgq:
                existing_msgq.close()
                logger.fatal(MSGQ_ALREADY_RUNNING)
                raise MsgQRunningError("bundy-msgq already running")

            os.remove(self.socket_file)

        try:
            self.listen_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
            self.listen_socket.bind(self.socket_file)
            self.listen_socket.listen(1024)
        except Exception as e:
            # remove the file again if something goes wrong
            # (note this is a catch-all, but we reraise it)
            if os.path.exists(self.socket_file):
                os.remove(self.socket_file)
            self.listen_socket.close()
            logger.fatal(MSGQ_LISTENER_FAILED, self.socket_file, e)
            raise e

    def setup_signalsock(self):
        """Create a socket pair used to signal when we want to finish.
           Using a socket is easy and thread/signal safe way to signal
           the termination.
        """
        # The __poller_sock will be the end in the poller. When it is
        # closed, we should shut down.
        (self.__poller_sock, self.__control_sock) = socket.socketpair()

    def setup(self):
        """Configure listener socket, polling, etc.
           Raises a socket.error if the socket_file cannot be
           created.
        """

        self.setup_signalsock()
        self.setup_listener()

        logger.debug(TRACE_START, MSGQ_LISTENER_STARTED);

        self.runnable = True

    def process_accept(self):
        """Process an accept on the listening socket."""
        newsocket, ipaddr = self.listen_socket.accept()
        # TODO: When we have logging, we might want
        # to add a debug message here that a new connection
        # was made
        self.register_socket(newsocket)

    def register_socket(self, newsocket):
        """
        Internal function to insert a socket. Used by process_accept and some
        tests.
        """
        self.sockets[newsocket.fileno()] = newsocket
        lname = self.newlname()
        self.lnames[lname] = newsocket
        self.fd_to_lname[newsocket.fileno()] = lname

        logger.debug(TRACE_BASIC, MSGQ_SOCKET_REGISTERED, newsocket.fileno(),
                     lname)

        self.members_notify('connected', {'client': lname})

    def kill_socket(self, fd, sock):
        """Fully close down the socket."""

        unsubscribed_from = self.subs.unsubscribe_all(sock)
        lname = self.fd_to_lname[fd]
        del self.fd_to_lname[fd]
        del self.lnames[lname]
        sock.close()
        del self.sockets[fd]
        if fd in self.sendbuffs:
            del self.sendbuffs[fd]
        logger.debug(TRACE_BASIC, MSGQ_SOCK_CLOSE, fd)
        # Filter out just the groups.
        unsubscribed_from_groups = set(map(lambda x: x[0], unsubscribed_from))
        for group in unsubscribed_from_groups:
            self.members_notify('unsubscribed', {
                                    'client': lname,
                                    'group': group
                                })
        self.members_notify('disconnected', {'client': lname})

    def __getbytes(self, fd, sock, length, continued):
        """Get exactly the requested bytes, or raise an exception if
           EOF.

           continued is set to True if this method is called to complete
           already read data.
           """
        received = b''
        while len(received) < length:
            try:
                data = sock.recv(length - len(received))

            except socket.error as err:
                # This case includes ECONNRESET, which seems to happen when
                # the remote client has closed its socket at some subtle
                # timing (it should normally result in receiving empty data).
                # Since we didn't figure out how exactly that could happen,
                # we treat it just like other really-unexpected socket errors.
                raise MsgQReceiveError(str(err))
            if len(data) == 0:
                raise MsgQCloseOnReceive("EOF", continued)
            received += data
            continued = True
        return received

    def read_packet(self, fd, sock):
        """Read a correctly formatted packet.  Will raise exceptions if
           something fails."""
        lengths = self.__getbytes(fd, sock, 6, False)
        overall_length, routing_length = struct.unpack(">IH", lengths)
        if overall_length < 2:
            raise MsgQReceiveError("overall_length < 2")
        overall_length -= 2
        if routing_length > overall_length:
            raise MsgQReceiveError("routing_length > overall_length")
        if routing_length == 0:
            raise MsgQReceiveError("routing_length == 0")
        data_length = overall_length - routing_length
        # probably need to sanity check lengths here...
        routing = self.__getbytes(fd, sock, routing_length, True)
        if data_length > 0:
            data = self.__getbytes(fd, sock, data_length, True)
        else:
            data = None
        return (routing, data)

    def process_packet(self, fd, sock):
        """Process one packet."""
        try:
            routing, data = self.read_packet(fd, sock)
        except (MsgQReceiveError, MsgQCloseOnReceive) as err:
            # If it's MsgQCloseOnReceive and that happens without reading
            # any data, it basically means the remote client has closed the
            # socket, so we log it as debug information.  Otherwise, it's
            # a somewhat unexpected event, so we consider it an "error".
            if isinstance(err, MsgQCloseOnReceive) and not err.partial_read:
                logger.debug(TRACE_BASIC, MSGQ_CLOSE_ON_RECV, fd)
            else:
                logger.error(MSGQ_RECV_ERROR, fd, err)
            self.kill_socket(fd, sock)
            return

        try:
            routingmsg = bundy.cc.message.from_wire(routing)
        except DecodeError as err:
            self.kill_socket(fd, sock)
            logger.error(MSGQ_HDR_DECODE_ERROR, fd, err)
            return

        self.process_command(fd, sock, routingmsg, data)

    def process_command(self, fd, sock, routing, data):
        """Process a single command.  This will split out into one of the
           other functions."""
        logger.debug(TRACE_DETAIL, MSGQ_RECV_HDR, routing)
        cmd = routing[CC_HEADER_TYPE]
        if cmd == CC_COMMAND_SEND:
            self.process_command_send(sock, routing, data)
        elif cmd == CC_COMMAND_SUBSCRIBE:
            self.process_command_subscribe(sock, routing, data)
        elif cmd == CC_COMMAND_UNSUBSCRIBE:
            self.process_command_unsubscribe(sock, routing, data)
        elif cmd == CC_COMMAND_GET_LNAME:
            self.process_command_getlname(sock, routing, data)
        elif cmd == CC_COMMAND_PING:
            # Command for testing purposes
            self.process_command_ping(sock, routing, data)
        elif cmd == CC_COMMAND_STOP:
            self.stop()
        else:
            logger.error(MSGQ_INVALID_CMD, cmd)

    def preparemsg(self, env, msg = None):
        if type(env) == dict:
            env = bundy.cc.message.to_wire(env)
        if type(msg) == dict:
            msg = bundy.cc.message.to_wire(msg)
        length = 2 + len(env);
        if msg:
            length += len(msg)
        ret = struct.pack("!IH", length, len(env))
        ret += env
        if msg:
            ret += msg
        return ret

    def sendmsg(self, sock, env, msg = None):
        self.send_prepared_msg(sock, self.preparemsg(env, msg))

    def _send_data(self, sock, data):
        """
        Send a piece of data to the given socket.  This method is
        essentially "private" to MsgQ, but defined as if it were "protected"
        for easier access from tests.

        Parameters:
        sock: The socket to send to
        data: The list of bytes to send
        Returns:
        An integer or None. If an integer (which can be 0), it signals
        the number of bytes sent. If None, the socket appears to have
        been closed on the other end, and it has been killed on this
        side too.
        """
        try:
            # We set the socket nonblocking, MSG_DONTWAIT doesn't exist
            # on some OSes
            sock.setblocking(0)
            return sock.send(data)
        except socket.error as e:
            if e.errno in [ errno.EAGAIN, errno.EWOULDBLOCK, errno.EINTR ]:
                return 0
            elif e.errno in [ errno.EPIPE, errno.ECONNRESET, errno.ENOBUFS ]:
                # EPIPE happens if the remote module has terminated by the time
                # of this send; its severity can vary, but in many cases it
                # shouldn't be critical, so we log it separately as a warning.
                if e.errno == errno.EPIPE:
                    logger.warn(MSGQ_CLOSE_ON_SEND, sock.fileno())
                else:
                    logger.error(MSGQ_SEND_ERROR, sock.fileno(),
                                 errno.errorcode[e.errno])
                self.kill_socket(sock.fileno(), sock)
                return None
            else:
                raise e
        finally:
            # And set it back again
            sock.setblocking(1)

    def send_prepared_msg(self, sock, msg):
        '''
        Add a message to the queue. If there's nothing waiting, try
        to send it right away.

        Return if the socket is still alive. It can return false if the
        socket dies (for example due to EPIPE in the attempt to send).
        Returning true does not guarantee the message will be delivered,
        but returning false means it won't.
        '''
        # Try to send the data, but only if there's nothing waiting
        fileno = sock.fileno()
        if fileno in self.sendbuffs:
            amount_sent = 0
        else:
            amount_sent = self._send_data(sock, msg)
            if amount_sent is None:
                # Socket has been killed, drop the send
                return False

        # Still something to send, add it to outgoing queue
        if amount_sent < len(msg):
            now = time.clock()
            # Append it to buffer (but check the data go away)
            if fileno in self.sendbuffs:
                (last_sent, buff) = self.sendbuffs[fileno]
                tdelta = now - last_sent
                if tdelta > 0.1:
                    logger.error(MSGQ_SOCKET_TIMEOUT_ERROR, fileno, tdelta)
                    self.kill_socket(fileno, sock)
                    return False
                buff += msg
            else:
                buff = msg[amount_sent:]
                last_sent = now
            self.sendbuffs[fileno] = (last_sent, buff)
        return True

    def _process_write(self, fileno):
        # Try to send some data from the buffer
        (_, msg) = self.sendbuffs[fileno]
        sock = self.sockets[fileno]
        amount_sent = self._send_data(sock, msg)
        if amount_sent is not None:
            # Keep the rest
            msg = msg[amount_sent:]
            if len(msg) == 0:
                # If there's no more, stop requesting for write availability
                del self.sendbuffs[fileno]
            else:
                self.sendbuffs[fileno] = (time.clock(), msg)

    def newlname(self):
        """Generate a unique connection identifier for this socket.
        This is done by using an increasing counter and the current
        time."""
        self.connection_counter += 1
        return "%x_%x@%s" % (time.time(), self.connection_counter,
                             self.hostname)

    def process_command_ping(self, sock, routing, data):
        self.sendmsg(sock, { CC_HEADER_TYPE : CC_COMMAND_PONG }, data)

    def process_command_getlname(self, sock, routing, data):
        lname = [ k for k, v in self.lnames.items() if v == sock ][0]
        self.sendmsg(sock, { CC_HEADER_TYPE : CC_COMMAND_GET_LNAME },
                     { CC_PAYLOAD_LNAME : lname })

    def process_command_send(self, sock, routing, data):
        group = routing[CC_HEADER_GROUP]
        instance = routing[CC_HEADER_INSTANCE]
        to = routing[CC_HEADER_TO]
        if group == None or instance == None:
            # FIXME: Should we log them instead?
            return  # ignore invalid packets entirely

        if to == CC_TO_WILDCARD:
            sockets = self.subs.find(group, instance)
        else:
            if to in self.lnames:
                sockets = [ self.lnames[to] ]
            else:
                sockets = []

        msg = self.preparemsg(routing, data)

        if sock in sockets:
            # Don't bounce to self
            sockets.remove(sock)

        has_recipient = False
        for socket in sockets:
            if self.send_prepared_msg(socket, msg):
                has_recipient = True
        if not has_recipient and routing.get(CC_HEADER_WANT_ANSWER) and \
            CC_HEADER_REPLY not in routing:
            # We have no recipients. But the sender insists on a reply
            # (and the message isn't a reply itself). We need to send
            # an error to satisfy the request, since there's nobody
            # else who can.
            #
            # We omit the replies on purpose. The recipient might generate
            # the response by copying and mangling the header of incoming
            # message (just like we do below) and would include the want_answer
            # by accident. And we want to avoid loops of errors. Also, it
            # is unclear if the knowledge of undeliverable reply would be
            # of any use to the sender, and it should be much rarer situation.

            # The real errors would be positive, 1 most probably. We use
            # negative errors for delivery errors to distinguish them a
            # little. We probably should have a way to provide more data
            # in the error message.
            payload = bundy.config.ccsession.create_answer(CC_REPLY_NO_RECPT,
                                                         "No such recipient")
            # We create the header based on the current one. But we don't
            # want to mangle it for the caller, so we get a copy. A shallow
            # one should be enough, we modify the dict only.
            header = routing.copy()
            header[CC_HEADER_REPLY] = routing[CC_HEADER_SEQ]
            # Dummy lname not assigned to clients
            header[CC_HEADER_FROM] = "msgq"
            header[CC_HEADER_TO] = routing[CC_HEADER_FROM]
            # We keep the seq as it is. We don't need to track the message
            # and we will not confuse the sender. The sender would use an
            # unique id for each message, so we won't return one twice to it.
            errmsg = self.preparemsg(header, payload)
            # Send it back.
            self.send_prepared_msg(sock, errmsg)

    def process_command_subscribe(self, sock, routing, data):
        group = routing[CC_HEADER_GROUP]
        instance = routing[CC_HEADER_INSTANCE]
        if group == None or instance == None:
            return  # ignore invalid packets entirely
        self.subs.subscribe(group, instance, sock)
        lname = self.fd_to_lname[sock.fileno()]
        self.members_notify('subscribed',
                            {
                                'client': lname,
                                'group': group
                            })

    def process_command_unsubscribe(self, sock, routing, data):
        group = routing[CC_HEADER_GROUP]
        instance = routing[CC_HEADER_INSTANCE]
        if group == None or instance == None:
            return  # ignore invalid packets entirely
        if self.subs.unsubscribe(group, instance, sock):
            lname = self.fd_to_lname[sock.fileno()]
            self.members_notify('unsubscribed',
                                {
                                    'client': lname,
                                    'group': group
                                })

    def run(self):
        """Process messages.  Forever.  Mostly."""
        self.running = True

        self.run_select()

    def run_select(self):
        while self.running:
            reads = list(self.fd_to_lname.keys())
            if self.listen_socket.fileno() != -1: # Skip in tests
                reads.append(self.listen_socket.fileno())
            if self.__poller_sock and self.__poller_sock.fileno() != -1:
                reads.append(self.__poller_sock.fileno())
            writes = list(self.sendbuffs.keys())
            (read_ready, write_ready) = ([], [])
            try:
                (read_ready, write_ready, _) = select.select(reads, writes,
                                                             []);
            except select.error as err:
                if err.args[0] == errno.EINTR:
                    continue # Just try it again if interrupted.
                else:
                    logger.fatal(MSGQ_SELECT_ERROR, err)
                    break
            with self.__lock:
                write_ready = set(write_ready)
                for fd in read_ready:
                    # Do only one operation per loop iteration on the given fd.
                    # It could be possible to perform both, but it may have
                    # undesired side effects in special situations (like, if the
                    # read closes the socket).
                    if fd in write_ready:
                        write_ready.remove(fd)
                    if fd == self.listen_socket.fileno():
                        self.process_accept()
                    elif self.__poller_sock and fd == \
                        self.__poller_sock.fileno():
                        # The signal socket. We should terminate now.
                        self.running = False
                        break
                    else:
                        self.process_packet(fd, self.sockets[fd])
                for fd in write_ready:
                    self._process_write(fd)

    def stop(self):
        # Signal it should terminate.
        self.__control_sock.close()
        self.__control_sock = None
        # Abort anything waiting on the condition, just to make sure it's not
        # blocked forever
        self.cfgmgr_ready(False)

    def cleanup_signalsock(self):
        """Close the signal sockets. We could do it directly in shutdown,
           but this part is reused in tests.
        """
        if self.__poller_sock:
            self.__poller_sock.close()
            self.__poller_sock = None
        if self.__control_sock:
            self.__control_sock.close()
            self.__control_sock = None

    def shutdown(self):
        """Stop the MsgQ master."""
        logger.debug(TRACE_START, MSGQ_SHUTDOWN)
        self.listen_socket.close()
        self.cleanup_signalsock()
        # Close all the sockets too. In real life, there should be none now,
        # as Msgq should be the last one. But some tests don't adhere to this
        # and create a new Msgq for each test, which led to huge socket leaks.
        # Some other threads put some other things in instead of sockets, so
        # we catch whatever exceptions there we can. This should be safe,
        # because in real operation, we will terminate now anyway, implicitly
        # closing anything anyway.
        for sock in self.sockets.values():
            try:
                sock.close()
            except Exception:
                pass
        if os.path.exists(self.socket_file):
            os.remove(self.socket_file)

    def config_handler(self, new_config):
        """The configuration handler (run in a separate thread).
           Not tested, currently effectively empty.
        """
        config_logger.debug(TRACE_DETAIL, MSGQ_CONFIG_DATA, new_config)

        with self.__lock:
            if not self.running:
                return

            # TODO: Any config handling goes here.

            return bundy.config.create_answer(0)

    def command_handler(self, command, args):
        """The command handler (run in a separate thread)."""
        config_logger.debug(TRACE_DETAIL, MSGQ_COMMAND, command, args)

        with self.__lock:
            if not self.running:
                return

            # TODO: Who does validation? The ModuleCCSession or must we?

            if command == 'members':
                # List all members of MsgQ or of a group.
                if args is None:
                    args = {}
                group = args.get('group')
                if group:
                    return bundy.config.create_answer(0,
                        list(map(lambda sock: self.fd_to_lname[sock.fileno()],
                                 self.subs.find(group, ''))))
                else:
                    return bundy.config.create_answer(0,
                                                    list(self.lnames.keys()))

            config_logger.error(MSGQ_COMMAND_UNKNOWN, command)
            return bundy.config.create_answer(1, 'unknown command: ' + command)

def signal_handler(msgq, signal, frame):
    if msgq:
        msgq.stop()

def main():
    def check_port(option, opt_str, value, parser):
        """Function to insure that the port we are passed is actually
        a valid port number. Used by OptionParser() on startup."""
        intval = int(value)
        if (intval < 0) or (intval > 65535):
            raise OptionValueError("%s requires a port number (0-65535)" %
                                   opt_str)
        parser.values.msgq_port = intval

    # Parse any command-line options.
    parser = OptionParser(version=VERSION)
    # TODO: Should we remove the option?
    parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
                      help="display more about what is going on")
    parser.add_option("-s", "--socket-file", dest="msgq_socket_file",
                      type="string", default=None,
                      help="UNIX domain socket file the msgq daemon will use")
    (options, args) = parser.parse_args()

    # Announce startup.
    logger.debug(TRACE_START, MSGQ_START, VERSION)

    msgq = MsgQ(options.msgq_socket_file, options.verbose)

    signal.signal(signal.SIGTERM,
                  lambda signal, frame: signal_handler(msgq, signal, frame))

    # Ignore SIGPIPE. We handle errors writing to children using try in the
    # appropriate places and the delivery of a signal is very heavy handed.
    # Windows doesn't support SIGPIPE so don't try it there.
    if not sys.platform.startswith('win'):
        signal.signal(signal.SIGPIPE, signal.SIG_IGN)

    try:
        msgq.setup()
    except Exception as e:
        logger.fatal(MSGQ_START_FAIL, e)
        sys.exit(1)

    # We run the processing in a separate thread. This is because we want to
    # connect to the msgq ourself. But the cc library is unfortunately blocking
    # in many places and waiting for the processing part to answer, it would
    # deadlock.
    poller_thread = threading.Thread(target=msgq.run)
    poller_thread.daemon = True
    try:
        poller_thread.start()
        if msgq.wait_cfgmgr():
            # Once we get the config manager, we can read our own config.
            session = bundy.config.ModuleCCSession(SPECFILE_LOCATION,
                                                 msgq.config_handler,
                                                 msgq.command_handler,
                                                 None, True,
                                                 msgq.socket_file)
            msgq._session = session
            session.start()
            # And we create a thread that'll just wait for commands and
            # handle them. We don't terminate the thread, we set it to
            # daemon. Once the main thread terminates, it'll just die.
            def run_session():
                while True:
                    # As the check_command has internal mutex that is shared
                    # with sending part (which includes notify). So we don't
                    # want to hold it long-term and block using select.
                    fileno = session.get_socket().fileno()
                    try:
                        (reads, _, _) = select.select([fileno], [], [])
                    except select.error as se:
                        if se.args[0] != errno.EINTR:
                            raise
                    session.check_command(True)
            background_thread = threading.Thread(target=run_session)
            background_thread.daemon = True
            background_thread.start()
        poller_thread.join()
    except KeyboardInterrupt:
        pass

    msgq.shutdown()

    logger.info(MSGQ_EXITING)

if __name__ == "__main__":
    bundy.util.traceback_handler.traceback_handler(main)
