
Distributed Programming (disasyncoro)
*************************************

As described in *Asynchronous Concurrenct Programming (asyncoro)*,
when a coroutine is created an AsynCoro (singleton) instance is
created if one is not already created. AsynCoro starts a scheduler
that manages coroutines created in that program. AsynCoro in
"asyncoro" does not start network services for distributed
programming. However, when "disasyncoro" is used instead of
"asyncoro", network services are started so that coroutines can send
messages to / receive messages from coroutines and use other network
services in remote asyncoro. See also *Distributed Communicating
Processes (discoro)* which supports distributed and parallel
computing, so the clients can send computations (Python functions,
files, modules etc.) to remote servers for executing coroutines there.

Network services, such as message passing, work transparently so that
once references are obtained, they can be used just as with concurrent
programming.


Examples
========

See Distributed Programming in tutorial for examples. There are many
illustrative use cases in 'examples' directory under where asyncoro
module is installed.

Following is a brief description of the examples included relevant to
this section:

* "examples/tut_client.py" and "examples/tut_server.py" exchange
  messages with one-to-one message passing to exchange messages
  between two remote coroutines. See also
  "examples/remote_coro_client.py" and
  "examples/remote_coro_server.py" where the client and server use
  'secret' feature for exclusivity (to prevent other users from using
  them in a shared network).

* "examples/remote_channel_client.py" and
  "examples/remote_channel_server.py" use broadcasting channels to
  exchange messages among a sender and local/remote recipients.

* "examples/chat_chan_client.py" and "examples/chat_chan_server.py"
  use broadcasting over a channel to send messages to all participants
  to implement a simple chat (message) service.  To use this and other
  'chat' examples below, run the server, and multiple clients (either
  on same machine or other machines in local network). Messages typed
  in one client show up at other clients.

* "examples/chat_sock_client.py" and "examples/chat_sock_server.py"
  use asynchronous network programming, coroutines and message passing
  to implement a chat (message) server that is used by clients to
  broadcast messages.

* "examples/rci_monitor_client.py" and
  "examples/rci_monitor_server.py" illustrate another approach to
  execute remote coroutines: The server registers a function and
  client requests to execute coroutine with that function. Compare
  this to discoro_client.py where the client sends the computation
  itself to the remote server, so the client can execute arbitrary
  functions, whereas with RCI (Remote Coroutine Invocation) only
  registered functions can be executed by clients. The client uses
  monitoring to get the exit status of remote coroutines (which fail
  if the argument is an odd number).


Location
========

Some entities in asyncoro, such as coroutines, channels, are capable
of providing services for remote peers. These entities should register
themselves with names so that remote peers can use those names to get
references. These references can then be used to invoke methods
described in this section.

Such entities have location, which is network IP address and TCP port,
associated to them that indicates where their origin is so that if a
method is invoked on a reference at a remote peer, that peer knows
where to direct that method. While these entities are being located
(to get the reference) at remote peers, if only the name they are
registered with is used, then all the known asyncoro's are queried to
lookup with the given name. If the name is unique across all the
asyncoro's, the reference would be for that specific entity. However,
if the same name is used in more than one asyncoro, then the reference
obtained may not be for the desired entity. In such case, the location
of the asyncoro where the entity is registered can be given. The
locations can be obtained by looking up for names of asyncoro with
"scheduler.locate()". If the host and TCP port are already known, then
the locations can be created with:

class disasyncoro.Location(host, tcp_port)

   *host* is either host name or IP address and *tcp_port* is TCP port
   where asyncoro is running. This information is printed by the
   scheduler when it starts.


AsynCoro
========

As explained in *Asynchronous Concurrenct Programming (asyncoro)*,
AsynCoro scheduler is started automatically (when a coroutine is
started, for example) with default initialization. If the scheduler
should be initialized with different parameters, it should be
initialized before creating any coroutines, channels etc. with:

