
Distributed Communicating Processes (discoro)
*********************************************

discoro module provides API for sending computation fragments (code
and data) to remote server processes for executing distributed
communicating processes. Whereas "RCI" provides API for creating
remote coroutines with pre-defined generator functions, discoro's API
provides generic framework that can be used by clients to send
different computaions to create coroutines at remote servers. There
are three components in discoro:

* Node / Servers program should be running on each of the nodes that
  run the servers to execute coroutines for clients,

* Scheduler that schedules client computations, manages nodes,
  remote coroutines etc. and ProcScheduler provides simple and
  convenient API to execute computations on remote server processes,

* Computation API for clients to create computations, schedule it
  with the scheduler and to run remote coroutines.


Examples
========

There are many illustrative use cases in 'discoro_client*.py' files in
the 'examples' directory under where asyncoro module is installed. To
use these examples, run 'discoronode.py' program on one or more of the
nodes (most examples use one or two servers) along with an example
file. The examples are written to illustrate various features, and not
necessarily in a simple way, or error checking is not comprehensive.
The comments in the code explain usage / notes.

Following is a brief description of the examples included relevant to
this section:

* "discomp1.py" illustrates how to use discoro and ProcScheduler for
  distributed computing. The computations are assumed to CPU intensive
  so at most one computation would be running at a (local or remote)
  server process, similar to using dispy project, except that with
  asyncoro coroutines can use message passing to communicate /
  cooperate.

* "discomp2.py" is another example of distributed computing, similar
  to discomp1.py. It shows how to relay status messages from discoro
  scheduler to ProcScheduler and HTTP Server.

* "discomp3.py" and "discomp4.py" demonstrate using AsyncThreadPool
  to run synchronous tasks (i.e., computations without 'yield'
  statements) in remote processes.

* "discomp5.py" uses *execute* method of ProcScheduler to issue
  computation jobs and get their results.

* "discomp6.py" uses *execute_at* method of ProcScheduler to
  initialize server processes for in-memory processing, issue
  computation jobs and get their results.

* "discoro_client1.py" distributes a computation and its
  dependencies to execute remote coroutines. The client and remote
  coroutines use message passing to exhcange data.

* "discoro_client2.py" is similar to discoro_client1.py, except that
  it uses status messages from scheduler to schedule coroutines.

* "discoro_client3.py" is a variation of discoro_client2.py where a
  certain number of jobs are submitted whenever a server becomes
  available.

* "discoro_client4.py" sends input data files ('input0.dat',
  'input1.dat' etc.) at the client to remote server to execute
  coroutines. These remote coroutines process given input file to
  generate corresponding output file (for given input file
  'inputX.dat', output file is 'outputX.dat') and then transfer that
  ouput file to client.

* "discoro_client5.py" uses status messages to initialize (setup) a
  server to read the data in a file into a global variable, which is
  later used to efficiently process data with in-memory processing (to
  use data in global variable instead of reading from file each time).

* "discoro_client6.py" shows how client process can stream data to
  two remote processes, one to compute statistics of moving window of
  data, and one to save the data in a file. Streaming uses same
  connection to send data (and thus efficient), instead of opening and
  closing connections.


Node / Servers
==============

*discoronode* program (discoronode.py) is used to start server
processes at a node. These server processes are used by discoro
scheduler to run computations submitted by clients. The program, by
default, starts one server for each processor available so that CPU
intensive computations can utilize all the processors efficiently.
Each server runs as a separate process so computations running in one
server process don't interfere with computations in another server
process on the same node. However, multiple coroutines can be
scheduled on one server so that if computations have I/O (such as
communicating with other computations / client, or reading/writing
data), asyncoro can run another coroutine that is ready to execute on
that server process. All coroutines running in one server process
share the address space, run on one CPU; however, as asyncoro doesn't
pre-empt a running coroutine until *yield* is used, there is no need
to use locking critical sections, as done with threads. If all
computations are same and do not need to communicate with each other,
then dispy project can be used.

