#!/usr/bin/python3.4

# Copyright (C) 2013  Internet Systems Consortium.
#
# Permission to use, copy, modify, and distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

import copy
import os
import sys
import signal
import socket
import threading

sys.path.append('/usr/lib64/python3.4/site-packages')
import bundy.dns
import bundy.log
from bundy.config import ModuleSpecError, ModuleCCSessionError
from bundy.log_messages.memmgr_messages import *
from bundy.server_common.bundy_server import BUNDYServer, BUNDYServerFatal
from bundy.server_common.datasrc_clients_mgr \
    import DataSrcClientsMgr, ConfigError
from bundy.memmgr.datasrc_info import DataSrcInfo, SegmentInfo
from bundy.memmgr.builder import MemorySegmentBuilder
import bundy.util.process
import bundy.util.traceback_handler

MODULE_NAME = 'memmgr'
SEGMENT_READER_GROUP = 'SegmentReader'

bundy.log.init('bundy-memmgr', buffer=True)
logger = bundy.log.Logger(MODULE_NAME)

bundy.util.process.rename()

class ConfigError(Exception):
    """An exception class raised for configuration errors of Memmgr."""
    pass

class _LoadZoneError(Exception):
    "Internal error in parsing loadzone command."

class Memmgr(BUNDYServer):
    def __init__(self):
        BUNDYServer.__init__(self)

        # The following attributes are defined as "protected" so tests can
        # inspect it; others shouldn't use them directly.

        # Running configurable parameters: on initial configuration this will
        # be a dict: str=>config_value.
        self._config_params = None

        # The manager to keep track of data source configuration.  Allow
        # tests to inspect/tweak it.
        self._datasrc_clients_mgr = DataSrcClientsMgr(use_cache=True)

        # Diect: string=>dict, mapping 'lname' of segment readers to another
        # dict, which maps segment info to the number of outstanding
        # segment_info_update for the reader.
        self._segment_readers = {}

        # Current DataSrcInfo and older generation ones (waiting for cleanup).
        # Allow tests to inspec it (otherwise shouldn't be referenced from
        # outside of the class)
        self._datasrc_info = None
        self._old_datasrc_info = {} # dict genid(int)=>DataSrcInfo

        self._builder_setup = False
        self._builder_command_queue = []
        self._builder_response_queue = []

    def _mod_command_handler(self, cmd, args):
        if cmd == 'segment_info_update_ack':
            # We usually expect the following to succeed, as long as the
            # sender is implemented correctly.  But a buggy module could
            # still send a bogus message, and since we cannot raise an
            # exception due to API contract, we catch everything and log it.
            # Whether it succeeds or fail, this command isn't exected to
            # be returned with an answer, so we return either None or False
            # (separating these for testing purposes).
            try:
                if self._datasrc_info.gen_id != args['generation-id']:
                    return False
                rrclass = bundy.dns.RRClass(args['data-source-class'])
                dsrc_name = args['data-source-name']
                sgmt_info = self._datasrc_info.segment_info_map[(rrclass,
                                                                 dsrc_name)]
                reader = args['reader']
                if not reader in self._segment_readers:
                    logger.info(MEMMGR_DATASRC_SGMTINFO_ACK_NOREADER, reader)
                    return False
                self._segment_readers[reader][sgmt_info] -= 1
                if self._segment_readers[reader][sgmt_info] == 0:
                    del self._segment_readers[reader][sgmt_info]
                    bcmd = sgmt_info.sync_reader(reader)
                    if bcmd is not None:
                        self._cmd_to_builder(bcmd)
            except Exception as ex:
                logger.error(MEMMGR_DATASRC_SGMTINFO_ACK_FAIL, str(ex))
                return None
            logger.debug(logger.DBGLVL_TRACE_BASIC, MEMMGR_SGMTINFO_ACK,
                         dsrc_name, rrclass)
            return False
        elif cmd == 'release_segments_ack':
            genid = 'unknown'
            try:
                genid = args['generation-id']
                reader = args['reader']
                datasrc_info = self._old_datasrc_info[genid]
                if not datasrc_info.cancel(reader):
                    del self._old_datasrc_info[genid]
                logger.debug(logger.DBGLVL_TRACE_BASIC,
                             MEMMGR_DATASRC_RELEASESGMT_ACK, reader, genid)
            except Exception as ex:
                logger.error(MEMMGR_DATASRC_RELEASESGMT_ACK_FAIL, str(genid),
                             ex)
        elif cmd == 'loadzone':
            return self.__update_zone(cmd, args)
        else:
            return bundy.config.create_answer(1, 'unknown command: ' + cmd)

    def __update_zone(self, cmd, args):
        "Unified helper to load/update zone."

        try:
            dsrc_info, rrclass, dsrc_name, zone_name, sgmt_info = \
                self.__handle_loadzone_args(cmd, args)
            # If it's manually invoked, it will be helpeful to consider
            # missing segment info an error; if it's update notification,
            # this is a normal condition for a 'local' segment, or (relatively
            # rarely) too old or new generation is given at the timing of
            # reconfiguring the data source.
            if sgmt_info is None and cmd == 'loadzone':
                raise _LoadZoneError("no memory segment in '%s' for"
                                     "RR class %s" % (dsrc_name, rrclass))
            elif sgmt_info is None:
                return
        except _LoadZoneError as ex:
            # Note: basically this can only happen for user-invoked command;
            # otherwise it's internal bug.  So we simply say 'loadzone' here.
            logger.error(MEMMGR_LOADZONE_FAIL, str(ex))
            return bundy.config.create_answer(1, 'bad loadzone parameters: '
                                            + str(ex))
        sgmt_info.add_event(('load', zone_name, dsrc_info, rrclass, dsrc_name))
        bcmd = sgmt_info.start_update()
        if bcmd is not None:
            self._cmd_to_builder(bcmd)
        if cmd == 'loadzone':
            logger.info(MEMMGR_LOADZONE, zone_name, rrclass, dsrc_name)
            return bundy.config.create_answer(0)
        else:
            logger.debug(logger.DBGLVL_TRACE_BASIC, MEMMGR_UPDATE_ZONE,
                         zone_name, rrclass, dsrc_name)

    def __handle_loadzone_args(self, cmd, args):
        "Parse loadzone args and return helpful error on failure"

        if (not 'class' in args or not 'datasource' in args or
            not 'origin' in args):
            raise _LoadZoneError('missing parameters')

        try:
            rrclass = bundy.dns.RRClass(args['class'])
        except bundy.dns.InvalidRRClass as ex:
            raise _LoadZoneError('bad class: ' + str(ex))
        try:
            zone_name = bundy.dns.Name(args['origin'])
        except Exception as ex: # there can be many types of error
            raise _LoadZoneError('bad zone origin:' + str(ex))
        dsrc_name = args['datasource']

        # Identify the generation of data sources to be updated.  In case of
        # an internal command from other modules, it's possible the generaion
        # ID doesn't match of that for memmgr, depending on the timing of
        # update and reconfiguration of data source.  Basically, we can ignore
        # this case: older generations won't be used, and newer generations
        # will be handled at the time of reconfiguration.
        # For load command invoked by a user, the latest generation will be
        # used.
        if cmd == 'zone_updated':
            genid = args.get('generation-id')
            if genid is None:
                raise _LoadZoneError('missing generation ID')
            if self._datasrc_info.gen_id != genid:
                logger.info(MEMMGR_UPDATEZONE_NOGENERATION,
                            genid, zone_name, rrclass, dsrc_name)
                return None, None, None, None, None
        try:
            sgmt_info = self._datasrc_info.segment_info_map[(rrclass,
                                                             dsrc_name)]
        except KeyError as ex:
            sgmt_info = None    # not necessarily an error, maybe it's 'local'
        return self._datasrc_info, rrclass, dsrc_name, zone_name, sgmt_info

    def _config_handler(self, new_config):
        """Configuration handler, called via BUNDYServer.

        This method must be exception free.  We assume minimum validity
        about the parameter, though: it should be a valid dict, and conform
        to the type specification of the spec file.

        """
        logger.debug(logger.DBGLVL_TRACE_BASIC, MEMMGR_CONFIG_UPDATE)

        # Default answer:
        answer = bundy.config.create_answer(0)

        # If this is the first time, initialize the local attributes with the
        # latest full config data, which consist of the defaults with
        # possibly overridden by user config.  Otherwise, just apply the latest
        # diff.
        if self._config_params is None:
            new_config = self.mod_ccsession.get_full_config()
        try:
            self.__update_config(new_config)
        except Exception as ex:
            logger.error(MEMMGR_CONFIG_FAIL, ex)
            answer = bundy.config.create_answer(
                1, 'Memmgr failed to apply configuration updates: ' + str(ex))

        return answer

    def __update_config(self, new_config):
        """Apply config changes to local attributes.

        This is a subroutine of _config_handler.  It's supposed to provide
        strong exception guarantee: either all changes successfully apply
        or, if any error is found, none applies.  In the latter case the
        entire original configuration should be kept.

        Errors are to be reported as an exception.

        """
        # If this is the first time, build everything from the scratch.
        # Otherwise, make a full local copy and update it.
        if self._config_params is None:
            new_config_params = {}
        else:
            new_config_params = copy.deepcopy(self._config_params)

        new_mapped_file_dir = new_config.get('mapped_file_dir')
        if new_mapped_file_dir is not None:
            if not os.path.isdir(new_mapped_file_dir):
                raise ConfigError('mapped_file_dir is not a directory: ' +
                                  new_mapped_file_dir)
            if not os.access(new_mapped_file_dir, os.W_OK):
                raise ConfigError('mapped_file_dir is not writable: ' +
                                  new_mapped_file_dir)
            new_config_params['mapped_file_dir'] = new_mapped_file_dir

        # All copy, switch to the new configuration.
        self._config_params = new_config_params

    def _cmd_to_builder(self, cmd):
        """
        Send a command to the builder, with proper synchronization.
        """
        assert isinstance(cmd, tuple)
        with self._builder_cv:
            self._builder_command_queue.append(cmd)
            self._builder_cv.notify_all()

    def __notify_readers(self, rrclass, dsrc_name, sgmt_info, readers,
                         inuse_only=False):
        """Helper to form and send segment_info_update message to readers.

        If 'inuse_only' is True, tell the readers to reset the segment
        only if they are currently using another segment.

        """

        reset_param = sgmt_info.get_reset_param(SegmentInfo.READER)
        params = {
            'data-source-class': str(rrclass),
            'data-source-name': dsrc_name,
            'segment-params': reset_param,
            'generation-id': sgmt_info.get_generation_id()
        }
        if inuse_only:
            params['inuse-only'] = True
        for reader in readers:
            params['reader'] = reader
            rcmd = bundy.config.create_command('segment_info_update', params)
            self.mod_ccsession._session.group_sendmsg(
                rcmd, SEGMENT_READER_GROUP, to=reader)
            if not sgmt_info in self._segment_readers[reader]:
                self._segment_readers[reader][sgmt_info] = 0
            self._segment_readers[reader][sgmt_info] += 1

    def _notify_from_builder(self):
        """Read the notifications from the builder thread.

        """
        self._master_sock.recv(1) # Clear the wake-up data
        notifications = None
        with self._builder_lock:
            # Copy the notifications out and clear them from the
            # original list. We may not assign [] to
            # self._builder_response_queue to clear it, because there's
            # another reference to it from the other thread and it would
            # not keep the original list.
            notifications = self._builder_response_queue[:]
            del self._builder_response_queue[:]
        for notification in notifications:
            notif_name = notification[0]
            if notif_name in ('validate-completed', 'load-completed'):
                (_, dsrc_info, rrclass, dsrc_name, result) = notification
                sgmt_info = dsrc_info.segment_info_map[(rrclass, dsrc_name)]
                if notif_name == 'validate-completed':
                    cmd = sgmt_info.complete_validate(result)
                else:
                    cmd = sgmt_info.complete_update(result)

                # It may return another builder command on the same data source.
                # If it is so, we execute it too, before we start
                # synchronizing with the readers.
                if cmd is not None:
                    self._cmd_to_builder(cmd)

                # If there are readers of the old version of this segment,
                # notify them so they'll move to the new version of segment.
                # If this is on completion of validation, we'll only have them
                # actually reset the segment when they are currently using
                # another one, just so the memmgr can safely update the 'writer'
                # version of the segment; otherwise we'd rather tell them to
                # switch when the latest data are loaded.
                old_readers = sgmt_info.get_old_readers()
                inuse_only = not sgmt_info.loaded()
                if old_readers:
                    self.__notify_readers(rrclass, dsrc_name, sgmt_info,
                                          old_readers, inuse_only=inuse_only)
                logger.debug(logger.DBGLVL_TRACE_BASIC,
                             MEMMGR_COMMAND_COMPLETED,
                             notif_name.replace('-completed', ''), dsrc_name,
                             rrclass, 'without' if cmd is None else 'with',
                             len(old_readers))
            elif notif_name == 'cancel-completed':
                # ask all possible readers for releasing segments of the
                # canceled generation
                _, dsrc_info = notification
                readers = dsrc_info.cancel(None)
                prms = {'generation-id': dsrc_info.gen_id}
                for reader in readers:
                    prms['reader'] = reader
                    rcmd = bundy.config.create_command('release_segments', prms)
                    self.mod_ccsession._session.group_sendmsg(
                        rcmd, SEGMENT_READER_GROUP, to=reader)
                if not readers: # cleanup completed
                    del self._old_datasrc_info[dsrc_info.gen_id]
            else:
                raise ValueError('Unknown notification name: ' + notif_name)

    def _create_builder_thread(self):
        # This is a "private" method, but defined as if it were "protected",
        # so tests can override it.  This shouldn't be overridden for other
        # purposes.

        # We get responses from the builder thread on this socket pair.
        (self._master_sock, self._builder_sock) = \
            socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
        self.watch_fileno(self._master_sock,
                          rcallback=self._notify_from_builder)

        # See the documentation for MemorySegmentBuilder on how the
        # following are used.
        self._builder_lock = threading.Lock()
        self._builder_cv = threading.Condition(lock=self._builder_lock)

        self._builder = MemorySegmentBuilder(self._builder_sock,
                                             self._builder_cv,
                                             self._builder_command_queue,
                                             self._builder_response_queue)
        self._builder_thread = threading.Thread(target=self._builder.run)
        self._builder_thread.start()

        self._builder_setup = True

    def __shutdown_builder_thread(self):
        # Some unittests do not create the builder thread, so we check
        # that.
        if not self._builder_setup:
            return

        self._builder_setup = False

        # This makes the MemorySegmentBuilder exit its main loop. It
        # should make the builder thread joinable.
        self._cmd_to_builder(('shutdown',))

        self._builder_thread.join()

        self._master_sock.close()
        self._builder_sock.close()

    def _zone_update_notification(self, ev_name, params):
        """Callback of zone update notification.

        Defined as 'protected' only for testing purposes.  Shouldn't be called
        directly otherwise.

        """
        if ev_name != 'zone_updated':
            return
        self.__update_zone(ev_name, params)

    def _reader_notification(self, ev_name, params):
        """Callback of module membership notification.

        Defined as 'protected' only for testing purposes.  Shouldn't be called
        directly otherwise.

        """
        # We're only interested in subscribe/unsucribe notifications for
        # the segment reader group.  Note that 'group' param must be included
        # for (un)subscribe.
        if ((ev_name != 'subscribed' and ev_name != 'unsubscribed') or
            params['group'] != SEGMENT_READER_GROUP):
            return

        reader = params['client']
        logger.info(MEMMGR_CCMEMBER_NOTIFY, ev_name, reader)
        try:
            self.__update_reader(ev_name, reader)
        except Exception as ex:
            # An unexpected error, but propagating it will kill memmgr, and
            # it may not necessarily a defect of memmgr itself, so we'll keep
            # running.
            logger.error(MEMMGR_CCMEMBER_NOTIFY_FAIL, ex)

    def __update_reader(self, ev_name, reader):
        "Helper of _reader_notification, handling notifications in detail."

        if ev_name == 'subscribed':
            if reader in self._segment_readers:
                logger.info(MEMMGR_CCMEMBER_DUP_SUBSCRIBED, reader)
            else:
                self._segment_readers[reader] = {}
                for key, sgmt_info in \
                    self._datasrc_info.segment_info_map.items():
                    sgmt_info.add_reader(reader)
                    # If a reader segment has been loaded, we should tell
                    # the reader about it.
                    if sgmt_info.loaded():
                        rrclass, dsrc_name = key
                        self.__notify_readers(rrclass, dsrc_name, sgmt_info,
                                              [reader])
        else:                   # must be 'unsubscribed'
            if not reader in self._segment_readers:
                logger.warn(MEMMGR_CCMEMBER_DUP_UNSUBSCRIBED, reader)
            else:
                del self._segment_readers[reader]
                for old_dsrc_info in list(self._old_datasrc_info.values()):
                    if not old_dsrc_info.cancel(reader):
                        del self._old_datasrc_info[old_dsrc_info.gen_id]
                for sgmt_info in self._datasrc_info.segment_info_map.values():
                    bcmd = sgmt_info.remove_reader(reader)
                    if bcmd is not None:
                        self._cmd_to_builder(bcmd)

    def _setup_module(self):
        """Module specific initialization for BUNDYServer."""
        try:
            # If self._config_params is None, it means the initial call to
            # __update_config() failed.  We cannot fail at that point due to
            # API contract, but this is the time to terminate.
            if self._config_params is None:
                raise BUNDYServerFatal('memmgr failed in ' +
                                        'initial configuration')

            # Now is the time to start the builder thread;
            # _datasrc_config_handler will expect it to be running.
            self._create_builder_thread()

            # subscribe to the group to be notified of updates to zones.
            self.mod_ccsession.subscribe_notification(
                'ZoneUpdateListener', self._zone_update_notification)

            # populate and maintain segment readers.  We first subscribe to
            # the membership notification group, and then get the set of
            # readers at this point.  This ordering is critical; otherwise
            # we could miss a new reader that appears immediately after we
            # populate the initial list.
            self.mod_ccsession.subscribe_notification('cc_members',
                                                      self._reader_notification)
            # This rpc_call should either return a list or raise an exception.
            readers = self.mod_ccsession.rpc_call(
                'members', 'Msgq', params={'group': SEGMENT_READER_GROUP})
            for r in readers:
                self._segment_readers[r] = {}

            # memmgr isn't usable if data source is not configured, and
            # as long as cfgmgr is ready there's no timing issue.  So we
            # immediately shut it down if it's missing.  See ddns.py.in
            # about exceptions to catch.
            self.mod_ccsession.add_remote_config_by_name(
                'data_sources', self._datasrc_config_handler)
            if self._datasrc_info is None:
                raise BUNDYServerFatal('failed to initialize data sources')
        except (ModuleSpecError, ModuleCCSessionError) as ex:
            logger.error(MEMMGR_NO_DATASRC_CONF, ex)
            raise BUNDYServerFatal('failed to setup memmgr module')

    def _shutdown_module(self):
        """Module specific finalization."""
        self.__shutdown_builder_thread()

    def _datasrc_config_handler(self, new_config, config_data):
        """Callback of data_sources configuration update.

        This method must be exception free, so we catch all expected
        exceptions internally; unexpected ones should mean a programming
        error and will terminate the program.

        """
        try:
            self._datasrc_clients_mgr.reconfigure(new_config, config_data)
            genid, clients_map = self._datasrc_clients_mgr.get_clients_map()
            datasrc_info = DataSrcInfo(genid, clients_map, self._config_params)

            # Cancel any outstanding operations running in the builder for the
            # now-deprecated generation of segments
            if self._datasrc_info is not None:
                self._cmd_to_builder(('cancel', self._datasrc_info))
                self._old_datasrc_info[self._datasrc_info.gen_id] = \
                    self._datasrc_info

            self._datasrc_info = datasrc_info
            self._init_segments(datasrc_info)
            # Full datasrc reconfig will be rare, so would be worth logging
            # at the info level.
            logger.info(MEMMGR_DATASRC_RECONFIGURED, genid)

        except bundy.server_common.datasrc_clients_mgr.ConfigError as ex:
            logger.error(MEMMGR_DATASRC_CONFIG_ERROR, ex)

    def _init_segments(self, datasrc_info):
        for key, sgmt_info in datasrc_info.segment_info_map.items():
            rrclass, dsrc_name = key

            for reader in self._segment_readers.keys():
                sgmt_info.add_reader(reader)

            # Initiate validating memory segments, send command for the reader
            # segment to the builder
            rvalidate_arg, wvalidate_arg = sgmt_info.start_validate()
            cmd = ('validate', datasrc_info, rrclass, dsrc_name, rvalidate_arg)
            self._cmd_to_builder(cmd)

            # Schedule additional initial commands: validating the writer
            # segment, followed by the initial full load.
            cmd = ('validate', datasrc_info, rrclass, dsrc_name, wvalidate_arg)
            sgmt_info.add_event(cmd)
            cmd = ('load', None, datasrc_info, rrclass, dsrc_name)
            sgmt_info.add_event(cmd)

            assert sgmt_info.get_state() == SegmentInfo.R_VALIDATING

def main():
    mgr = Memmgr()
    sys.exit(mgr.run(MODULE_NAME))

if '__main__' == __name__:
    bundy.util.traceback_handler.traceback_handler(main)
