
Distributed Communicating Processes (discoro)
*********************************************

discoro module provides API for sending computation fragments (code
and data) to remote server processes for executing distributed
communicating processes. Whereas RCI (Remote Coroutine Invocation)
provides API for creating remote coroutines with pre-defined generator
functions, discoro's API provides generic framework that can be used
by clients to send different computaions to create coroutines at
remote servers. There are three components in discoro:

* Node / Servers (discoronode) program should be running on each of
  the nodes that run the servers to execute coroutines for clients,

* Scheduler (discoro) that schedules client computations, manages
  nodes, remote coroutines etc. and RemoteCoroScheduler provides
  simple and convenient API to execute computations on remote server
  processes,

* Computation API for clients to create computations, schedule it
  with the scheduler and to run remote coroutines.


Examples
========

There are many illustrative use cases in 'discomp*.py' and
'discoro_client*.py' files in the 'examples' directory under where
asyncoro module is installed. To use these examples, run
'discoronode.py' program on one or more of the nodes (most examples
use one or two servers) along with an example file. The examples are
written to illustrate various features, and not necessarily in a
simple way, or error checking is not comprehensive. The comments in
the code explain usage / notes.

Compared to dispy project, there are quite a few more number of steps
involved in distributing and getting results with asyncoro/discoro;
however, asyncoro/discoro offer many features, such as communicating
with computation (even computations communicating among themselves),
data streaming, live analytics etc.

Following is a brief description of the examples included relevant to
this section:

* "discoro_client1.py" illustrates how to use discoro and
  RemoteCoroScheduler for distributed computing.

* "discoro_client2.py" is a variation of discoro_client1.py. In this
  example, computations are assumed to CPU intensive (simulated using
  'time.sleep'), similar to using dispy project. It also shows how to
  use status messages from discoro scheduler (instead of simpler
  RemoteCoroScheduler) to run coroutines at remote server processes.

* "discoro_client3.py" uses RemoteCoroScheduler to distribute
  coroutine function and a class. The local coroutine (running at
  client) and remote coroutines exchange objects of that class using
  message passing.

* "discoro_client4.py" sends files at the client to remote process
  to execute computations that process those files and the remote
  process in turn sends the results in files back to the client.

* "discoro_client5.py" runs an external program
  ("discoro_client5_proc.py") at remote servers. The program reads
  from standard input and writes to standard output. Asynchronous
  pipes and message passing are used to send input from client to this
  program executing on remote servers, and get the output back to
  client.

* "discoro_client6.py" uses streaming of data to remote coroutines
  for efficient communication. The example also shows how to implement
  live/real-time analytics and send them to client.

* "discoro_client6_channel.py" is same as discoro_client6.py, except
  it uses channel to broadcast data to remote coroutines.

* "discoro_client7.py" is an alternate implementation of
  discoro_client1.py; instead of using RemoteCoroScheduler (which is
  easier), messages from discoro scheduler are used to schedule remote
  coroutines.

* "discoro_client8.py" demonstrates that long-runnning computations
  without *yield* often can be executed. In this cast, *time.sleep* is
  used to simulate computation. Note that *time.sleep* blocks entire
  asyncoro framework, so no other coroutines can execute until next
  *yield*. With version 4.1 (and above), I/O processing, message
  passing, sending heartbeat messages to scheduler etc. are handled by
  a separate (called "reactive") asyncoro scheduler that is not
  affected by user's coroutines. So messages sent by client are
  received and queued by reactive scheduler.

* "discoro_client9.py" uses RemoteCoroScheduler to setup remote
  server processes by reading data in files in to global variables
  (memory) for processing that data in comptations efficiently (i.e.,
  in-memory processing).

* "discoro_ssh_ec2.py" is a variation of "discoro_client8.py" that
  uses Amazon EC2 for cloud computing; see Cloud Computing for more
  details.


Node / Servers
==============

*discoronode* program (discoronode.py) is used to start server
processes at a node. These server processes are used by discoro
scheduler to run computations submitted by clients. The program, by
default, starts one server for each processor available so that CPU
intensive computations can utilize all the processors efficiently.
Each server runs as a separate process so computations running in one
server process don't interfere with computations in another server
process on the same node. However, multiple coroutines can be
scheduled on one server so that if computations have I/O (such as
communicating with other computations / client, or reading/writing
data), asyncoro can run another coroutine that is ready to execute on
that server process. All coroutines running in one server process
share the address space, run on one CPU; however, as asyncoro doesn't
pre-empt a running coroutine until *yield* is used, there is no need
to use locking critical sections, as done with threads. If all
computations are same and do not need to communicate with each other,
then dispy project can be used.