The program takes following options to customize how the servers are
started.

   * "-c" option specifies number of instances of discoro server
     processes to run. Each server process uses one processor. If "-c"
     option is used with a positive number, then that many processors
     can be used in parallel; if it is negative number, then that many
     processors are not used (from available processors) and if it is
     "0" (default value), then all available processors are used.

   * "-i" or "--ip_addr" is same as *node* option to AsynCoro

   * "--ext_ip_addr" is same as *ext_ip_addr* option to AsynCoro

   * "-u" or "--urp_port" is same as *udp_port* option to AsynCoro

   * "-n" or "--name" is same as *name* option to AsynCoro

   * "--dest_path" is same as *dest_path* option to AsynCoro

   * "--max_file_size" is same as *max_file_size* option to AsynCoro

   * "-s" or "--secret" is same as *secret* option to AsynCoro

   * "--certfile" is same as *certfile* option to AsynCoro

   * "--keyfile" is same as *keyfile* option to AsynCoro

   * "-d" or "--debug" option enables debug log messages.


Scheduler
=========

discoro scheduler schedules computations, keeps track of available
nodes, server processes etc. Client computations, constructed with
Computation, are sent to the scheduler. The scheduler queues the
client computations and processes one each time, until the computation
closes. Even if currently scheduled computation is not utilizing all
the servers, the scheduler doesn't schedule any other computations, so
that one computation's state doesn't interfere with another
computation's. That is, currently scheduled computation reserves all
the available servers.

Scheduler can be started either as a separate program, or from within
the client program. If multiple client programs can schedule
computations simultaneously, scheduler should be started as a program
on a node; then the scheduler will schedule them in the order
submitted.


Private Scheduler
-----------------

If only one client uses the nodes, then it is easier to start the
scheduler with:

   discoro.Scheduler()

in the client program before computation is scheduled with
"compute.schedule()" (see below). There is no need to hold reference
to the object created, as the methods of scheduler are not to be used
directly by the client; instead, computation created with
"Computation" should be used to run remote coroutines. When the
scheduler starts, it detects discoro nodes and servers, which may take
some time (depending on network topology, number of nodes and servers,
it may be less than a second to a few seconds), so it may be easier to
delay scheduling computation if started from the program. Alternately,
*status_coro* parameter of computation (see below) can be used to have
scheduler inform when a server is initialized etc., and then schedule
jobs accordingly. See 'discoro_client*.py' files in examples directory
for use cases.


Shared Scheduler
----------------

If more than one client may use the scheduler simultaneously,
scheduler should be started by running the discoro program
(discoro.py). The program takes following options (same as
*discoronode*, except for starting servers):

   * "-i" or "--ip_addr" is same as *node* option to AsynCoro

   * "--ext_ip_addr" is same as *ext_ip_addr* option to AsynCoro

   * "-u" or "--urp_port" is same as *udp_port* option to AsynCoro

   * "-n" or "--name" is same as *name* option to AsynCoro

   * "--dest_path" is same as *dest_path* option to AsynCoro

   * "--max_file_size" is same as *max_file_size* option to AsynCoro

   * "-s" or "--secret" is same as *secret* option to AsynCoro

   * "--certfile" is same as *certfile* option to AsynCoro

   * "--keyfile" is same as *keyfile* option to AsynCoro

   * "--zombie_period=sec" specifies maximum number of seconds a
     remote server process can stay idle before it closes computation.
     The default value is 1800 seconds (30 minutes). Once all servers
     used by a computation close, the computation is discarded so
     other pending (queued) computations can be run. If
     "zombie_period" is set to 0, then idle check is not done, so
     computations are not automatically closed.

   * "-d" or "--debug" option enables debug log messages.

When shared scheduler is running on a computer in local network,
"Computation.schedule()" will locate the scheduler automatically. If
the scheduler is in remote network, "scheduler.peer()" method of
AsynCoro should be used so asyncoro can locate the scheduler.


Computation
===========

*discoro* module's "Computation" provides API for client programs to
package computation fragments, send it to scheduler, and submit
coroutines to be executed at remote server processes to execute
distributed communicating processes. A computation's jobs (remote
coroutines) at a remote server run with only the components pacakge. A
remote coroutine can expect to have only asyncoro module available;
any other modules, global variables etc. need to be initialized
appropriately. If necessary, initialization can be done by scheduling
a job on remote servers, e.g., to read data in a file, before running
other jobs, which can expect the side effects of setup job. See
"discoro_client*.py" files in "examples" directory (under where
asyncoro module is installed) for different use cases. All coroutines
at a server process (if more than one job is scheduled concurrently)
run in one thread / processor, share the address space, and run
concurrently.