class disasyncoro.AsynCoro(node=None, udp_port=None, tcp_port=0, ext_ip_addr=None, name=None, discover_peers=True, peers=[], secret='', certfile=None, keyfile=None, notifier=None, dest_path_prefix=None, max_file_size=None)

   *node* must be either host name or IP address and *udp_port* must
   be port number where asyncoro starts network services. If
   *udp_port* is set to 0, then port 51350 is used.

   By default, any available port is used for *tcp_port* where
   asyncoros exchange messages. However, this port can be set to a
   specific number, which can be useful when dealing with
   firewall/port forwarding.

   *name*, if given, must be a string that is unique for each instance
   of asyncoro. If this option is not given, it is set to string
   representing location where network services are running. Using
   *name*, the location where a specific asyncoro instance is running
   can be found with asyncoro's "locate(name)()" method.  This
   location can then be used to find and use services, such as sending
   messages to coroutines and channels running at that location.

   If *discover_peers* is **True** (default), asyncoro scheduler
   broadcasts message to detect peers running on local network. If
   this option is **False**, that message is not broadcast, so this
   asyncoro will not detect peers and peers will not detect this
   asyncoro. Message passing with remote coroutines, channnels etc.
   can only be estabilished with them only on known peers.

   *secret* is a string that must be set to same for all asyncoros
   that communicate.  It is used to compute a hash that is used and
   checked in all communication, so that only asyncoros that use same
   secret can be peers. Note that the messages are not encrypted; if
   security of messages is needed, SSL options *certfile* and/or
   *keyfile* can be used.

   *certfile* and *keyfile* are as per SSL module. See SSL (Security /
   Encryption) for more information.

   *ext_ip_addr* must be IP address of gateway/firewall when asyncoro
   is used behind that gateway/firewall and other peers are outside
   that gateway/firewall. With this option, peers can communicate with
   asyncoro running on a local network and other peers are on remote
   network.

   *dest_path* is path where files from remote peers are saved, when
   "send_file()" method is called (see below). If *dest_path* is not
   set, asyncoro will initialize it to "tempfile.gettempdir()". If
   *send_file* sets *dir* to a relative path, the file will be saved
   at *dest_path + dir*.

   *max_file_size* is maximum number of bytes a transferred file can
   be. If it is set to None or 0, there is no limit; if it is set to a
   number, then any file bigger than that limit is is rejected when
   *send_file* is called.

Note that Python programs can't use multiple cores on an SMP node,
other than by using multiprocessing. Instead of using multiprocessing
in the case of SMP, multiple programs, each using asyncoro, can be
started on a machine with multiple cores (as many times as needed),
all on same udp_port, but different tcp_port per instance; see
'discoro' module in which multiple instances are started on a node
with multiple processors. If tcp_port is set explicitly, each instance
must use unique tcp_port (but same udp_port). if tcp_port is not set
explicitly, asyncoro chooses differetn tcp_port for each instance.
Within each such program, coroutines can use *Asynchronous Concurrenct
Programming (asyncoro)* and API below to exchange messages among
coroutines in all programs - although all programs are running on same
machine in the case of SMP, coroutines running in one program are
distributed/remote for coroutines running in another program.

When "disasyncoro" is used instead of "asyncoro", following additional
methods are available in AsynCoro:

scheduler.locate(name, timeout=None)

   Note: This method must be used with *yield*. While *locate*
     methods of coroutines, channels etc., are static methods (to be
     invoked with the class) this method should be invoked on the
     scheduler instance (which can be obtained, for example, with
     "scheduler = AsynCoro()").

   Finds and returns location where peer AsynCoro with *name* is
   running. If no AsynCoro with *name* is currently running, then it
   will wait until a peer with *name* starts. This location can be
   used in other methods. The *timeout* is the maximum number (or
   fractions) of seconds for finding location. If timeout seconds
   elapse (if no peer with *name* is running or reachable), "None" is
   returned.

scheduler.peer(loc, udp_port=51350, stream_send=False, broadcast=False)

   Note: This method must be used with *yield*.

   Contacts asyncoro at given *loc* to establish communication. If all
   asyncoros are running on a local network, they can find each other
   automatically through UDP broadcasting. However, asyncoros running
   at remote network(s) need to be explicitly added with *peer*
   method. It is not needed to run this method on both sides - if one
   asyncoro successfully runs this method, they will find each other.
   By default all asyncoro instances running on a single node (can)
   use same UDP port, so *peer* method can be called once per node,
   even when more than one asyncoro instance is running on that node.

   *loc* can be an instance of "Location", host name or IP address. If
   it
      is host name or IP address, it is treated as a "Location" with
      *port* set to 0.

   If *stream_send* is "False", asyncoro's scheduler may send one
   message per connection; i.e., the process that transfers messages
   opens a connection to the peer, sends a message and the connection
   is closed, unless there are more messages queued for the same peer
   (in which case, same connection is used to send the message). As
   creating sockets and making connections is expensive, this may be
   inefficient. If *stream_send* is "True", then the connection to
   given peer is not closed after sending all pending messages - the
   connection is kept open and used to send messages when they are
   queued. This can significantly improve performance in some cases.
   However, keeping connections alive takes system resources, so this
   option should be used when delays in message transfer are to be
   minimized at the cost of keeping sockes open to peers. If
   *stream_send* is "True" and *port* of *loc* is 0, then all all
   asyncoro's (if more than one asyncoro is running on that node)
   running on the node will be streamed; if *port* is not 0, then
   messages to only asyncoro on that TCP port will be streamed.

   If *broadcast* is "True", then the information about source
   (asyncoro with which *peer* method is invoked) is broadcast on the
   network of *loc*. If source and target are on same network, as
   mentioned above, they discover each other. If they are on different
   networks, *loc* can be used to communicate with each peer by
   calling *peer* method for each peer. However, if there are many
   peers on remote network, *broadcast* can be set to "True" with one
   peer on the remote network and all available peers on that network
   will be discovered.

   *stream_send* can be used to enable / disable streaming as and when
   necessary by calling this method (with "stream_send=True" to enable
   streaming or with "stream_send=False" to disable streaming), even
   if peer is already discovered..

