
Distributed Communicating Processes
***********************************

discoro module provides API for sending computation fragments to
remote processes (servers) for executing distributed communicating
processes (local and remote coroutines). Whereas "RCI" provides API
for creating remote coroutines with pre-defined generator functions,
discoro's API provides generic framework that can be used by clients
to send different computaions to create coroutines at remote servers.
There are three components in discoro: *discoronode.py* program should
be running on each of the nodes that run the coroutines for the
clients, *DispyScheduler* that schedules clients, manages nodes,
remote coroutines etc., and API for clients to create computations,
schedule with the scheduler, run remote coroutines etc.


Node / Processes
================

*discoronode* program (discoronode.py) is used to start server
processes at a node. These server processes are used by discoro
scheduler to run computations submitted by clients. The program, by
default, starts one process for each processor available so that CPU
intensive computations can utilize all the processors efficiently.
Each process runs with its own address space so computations running
in one process don't interfere with computations in another process on
the same node. However, multiple coroutines can be scheduled on one
process so that if computations have I/O (such as communicating with
other computations / client, or reading/writing data), asyncoro can
run another coroutine on that process that is ready to execute. All
coroutines running in one process share the address space, run on one
CPU; however, as asyncoro doesn't pre-empt a running coroutine until
*yield* is used, there is no need to use locking critical sections, as
done with threads. If computations have no I/O, then dispy project can
be used.

The program takes following options to customize how the processes are
started.

   * "-c" option specifies number of instances of "discoronode()" to
     run so that many processors are used to run compute intensive
     tasks. If "-c" option is used with a positive number, then that
     many instances of "discoronode()" are run (so that many
     processors can be used in parallel); if it is negative number,
     then that many processors are not used (from available
     processors) and if it is "0" (default value), then all available
     processors are used.

   * "-i" or "--ip_addr" is same as *node* option to "AsynCoro" of
     "disasyncoro".

   * "--ext_ip_addr" is same as *ext_ip_addr* option to "AsynCoro"
     of "disasyncoro".

   * "-u" or "--urp_port" is same as *udp_port* option to "AsynCoro"
     of "disasyncoro".

   * "-n" or "--name" is same as *name* option to "AsynCoro" of
     "disasyncoro".

   * "--dest_path" is same as *dest_path* option to "AsynCoro" of
     "disasyncoro".

   * "--max_file_size" is same as *max_file_size* option to
     "AsynCoro" of "disasyncoro".

   * "-s" or "--secret" is same as *secret* option to "AsynCoro" of
     "disasyncoro".

   * "--certfile" is same as *certfile* option to "AsynCoro" of
     "disasyncoro".

   * "--keyfile" is same as *keyfile* option to "AsynCoro" of
     "disasyncoro".

   * "-d" or "--debug" option enables debug log messages.


Scheduler
=========

discoro scheduler schedules computations, keeps track of available
nodes, processes etc. Client computations, constructed with
"Computation", are sent to the scheduler. The scheduler queues the
client computations and processes one each time, until the computation
closes. Even if currently scheduled computation is not utilizing all
the processes, the scheduler doesn't schedule any other computations,
so that one computation's state doesn't interfere with another
computation's. That is, currently scheduled computation reserves all
the available processes.

Scheduler can be started either as a separate program, or from within
the client program. If multiple client programs can schedule
computations simultaneously, scheduler should be started as a program
on a node; then the scheduler will schedule them in the order
submitted.


Private Scheduler
-----------------

If only one client uses the nodes, then it is easier to start the
scheduler with:

   discoro.Scheduler()

in the client program before computation is scheduled with
"compute.schedule()" (see below). There is no need to hold reference
to the object created, as the methods of scheduler are not to be used
directly by the client; instead, computation created with
"Computation" should be used to run remote coroutines. When the
scheduler starts, it detects discoro nodes and processes, which may
take some time (depending on network topology, number of nodes and
processes, it may be less than a second to a few seconds), so it may
be easier to delay scheduling computation if started from the program.
Alternately, *MonitorStatus* can be customized to run computations as
and when a node or process is discovered. Or, scheduler can be started
on a node, as described below, and submit computation, even if no
other computations may use the scheduler.


Shared Scheduler
----------------

If more than one client may use the scheduler at any time, scheduler
should be started by running the discoro program (discoro.py). The
program takes following options (same as *discoronode*, except for
starting processes):

   * "-i" or "--ip_addr" is same as *node* option to "AsynCoro" of
     "disasyncoro".

   * "--ext_ip_addr" is same as *ext_ip_addr* option to "AsynCoro"
     of "disasyncoro".

   * "-u" or "--urp_port" is same as *udp_port* option to "AsynCoro"
     of "disasyncoro".

   * "-n" or "--name" is same as *name* option to "AsynCoro" of
     "disasyncoro".

   * "--dest_path" is same as *dest_path* option to "AsynCoro" of
     "disasyncoro".

   * "--max_file_size" is same as *max_file_size* option to
     "AsynCoro" of "disasyncoro".

   * "-s" or "--secret" is same as *secret* option to "AsynCoro" of
     "disasyncoro".

   * "--certfile" is same as *certfile* option to "AsynCoro" of
     "disasyncoro".

   * "--keyfile" is same as *keyfile* option to "AsynCoro" of
     "disasyncoro".

   * "-d" or "--debug" option enables debug log messages.


Computation
===========

*discoro* module also provides API for client programs to package
computation fragments with "Computation", send to scheduler, and
execute runs at remote discoro nodes / processes.

class class discoro.Computation(components, status_coro=None, timeout=MsgTimeout, pulse_interval=MinPulseInterval)

