#!/bin/python

# -*- coding: utf-8 -*-

"""
oxd
~~~~~~~~

Main logging daemon for BlueOx events.

:copyright: (c) 2012 by Rhett Garber
:license: ISC, see LICENSE for more details.

"""
import argparse
import datetime
import errno
import os
import sys
import io
import logging
import signal
import struct
import time

import json
import zmq
import msgpack

from blueox import network

# How long do we wait for network traffic before running our poll loop anyway.
POLL_LOOP_TIMEOUT_MS = 1000

# How often should we poll our file streams. This is also the flush interval.
FILE_POLL_INTERVAL = 1.0

# How long before we close an open idle file
FILE_IDLE_TIMEOUT = 60.0

# If we're asked to shutdown before forwarding everything, we'll wait this long.
LINGER_SHUTDOWN_MSECS = 5000

log = logging.getLogger("blueox.d")

def setup_logging(options):
    if len(options.verbose) > 1:
        level = logging.DEBUG
    elif options.verbose:
        level = logging.INFO
    else:
        level = logging.WARNING
    
    log_format = "%(asctime)s %(levelname)s:%(name)s: %(message)s"

    # We just log straight to stdout. Generally oxd is run by some process that
    # handles collecting logging for you, like upstart or supervisord. It would be
    # easy enough to add an option if it was needed by someone though.
    logging.basicConfig(level=level, format=log_format, stream=sys.stdout)

class LogFileStream(object):
    def __init__(self, log_path, name):
        self.name = name
        self.log_path = log_path

        self.last_write = None
        self.last_flush = None
        self.last_poll = None
        self.stream = None
        self.stream_filename = None

    def poll(self):
        if self.last_poll and time.time() - self.last_poll < FILE_POLL_INTERVAL:
            return True

        log.debug("Doing poll for file %s", self.name)

        self.last_poll = time.time()

        if self.stream:
            if time.time() - self.last_write > FILE_IDLE_TIMEOUT:
                self.close()
                return False
            elif time.time() - self.last_write > FILE_POLL_INTERVAL:
                self.stream.flush()

        if self.stream and not os.path.exists(self.stream_filename):
            self.close()
            return False

        if self.stream and self.log_file_path() != self.stream_filename:
            self.close()
            return False

        return True

    def log_file_path(self):
        day_str = datetime.datetime.utcnow().strftime('%Y%m%d')
        return os.path.normpath(os.path.join(self.log_path + '/', "%s/%s-%s.log" % (day_str, self.name, day_str)))

    def open(self):
        self.stream_filename = self.log_file_path()
        try:
            os.makedirs(os.path.dirname(self.stream_filename))
        except OSError:
            pass

        log.info("Opening file stream for %s: %s", self.name, self.stream_filename)
        self.stream = io.open(self.stream_filename, "ab")

    def write(self, write_time, data):
        self.last_write = time.time()
        if not self.stream:
            self.open()

        self.stream.write(data)

    def close(self):
        if self.stream:
            log.info("Closing stream for file %s", self.name)
            self.stream.close()
            self.stream = None
            self.stream_filename = None

def collect_stats(stats, event):
    type_name = event['type']
    send_time = event['end']
    host = event['host']

    stats['last'] = time.time()
    stats['lag'] = time.time() - send_time

    stats.setdefault('events', {})
    stats['events'].setdefault(type_name, {})
    stats['events'][type_name]['last'] = send_time

    stats.setdefault('hosts', {})
    stats['hosts'].setdefault(host, {})
    stats['hosts'][host]['last'] = send_time