The program takes following options to customize how the servers are
started.

   * "-c" option specifies number of instances of discoro server
     processes to run. Each server process uses one processor. If "-c"
     option is used with a positive number, then that many processors
     can be used in parallel; if it is negative number, then that many
     processors are not used (from available processors) and if it is
     "0" (default value), then all available processors are used.

   * "-i" or "--ip_addr" is same as *node* option to AsynCoro

   * "--ext_ip_addr" is same as *ext_ip_addr* option to AsynCoro

   * "-u" or "--udp_port" is same as *udp_port* option to AsynCoro

   * "-n" or "--name" is same as *name* option to AsynCoro

   * "--dest_path" is same as *dest_path* option to AsynCoro

   * "--max_file_size" is same as *max_file_size* option to AsynCoro

   * "-s" or "--secret" is same as *secret* option to AsynCoro

   * "--certfile" is same as *certfile* option to AsynCoro

   * "--keyfile" is same as *keyfile* option to AsynCoro

   * "--min_pulse_interval n" specifies minimum pulse interval that
     can be given as *pulse_interval* by client computations. The
     default value is *MinPulseInterval* defined (as 10 seconds) in
     discoro module. As nodes send availability status (CPU, memory
     and disk avaialability), the clients may want this inofrmation
     more frequently than at *MinPulseInterval*, in which case,
     smaller value can be specieid with this option.

   * "--max_pulse_interval n" specifies maximum pulse interval that
     can be given as *pulse_interval* by client computations. See
     *min_pulse_interval* above.

   * "--zombie_period n" is maximum number of seconds that
     discoronode remains idle (i.e., doesn't run any computations for
     current client) before the node closes current computation so
     another computation may use node.

   * "-d" or "--debug" option enables debug log messages.

   * "--tcp_ports n1-n2" or "--tcp_ports n" option can be used to
     specify list of TCP ports to be used by servers. Without this
     option a server uses any available (dynamic) port, which can be a
     problem with remote servers or cloud computing that require
     firewall to be configured to forward ports. With *tcp_ports*,
     specific range of ports can be used and those ports can be
     configured for port forwarding. The range can be given either as
     *n1-n2*, in which case ports from *n1* to *n2* (both inclusive)
     will be used, or as single number *n*. *tcp_ports* can be used as
     many times as necessary to add different ranges, or different
     ports. If number of ports listed is less than number of servers
     (based on *-c* option, or number of availalble CPUs), discoronode
     will use ports beyond the highest listed port; thus, if number of
     servers to start is 8, and *--tcp_ports 2345-2347 --tcp_ports
     3799* is given, then servers will use ports 2345, 2346, 2347,
     3799, 3780, 3781, 3782, 3783, or if, say, just *--tcp_ports
     51347* is used, then ports from 51347 to 51354 are used.

   * "--serve n" option indicates number of computations/clients to
     serve before exiting. The default value of -1 implies no limit
     and any positive number causes discoronode to quit after running
     that many computations. This option can be used with Docker
     Container to run each computation in a new container, so one
     computation starts with the same environment the docker image was
     built with.

   * "--daemon", if given, indicates that discoronode shouldn't read
     standard input. Without this option and started with command-
     line, discoronode offers a choice of commands that can be input
     to get status or quit. Currently supported commands are:

     * "status" shows status of each of the processes, such as
       number of coroutines being executed for a computation

     * "close" closes currently executing computation (if any). This
       is equivalent to computation calling "close" method. If any
       coroutines are being executed for that computation, they will
       be killed. A new computation can then use the server.

     * "quit" or "exit" command causes discoronode to stop accepting
       any more coroutines and when all coroutines are done, closes
       the computation and quits.

     * "terminate" kills currently executing coroutines, closes
       computation and quits.

   * "--service_start HH:MM", "--service_stop HH:MM" and "--
     service_end HH:MM" set time of day when discoro service can be
     used by clients. The time is given as hours of day in 24-hour
     format, a colon and minutes of hour. If start time is not given,
     currrent time is assumed as start time. Either stop time or end
     time must also be given. After stop time new jobs will not be
     accepted, but currently running jobs will continue to execute. At
     end time any running jobs will be killed. For example, if
     *service_start* is set to 17:00, *service_stop* set to 07:00 and
     *service_end* is set to 08:00, the node will execute jobs from
     5PM, stop accepting new jobs at 7AM (next day), and kill any
     running jobs at 8AM. Then it will not accept any jobs until 5PM.

   * "--phoenix" indicates that discoronode should kill any previous
     server processes left running. When discoronode server processes
     are started, they store process ID in files on file system. When
     the server processes quit, they remove the files. If the
     processes are terminated, for example, by killing them explicitly
     or due to an unforeseen exception, then the files are left behind
     and the next time the server processes notice the files from
     previous run and refuse to start. *phoenix* option can be used to
     request the server process to kill previous servers. This should
     be done only if previous process is not running, or acceptable to
     kill the process if it is running.

   * "--discover_peers" requests discoronode to detect other peers
     on the local network. If this option is not used, discoronode
     doesn't detect peers, which is efficient especially when many
     nodes are available (in local network). Coroutines created by
     clients can communicate with coroutines on other discoronode
     peers only if they estabilished communication (i.e., have
     discovered each other, either with *discover_peers=True* or added
     with *peer* option below). See *discover_peers* option for
     AsynCoro; by default discoronode starts asyncoro with
     *discoro_peers=False*, and with *discoro_peers=True* if "--
     discover_peers" option is given.

   * "--peer location" requests discoronode to contact given peer to
     estabilish communication with it. *location* should be given in
     the form *node:port* where *node* is either host name or IP
     address and *port* is TCP port where peer is running. This option
     can be used multiple times to detect multiple peers.

     A typical use case where client's computations don't communicate
     with computations on other discoronodes (i.e., each coroutine
     executing on a discoronode communicates with coroutines at client
     or other local coroutines executing on that discoronode only) can
     be implemented with running each discoronode without
     *discover_peers* and *peer* options, and then start scheduler /
     client. (The scheduler and client use *discover_peers* so they
     detect all available discoronode servers, even the ones started
     without *discover_peers* option.)

     If scheduler is already running, *peer* option can be used with
     location where scheduler is running so that scheduler and new
     discoronode server can detect each other; alternately,
     *discover_peers* can be used, but in that case other discoronode
     servers will also detect new server.

   * "--save_config <file>" saves configuration (various options to
     start dispynode) in given file and exits. See "--config" below.

   * "--config <file>" loads configuration (various options to start
     dispynode) from given file (saved with "--save_config" option).

     "--save_config" can be used to save most of the options (certain
     options, such as "ip_addr" are unique to a node) and "--config"
     can be used to run discoronode with those options on many nodes.