class class Computation(components, status_coro=None, timeout=MsgTimeout, pulse_interval=MinPulseInterval, zombie_period=None)

* *components* must be a list, each element of which can be either a
  Python function class module, or path to a file. These computation
  fragments are sent to discoro servers for execution. Once the
  computation is scheduled (i.e,. *schedule* method is finished),
  generator functions can be run on the nodes / servers. These jobs
  can use the components packaged. If the jobs scheduled are listed in
  *components*, then the code for those functions would've already
  been distributed, so only the names of those functions are sent
  along with the arguments; otherwise, each *run* method sends the
  code for the functions along with the arguments to the remote
  server.

  Before a remote coroutine can be created at a remote discoro server,
  the components would've been transferred: If a component is a file,
  it would've been saved at the current working directory (which is
  system dependent and can be customized with "--dest-path" option to
  discoronode; e.g,. "/tmp/asyncoro/discoro/server-1"), code (e.g.,
  Class definition) executed (so Class instances can be sent as
  arguments or messages) etc.

* *status_coro*, if given, must be a coroutine. If it is a
  coroutine, discoro scheduler sends status of remote servers, jobs
  executed etc., to this coroutine as messages. These messages can be
  used to schedule jobs.

* *timeout* is maximum number of seconds to successfully transfer
  messages to/from servers or client. If a message could not be sent
  or received within timeout, the operation is aborted. Default value
  is as defined by *MsgTimeout* in discoro module.

* *pulse_interval* is interval number of seconds at which pulse
  messages are exchanged among servers and client to check for faults.
  If no messages are received within (5 * *pulse_interval*), the
  remote end is assumed to be faulty and it is closed. Default value
  is *MinPulseInterval* as defined in discoro module.