scheduler.peers()

   Is a regular function (not a generator method to create coroutine)
   that returns currently known peers as list. Each peer is an
   instance of Location.

scheduler.peer_status(coro)

   Registers *coro* to receive notifications of peer status. Whenever
   asyncoro scheduler discovers a new peer or a peer disconnects, it
   sends a *PeerStatus* instance message to *coro*. *PeerStatus* has 3
   attributes: *location* which is an instance of Location, *name* is
   name of peer and *status* which is either **PeerStatus.Online** if
   peer is discovered and **PeerStatus.Offline** if peer disconnected.

   If this method is called with **None** (instead of a coroutine),
   peer status messages are not sent any more (to any previously
   registered coroutine).

scheduler.send_file(location, file, dir=None, overwrite=False, timeout=None)

   Note: This method must be used with *yield*.

   Sends (transfers) file *file* to peer at *location*. If *dir* is
   not "None", it must be a relative path, in which case, the file is
   saved under peer's *dest_path* + *dir*. This method returns 0 if
   the file was successfully saved at peer, 1 if the file exists at
   peer with same size, timestamp and permissions (so there is no need
   to transfer), *os.stat* structure if file exists at destination
   with different size/timestamp/permissions but *overwrite* is
   "False", and -1 in case of any error. If the return value is 0, the
   sender may delete the file with "del_file()" later.

scheduler.del_file(location, file, dir=None, timeout=None)

   Note: This method must be used with *yield*.

   Deletes file *file* from peer at *location*. *file* and *dir* must
   be same as used to send the file (with "send_file()"). This method
   returns 0 if the file was successfully deleted and -1 in case of
   any error.


Distributed Coroutines
======================

When "disasyncoro" module is used instead of "asyncoro" module,
additional methods described below are added to Coroutine class to
register them with names so they can be located by remote coroutines,
used for message passing to/from remote coroutines, monitoring remote
coroutines etc.

disasyncoro.register(name=None)

   A coroutine that can be used for remote services must first
   register itself with "register(name=None)". If *name* is "None",
   then name of the generator function of coroutine is used for
   registering. The name must be unique at that asyncoro; registering
   a different coroutine with the same name will fail. If same name is
   used for registering in different asyncoro's, then "locate()"
   method can be given the location of specific asyncoro to get a
   reference to coroutine with that name at that location.

