
Distributed Programming
***********************

As described in *Asynchronous Concurrenct Programming* when a
coroutine is created, an AsynCoro (singleton) instance is created if
one is not already created. AsynCoro starts a scheduler that manages
coroutines created in that program. AsynCoro in "asyncoro" does not
start network services for distributed programming. However, when
"disasyncoro" is used instead of "asyncoro", network services are
started so that coroutines can send messages to / receive messages
from coroutines and use other network services in remote asyncoro. See
also *Distributed Communicating Processes* which supports distributed
and parallel computing, so the clients can send computations (Python
functions, files, modules etc.) to remote servers for executing
coroutines there.

Network services, such as message passing, work transparently so that
once references are obtained, they can be used just as with concurrent
programming.


Location
========

Some entities in asyncoro, such as coroutines, channels, are capable
of providing services for remote peers. These entities should register
themselves with names so that remote peers can use those names to get
references. These references can then be used to invoke methods
described in this section.

Such entities have location, which is network IP address and TCP port,
associated to them that indicates where their origin is so that if a
method is invoked on a reference at a remote peer, that peer knows
where to direct that method. While these entities are being located
(to get the reference) at remote peers, if only the name they are
registered with is used, then all the known asyncoro's are queried to
lookup with the given name. If the name is unique across all the
asyncoro's, the reference would be for that specific entity. However,
if the same name is used in more than one asyncoro, then the reference
obtained may not be for the desired entity. In such case, the location
of the asyncoro where the entity is registered can be given. The
locations can be obtained by looking up for names of asyncoro with
"scheduler.locate()". If the host and TCP port are already known, then
the locations can be created with:

class class disasyncoro.Location(host, tcp_port)

   *host* is either host name or IP address and *tcp_port* is TCP port
   where asyncoro is running. This information is printed by the
   scheduler when it starts.


Distributed AsynCoro
====================

As explained in *Asynchronous Concurrenct Programming*, AsynCoro
scheduler is started automatically (when a coroutine is started, for
example) with default initialization. If the scheduler should be
initialized with different parameters, it should be initialized before
creating any coroutines, channels etc. with:

class class disasyncoro.AsynCoro(node=None, udp_port=None, tcp_port=0, ext_ip_addr=None, name=None, secret='', certfile=None, keyfile=None, notifier=None, dest_path_prefix=None, max_file_size=None)

   *node* must be either host name or IP address and *udp_port* must
   be port number where asyncoro starts network services. If
   *udp_port* is set to 0, then port 51350 is used.

   By default, any available port is used for *tcp_port* where
   asyncoros exchange messages. However, this port can be set to a
   specific number, which can be useful when dealing with
   firewall/port forwarding.

   *name*, if given, must be a string that is unique for each instance
   of asyncoro. If this option is not given, it is set to string
   representing location where network services are running. Using
   *name*, the location where a specific asyncoro instance is running
   can be found with asyncoro's "locate(name)()" method.  This
   location can then be used to find and use services, such as sending
   messages to coroutines and channels running at that location.

   *secret* is a string that must be set to same for all asyncoros
   that communicate.  It is used to compute a hash that is used and
   checked in all communication, so that only asyncoros that use same
   secret can be peers. Note that the messages are not encrypted; if
   security of messages is needed, SSL options *certfile* and/or
   *keyfile* can be used.

   *certfile* and *keyfile* are as per SSL module.

   *ext_ip_addr* must be IP address of gateway/firewall when asyncoro
   is used behind that gateway/firewall and other peers are outside
   that gateway/firewall. With this option, peers can communicate with
   asyncoro running on a local network and other peers are on remote
   network.

   *dest_path* is path where files from remote peers are saved, when
   "send_file()" method is called (see below). If *dest_path* is not
   set, asyncoro will initialize it to "tempfile.gettempdir()". If
   *send_file* sets *dir* to a relative path, the file will be saved
   at *dest_path + dir*.

   *max_file_size* is maximum number of bytes a transferred file can
   be. If it is set to None or 0, there is no limit; if it is set to a
   number, then any file bigger than that limit is is rejected when
   *send_file* is called.

