
Distributed Communicating Processes
***********************************

discoro module provides API for sending computation fragments to
remote processes (servers) for executing distributed, concurrent
communicating processes. Whereas "RCI" provides API for creating
remote coroutines with pre-defined generator functions, discoro's API
provides generic framework that can be used by clients to send
different computaions to create coroutines at remote servers. There
are three components in discoro:


Node
====

*discoronode* program (discoronode.py) is used to start server
processes at a node. These server processes are used by discoro
scheduler to run computations submitted by clients. The program, by
default, starts one process for each processor available so that CPU
intensive computations can utilize all the processors efficiently.
Each process runs with its own address space so computations running
in one process don't interfere with computations in another process on
the same node. However, multiple coroutines (computations) can be
scheduled on one process, for example, if computations have I/O (such
as communicating with other computations / client, or reading/writing
data), in which case asyncoro can run another coroutine on that
process that is ready to execute. All coroutines running in one
process share the address space, run on one CPU; howoever, as asyncoro
doesn't pre-empt a running coroutine until *yield* is used, there is
no need to use locking critical sections, as done with threads. If
computations have no I/O, then dispy project can be used.

The program takes following options to customize how the processes are
started.

   * "-c" option specifies number of instances of "discoronode()" to
     run so that many processors are used to run compute intensive
     tasks. If "-c" option is used with a positive number, then that
     many instances of "discoronode()" are run (so that many
     processors can be used in parallel); if it is negative number,
     then that many processors are not used (from available
     processors) and if it is "0" (default value), then all available
     processors are used.

   * "-i" or "--ip_addr" is same as *node* option to "AsynCoro" of
     "disasyncoro".

   * "--ext_ip_addr" is same as *ext_ip_addr* option to "AsynCoro"
     of "disasyncoro".

   * "-u" or "--urp_port" is same as *udp_port* option to "AsynCoro"
     of "disasyncoro".

   * "-n" or "--name" is same as *name* option to "AsynCoro" of
     "disasyncoro".

   * "--dest_path" is same as *dest_path* option to "AsynCoro" of
     "disasyncoro".

   * "--max_file_size" is same as *max_file_size* option to
     "AsynCoro" of "disasyncoro".

   * "-s" or "--secret" is same as *secret* option to "AsynCoro" of
     "disasyncoro".

   * "--certfile" is same as *certfile* option to "AsynCoro" of
     "disasyncoro".

   * "--keyfile" is same as *keyfile* option to "AsynCoro" of
     "disasyncoro".

   * "-d" or "--debug" option enables debug log messages.


Scheduler
=========

discoro scheduler schedules computations, keeps track of available
nodes, processes etc. Client computations, constructed with
"Computation", are sent to the scheduler. The scheduler queues the
client computations and processes one each time, until the computation
closes. Even if currently scheduled computation is not utilizing all
the processes, the scheduler doesn't schedule any other computations,
so that one computation's state doesn't interfere with another
computation's. That is, currently scheduled computation reserves all
the available processes.

Scheduler can be started either as a separate program, or from within
the client program. If multiple client programs can schedule
computations simultaneously, scheduler should be started as a program
on a node; then the scheduler will schedule them in the order
submitted.


Private Scheduler
-----------------

If only one client uses the nodes, then it is easier to start the
scheduler with:

   discoro.Scheduler()

in the client program before computation is scheduled with
"compute.schedule()" (see below). There is no need to hold reference
to the object created, as the methods of scheduler are not to be used
directly by the client; instead, computation created with
"Computation" should be used to run remote coroutines. When the
scheduler starts, it detects discoro nodes and processes, which may
take some time (depending on network topology, number of nodes and
processes, it may be less than a second to a few seconds), so it may
be easier to delay scheduling computation if started from the program.
Alternately, *MonitorStatus* can be customized to run computations as
and when a node or process is discovered. Or, scheduler can be started
on a node, as described below, and submit computation, even if no
other computations may use the scheduler.


Shared Scheduler
----------------

If more than one client may use the scheduler at any time, scheduler
should be started by running the discoro program (discoro.py). The
program takes following options (same as *discoronode*, except for
starting processes):

   * "-i" or "--ip_addr" is same as *node* option to "AsynCoro" of
     "disasyncoro".

   * "--ext_ip_addr" is same as *ext_ip_addr* option to "AsynCoro"
     of "disasyncoro".

   * "-u" or "--urp_port" is same as *udp_port* option to "AsynCoro"
     of "disasyncoro".

   * "-n" or "--name" is same as *name* option to "AsynCoro" of
     "disasyncoro".

   * "--dest_path" is same as *dest_path* option to "AsynCoro" of
     "disasyncoro".

   * "--max_file_size" is same as *max_file_size* option to
     "AsynCoro" of "disasyncoro".

   * "-s" or "--secret" is same as *secret* option to "AsynCoro" of
     "disasyncoro".

   * "--certfile" is same as *certfile* option to "AsynCoro" of
     "disasyncoro".

   * "--keyfile" is same as *keyfile* option to "AsynCoro" of
     "disasyncoro".

   * "-d" or "--debug" option enables debug log messages.


Computation
===========