def build_stats(stats):
    return stats

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('--verbose', '-v', dest='verbose', action='append_const', const=True, default=list())

    parser.add_argument('--control', '-t', dest='control', action='store', default="127.0.0.1:3513")
    parser.add_argument('--collect', '-c', dest='collect', action='store', default="127.0.0.1:3514")
    parser.add_argument('--forward', '-r', dest='forward', action='store', default=None)

    parser.add_argument('--log-path', '-l', dest='log_path', action='store', default=None)

    options = parser.parse_args()

    setup_logging(options)

    def handle_sigterm(signum, frame):
        log.info("Exiting")
        continue_running = True

    signal.signal(signal.SIGTERM, handle_sigterm)

    if options.log_path:
        try:
            os.makedirs(options.log_path)
        except OSError:
            pass

    zmq_context = zmq.Context()
    poller = zmq.Poller()

    log.info("Initializing control port %s", options.control)
    control_sock = zmq_context.socket(zmq.REP)
    control_sock.bind("tcp://%s" % (options.control,))
    poller.register(control_sock, zmq.POLLIN)

    log.info("Initializing collector port %s", options.collect)
    collector_sock = zmq_context.socket(zmq.PULL)
    collector_sock.bind("tcp://%s" % options.collect)
    poller.register(collector_sock, zmq.POLLIN)

    streamer_sock = zmq_context.socket(zmq.PUB)
    host, _ = options.control.split(':')
    streamer_sock_port = streamer_sock.bind_to_random_port("tcp://%s" % host, min_port=35000, max_port=36000)
    log.info("Streaming port bound to %s:%d", host, streamer_sock_port)

    forward_sock = None
    if options.forward:
        log.info("Inializing forwarding socket to %s", options.forward)
        forward_sock = zmq_context.socket(zmq.PUSH)
        forward_sock.linger = LINGER_SHUTDOWN_MSECS
        # Note we are not setting a HWM for our forwarder, this means we'll
        # just collect and store all messages in memory until our forwarder
        # becomes available. The SWAP option which would temporarily store on
        # disk was removed in zmq 3
        forward_sock.connect("tcp://%s" % options.forward)

    stats = {'last': None, 'lag': None, 'events': {}, 'hosts': {}}
    log_files = {}
    continue_running = True
    log.info("Starting IO Loop")
    while continue_running:
        log.debug("Poll")
        try:
            ready = dict(poller.poll(POLL_LOOP_TIMEOUT_MS))
        except KeyboardInterrupt, SystemExit:
            continue_running = False
            break
        except zmq.ZMQError, e:
            if e.errno == errno.EINTR:
                # If this is from a SIGTERM, we have a handler for that and the
                # loop should exit gracefull.
                continue
            else:
                raise


        log.debug("Poller returned: %r", ready)

        log_files = dict((name, stream) for name, stream in log_files.iteritems() if stream.poll())

        if collector_sock in ready:
            try:
                event_meta, event_data = collector_sock.recv_multipart()
            except ValueError, e:
                # Sometimes clients can fail and corrupt these two-part sends.
                log.warning("Failed to recv from %r: %r", collector_sock, e)
                continue

            try:
                network.check_meta_version(event_meta)
            except ValueError:
                log.warning("Failed to decode event due to version mismatch")
                continue

            # See blueox.network for how this is packed.
            _, event_time, event_host, event_type = struct.unpack(network.META_STRUCT_FMT, event_meta)
            event = {'type': event_type, 'end': event_time, 'host': event_host}

            collect_stats(stats, event)

            # We may have subscribers to our streaming feed
            # Clients should expect two messages, the name of the channel and the actual channel data
            # This allows for subscribing to only certain prefixes of the type
            streamer_sock.send(event['type'], zmq.SNDMORE)
            streamer_sock.send(event_data)

            # If we are forwarding our data to another host, do so
            if forward_sock:
                try:
                    forward_sock.send(event_meta, zmq.NOBLOCK|zmq.SNDMORE)
                    forward_sock.send(event_data, zmq.NOBLOCK)
                except zmq.ZMQError, e:
                    log.error("Error forwarding event data, buffer possibly overflowed: %r", e)

            # We have been configured to log data to log files.
            if options.log_path:

                type_name = event['type']
                if type_name not in log_files:
                    log_files[type_name] = LogFileStream(options.log_path, type_name)

                log.debug("writing to %s", type_name)
                log_files[type_name].write(event['end'], event_data)
        elif control_sock in ready:
            control_data = control_sock.recv()
            request = msgpack.unpackb(control_data)
            log.info("Received control request: %r", request)
            
            if request['cmd'] == "SOCK_STREAM":
                control_sock.send(msgpack.packb({'port': streamer_sock_port}))
            elif request['cmd'] == "SOCK_COLLECT":
                _, port = options.collect.split(':')
                control_sock.send(msgpack.packb({'port': int(port)}))
            elif request['cmd'] == 'STATUS':
                control_sock.send(msgpack.packb(build_stats(stats)))
            elif request['cmd'] == 'SHUTDOWN':
                control_sock.send(msgpack.packb({'ok': True}))
                continue_running = False
            else:
                control_sock.send(msgpack.packb({'error': "Unknown Command"}))

    collector_sock.close(0)
    control_sock.close(0)
    streamer_sock.close(0)

    for file in log_files.values():
        file.close()

    sys.exit(0)

if __name__ == '__main__':
    main()