Note that Python programs can't use multiple cores on an SMP node,
other than by using multiprocessing. Instead of using multiprocessing
in the case of SMP, multiple programs, each using asyncoro, can be
started on a machine with multiple cores (as many times as needed),
all on same udp_port, but different tcp_port per instance; see
'discoro' module in which multiple instances are started on a node
with multiple processors. If tcp_port is set explicitly, each instance
must use unique tcp_port (but same udp_port). if tcp_port is not set
explicitly, asyncoro chooses differetn tcp_port for each instance.
Within each such program, coroutines can use *Asynchronous Concurrenct
Programming* and API below to exchange messages among coroutines in
all programs - although all programs are running on same machine in
the case of SMP, coroutines running in one program are
distributed/remote for coroutines running in another program.

When "disasyncoro" is used instead of "asyncoro", following additional
methods are available in AsynCoro:

scheduler.locate(name, timeout=None)

   Note: This method must be used with *yield*. While *locate*
     methods of coroutines, channels etc., are static methods (to be
     invoked with the class) this method should be invoked on the
     scheduler instance (which can be obtained, for example, with
     "scheduler = AsynCoro()").

   Finds and returns location where peer AsynCoro with *name* is
   running. If no AsynCoro with *name* is currently running, then it
   will wait until a peer with *name* starts. This location can be
   used in other methods. The *timeout* is the maximum number (or
   fractions) of seconds for finding location. If timeout seconds
   elapse (if no peer with *name* is running or reachable), "None" is
   returned.

scheduler.peer(node, udp_port=51350, tcp_port=0, stream_send=False)

   Note: This method must be used with *yield*.

   Contacts asyncoro at given *node* and *udp_port* to establish
   communication. If all asyncoros are running on a local network,
   they can find each other automatically through UDP broadcasting.
   However, asyncoros running at remote network(s) need to be
   explicitly added with *peer* method. It is not needed to run this
   method on both sides - if one asyncoro successfully runs this
   method, they will find each other. By default all asyncoro
   instances running on a single node (can) use same UDP port, so
   *peer* method can be called once per node, even when more than one
   asyncoro instance is running a node.

   If *tcp_port* is not 0, then asyncoro uses given TCP port to
   discover peer, so if peer is on remote network, UDP port forwarding
   is not necessary for them to communicate. This is can be used, for
   example, with SSH port forwarding so asyncoro can be used with
   remote peers without having to change any firewall settings at all.
   As the default value of *tcp_port* choosen by asyncoro when it
   stars is random, this option can be used by giving specific value
   for the peer when it starts and/or by noting the TCP port shown by
   the peer when it starts.

   If *stream_send* is "False", asyncoro's scheduler may send messages
   one per connection; i.e., the process that transfers messages opens
   a connection to the peer, sends a message and the connection is
   closed, unless the next message queued is also for the same peer
   (in which case, same connection is used to send the message). As
   creating sockets and making connections is expensive, this may be
   inefficient. If *stream_send* is "True", then the connection to
   given peer is not closed after sending pending messages - the same
   connection is used again to send messages later. This can
   significantly improve performance in some cases. However, keeping
   connections alive comes at a cost, too, so this option should be
   used when delays in message transfer are to be minimized at the
   cost of keeping sockes open to peers. If *stream_send* is "True"
   and *tcp_port* is 0, then messages to all asyncoro's (if more than
   one asyncoro is running on that node) running on the peer will be
   streamed; if *tcp_port* is not 0, then messages to only asyncoro on
   that TCP port will be streamed.

scheduler.peer_status(callback)

   Registers *callback* as function (not generator) to call when a
   peer is discovered or when a peer terminates. The function is
   called with 3 arguments: name of the peer, location of the peer
   (instance of "Location") and status; status is 1 if peer is
   discovered or 0 if it is terminated. If at the time of registration
   there are already known peers, then callback is called for each of
   the those peers (with status=1).

scheduler.send_file(location, file, dir=None, overwrite=False, timeout=None)

   Note: This method must be used with *yield*.

   Sends (transfers) file *file* to peer at *location*. If *dir* is
   not "None", it must be a relative path, in which case, the file is
   saved under peer's *dest_path* + *dir*. This method returns 0 if
   the file was successfully saved at peer, 1 if the file exists at
   peer with same size, timestamp and permissions (so there is no need
   to transfer), *os.stat* structure if file exists at destination
   with different size/timestamp/permissions but *overwrite* is
   "False", and -1 in case of any error. If the return value is 0, the
   sender may delete the file with "del_file()" later.

scheduler.del_file(location, file, dir=None, timeout=None)

   Note: This method must be used with *yield*.

   Deletes file *file* from peer at *location*. *file* and *dir* must
   be same as used to send the file (with "send_file()"). This method
   returns 0 if the file was successfully deleted and -1 in case of
   any error.