*discoro* module also provides API for client programs to package
computation fragments with "Computation", send to scheduler, and
execute computations at remote discoro nodes / processes.

class class discoro.Computation(components, status_coro=None, timeout=MsgTimeout, pulse_interval=MinPulseInterval)

* *components* must be a list, each element of which can be either a
  Python (generator) function, Python class, Python module, or path to
  a file. These computation fragments (components) are sent to discoro
  servers for execution. Once the computation is scheduled (i.e,.
  *schedule* method is finished), computations (generator functions)
  can be run on the nodes / processes. These computations can use the
  components packaged. If the computations scheduled (with *run*
  methods below) are listed in *components*, then the code for those
  computations would've already been distributed, so not sent each
  time; otherwise, each *run* method sends the code along with the
  arguments to the remote server.

* *status_coro*, if given, must be a coroutine. If it is a
  coroutine, discoro scheduler sends status of remote processes, jobs
  executed etc., to this coroutine as messages. These messages can be
  used to schedule computations. See *MonitorStatus* template in the
  examples to expand / customize it.

* *timeout* is maximum number of seconds to successfully transfer
  messages to/from servers or client. If a message could not be sent
  or received within timeout, the operation is aborted. Default value
  is as defined by *MsgTimeout* in discoro module.

* *pulse_interval* is interval number of seconds at which pulse
  messages are exchanged among servers and client to check for faults.
  If no messages are received within (5 * *pulse_interval*), the
  remote end is assumed to be faulty and it is closed. Default value
  is *MinPulseInterval* as defined in discoro module.

  The computation created as, for example, "compute =
  Computation(...)", has following methods:

     compute.schedule(location=None)

        Note: This method must be used with *yield* as "result =
          yield compute.schedule()".

        Schedule computation for execution.  If scheduler is executing
        other computations, this will block until scheduler processes
        them (computations are processed in the order submitted). If
        successful, *result* will be 0.

        *location*, if not *None*, should be a Location instance
           refering to where the scheduler is running. If it is
           *None*, scheduler must be running at a node in local
           network; asyncoro will use name resolution to find the
           scheduler (a coroutine) with the registered name
           "discoro_scheduler".

     compute.run_at(where, func, *args, **kwargs)

        Note: This method must be used with *yield* as "rcoro =
          yield compute.run_at(...)".

        Run given generator function *func* with arguments *args* and
        *kwargs* at *where*; i.e., create a coroutine at a process
        with the given generator function and arguments. If the
        request is successful, *rcoro* will be a (remote) coroutine.

        If *where* is a string, it is assumed to be IP address of a
        node, in which case the function is scheduled at that node on
        a process with least load (i.e., process with least number of
        pending coroutines). If *where* is a Location instance, it is
        assumed to be process location in which case the function is
        scheduled at that process.

        *func* must be generator function, as it is used to run
        coroutine at remote location.

        All the *run* methods below take generator function and
        arguments used to create coroutines at a remot server.

     compute.run_each(where, func, *args, **kwargs)

        Note: This method must be used with *yield* as "rcoros =
          yield compute.run_each(...)".

        Run given generator function *func* with arguments *args* and
        *kwargs* at each node or process. When finished, *rcoros* will
        be list of (remote) coroutines.

        *where* is same as in the case of *run_at*: If it is string,
        the function is scheduled at every node and if it is a
        Location instance, the function is scheduled at every process
        (on every node).

     compute.run(func, *args, **kwargs)

        Note: This method must be used with *yield* as "rcoro =
          yield compute.run(...)".

        Run given generator function *func* with arguments *args* and
        *kwargs* at a process with least load at a node with least
        load; i.e., . If the request is successful, *rcoro* will be a
        (remote) coroutine.

     compute.run_nodes(func, *args, **kwargs)

        Note: This method must be used with *yield* as "rcoros =
          yield compute.run_nodes(...)".

        Run given generator function *func* with arguments *args* and
        *kwargs* at a process with least load at every node; i.e.,
        given function is executed at one process at each node. When
        finished, *rcoros* will be a list of (remote) coroutines.

     compute.run_procs(func, *args, **kwargs)

        Note: This method must be used with *yield* as "rcoros =
          yield compute.run_procs(...)".

        Run given generator function *func* with arguments *args* and
        *kwargs* at every process at every node. When finished,
        *rcoros* will be a list of (remote) coroutines.

     compute.run_node_procs(func, *args, **kwargs)

        Note: This method must be used with *yield* as "rcoros =
          yield compute.run_node_procs(...)".

        Run given generator function *func* with arguments *args* and
        *kwargs* at every process at given node at *addr*. When
        finished, *rcoros* will be a list of (remote) coroutines.

     compute.nodes()

        Note: This method must be used with *yield* as "nodes =
          yield compute.nodes(...)".

        Get list of addresses of nodes initialized for this
        computation.

     compute.procs()

        Note: This method must be used with *yield* as "nodes =
          yield compute.procs(...)".

        Get list of Location instances of processes initialized for
        this computation.

     compute.close()

        Note: This method must be used with *yield* as "yield
          compute.close()".

        Closes computation (so scheduler can execute next computation
        in the queue).