Coro.locate(name, location=None, timeout=None)

   Note: This method must be used with *yield*.

   This static method finds and returns reference to coroutine
   registered with *name*. If *location* is given (e.g., obtained by
   asyncoro's *locate*), only the peer running at that location is
   queried for coroutine with that name; otherwise, all known peers
   are queried. If coroutine is successfully located, the return value
   is an instance of Coroutine.  A remote coroutine reference, "rcoro"
   can be used for message passing with "rcoro.send()" and
   "rcoro.deliver()" methods to it, and "monitor()" method can be used
   to receive exit status notifications when it is finished, or can be
   terminated with "rocro.terminate()".

   If a coroutine running at location *loc1* registers itself with
   "coro.register("server_coro")", then coroutines running at other
   peer(s) can obtain references to it with "rcoro = yield
   Coro.locate("server_coro", loc1)". If "server_coro" is unique among
   all asyncoro's, then the same reference can be obtained with "rcoro
   = yield Coro.locate("server_coro")". Following methods can then be
   used with the remote coroutine reference "rcoro":

   rcoro.send(msg)

      Sends message *msg* to the coroutine "rcoro" (running at
      "loc1"). That remote coroutine will receive the messages with
      "msg = yield coro.receive()". Messages are sent asynchronously -
      they are queued to be sent and a daemon coroutine sends them in
      the order submitted. Thus, the caller of "rcoro.send()" cannot
      determine if/when a message has been sent - in case of network
      failures, the message may be dropped. "rcoro.deliver()" may be
      used instead to guarantee message was sent or not. Alternately,
      the recipient can send acknowledgement back.

   rcoro.deliver(msg, timeout=None)

      Note: This method must be used with *yield* as "n = yield
        rcoro.deliver(msg)".

      Sends message *msg* to the recipient, waits for acknowledgement
      from remote asyncoro that the message has been put in
      recipient's message queue and returns that status. If the status
      is 1, the message has been successfully delivered (when
      recipient calls "receive()", it gets the queued messages in the
      order they are received). Otherwise (e.g., *timeout* occurs
      before delivery, remote corutine reference "rcoro" is no longer
      valid), the status returned is 0.

   coro.monitor(rcoro)

      Note: This method must be used with *yield* as "status = yield
        coro.monitor(rcoro)".

      Sets *coro* (the coroutine that invoked this method) as a
      monitor of remote coroutine *rcoro*. If successful, the value
      yielded is 0. Then when *rcoro* terminates (either normally or
      abnormally), the exit status is sent to the monitor *coro*.

   coro.notify(rcoro)

      Sets *rcoro* as a monitor of coroutine *coro* (the coroutine
      that invoked this method). If successful, the value yielded is
      0. Then when *coro* terminates (either normally or abnormally),
      the exit status is sent to the monitor *rcoro*.

   rcoro.terminate()

      Terminates the coroutine.


Distributed Channels
====================

Similar to coroutines, channels can be registered and such channels
can be used in remote coroutines for message passing. A channel, say,
*chan*, can be registed with

chan.register(name=None)

   so that coroutines at peers can obtain a reference to it.  If
   *name* is "None", name of the channel created with is used for
   registering. The *name* must be unique, at least at the asyncoro
   where it is registered. If there are duplicate names at different
   asyncoros, finding a channel with "Channel.locate()" should be used
   with specific location (peer).

chan.unregister()

   A registered channel can be unregistered with
   "chan.unregister(name=None)"; the *name* must be same as the one
   used to register.

Channel.locate(name, location=None, timeout=None)

   Note: This method must be used with *yield* as "chan = yield
     Channel.locate("achannel")".

   Finds and returns reference to channel registered with *name*. If
   *location* is given (e.g., obtained by asyncoro's *locate*), only
   the peer running at that location is queried; otherwise, all peers
   are queried. If channel is successfully located, the return value
   is an instance of Channel.  A remote channel reference can be used
   for message passing with "send()" and "deliver()" methods to it.

   If a channel, *chan*, is created and registered at location *loc1*
   with "chan.register("server_channel")", then coroutines running at
   other peer(s) can obtain references to it with "rchan = yield
   Channel.locate("server_channel", loc1)".  If "server_channel" is
   unique channel among all the peers, then same can be obtained
   without giving *loc1*: "rchan = yield
   Channel.locate("server_channel")". Following methods can then be
   used with the remote channel reference *rchan*:

   rchan.send(msg)

      Sends message *msg* to the channel ("server_channel" at "loc1"),
      so that current subscribers to that channel will receive that
      message. "send()" sends messages asynchronously - messages are
      queued to be sent and a daemon coroutine sends them in the order
      submitted. Thus, the caller of "send()" cannot determine if/when
      a message has been sent - in case of network failures, the
      message may fail to reach the recipient. "deliver()" may be used
      instead to guarantee message was sent or not.

   rchan.deliver(msg, timeout=None, n=0)

      Note: This method must be used with *yield* as "n = yield
        rchan.deliver(msg)".

      Similar to "send()", except that this method must be used with
      *yield* and it returns status of delivering the message to the
      channel. The method will wait until there are at least *n*
      subscribers to the channel until *timeout* seconds. Then it
      delivers the message to each of the subscribers. The value
      returned is the number of total recipients of the message. This
      can be less than *n* in case of network failures, timeouts, or
      some subscribers leaving while message is being delivered, or
      more than *n* if there are more than *n* subscribers. If *n* is
      0, then the message will be delivered to all current
      subscribers. In any case, *timeout* is maximum amount of time in
      seconds (or fraction of second) for delivering the message.

   rchan.subscribe(subscriber, timeout=None)

      Note: This method must be used with *yield* as "n = yield
        rchan.subscribe(coro)".

      Subscribes *subscriber* to *rchan* so that messages sent to
      *rchan* will be sent to *subscriber*. If *subscriber* is a
      coroutine, then messages can be received with "msg = yield
      coro.receive()". Note that messages sent directly to the
      coroutine from other coroutine will also be received with the
      same method, so if it is necessary to distinguish if a message
      is received directly to or through a channel, then messages can
      be encapsulated with appropriate information.

   rchan.unsubscribe(subscriber, timeout=None)

      Note: This method must be used with *yield* as "yield
        rchan.unsubscribe(coro)".

      Removes *subscriber* from *rchan*'s subscribers so it will no
      longer receive messages sent to *rchan*.


RCI (Remote Coroutine Invocation)
=================================

As explained above, coroutines running at a location can be used to
send messages and monitoring. This, however, cannot be used to start
coroutines at remote peers. RCI, similar to Remote Procedure Call
(RPC), allows peers to execute coroutines with registered generator
methods.  With RCI, a generator method should be used first to create
and register. Coroutines running in remote peers can then obtain a
reference to it with "locate()" method (similar to distributed
coroutines and channels). The reference can then be used to start a
coroutine with arguments.

class disasyncoro.RCI(method, name=None)

   Creates an RCI instance with given method. *method* should be a
   generator function, similar to coroutine methods; in fact, the
   given method is used to create coroutine when remote coroutine
   calls this method. If *name* is not given, generator method's name
   is used for registering it. As in the case of coroutines and
   channels, the name must be unique, at least at the asyncoro where
   it is registered.

   rci.register()

      Registers *rci* with the scheduler so it can be located (i.e., a
      reference to it can be obtained) by remote coroutines.

      Assuming an RCI is created with "rci1 = RCI(method1)", it can be
      registered with "rci1.register()" so remote coroutines can
      locate it (see below) with the name "method1".

   locate(name, location=None, timeout=None)

      Note: This method must be used with *yield* as "rci = yield
        RCI.locate("rci_1")".

      Get an instance to RCI with the name *name*. Similar to
      distributed coroutines, *location* can be used to query specific
      asyncoro and *timeout* is number (or fraction) of seconds to
      locate.

      The reference returned from "locate()" can be used to create
      (remote) coroutines (where RCI has been registered). If, as
      above, "method1" is registered at a remote peer at location
      *loc1*, a reference to it can be obtained with "drci = yield
      RCI.locate("mehod1", loc1)". If the name "method1" is unique
      among RCIs at all peer asyncoros, the same can be obtained with
      "drci = yield RCI.locate("mehod1")".

      If "locate()" is successful, the reference obtained can be used
      to start remote coroutines (where RCI has been registered). With
      the above example, *drci1* can be used as many times as
      necessary to create coroutines at that remote peer with "rcoro =
      yield drci1(*args, **kwargs)". The return value *rcoro* is
      reference to remote coroutine. This can be used for message
      passing, monitoring etc.


SSL (Security / Encryption)
===========================

If peers are on public / remote networks, SSL (Secure Sockets Layer)
can be used to encrypt all communication, including data, so that the
connection is private - only the sender and recipient see the data
exchanged and other observers see encrypted data. disasyncoro provides
a simple mechnism where digital certificates / symmetric keys are used
to encrypt data on one side and decrypt data on the other side. Below
are set of commands to generate the certificates using openssl tool
(this is just an example - there are other approaches / methods):

   openssl genrsa -out disasyncoro.key 2048
   openssl req -new -key disasyncoro.key -out disasyncoro.csr
   openssl req -x509 -days 365 -key disasyncoro.key -in disasyncoro.csr -out disasyncoro.cert

These commands generate certificates *disasyncoro.key* and
*disasyncoro.cert* files that should be used in *keyfile* and
*certfile* parameters where they are referred to. It may be convenient
to put copy *disasyncoro.key* and *disasyncoro.cert* in to a single
file, say, *disasyncoro.keycert* and use that in *certfile* parameter
and ignore *keyfile* parameter.

Once the files are generated (or obtained by other means), they should
be copied to each of the peers (over a secure channel, for example,
*ssh*) and the peers created with "asyncoro.AsynCoro(...,
certfile=disasyncoro.cert, keyfile=disasyncoro.key)" (or if both key
and certificate stored in *dispy.keycert* as mentioned above, with
"asyncoro.AsynCoro(..., certfile=disasyncoro.keycert").