Distributed Coroutines
======================

When "disasyncoro" module is used instead of "asyncoro" module,
additional methods described below are added to "Coro" class to
register them with names so they can be located by remote coroutines,
used for message passing to/from remote coroutines, monitoring remote
coroutines etc.

disasyncoro.register(name=None)

   A coroutine that can be used for remote services must first
   register itself with "register(name=None)". If *name* is "None",
   then name of the generator function of coroutine is used for
   registering. The name must be unique at that asyncoro; registering
   a different coroutine with the same name will fail. If same name is
   used for registering in different asyncoro's, then "locate()"
   method can be given the location of specific asyncoro to get a
   reference to coroutine with that name at that location.

Coro.locate(name, location=None, timeout=None)

   Note: This method must be used with *yield*.

   This static method finds and returns reference to coroutine
   registered with *name*. If *location* is given (e.g., obtained by
   asyncoro's *locate*), only the peer running at that location is
   queried for coroutine with that name; otherwise, all known peers
   are queried. If coroutine is successfully located, the return value
   is an instance of "Coro".  A remote coroutine reference, "rcoro"
   can be used for message passing with "rcoro.send()" and
   "rcoro.deliver()" methods to it, and "monitor()" method can be used
   to receive exit status notifications when it is finished, or can be
   terminated with "rocro.terminate()".

   If a coroutine running at location *loc1* registers itself with
   "coro.register("server_coro")", then coroutines running at other
   peer(s) can obtain references to it with "rcoro = yield
   Coro.locate("server_coro", loc1)". If "server_coro" is unique among
   all asyncoro's, then the same reference can be obtained with "rcoro
   = yield Coro.locate("server_coro")".  Following methods can then be
   used with the remote coroutine reference "rcoro":

   rcoro.send(msg)

      Sends message *msg* to the coroutine "rcoro" (running at
      "loc1"). That remote coroutine will receive the messages with
      "msg = yield coro.receive()". Messages are sent asynchronously -
      they are queued to be sent and a daemon coroutine sends them in
      the order submitted. Thus, the caller of "rcoro.send()" cannot
      determine if/when a message has been sent - in case of network
      failures, the message may be dropped. "rcoro.deliver()" may be
      used instead to guarantee message was sent or not. Alternately,
      the recipient can send acknowledgement back.

   rcoro.deliver(msg, timeout=None)

      Note: This method must be used with *yield*.

      Sends message *msg* to the recipient, waits for acknowledgement
      from remote asyncoro that the message has been put in
      recipient's message queue and returns that status. If the status
      is 1, the message has been successfully delivered (when
      recipient calls "receive()", it gets the queued messages in the
      order they are received). Otherwise (e.g., *timeout* occurs
      before delivery, remote corutine reference "rcoro" is no longer
      valid), the status returned is 0.

   coro.monitor(rcoro)

      Note: This method must be used with *yield*.

      Sets *coro* (the coroutine that invoked this method) as a
      monitor of remote coroutine *rcoro*. Then when *rcoro*
      terminates (either normally or abnormally), the exit status is
      sent to the monitor *coro*.

   rcoro.terminate()

      Terminates the coroutine.


Distributed Channels
====================

Similar to coroutines, channels can be registered and such channels
can be used in remote coroutines for message passing. A channel, say,
*chan*, can be registed with

chan.register(name=None)

   so that coroutines at peers can obtain a reference to it.  If
   *name* is "None", name of the channel created with is used for
   registering. The *name* must be unique, at least at the asyncoro
   where it is registered. If there are duplicate names at different
   asyncoros, finding a channel with "Channel.locate()" should be used
   with specific location (peer).

chan.unregister()

   A registered channel can be unregistered with
   "chan.unregister(name=None)"; the *name* must be same as the one
   used to register.

Channel.locate(name, location=None, timeout=None)

   Note: This method must be used with *yield*.

   Finds and returns reference to channel registered with *name*. If
   *location* is given (e.g., obtained by asyncoro's *locate*), only
   the peer running at that location is queried; otherwise, all peers
   are queried. If channel is successfully located, the return value
   is an instance of Channel. A remote channel reference can be used
   for message passing with "send()" and "deliver()" methods to it.

   If a channel, *chan*, is created and registered at location *loc1*
   with "chan.register("server_channel")", then coroutines running at
   other peer(s) can obtain references to it with "rchan = yield
   Channel.locate("server_channel", loc1)".  If "server_channel" is
   unique channel among all the peers, then same can be obtained
   without giving *loc1*: "rchan = yield
   Channel.locate("server_channel")". Following methods can then be
   used with the remote channel reference *rchan*:

   rchan.send(msg)

      Sends message *msg* to the channel ("server_channel" at "loc1"),
      so that current subscribers to that channel will receive that
      message. "send()" sends messages asynchronously - messages are
      queued to be sent and a daemon coroutine sends them in the order
      submitted. Thus, the caller of "send()" cannot determine if/when
      a message has been sent - in case of network failures, the
      message may fail to reach the recipient. "deliver()" may be used
      instead to guarantee message was sent or not.

   rchan.deliver(msg, timeout=None, n=0)

      Note: This method must be used with *yield*.

      Similar to "send()", except that this method must be used with
      *yield* and it returns status of delivering the message to the
      channel. The method will wait until there are at least *n*
      subscribers to the channel until *timeout* seconds. Then it
      delivers the message to each of the subscribers. The value
      returned is the number of total recipients of the message. This
      can be less than *n* in case of network failures, timeouts, or
      some subscribers leaving while message is being delivered, or
      more than *n* if there are more than *n* subscribers. If *n* is
      0, then the message will be delivered to all current
      subscribers. In any case, *timeout* is maximum amount of time in
      seconds (or fraction of second) for delivering the message.

   rchan.subscribe(subscriber, timeout=None)

      Note: This method must be used with *yield*.

      Subscribes *subscriber* to *rchan* so that messages sent to
      *rchan* will be sent to *subscriber*. If *subscriber* is a
      coroutine, then messages can be received with "msg = yield
      coro.receive()". Note that messages sent directly to the
      coroutine from other coroutine will also be received with the
      same method, so if it is necessary to distinguish if a message
      is received directly to or through a channel, then messages can
      be encapsulated with appropriate information.

   rchan.unsubscribe(subscriber, timeout=None)

      Note: This method must be used with *yield*.

      Removes *subscriber* from *rchan*'s subscribers so it will no
      longer receive messages sent to *rchan*.


Remote Coroutine (Callable) Invocation (RCI)
============================================

As explained above, coroutines running at a location can be used to
send messages and monitoring. This, however, cannot be used to start
coroutines at remote peers. RCI, similar to Remote Procedure Call
(RPC), allows peers to execute coroutines with registered generator
methods.  With RCI, a generator method should be used first to create
and register. Coroutines running in remote peers can then obtain a
reference to it with "locate()" method (similar to distributed
coroutines and channels). The reference can then be used to start a
coroutine with arguments.

class class disasyncoro.RCI(method, name=None)

   Creates an RCI instance with given method. *method* should be a
   generator function, similar to coroutine methods; in fact, the
   given method is used to create coroutine when remote coroutine
   calls this method. If *name* is not given, generator method's name
   is used for registering it. As in the case of coroutines and
   channels, the name must be unique, at least at the asyncoro where
   it is registered.

   rci.register()

      Registers *rci* with the scheduler so it can be located (i.e., a
      reference to it can be obtained) by remote coroutines.

      Assuming an RCI is created with "rci1 = RCI(method1)", it can be
      registered with "rci1.register()" so remote coroutines can
      locate it (see below) with the name "method1".

   locate(name, location=None, timeout=None)

      Note: This method must be used with *yield*.

      Get an instance to RCI with the name *name*. Similar to
      distributed coroutines, *location* can be used to query specific
      asyncoro and *timeout* is number (or fraction) of seconds to
      locate.

      The reference returned from "locate()" can be used to create
      (remote) coroutines (where RCI has been registered). If, as
      above, "method1" is registered at a remote peer at location
      *loc1*, a reference to it can be obtained with "drci = yield
      RCI.locate("mehod1", loc1)". If the name "method1" is unique
      among RCIs at all peer asyncoros, the same can be obtained with
      "drci = yield RCI.locate("mehod1")".

      If "locate()" is successful, the reference obtained can be used
      to start remote coroutines (where RCI has been registered). With
      the above example, *drci1* can be used as many times as
      necessary to create coroutines at that remote peer with "rcoro =
      yield drci1(*args, **kwargs)". The return value *rcoro* is
      reference to remote coroutine. This can be used for message
      passing, monitoring etc.


Examples
--------

See Distributed Programming in tutorial for examples.