* *components* must be a list, each element of which can be either a
  Python function class module, or path to a file. These computation
  fragments are sent to discoro servers for execution. Once the
  computation is scheduled (i.e,. *schedule* method is finished),
  generator functions can be run on the nodes / processes. These runs
  can use the components packaged. If the runs scheduled are listed in
  *components*, then the code for those functions would've already
  been distributed, so only the names of those functions are sent
  along with the arguments; otherwise, each *run* method sends the
  code for the functions along with the arguments to the remote
  server.

* *status_coro*, if given, must be a coroutine. If it is a
  coroutine, discoro scheduler sends status of remote processes, jobs
  executed etc., to this coroutine as messages. These messages can be
  used to schedule runs. See *MonitorStatus* template in the examples
  to expand / customize it.

* *timeout* is maximum number of seconds to successfully transfer
  messages to/from servers or client. If a message could not be sent
  or received within timeout, the operation is aborted. Default value
  is as defined by *MsgTimeout* in discoro module.

* *pulse_interval* is interval number of seconds at which pulse
  messages are exchanged among servers and client to check for faults.
  If no messages are received within (5 * *pulse_interval*), the
  remote end is assumed to be faulty and it is closed. Default value
  is *MinPulseInterval* as defined in discoro module.

  The computation created as, for example, "compute =
  Computation(...)", has following methods:

     compute.schedule(location=None)

        Note: This method must be used with *yield* as "result =
          yield compute.schedule()".

        Schedule computation for execution.  If scheduler is executing
        other computations, this will block until scheduler processes
        them (computations are processed in the order submitted). If
        successful, *result* will be 0.

        *location*, if not *None*, should be a Location instance
        refering to where the scheduler is running. If it is *None*,
        scheduler must be running at a node in local network; asyncoro
        will use name resolution to find the scheduler (a coroutine)
        with the registered name "discoro_scheduler".

     Note: All the *run* methods below take generator function and
       arguments used to create coroutines at a remot server. If the
       generator function used in a *run* method is one of the
       *components* used to create computation, the code for the
       function is not transferred to the servers (thus a bit
       efficient); otherwise, the code is also transferred to the
       servers to create the coroutine. The latter approach can be
       used to cteate remote coroutines with dynamically generated
       functions (by transferreing all necessary dependencies with
       *components*).

     compute.run_at(where, func, *args, **kwargs)

        Note: This method must be used with *yield* as "rcoro =
          yield compute.run_at(...)".

        Run given generator function *func* with arguments *args* and
        *kwargs* at *where*; i.e., create a coroutine at a process
        with the given generator function and arguments. If the
        request is successful, *rcoro* will be a (remote) coroutine.

        If *where* is a string, it is assumed to be IP address of a
        node, in which case the function is scheduled at that node on
        a process with least load (i.e., process with least number of
        pending coroutines). If *where* is a Location instance, it is
        assumed to be process location in which case the function is
        scheduled at that process.

        *func* must be generator function, as it is used to run
        coroutine at remote location.

     compute.run_each(where, func, *args, **kwargs)

        Note: This method must be used with *yield* as "rcoros =
          yield compute.run_each(...)".

        Run given generator function *func* with arguments *args* and
        *kwargs* at each node or process. When finished, *rcoros* will
        be list of (remote) coroutines.

        *where* is same as in the case of *run_at*: If it is string,
        the function is scheduled at every node and if it is a
        Location instance, the function is scheduled at every process
        (on every node).

     compute.run(func, *args, **kwargs)

        Note: This method must be used with *yield* as "rcoro =
          yield compute.run(...)".

        Run given generator function *func* with arguments *args* and
        *kwargs* at a process with least load at a node with least
        load; i.e., . If the request is successful, *rcoro* will be a
        (remote) coroutine.

     compute.run_nodes(func, *args, **kwargs)

        Note: This method must be used with *yield* as "rcoros =
          yield compute.run_nodes(...)".

        Run given generator function *func* with arguments *args* and
        *kwargs* at a process with least load at every node; i.e.,
        given function is executed at one process at each node. When
        finished, *rcoros* will be a list of (remote) coroutines.

     compute.run_procs(func, *args, **kwargs)

        Note: This method must be used with *yield* as "rcoros =
          yield compute.run_procs(...)".

        Run given generator function *func* with arguments *args* and
        *kwargs* at every process at every node. When finished,
        *rcoros* will be a list of (remote) coroutines.

     compute.run_node_procs(func, *args, **kwargs)

        Note: This method must be used with *yield* as "rcoros =
          yield compute.run_node_procs(...)".

        Run given generator function *func* with arguments *args* and
        *kwargs* at every process at given node at *addr*. When
        finished, *rcoros* will be a list of (remote) coroutines.

     compute.nodes()

        Note: This method must be used with *yield* as "nodes =
          yield compute.nodes(...)".

        Get list of addresses of nodes initialized for this
        computation.

     compute.procs()

        Note: This method must be used with *yield* as "nodes =
          yield compute.procs(...)".

        Get list of Location instances of processes initialized for
        this computation.

     compute.close()

        Note: This method must be used with *yield* as "yield
          compute.close()".

        Closes computation (so scheduler can execute next computation
        in the queue).

The remote coroutine *rcoro* obtained with *run* methods above can be
used for message passing or terminated; see Distributed Coroutines.
Although it can also be monitored with *yield rcoro.monitor(coro)*,
discoro scheduler monitors all coroutines created with *run* methods
and sends *MonitorException* message to *status_coro* about remote
coroutines.


Examples
--------

See Distributed Communicating Processes in tutorial for an example
usage. *discoro_client*.py* in the 'examples' directory show a few
more use cases.