* *zombie_period* is as explained in *Shared Scheduler* section`.
  When computation is sent to shared scheduler, the zombie period is
  set to whatever value shared scheduler is started with (default is
  1800 seconds), so that if the computation is idle for that period,
  it is closed so other computations can use the nodes. If private
  scheduler is used instead, the default is *None*, so computation is
  not closed automatically even if it is idle, as there won't be other
  computations pending.

  The computation created as, for example, "compute =
  Computation(...)", has following methods:

     compute.schedule(location=None)

        Note: This method must be used with *yield* as "result =
          yield compute.schedule()".

        Schedule computation for execution.  If scheduler is shared
        and executing other computations, this method will block until
        those computations close. If successful, *result* will be 0.

        *location*, if not *None*, should be a Location instance
        refering to where the scheduler is running. If it is *None*,
        scheduler must be running at a node in local network; asyncoro
        will use name resolution to find the scheduler (a coroutine)
        with the registered name "discoro_scheduler".

     Note: All the *run* methods below take generator function and
       arguments used to create coroutines (jobs) at a remot server.
       If the generator function used in a *run* method is one of the
       *components* used to create computation, the code for the
       function is not transferred to the servers (thus a bit
       efficient); otherwise, the code is also transferred to the
       servers to create the coroutine. The latter approach can be
       used to cteate remote coroutines with dynamically generated
       functions (by transferreing all necessary dependencies with
       *components*).

     Note: The generator functions sent to remote servers to run
       coroutines are expected to *yield* at least every
       *pulse_interval* seconds. Otherwise, the server won't be able
       to send pulse messages to scheduler, causing scheduler to
       assume the server is not reachable. If the computations are CPU
       bound only (i.e., without *yield* statements), consider
       increasing *pulse_interval* up to *MaxPulseInterval* (which is
       100 seconds as of now). If that is too short, increase that
       with "discoro.MaxPulseInterval = n" before starting (private)
       scheduler. Increasing or disabling pulse interval will delay or
       disable fault detection.

     compute.run_at(where, gen, *args, **kwargs)

        Note: This method must be used with *yield* as "rcoro =
          yield compute.run_at(...)".

        Run given generator function *gen* with arguments *args* and
        *kwargs* at *where*; i.e., create a coroutine at a server with
        the given generator function and arguments. If the request is
        successful, *rcoro* will be a (remote) coroutine.

        If *where* is a string, it is assumed to be IP address of a
        node, in which case the function is scheduled at that node on
        a server with least load (i.e., server with least number of
        pending coroutines). If *where* is a Location instance, it is
        assumed to be server location in which case the function is
        scheduled at that server.

        *gen* must be generator function, as it is used to run
        coroutine at remote location.

     compute.run_each(where, gen, *args, **kwargs)

        Note: This method must be used with *yield* as "rcoros =
          yield compute.run_each(...)".

        Run given generator function *gen* with arguments *args* and
        *kwargs* at each node or server. When finished, *rcoros* will
        be list of (remote) coroutines.

        *where* is same as in the case of *run_at*: If it is string,
        the function is scheduled at every node and if it is a
        Location instance, the function is scheduled at every server
        (on every node).

     compute.run(gen, *args, **kwargs)

        Note: This method must be used with *yield* as "rcoro =
          yield compute.run(...)".

        Run given generator function *gen* with arguments *args* and
        *kwargs* at a server with least load at a node with least
        load. If the request is successful, *rcoro* will be a (remote)
        coroutine.

     compute.run_nodes(gen, *args, **kwargs)

        Note: This method must be used with *yield* as "rcoros =
          yield compute.run_nodes(...)".

        Run given generator function *gen* with arguments *args* and
        *kwargs* at a server with least load at every node; i.e.,
        given function is executed at one server at each node. When
        finished, *rcoros* will be a list of (remote) coroutines.

     compute.run_servers(gen, *args, **kwargs)

        Note: This method must be used with *yield* as "rcoros =
          yield compute.run_servers(...)".

        Run given generator function *gen* with arguments *args* and
        *kwargs* at every server at every node. When finished,
        *rcoros* will be a list of (remote) coroutines.

     compute.run_node_servers(gen, *args, **kwargs)

        Note: This method must be used with *yield* as "rcoros =
          yield compute.run_node_servers(...)".

        Run given generator function *gen* with arguments *args* and
        *kwargs* at every server at given node at *addr*. When
        finished, *rcoros* will be a list of (remote) coroutines.

     compute.nodes()

        Note: This method must be used with *yield* as "nodes =
          yield compute.nodes()".

        Get list of addresses of nodes initialized for this
        computation.

     compute.servers()

        Note: This method must be used with *yield* as "servers =
          yield compute.servers()".

        Get list of Location instances of servers initialized for this
        computation.

     compute.close()

        Note: This method must be used with *yield* as "yield
          compute.close()".

        Closes computation. After the computation is closed, remote
        coroutines cannot be scheduled. Closing computation causes
        each server process to remove any files saved or created at
        that server, remove global variables created by the jobs etc.
        If the scheduler is shared, and another computation is
        pending, then the scheduler makes it the active computation
        (i.e., it's *schedule* finishes) so it can then schedule
        remote coroutines.

The remote coroutine *rcoro* obtained with *run* methods above can be
used for message passing or terminated; see Distributed Coroutines.
Although it can also be monitored with *yield rcoro.monitor(coro)*,
discoro scheduler monitors all coroutines created with *run* methods
and sends *MonitorException* message to *status_coro* about remote
coroutines.


DiscoroStatus
=============

When discoro scheduler changes state of nodes, servers or coroutines,
it sends the changes as messages to computations *status_coro*
coroutine, if it is initialized to a coroutine before the computation
is scheduled. This is so *status_coro* can schedule coroutines as and
when servers are avialable, for example. Each message is an instance
of *DiscoroStatus*, with attributes *status* and *info*:

* *status* is either *NodeDiscovered*, *NodeInitialized*,
  *NodeClosed*, *NodeIgnore*, *NodeDisconnected*, *ServerDiscovered*,
  *ServerInitialized*, *ServerClosed*, *ServerIgnore*,
  *ServerDisconnected*, *CoroCreated*, or *ComputationClosed*.

* *info* is either an instance of *Location* or an instance of
  *CoroInfo*.

If *status* is *CoroCreated* (indicating a coroutine has been created
at remote server), then *info* is an instance of *CoroInfo*; if
*status* is for a node (i.e., one of *NodeDiscovered*,
*NodeInitialized*, *NodeClosed*, *NodeIgnore*, *NodeDisconnected*),
then *info* is a string with IP address of the node; in all other
cases, *info* is an instance of Location.

*CoroInfo* has following attributes:

* *coro* is (remote) coroutine (instance of "Coro"),

* **args* is arguments used to create coroutine,

* ***kwargs* is keyword arguments used to create coroutine,

* *start_time* is time when coroutine was created.


HTTP Server
===========

**asyncoro.httpd** module provides provides HTTP interface to monitor
and manage discoro servers (nodes and servers) with a web browser; it
works with common web browsers, including in iOS and Android devices.
It doesn't require / use apache or other web servers.  HTTP server can
be created with:

class class HTTPServer(host='', port=8181, poll_sec=10, DocumentRoot=None, keyfile=None, certfile=None)

   Creates an instance of HTTP server which will listen for
   connections at given *host* and *port*.

   * *host* should be a string, either IP address or name of the
     host. The default value of *''* will bind the server to all
     configured network interfaces.

   * *port* should be a number HTTP server binds to. Web browsers
     can monitor and manage cluster by connecting to
     **http://<host>:<port>** if SSL (https) is not used and
     **https://<host>:<port>** if SSL is used.

   * *poll_sec* is number of seconds (interval) the client waits
     between update requests from the server. Smaller values of
     *poll_sec* will cause more frequent updates so the information
     shown is more accurate, but cause more network traffic/load.
     Bigger values of *poll_sec* are more efficient but the status in
     browser may not reflect more recent information.

   This value can be changed in the browser as well.

   * *DocumentRoot* is directory where monitor.html, asyncoro.css,
     etc. files needed for the service are stored. If this is not set,
     the directory is assumed to be **data** directory under the
     directory where asyncoro.httpd module is stored.

   * *keyfile* and *certfile*, if given, will be used to configure
     SSL so https can be used between browser and HTTP server. If both
     key and certificate are in the same file, then it should be given
     as *certfile*.

   The HTTP server has following methods:

   shutdown(wait=True)

      Shuts down HTTP server. If *wait* is **True**, the server waits
      for current *poll_sec* period before shutting down so the web
      browser gets all the updates.

   Note: When cluster status is being monitored, the HTTP server
     sends only changes to cluster status between updates to browser
     (for efficiency); i.e., each time browser gets the status update
     at current *poll_sec* interval, the server sends the changes
     since last time browser requested data. The browser computes full
     current status with these updates.  Consequently, the status can
     be viewed in only one browser; if more than one browser is used,
     they will not get full information.

   status_coro

      This is a corotune that should get all status messages sent by
      scheduler. The client program should set *status_coro* attribute
      to this coroutine if the client doesn't need to process status
      messages; otherwise, the messages should be resent by client to
      this corotune. That is, if client does not process status
      messages, the client program should be of the form:

         http_server = asyncoro.httpd.HTTPServer()
         compute = discoro.Computation(...)
         compute.status_coro = http_server.status_coro


Example
=======

See "discoro_httpd.py" for an example where client program relays the
status messages to HTTP server's *status_coro* before processing them
(to schedule coroutines).


Client (Browser) Interface
==========================

Once HTTP server is created, the discoro servers can be monitored and
managed in a web browser at **http://<host>:8181**, where *<host>* is
name or IP address of computer running the program. If SSL
certificates are used to setup HTTP server, https protocol should be
used in URL above. There are currently 3 sections (menu items):


Cluster Status
--------------

The *Cluster* menu shows summary of nodes and coroutines:

[image]

The information shows summary of nodes and coroutines. Coroutine
summary shows total number of coroutines submitted so far, done
(finished or cancelled) and currently running coroutines. The nodes
summary shows IP address, number of servers running on that node,
number of coroutines submitted to all servers on that node, number of
corotuines done by all servers on that node, and number of currently
running coroutines by all servers on that node. Each node's IP address
is shown with hyper link; when the link is activated, the page changes
to show status for that node, as explained in *Node Status*.

The nodes are sorted by default on the IP address in descending order.
The field to sort on can be changed; however, as other fields are not
necessarily unique, sorting on other fields is inefficient, so if
there are many nodes, especially with frequent updates, choose IP
address as the field to sort. Sorting can be changed even after
cluster is closed.


Node Status
-----------

Each node in *Cluster Status* section is a hyper link which when
followed (to **Node** menu) shows details about that node, including
servers available, coroutines processed:

[image]

The summary of node includes number of server processes running on the
node, number of coroutines running on each server. Each server is
shown with its location (an instance of Location) as hyper link. This
link can be used to get details of coroutines running on that server,
as explained in *Server Status*.


Server Status
-------------

As noted above, a discoronode program starts a discoro server process
for each CPU available (or as specified with "-c" option) on that
node. Each server process runs coroutines submitted by discoro
scheduler.  The **Server** menu shows number of coroutines submitted,
number of coroutines done, and details of each coroutine currently
running. The details show the name of coroutine (function), the
arguments used to run it and the time when it was started.

[image]

The arguments are converted to strings by HTTP server before sending
to the browser client. If any of these are instances of user provided
classes, it may be useful to provide __str__ method. Otherwise,
Python's default *__str__* method may show it as simply an instance of
a class, which may not be very useful.

If necessary, coroutines can be selected and terminated (killed).


ProcScheduler
=============

When a computation's *run* method is used to create (remote)
coroutine, discoro's Scheduler uses load balancing to select a server
process to run the coroutine, with no limit on how many coroutines are
scheduled at a server. This is appropriate if the coroutines are not
CPU bound (i.e., use asynchronous I/O, such as receiving messages,
wait for messages, sleep etc., in which case asyncoro can run other
eligible coroutines), especially as creating coroutines is very
efficient. However, if coroutines are CPU intensive, creating multiple
such coroutines may not be efficient (e.g., load balancing scheduler
may create equal number of coroutines at a slow node's server and a
fast node's server, so it would take longer to finish all
computations).

A client program can use status messages from discoro scheduler to
schedule coroutines as appropriate for different use cases.
*discoro_schedulers.py* implements special purpose schedulers for a
few such cases.

**ProcScheduler** schedules at most one coroutine at a server process.
This can be used to create coroutines that are (mostly) CPU bound. See
"discomp1.py", "discomp2.py" for example use cases.

class class ProcScheduler(computation)

   Creates server process (or processor) scheduler for given
   computation (an instance of Computation). The computation's
   *status_coro* is set to *ProcScheduler*'s coroutine so status
   messages from discoro Scheduler are used to keep track of server
   processes and coroutine finish messages. If client program needs to
   process status messages for other purposes, then these messages can
   be relayed; see "discomp2.py" for an example.

   This scheduler has following methods:

   schedule(gen, *args, **kwargs)

      Note: This method must be used with *yield* as "rcoro = yield
        proc_scheduler.schedule(gen, ...)".

      Similar to *run* method of Computation, except the call may wait
      until a server process is available (i.e., not running any other
      scheduled coroutines).

      When using this scheduler, *run* method of Computation should
      not also be used to create coroutines, unless they are not CPU
      intensive (e.g., they wait for messages, events etc.).

   execute(gen, *args, **kwargs)

      Note: This method must be used with *yield* as "result = yield
        proc_scheduler.execute(gen, ...)".

      Similar to *schedule* method above, the calling coroutine is
      suspended until a server process is available (i.e., not running
      any other computations scheduled with *ProcScheduler*), the
      computation is executed and its results are obtained. The value
      yielded is the result (i.e., exit status) of computation.

      When using this scheduler, *run* method of Computation should
      not also be used to create coroutines, unless they are not CPU
      intensive (e.g., they wait for messages, events etc.).

   execute_at(where, gen, *args, **kwargs)

      Note: This method must be used with *yield* as "result = yield
        proc_scheduler.execute(location, gen, ...)".

      Similar to *run_at* method of Computation, except the calling
      coroutine is blocked until the computation finishes and exit
      value of computation is returned. Unlike *execute* above, the
      computation is executed right away, even if remote server
      process is executing another computation.

   finish(close=False)

      Note: This method must be used with *yield* as "yield
        proc_scheduler.finish()".

      Waits until all scheduled coroutines with this *ProcScheduler*
      to finish. If *close* is **True**, the computation is also
      closed.