Scheduler
=========

discoro scheduler schedules computations, keeps track of available
nodes, server processes etc. Client computations, constructed with
Computation, are sent to the scheduler. The scheduler queues the
client computations and processes one each time, until the computation
closes. Even if currently scheduled computation is not utilizing all
the servers, the scheduler doesn't schedule any other computations, so
that one computation's state doesn't interfere with another
computation's. That is, currently scheduled computation reserves all
the available servers.

Scheduler can be started either as a separate program, or from within
the client program. If multiple client programs can schedule
computations simultaneously, scheduler should be started as a program
on a node; then the scheduler will schedule them in the order
submitted.


Private Scheduler
-----------------

If only one client uses the nodes, then it is easier to start the
scheduler in discoro module with:

   Scheduler()

in the client program before computation is scheduled with
"compute.schedule()" (see below). There is no need to hold reference
to the object created, as the methods of scheduler are not to be used
directly by the client; instead, computation created with
"Computation" should be used to run remote coroutines. If necessary,
options used in AsynCoro can be passed, such as *node* to specify host
name or IP address to use for distributed computing. When the
scheduler starts, it detects discoro nodes and servers on the
configured network, which may take some time (depending on network
topology, number of nodes and servers, it may be less than a second to
a few seconds), so it may be easier to delay scheduling computation
jobs if started from the program. Alternately, *status_coro* parameter
of computation or RemoteCoroScheduler (see below) can be used to have
scheduler inform when a server is initialized etc., and then schedule
jobs accordingly. See 'discoro_client*.py' files in examples directory
for use cases.


Batch Scheduler
---------------

If more than one client may use the scheduler simultaneously,
scheduler should be started by running the discoro program
(discoro.py). The program takes following options (same as
*discoronode*, except for starting servers):

   * "-i" or "--ip_addr" is same as *node* option to AsynCoro

   * "--ext_ip_addr" is same as *ext_ip_addr* option to AsynCoro

   * "-u" or "--udp_port" is same as *udp_port* option to AsynCoro

   * "-t" or "--tcp_port" is same as *tcp_port* option to AsynCoro

   * "-n" or "--name" is same as *name* option to AsynCoro

   * "--node <host name or IP>" can be used as many times as
     necessary to list name or IP address of nodes in *remote*
     networks. Nodes in local network are automatically found, so no
     need to list them with this option. Moreover, listing only one
     node per one remote network should be enough - asyncoro finds all
     nodes in a netowrk by broadcasting.

   * "--dest_path" is same as *dest_path* option to AsynCoro

   * "--max_file_size" is same as *max_file_size* option to AsynCoro

   * "-s" or "--secret" is same as *secret* option to AsynCoro

   * "--certfile" is same as *certfile* option to AsynCoro

   * "--keyfile" is same as *keyfile* option to AsynCoro

   * "--zombie_period=sec" specifies maximum number of seconds a
     remote server process can stay idle before it closes computation.
     The default value is 10*MaxPulseInterval, which is 1000 seconds.
     Once all servers used by a computation close, the computation is
     discarded so other pending (queued) computations can be run. If
     "zombie_period" is set to 0, then idle check is not done, so
     computations are not automatically closed.

   * "-d" or "--debug" option enables debug log messages.

   * "--daemon", if given, indicates that discoro scheduler
     shouldn't read standard input. Starting the scheduler as
     background process (i.e., with "&" in Unix, for example) implies
     daemon. If not a daemon, the scheduler can be terminated with
     "quit" or "exit" commands.

When batch scheduler is running on a computer in local network,
"Computation.schedule()" will locate the scheduler automatically. If
the scheduler is in remote network, "scheduler.peer()" method of
AsynCoro should be used so asyncoro can locate the scheduler.

Note that discoro scheduler runs jobs for at most one computation at
any time. Other computations are queued and wait for their turn in the
order submitted; when currently running computation finishes, next
computation in queue is made active so its jobs can be run.


Computation
===========

*discoro* module's "Computation" provides API for client programs to
package computation fragments, send it to scheduler, and submit
coroutines to be executed at remote server processes to execute
distributed communicating processes. A computation's jobs (remote
coroutines) at a remote server run with only the components pacakge. A
remote coroutine can expect to have only asyncoro module available;
any other modules, global variables etc. need to be initialized
appropriately. If necessary, initialization can be done by scheduling
a job on remote servers, e.g., to read data in a file, before running
other jobs, which can expect the side effects of setup job. See
"discoro_client*.py" files in "examples" directory (under where
asyncoro module is installed) for different use cases. All coroutines
at a server process (if more than one job is scheduled concurrently)
run in one thread / processor, share the address space, and run
concurrently.

class Computation(components, status_coro=None, timeout=MsgTimeout, pulse_interval=(2*MinPulseInterval), ping_interval=None, zombie_period=0, node_filters=[])

* *components* must be a list, each element of which can be either a
  Python function class module, or path to a file. These computation
  fragments are sent to discoro servers for execution. Once the
  computation is scheduled (i.e,. *schedule* method is finished),
  generator functions can be run on the nodes / servers. These jobs
  can use the components packaged. If the jobs scheduled are listed in
  *components*, then the code for those functions would've already
  been distributed, so only the names of those functions are sent
  along with the arguments; otherwise, each *run* method sends the
  code for the functions along with the arguments to the remote
  server.

  Before a remote coroutine can be created at a remote discoro server,
  the components would've been transferred: If a component is a file,
  it would've been saved at the current working directory (which is
  system dependent and can be customized with "--dest-path" option to
  discoronode; e.g,. "/tmp/asyncoro/discoro/server-1"), code (e.g.,
  Class definition) executed (so Class instances can be sent as
  arguments or messages) etc.

* *status_coro*, if given, must be a coroutine. If it is a
  coroutine, discoro scheduler sends status of remote servers, jobs
  executed etc., to this coroutine as messages. These messages can be
  used to schedule jobs.

* *timeout* is maximum number of seconds to successfully transfer
  messages to/from servers or client. If a message could not be sent
  or received within timeout, the operation is aborted. Default value
  is as defined by *MsgTimeout* in discoro module.

* *pulse_interval* is interval number of seconds at which pulse
  messages are exchanged among servers and client to check for faults.
  If no messages are received within (5 * *pulse_interval*), the
  remote end is assumed to be faulty and it is closed.  This value
  must be at least *MinPulseInterval* and at most *MaxPulseInterval*.
  Both of these values are defined in discoro module. The default
  value of *pulse_interval* is *2*MinPulseInterval*.

  If nodes have psutil module installed, they send node availability
  status (CPU available in percent, memory in bytes and disk space in
  bytes) as an instance of DiscoroNodeAvailInfo at *pulse_interval*
  frequency. This information is useful for monitoring application
  performance, filtering nodes for required resources etc. This
  information is shown in web browser if HTTP Server is used.

* *ping_interval* is time in seconds at which scheduler broadcasts
  messages to discover new nodes that may not have been up, or
  discovered. When scheduler starts, it broadcasts "ping" message to
  which nodes respond. However, if a node is started after the
  scheduler starts, then scheduler won't be able to use it. (Note that
  by default nodes don't announce themselves unless *--discover_peers*
  option is given.) It is also possible that initial broadcast UDP
  message may have been dropped by network congestion (which happens
  more frequently with WiFi networks). When *ping_interval* is given
  as a number (which must be at least *MinPulseInterval*), the
  scheduler resends UDP broadcast messages at that interval.

* *zombie_period* is as explained in *Batch Scheduler* section. When
  computation is sent to batch scheduler, the zombie period is set to
  whatever value batch scheduler is started with (default is
  10*MaxPulseInterval, which is 1000 seconds), so that if the
  computation is idle for that period, it is closed so other
  computations can use the nodes.

* *node_filters* should be a list of DiscoroNodeFilter instances;
  the scheduler will accept only nodes that match properties given.

  The computation created as, for example, "compute =
  Computation(...)", has following methods:

     compute.schedule(location=None)

        Note: This method must be used with *yield* as "result =
          yield compute.schedule()".

        Schedule computation for execution.  If scheduler is batch and
        executing other computations, this method will block until
        those computations close. If successful, *result* will be 0.

        *location*, if not *None*, should be a Location instance
        refering to where the scheduler is running. If it is *None*,
        scheduler must be running at a node in local network; asyncoro
        will use name resolution to find the scheduler (a coroutine)
        with the registered name "discoro_scheduler".

     Note: All the *run* methods below take generator function and
       arguments used to create coroutines (jobs) at a remot server.
       If the generator function used in a *run* method is one of the
       *components* used to create computation, the code for the
       function is not transferred to the servers (thus a bit
       efficient); otherwise, the code is also transferred to the
       servers to create the coroutine. The latter approach can be
       used to cteate remote coroutines with dynamically generated
       functions (by transferreing all necessary dependencies with
       *components*). See RemoteCoroScheduler, which offers more
       features and easier to use, to schedule remote coroutines,
       instead of using *run* methods directly.

     Note: The generator functions sent to remote servers to run
       coroutines are expected to *yield* at least every
       *pulse_interval* seconds. Otherwise, the server won't be able
       to send pulse messages to scheduler, causing scheduler to
       assume the server is not reachable. If the computations are CPU
       bound only (i.e., without *yield* statements), consider
       increasing *pulse_interval* up to *MaxPulseInterval* (which is
       100 seconds as of now). If that is too short, increase that
       with "discoro.MaxPulseInterval = n" before starting (private)
       scheduler. Increasing or disabling pulse interval will delay or
       disable fault detection.

     compute.run_at(where, gen, *args, **kwargs)

        Note: This method must be used with *yield* as "rcoro =
          yield compute.run_at(...)".

        Run given generator function *gen* with arguments *args* and
        *kwargs* at *where*; i.e., create a coroutine at a server with
        the given generator function and arguments. If the request is
        successful, *rcoro* will be a (remote) coroutine.

        If *where* is a string, it is assumed to be IP address of a
        node, in which case the function is scheduled at that node on
        a server with least load (i.e., server with least number of
        pending coroutines). If *where* is a Location instance, it is
        assumed to be server location in which case the function is
        scheduled at that server.

        *gen* must be generator function, as it is used to run
        coroutine at remote location.

     compute.run_each(where, gen, *args, **kwargs)

        Note: This method must be used with *yield* as "rcoros =
          yield compute.run_each(...)".

        Run given generator function *gen* with arguments *args* and
        *kwargs* at each node or server. When finished, *rcoros* will
        be list of (remote) coroutines.

        *where* is same as in the case of *run_at*: If it is string,
        the function is scheduled at every node and if it is a
        Location instance, the function is scheduled at every server
        (on every node).

     compute.run(gen, *args, **kwargs)

        Note: This method must be used with *yield* as "rcoro =
          yield compute.run(...)".

        Run given generator function *gen* with arguments *args* and
        *kwargs* at a server with least load at a node with least
        load.  If the request is successful, *rcoro* will be a
        (remote) coroutine.

     compute.run_nodes(gen, *args, **kwargs)

        Note: This method must be used with *yield* as "rcoros =
          yield compute.run_nodes(...)".

        Run given generator function *gen* with arguments *args* and
        *kwargs* at a server with least load at every node; i.e.,
        given function is executed at one server at each node. When
        finished, *rcoros* will be a list of (remote) coroutines.

     compute.run_servers(gen, *args, **kwargs)

        Note: This method must be used with *yield* as "rcoros =
          yield compute.run_servers(...)".

        Run given generator function *gen* with arguments *args* and
        *kwargs* at every server at every node. When finished,
        *rcoros* will be a list of (remote) coroutines.

     compute.run_node_servers(gen, *args, **kwargs)

        Note: This method must be used with *yield* as "rcoros =
          yield compute.run_node_servers(...)".

        Run given generator function *gen* with arguments *args* and
        *kwargs* at every server at given node at *addr*. When
        finished, *rcoros* will be a list of (remote) coroutines.

     compute.nodes()

        Note: This method must be used with *yield* as "nodes =
          yield compute.nodes()".

        Get list of addresses of nodes initialized for this
        computation.

     compute.servers()

        Note: This method must be used with *yield* as "servers =
          yield compute.servers()".

        Get list of Location instances of servers initialized for this
        computation.

     compute.close()

        Note: This method must be used with *yield* as "yield
          compute.close()".

        Closes computation. After the computation is closed, remote
        coroutines cannot be scheduled. Closing computation causes
        each server process to remove any files saved or created at
        that server, remove global variables created by the jobs etc.
        If the batch scheduler is used, and another computation is
        pending, then the scheduler makes it the active computation
        (i.e., it's *schedule* finishes) so it can then schedule
        remote coroutines.

The remote coroutine *rcoro* obtained with *run* methods above can be
used for message passing or terminated; see Distributed Coroutines.
Although it can also be monitored with *yield rcoro.monitor(coro)*,
discoro scheduler monitors all coroutines created with *run* methods
and sends *MonitorException* message to *status_coro* about remote
coroutines.


DiscoroStatus
=============

When discoro scheduler changes state of nodes, servers or coroutines,
it sends the changes as messages to computations *status_coro*
coroutine, if it is initialized to a coroutine before the computation
is scheduled. This is so *status_coro* can schedule coroutines as and
when servers are avialable, for example. Each message is an instance
of *DiscoroStatus*, with attributes *status* and *info*:

* *status* is either *NodeDiscovered*, *NodeInitialized*,
  *NodeClosed*, *NodeIgnore*, *NodeDisconnected*, *ServerDiscovered*,
  *ServerInitialized*, *ServerClosed*, *ServerIgnore*,
  *ServerDisconnected*, *CoroCreated*, or *ComputationClosed*.

* *info* is either an instance of *Location* or an instance of
  *CoroInfo*.

If *status* is *CoroCreated* (indicating a coroutine has been created
at remote server), then *info* is an instance of *CoroInfo*; if
*status* is for a node (i.e., one of *NodeDiscovered*,
*NodeInitialized*, *NodeClosed*, *NodeIgnore*, *NodeDisconnected*),
then *info* is a string with IP address of the node; in all other
cases, *info* is an instance of Location.

*CoroInfo* has following attributes:

* *coro* is (remote) coroutine (instance of "Coro"),

* **args* is arguments used to create coroutine,

* ***kwargs* is keyword arguments used to create coroutine,

* *start_time* is time when coroutine was created.


DiscoroNodeAvailInfo
====================

      If a node has psutil module installed, its latest avaiability
      status is given as an instance of DiscoroNodeAvailInfo, which
      has three read-only attributes:

         *cpu* is available CPU as percent. If it is close to 100, the
         node is not busy at all, and if it is close to 0, the node is
         rather busy (running compute-intensive tasks on all CPUs).

         *memory* is available memory in bytes. This is not total
         memory, but usable memory, as interpretted by psutil module.

         *disk* is available disk space in bytes for the partition
         that discoronode uses as given by *dest_path* option (where
         client's files are saved by discoronode and jobs are run).

   This information is sent to Computation's *status_coro* at
   *pulse_interval* frequency.


DiscoroNodeFilter
=================

If a cluster has nodes with different resources and computation
requires specific resources, then the computation can list resource
requirements with *node_filters* list to Computation. Each element in
the list must be an instance of **DiscoroNodeFilter** with following
attributes:

   *ip_addr* must be a Python regular expression. A node's IP address
   must match
      this attribute for constraints on resources. Default is ".*",
      which matches any IP address. Rest of the attributes are
      constraints on resources that are used only if that node has
      *psutil* module so the available resources can be obtained by
      discoronode; if *psutil* module is not available on a node, it
      will be accepted (i.e., resource constraints are not checked).

   *cpus* must be an integer. If it is a positive number, then a node
   must have
      at least that many servers enabled to be used for the
      computation. If it is 0 (default), a node with any number of
      servers is accepted (i.e., no constraints on number of servers).

   *memory* must be an integer. If it is a positive number, then a
   node must have
      at least that many bytes of memory to be used for the
      computation. If it is 0 (default), a node with any number of
      bytes is accepted (i.e., no constraints on amount of memory).

   *disk* must be an integer. If it is a positive number, then a node
   must have
      at least that many bytes of disk space on the partition used by
      discoro to be used for the computation. If it is 0 (default),
      there are no constraints on amount of available disk space on
      discoronode's partition.

   *platform* must be a Python regular expression. For a computation
   to use a
      node, given expression must occur in its platform string,
      obtained by "platform.platform()" on the node, ignoring case.
      Default value is "''", which matches any platform. For example,
      "linux.*x86_64" accepts only nodes that run 64-bit Linux.

The constraint check is done on the list of *node_filters* in the
order given. If a node's IP address matches a filter, but resources
don't match the constraint, it is rejected. Thus, to reject a node
with specific IP address, for example, give a filter with that IP
address and an impossible constraint on resources (e.g., either
*cpus=999*, or *platform='^unpossible'*) as first elelemnt in
*node_filters*.


HTTP Server
===========

**asyncoro.httpd** module provides provides HTTP interface to monitor
and manage discoro servers (nodes and servers) with a web browser; it
works with common web browsers, including in iOS and Android devices.
It doesn't require / use apache or other web servers.  HTTP server can
be created with:

class HTTPServer(computation, host='', port=8181, poll_sec=10, DocumentRoot=None, keyfile=None, certfile=None)

   Creates an instance of HTTP server which will listen for
   connections at given *host* and *port*.

   * *computation* is an instance of Computation whose status will
     be sent to HTTP client (browser).

   * *host* should be a string, either IP address or name of the
     host. The default value of *''* will bind the server to all
     configured network interfaces.

   * *port* should be a number HTTP server binds to. Web browsers
     can monitor and manage cluster by connecting to
     **http://<host>:<port>** if SSL (https) is not used and
     **https://<host>:<port>** if SSL is used.

   * *poll_sec* is number of seconds (interval) the client waits
     between update requests from the server. Smaller values of
     *poll_sec* will cause more frequent updates so the information
     shown is more accurate, but cause more network traffic/load.
     Bigger values of *poll_sec* are more efficient but the status in
     browser may not reflect more recent information.

   This value can be changed in the browser as well.

   * *DocumentRoot* is directory where monitor.html, asyncoro.css,
     etc. files needed for the service are stored. If this is not set,
     the directory is assumed to be **data** directory under the
     directory where asyncoro.httpd module is stored.

   * *keyfile* and *certfile*, if given, will be used to configure
     SSL so https can be used between browser and HTTP server. If both
     key and certificate are in the same file, then it should be given
     as *certfile*.

   The HTTP server has following methods:

   shutdown(wait=True)

      Shuts down HTTP server. If *wait* is **True**, the server waits
      for current *poll_sec* period before shutting down so the web
      browser gets all the updates.

   Note: When cluster status is being monitored, the HTTP server
     sends only changes to cluster status between updates to browser
     (for efficiency); i.e., each time browser gets the status update
     at current *poll_sec* interval, the server sends the changes
     since last time browser requested data. The browser computes full
     current status with these updates. Consequently, the status can
     be viewed in only one browser; if more than one browser is used,
     they will not get full information.

   status_coro

      This is a corotune that should get all status messages sent by
      scheduler. The client program should set *status_coro* attribute
      to this coroutine if the client needs to process status messages
      itself, or if messages need to be chained to other recipients,
      such as RemoteCoroScheduler.


Example
=======

See "discoro_httpd.py" for an example where client program relays the
status messages to HTTP server's *status_coro* before processing them
(to schedule coroutines).


Client (Browser) Interface
==========================

Once HTTP server is created, the discoro servers can be monitored and
managed in a web browser at **http://<host>:8181**, where *<host>* is
name or IP address of computer running the program. If SSL
certificates are used to setup HTTP server, https protocol should be
used in URL above. There are currently 3 sections (menu items):


Cluster Status
--------------

The *Cluster* menu shows summary of nodes and coroutines:

[image]

The information shows summary of nodes and coroutines. Coroutine
summary shows total number of coroutines submitted so far, done
(finished or cancelled) and currently running coroutines. The nodes
summary shows IP address, number of servers running on that node,
number of coroutines submitted to all servers on that node, number of
corotuines done by all servers on that node, and number of currently
running coroutines by all servers on that node. Each node's IP address
is shown with hyper link; when the link is activated, the page changes
to show status for that node, as explained in *Node Status*.

The nodes are sorted by default on the IP address in descending order.
The field to sort on can be changed; however, as other fields are not
necessarily unique, sorting on other fields is inefficient, so if
there are many nodes, especially with frequent updates, choose IP
address as the field to sort. Sorting can be changed even after
cluster is closed.


Node Status
-----------

Each node in *Cluster Status* section is a hyper link which when
followed (to **Node** menu) shows details about that node, including
servers available, coroutines processed:

[image]

The summary of node includes number of server processes running on the
node, number of coroutines running on each server. Each server is
shown with its location (an instance of Location) as hyper link. This
link can be used to get details of coroutines running on that server,
as explained in *Server Status*.


Server Status
-------------

As noted above, a discoronode program starts a discoro server process
for each CPU available (or as specified with "-c" option) on that
node. Each server process runs coroutines submitted by discoro
scheduler.  The **Server** menu shows number of coroutines submitted,
number of coroutines done, and details of each coroutine currently
running. The details show the name of coroutine (function), the
arguments used to run it and the time when it was started.

[image]

The arguments are converted to strings by HTTP server before sending
to the browser client. If any of these are instances of user provided
classes, it may be useful to provide __str__ method. Otherwise,
Python's default *__str__* method may show it as simply an instance of
a class, which may not be very useful.

If necessary, coroutines can be selected and terminated (killed).


RemoteCoroScheduler
===================

When a computation's *run* method is used to create (remote)
coroutine, discoro's Scheduler uses load balancing to select a server
process to run the coroutine, with no limit on how many coroutines are
scheduled at a server. This is appropriate if the coroutines are not
CPU bound (i.e., use asynchronous I/O, such as receiving messages,
wait for messages, sleep etc., in which case asyncoro can run other
eligible coroutines), especially as creating coroutines is very
efficient. However, if coroutines are CPU intensive, creating multiple
such coroutines may not be efficient (e.g., load balancing scheduler
may create equal number of coroutines at a slow node's server and a
fast node's server, so it would take longer to finish all
computations).

A client program can use status messages from discoro scheduler to
schedule coroutines as appropriate for different use cases.
*discoro_schedulers.py* implements special purpose schedulers for a
few such cases.

class RemoteCoroScheduler(computation, proc_status=None, proc_available=None, proc_close=None)

   Creates server process (or processor) scheduler for given
   computation (an instance of Computation). The computation's
   *status_coro* is set to *RemoteCoroScheduler*'s coroutine so status
   messages from discoro Scheduler are used to keep track of server
   processes and coroutine finish messages. If client program needs to
   process status messages for other purposes, then these messages can
   be relayed; see "discoro_client2.py" for an example.

   *proc_status* if not **None** should be a generator function that
   is called (as coroutine) with the status and info, as received by
   status_coro. If status is ServerInitialized and this function
   returns non-zero value, the server is ignored; i.e., jobs scheduled
   with *schedule* or *execute* will not use that server.

   *proc_available* if not **None** should be a generator function
   that is called (as coroutine) with the location of a server process
   when it becomes available (after all *depends* of computation have
   been transferred). The coroutine runs at the client; it can create
   remote coroutine(s) at the server process, perhaps to setup, such
   as initializing global variables, transfer additional files etc.
   The coroutine should exit with 0 to indicate successful setup; any
   other value is interpretted as failure and not used by scheduler.

   *proc_close* if not **None** should be a generator function that is
   called (as coroutine) with the status and location of server
   process when server is about to be closed, or already closed. The
   coroutine runs at the client; it can create remote coroutine(s) at
   server process to cleanup, such as delete global variables,
   transfer files back to client etc. The coroutine is called with two
   parameters: *status*, which is either *Scheduler.ServerInitialized*
   when server is about to be closed (i.e., server is still available,
   and remote coroutines can be executed), or *Scheduler.ServerClosed*
   when server is already closed (e.g., due to *zombie_period* time
   elapsed without communication, or server was manually closed with
   command-line etc.), and *location* of server process.

   This scheduler has following methods:

   schedule(gen, *args, **kwargs)

      Note: This method must be used with *yield* as "rcoro = yield
        proc_scheduler.schedule(gen, ...)".

      Similar to *run* method of Computation, except that *schedule*
      blocks if another coroutine is already scheduled at a remote
      server; i.e., *schedule* runs at most one coroutine at a server
      process, with the assumption that given generator is CPU
      intensive and running more than one coroutine at one server is
      not efficient.

   submit_at(where, gen, *args, **kwargs):

      Note: This method must be used with *yield* as "rcoro = yield
        proc_scheduler.submit_at(location, gen, ...)".

      Similar to *run_at* method of computation. If *where* is None,
      the calling coroutine is blocked until any server is discovered
      and initialized (so computation's *run_at* will not fail).
      Unlike *schedule*, this method doesn't wait for server to be
      free (i.e., not running any other coroutines), nor unlike
      *execute_at*, the caller is not blocked until the coroutine
      finishes.

   submit(gen, *args, **kwargs):

      Note: This method must be used with *yield* as "rcoro = yield
        proc_scheduler.submit(gen, ...)".

      Submit coroutine at any server; see *submit_at* above.

   execute(gen, *args, **kwargs)

      Note: This method must be used with *yield* as "result = yield
        proc_scheduler.execute(gen, ...)".

      Similar to *schedule* method above, the calling coroutine is
      suspended until a server process is available (i.e., not running
      any other computations scheduled with *RemoteCoroScheduler*),
      the computation is executed and its results are obtained. The
      value yielded is the result (i.e., exit status) of computation.

      When using this scheduler, *run* method of Computation should
      not also be used to create coroutines, unless they are not CPU
      intensive (e.g., they wait for messages, events etc.).

   execute_at(where, gen, *args, **kwargs)

      Note: This method must be used with *yield* as "result = yield
        proc_scheduler.execute(location, gen, ...)".

      Similar to *run_at* method of Computation, except the calling
      coroutine is blocked until the computation finishes and exit
      value of computation is returned. Unlike *execute* above, the
      computation is executed right away, even if remote server
      process is executing another computation.

   map_results(gen, iter)

      Note: This method must be used with *yield* as "results =
        yield proc_scheduler.map_results(gen, list)".

      Execute generator *gen* with arguments from given iterable. The
      return value is list of results that correspond to executing
      *gen* with arguments in iterable in the same order.

   finish(close=False)

      Note: This method must be used with *yield* as "yield
        proc_scheduler.finish()".

      Waits until all scheduled coroutines with this
      *RemoteCoroScheduler* to finish. If *close* is **True**, the
      computation is also closed.


Docker Container
================

discoronode islotates computation environment so that jobs from one
computation don't interfere with jobs from another computation, even
if a node is shared and jobs from different computations are running
simlutaneously. Usually, any files transferred and saved by jobs are
also removed when computation is closed (the exception is when
*dest_path* is given or if *cleanup* is **False**, when files may be
left behind). However, the jobs have access to server's file system so
they can be security risk. It is possible to avoid (some) issues by
creating special user with access only to specific path (e.g., with a
*chroot* environment).

If complete isolation of computation is needed, Docker containers can
be used. Each container runs a copy of small Linux distribution with
its own file system; the container has no access to host file system
(unless it is configured to). asyncoro now includes "Dockerfile" under
"data" directory where asyncoro module is installed, which can be
obtained with the program:

   import os, asyncoro
   print(os.path.join(os.path.dirname(asyncoro.__file__), 'data', 'Dockerfile'))

Note that *Docker* runs under Linux host only; with other operating
systems, a guest VM can be used to run Linux under which *Docker* can
be run. See Docker Machine and Docker Docs for more details.

To build an image with latest Ubuntu Linux and asyncoro, install
docker if not already installed, create a temporary directory, say,
"/tmp/asyncoro-docker", change to that directory and copy "Dockerfile"
from above to that directory. (The "Dockerfile" can be customized to
suit any additional tools or setup needed.) Then execute "docker build
-t asyncoro ." (note the *dot* at the end). Full list of instructions
for building image for Python 2.7 (for Python 3 use appropriate path
to where "Dockerfile" is installed) are:

   mkdir /tmp/asyncoro-docker
   cd /tmp/asyncoro-docker
   cp /usr/local/lib/python2.7/dist-packages/asyncoro/data/Dockerfile .
   docker build -t asyncoro .

Once the image is built, a new container can be run with:

   docker run --net=host -it asyncoro

to start discoronode.py (which is the default command for the image
built above) with default options. "--net=host" runs container in host
network mode, i.e., container uses host network configuration. See
*--save_config* and *--config* options to discoronode to use same
options across many runs. If these or any other options are needed,
*Dockerfile* can be customized before building the image in the
instructions above.

If each computation should be started in a new container (so that
computations start in the same environment using the image built
above), then *serve* option can be used as:

   while :; do
       docker run --net=host -it asyncoro discoronode.py --serve 1
   done

This causes discoronode to quit when the client closes currently
running computation, which terminates container and because of "while"
loop, a new container is started from the image.

dispy project also has similar instructions for building docker
images. Since dispy depends on asyncoro, asyncoro modules, including
discoronode, are also installed when installing dispy. So it is
possible to build dispy and use discoronode (e.g., with "docker run
--net=host -it dispy discoronode.py") from dispy image instead of
dispynode (when discoronode is more appropriate than dispynode).


Cloud Computing
===============

*ext_ip_addr* of Node / Servers can be used to work with cloud
computing service, such as Amazon EC2. Other cloud computing services
can also be used similarly.

It may be necessary to setup the configuration to allow TCP ports used
by discoronode. Here we assume ports 51347 and above are used by
discoronode.  For example, with EC2 "Security Group" should be created
and assigned to the instance so inbound TCP ports 51347 (and/or other
ports used) are allowed.

With EC2 service, a node has a private IP address (called 'Private DNS
Address') that uses private network of the form 10.x.x.x and public
address (called 'Public DNS Address') that is of the form
ec2-x-x-x-x.x.amazonaws.com. After launching instance(s), login to
server(s), install asyncoro (e.g., with "pip install asyncoro") and
run discoronode on each node with:

   discoronode.py --ext_ip_addr ec2-x-x-x-x.y.amazonaws.com --tcp_ports 51347

(this address can't be used with *-i*/*--ip_addr* option, as the
network interface is configured with private IP address only). This
node can then be used by discoro client from outside EC2 network by
specifying ec2-x-x-x-x.x.amazonaws.com as a *peer* (see below). With
*ext_ip_addr*, discoronode acts similar to NAT - it announces
*ext_ip_addr* to other services instead of the configured *ip_addr* so
that external services send requests to *ext_ip_addr*.

If the EC2 node can connect back to client with the IP address and
port used by client, the node can be paired with:

   ...
   yield asyncoro.AsynCoro().peer(asyncoro.Location('ec2-x-x-x-x.y.amazonaws.com', 51347))
   if (yield computation.schedule()):
       raise Exception('Schedule failed')
   ...

By default, asyncoro uses random TCP port. Within a local network or
if client can be reached at any port, this works fine. If the client
is behind a router, the router's firewall can be configured to forward
a specific port, say, 4567 (or, 51347 at client as well; here, to
avoid confusion a different port is used), to client's IP address, and
asyncoro can be configured in the client to use tcp port 4567 with:

   asyncoro.AsynCoro(tcp_port=4567)

before any coroutines or channels are created (creating a coroutine or
channel automatically starts asyncoro with default parameters, which
uses random TCP port).

If client is behind a router and its firewall can't be setup to
forward port 4567, then *ssh* can be used to forward the port. To use
this, first login to EC2 node with:

   ssh -i ec2-key.pem 4567:127.0.0.1:4567 userid@ec2-x-x-x-x.y.amazonaws.com

Then start discoronode as mentioned above, and start asyncoro at
client with:

   asyncoro.AsynCoro(node='127.0.0.1', tcp_port=4567)

See "discoro_ssh_ec2.py" for an example where *ssh* port forwarding is
used for cloud computing with Amazon EC2.

In case of problems, enable debugging on the nodes (with "-d" option)
and client (with "asyncoro.logger.setLevel(logging.DEBUG)" statement,
as done in example above). If that still doesn't work, check that the
node is reachable with "telnet ec2-x-x-x-x.y.amazonaws.com 51347" from
client (after starting discoronode); the output should contain
*Connected* message.
