
Asynchronous Concurrenct Programming
************************************

asyncoro provides API for asynchronous and concurrent programming with
coroutines using Python's generator functions. Coroutines are like
light weight threads - creating and running coroutines is very
efficient. Moreover, unlike in the case of thread programming, a
coroutine continues to run until it voluntarily gives up control (when
*yield* is used), so locking is not needed to protect critical
sections. However, asyncoro provides all the locking primitives (Lock,
RLock, Event, Condition and Semaphore) similar to threading module.

Programs developed with asyncoro have same logic and structure as
programs with threads, except for a few syntactic changes. Although
the API below has many methods, most of them are for additional
features of asyncoro (such as message passing, hot swapping,
monitoring etc.), and not needed for simple programs that are similar
to thread based programs. The differences compared to threaded
programming are:

   * Instead of creating threads, coroutines should be created with
     "Coro". The process function (first argument to "Coro") should be
     a generator function (i.e., function with *yield* statements).

   * "coro.sleep()" should be used in place of "time.sleep()",

   * asyncoro's locking primitives ("asyncoro.Event",
     "asyncoro.Condition", etc.) should be used in place of Python's
     threading counterparts,

   * Sockets, pipes etc, should be converted to asynchronous
     versions with "AsyncSocket", "AsncPipe" etc.,

   * I/O operations, such as AsyncSocket's "send()", "receive()",
     "accept()", blocking operations, such as "sleep()", Event's
     "wait()", etc., are implemented as generator methods; these
     should be used with *yield* so that the operation is initiated,
     the invoking coroutine is suspended and when the operation is
     completed, the coroutine is resumed with the result of the
     operation.

Coroutines in asyncoro are essentially generator functions that
suspend execution when *yield* is used and are resumed by asyncoro's
scheduler (AsynCoro) after the asynchronous operation is complete.
Usually *yield* is used with an asynchronous call, such as socket's
"connect()", "send()" or pipe's "read()", "communicate()", waiting for
a message etc. With such statements, the asynchronous call is
initiated and control goes to scheduler which schedules another
coroutine, if one is ready to execute. When the asynchronous operation
is complete, the coroutine that called the opreation becomes ready to
execute. Thus, the coroutines in asyncoro are not strictly cooperative
tasks that pass control to each other, but each *yield* statement
transfers control to asyncoro's scheduler, which manages them.
However, asyncoro supports message passing, suspend/resume calls etc.,
so that coroutines can cooperate in a way that is easier to program
and understand.

Unlike with threads, there is no forced preemption with coroutines -
at any time at most one coroutine is executing and it continues to
execute until *yield* is called. Thus, there is no need for locking
critical sections with asyncoro.

asyncoro framework consists of "AsynCoro" scheduler, "Coro" to create
coroutines from generator functions, "Channel" to broadcast messages,
"AsyncSocket" to convert regular synchronous sockets to asynchronous
(non-blocking) sockets, "AsyncPipe" for pipes, "AsyncFile" for files,
"Computation" and "discoro_server()" for distributed / parallel
computing, "Lock" and "RLock" for locking (although locking is not
required with asyncoro), "Condition", "Event", "Semaphore" primitives
very similar to thread primitives (except for a few syntactic changes
noted above).


AsynCoro scheduler
==================

"AsynCoro" is a (singleton) scheduler that runs coroutines similar to
the way operating system's scheduler runs multiple processes. It is
initialized automatically (for example, when a coroutine is created),
so for most purposes the scheduler is transparent. The scheduler in
"asyncoro" manages coroutines, message passing, I/O events, timeouts,
wait/resume events etc., in a single concurrent program; it doesn't
provide distributed programming for message passing over network.
"disasyncoro" extends "AsynCoro" in "asyncoro" with features
supporting distributed programming, remote execution of coroutines
etc.  If the scheduler instance is needed, it can be obtained with
either "AsynCoro()" or "AsynCoro.instance()".

Unlike in other asychronous frameworks, in asyncoro there is no
explicit event loop - the I/O events are processed by the scheduler
and methods in "AsyncSocket", "AsyncPipe" etc. For example, "recv()"
method (which must be used with *yield*) sets up an internal method to
execute when the socket has data to read and suspends the caller
coroutine. The scheduler can execute any other coroutines that are
ready while the I/O operation is pending. When the data has been read,
the suspended coroutine is resumed with the data read so that
"AsyncSocket"'s "recv()" works just as "socket.recv()", except for
using *yield*. Thus, programming with asyncoro is very similar to that
with threads, except for using *yield* with certain methods.

class class asyncoro.AsynCoro

   Creates and returns singleton scheduler. If a scheduler instance
   has already been created (such as when a coroutine was created), a
   new instance won't be created. "disasyncoro" extends "AsynCoro" for
   distributed programming and the constructor there has various
   options to customize.

   The scheduler following methods:

   instance()

      This static method returns instance of "AsynCoro" scheduler; use
      it as "scheduler = AsynCoro.instance()". If the instance has not
      been started (yet), it creates one and returns it.

   cur_coro()

      This static method returns coroutine (instance of "Coro") being
      executed; use it as "coro = AsynCoro.cur_coro()". As mentioned
      below, if coroutine's generator function has "coro=None"
      parameter, "Coro" constructor initializes it to the coroutine
      instance (which is a way to document that method is used for
      creating coroutines).

   join()

         Note: This method must be called from (main) thread only -
           calling from a coroutine will deadlock entire coroutine
           framework.

      Waits for all scheduled non-daemon coroutines to finish. After
      join returns, more coroutines can be created (which are then
      added to scheduler).

   terminate()

         Note: This method must be called from (main) thread only -
           calling from a coroutine will deadlock entire coroutine
           framework.

      Terminates all scheduled coroutines and then the scheduler
      itself. If necessary, a new scheduler instance may be created
      with "AsynCoro()" or "AsynCoro.instance()".

The scheduler runs in a separate thread from user program. The
scheduler terminates when all non-daemon coroutines are terminated,
similar to Python's threading module.


Coroutines
==========

asyncoro's "Coro" class creates coroutines (light weight processes).
Coroutines are similar to threads in regular Python programs, except
for a few differences as noted above.

class class asyncoro.Coro(target[, arg1, arg2, ...])

   Creates a coroutine, where *target* is a generator function (a
   function with *yield* statements), *arg1*, *arg2* etc. are
   arguments or keyword arguments to *target*. If *target* generator
   function has "coro=None" keyword argument, Coro constructor
   replaces "None" with the instance of Coro created, so coroutine can
   use this to invoke methods in Coro class (see below). Alternately,
   the instance can be obtained with the static method "coro =
   AsynCoro.cur_coro()".

   Consider the generator function (where *sock* is asynchronous
   socket and all statements are asynchronous, so all are used with
   *yield*):

      def get_reply(sock, msg, coro=None):
          yield sock.sendall(msg)
          yield coro.sleep(1)
          reply = yield sock.recv(1024)

   A coroutine for processing above function can be created with, for
   example, "Coro(get_reply, conn, "ping")". **Coro** constructor
   creates coroutine with the method *get_reply* with parameters
   "sock=conn", "msg="ping"" and *coro* set to the just created
   coroutine instance. (If *coro=None* argument is not used, the
   coroutine instance can be obtained with "coro =
   AsynCoro.cur_coro()".) The coroutine is then added to AsynCoro
   scheduler so it executes concurrently with other coroutines - there
   is no need to start it explicitly, as done with threads. Note that
   generator functions cannot use "return" statement. With asyncoro a
   return statement such as "return v" can be replaced with "raise
   StopIteration(v)". If a generator/coroutine does not use
   "StopIteration", then the last value yielded in the generator
   becomes the return value. Thus, in the example above *get_reply*
   does not use "StopIteration", so buffer received (in the last
   *yield*) is equivalent to return value of *get_reply*.

   Blocking operations, such as "socket.recv()", "socket.connect()",
   are implemented as generator functions in asynchronous
   implementation of socket "AsyncSocket". These functions simply
   initiate the operation; *yield* should be used with them (as in the
   example above) so scheduler can run other eligible coroutines while
   the operation is pending. Calling these methods without *yield*
   simply returns generator function itself, instead of result of the
   method call. So care must be taken to use *yield* when calling
   generator functions. Using *yield* where it is not needed is not an
   error; e.g., "resume()" method of coroutines can be used without
   *yield*, but when used with *yield*, the caller gives control to
   scheduler which may execute resumed coroutine right away. In rest
   of the documentation, methods that need to be called with *yield*
   are noted so.

   In rest of the documentation we follow the convention of using
   "coro=None" keyword argument in generator methods and use *coro*
   variable to refer to the coroutine, i.e., instance of **Coro**,
   executing the generator function. This variable can be used to
   invoke methods of **Coro**, use it in other coroutines, for
   example, to send messages to it, or wake up from sleep etc. A
   coroutine has following methods:

   suspend(timeout=None, alarm_value=None)
   sleep(timeout=None, alarm_value=None)

      Note: This method must always be used with *yield* as "yield
        coro.sleep()".

      Suspends coroutine *coro* until *timeout*. If *timeout* is a
      positive number (float or int), the scheduler suspends execution
      of coroutine until that many seconds (or fractions of second).
      If *timeout* is "None", the coroutine is not woken up by the
      scheduler - some other coroutine needs to resume it. The value
      yielded by this method is the value it is resumed with or
      *alarm_value* if resumed by the scheduler due to timeout.  If
      *timeout=0*, this method returns *alarm_value* without
      suspending the coroutine.

      For example, if a coroutine executes "v = yield
      coro.sleep(2.9)", it is suspended for 2.9 seconds. If before
      timeout, another coroutine wakes up this coroutine (with
      "resume()" method) with a value, *v* is set to that value.
      Otherwise, after 2.9 seconds, this coroutine is resumed with
      "None" (default *alarm_value*) so *v* is set to "None" and
      coroutine continues execution. During the time coroutine is
      suspended, scheduler executes other scheduled coroutines. Within
      a coroutine "coro.sleep" (or "coro.suspend") must be used (with
      *yield*) instead of "time.sleep"; calling "time.sleep" will
      deadlock entire coroutine framework.

   resume(update=None)
   wakeup(update=None)

      Wakes up (suspended) coroutine "coro". As explained above, the
      suspended coroutine gets *update* (any Python object) as the
      value of yield statement that caused it to suspend. If
      sleep/resume synchronization is needed (so that resume waits
      until specific suspend is ready to receive), Event locking
      primitive can be used so that resuming coroutine waits on an
      event variable and suspending coroutine sets the event before
      going to sleep.

   send(msg)

      Sends message *msg* (any Python object) to the coroutine on
      which this method is invoked. If the coroutine is currently
      waiting for messages (with "receive()"), then it is resumed with
      *msg*. If it is not currently waiting for messages, the message
      is queued so that next "receive()" returns the message without
      suspending.

      Message can be any Python object when sender and recipients are
      in same program/asyncoro (i.e., messages are not sent over
      network). However, when sender and reecipient are on different
      asyncoro instances (over network), the messages must be
      serializable at the sender and unserializable at the receiver.
      If message includes any objects that have unserializable
      attributes, then their classes have to provide "__getstate__()"
      method to serialize the objects, and the remote program should
      have "__setstate__()" for those classes; see Pickle protocol.

      If the recipient is in a remote asyncoro, "send()" simply queues
      messages for transfer over network. A daemon coroutine in
      asyncoro transfers the messages in the order they are queued.
      This process, by default, may transfer each message with a new
      connection. As creating sockets and making connections is
      expensive, it may be rather inefficient, especially if messages
      are sent frequently. See "peer()" method in *Distributed
      Programming* for specifying that messages to peers should be
      sent as stream, using same connection.

   deliver(msg, timeout=None)

      Note: This method must always be used with *yield* as "recvd =
        yield rcoro.deliver(msg)".

      Similar to "send()" except that this method must be used with
      *yield* and it returns status of delivering the message to the
      recipient. If it is 1, the message has been successfully placed
      in recipient coroutine's message queue (when recipient calls
      "receive()", it gets the queued messages in the order they are
      received). If *timeout* is given and message couldn't be
      delivered before timeout, the return value is 0. If *timeout* is
      "None", delivery will not timeout. For local coroutines (i.e.,
      coroutines executing in the same program) *timeout* has no
      effect - if the recipient is valid, message will be delivered
      successfully. However, if the recipient is a remote coroutine
      (see *Distributed Programming*), network delays / failures may
      cause delivery to be delayed or delivery may fail (i.e., there
      is a possibility of delivery waiting forever); to avoid such
      issues, appropriate *timeout* may be used.

   receive(timeout=None, alarm_value=None)

      Note: This method must always be used with *yield* as "msg =
        yield coro.receive()".

      Returns earliest queued message if there are pending messages,
      or suspends "coro" until either a message is sent to it or
      *timeout* seconds elapse. If called with *timeout=0*, this
      method will not suspend the coroutine - it will return either
      earliest queued message if available or *alarm_value*.

   set_daemon(flag=True)

      Marks the coroutine a daemon (process that never terminates) if
      *flag* is "True". Similar to threading module, AsynCoro
      scheduler waits for all non-daemon coroutines to terminate
      before exiting. The daemon status can be toggled by calling
      "set_daemon()" with *flag* set to "True" or "False".

   hot_swappable(flag)

      Marks if the coroutine's generator function can be replaced.
      This method can be used to set (with *flag=True*) or clear (with
      *flag=False*) the flag. With hot swapping, a coroutine's code
      can be updated (to new functionality) while the application is
      running.

   hot_swap(target[, arg1, arg2, ...])

      Requests AsynCoro to replace coroutine's generator function with
      *target([arg1, arg2, ...])*. AsynCoro then throws
      "HotSwapException" in the coroutine when:

         * coro indicated it can handle hot swap (i.e., last called
           "hot_swappable" with *flag=True*),

         * it is currently executing at top-level in the call stack
           (i.e., has not called other generator functions), and

         * has no pending asynchronous operations (socket I/O, tasks
           scheduled with AsyncThreadPool, etc.).

      The new generator is set as args[0] of "HotSwapException", so
      the coro can inspect new generator, if necessary, and can do any
      preparation for hot swapping, e.g., saving state (perhaps by
      sending state as a message to itself which can be retrieved in
      the new generator with "receive()"), or even ignore hot swap
      request. If/when it is ready for swap, it must re-raise the same
      "HotSwapException" (with the new generator as args[0]). This
      causes AsynCoro to close current generator function, replace it
      with the new generator function and schedule new generator for
      execution (from the beginning). Any messages (i.e., resume
      updates) queued in the previous generator are not reset, so new
      generator can process queued messages (e.g., use "receive()" in
      a loop with *timeout=0* until "receive()" returns
      *alarm_value*). Note that "hot_swap()" changes generator
      function of a particular coroutine for which it is called. If
      there are many coroutines using that generator function,
      "hot_swap()" may be called for each such coroutine.

   monitor(observe)

      Note: This method must always be used with *yield* as "v =
        yield coro.monitor(observe)".

      Sets "coro" as the monitor of coroutine *observe*. Then, when
      the coroutine *observe* is finished (either because coroutine's
      generator function finished exceution or was terminated by
      AsynCoro because of an uncaught exception), AsynCoro sends the
      status as message with "MonitorException" to "coro".
      "MonitorException" args[0] is set to the affected coroutine
      *observe* and args[1] is set to the exception tuple: If
      *observe* finished execution, the tuple is a pair, with first
      element set to (type) "StopIteration" and second element
      instance of "StopIteration" with the last value yielded by
      *observe*, and if *observe* was terminated due to uncaught
      exception, the tuple will have either 2 or 3 elements, with
      first element set to the type of exception, second element set
      to the uncaught exception, and third element set to trace, if
      available. The monitor coroutine can inspect "MonitorException"
      and possibly restart the affected coroutine (see below). A
      coroutine can be monitored by more than one monitor, and a
      monitor can monitor more than one coroutine. This method must
      always be used with *yield*.

   throw(*args)

      Throws exception **args* to coroutine (at the point where it is
      currently executing).

   terminate()

      Terminates the coroutine. This is useful, for example, to
      terminate server processes that otherwise never terminate.

   value()

      Note: This method must be called from a thread, not a
        coroutine.

      Returns the last value yielded by the coroutine, possibly
      waiting until coroutine terminates. This method should not be
      called from a coroutine - this will cause entire coroutine
      framework to deadlock. This method is meant for main thread in
      the user program to wait for (main) coroutine(s) it creates.

   finish()

      Note: This method must always be used in a coroutine with
        *yield* as "v = yield other.finish()".

      Returns the last value yielded by the coroutine "other",
      possibly waiting until it terminates.

   Faults in (local or remote) coroutines can be detected with
   "monitor()", and fault-toerant coroutines can be developed with
   "hot_swap()".


Examples
--------

See Asynchronous Concurrent Programming in tutorial for examples.


Locking Primitives
==================

class class asyncoro.Lock

class class asyncoro.RLock

class class asyncoro.Semaphore

class class asyncoro.Event

class class asyncoro.Condition

Note: With asyncoro locking is not needed, as there is no forced
  preemption - at any time at most one coroutine is executing and the
  control is transfered to the scheduler only when *yield* statement
  is encountered. (In fact, the implementation of asynchronous locking
  primitives in asyncoro updates lists and counters without locking.)
  So with asyncoro "Lock" and "RLock" are optional.

asyncoro provides asynchronous implementations of "Lock", "RLock",
"Semaphore", "Event" and "Condition" primitives. They are similar to
versions in threading module. Any operation that would block in
threading module must be called with *yield* appropriately. For
example, acquiring a lock is a blocking operation, so it should be
invoked as "yield lock.acquire()". Similarly, Event's wait method or
Condition's wait method must be used as "yield event.wait()" or "yield
condition.wait()". For example, Condition variable cv in a client
should be used as (compare to example at threading module):

   while True:
     yield cv.acquire()
     while not an_item_is_available():
         yield cv.wait()
     get_an_available_item()
     cv.release()

See documentation strings in "asyncoro" module for more details on
which methods should be used with *yield* and which methods need not
be.


Channels
========

Channel is a broadcast mechanism with which coroutines can exchange
messages. Messages sent to Channel are sent to its subscribers
(recipients). While a message can be sent one-to-one with coroutine's
"send()" or "deliver()" methods on the receiving coroutine, channels
can be used to broadcast a message so all its subscribers get that
message.

class class asyncoro.Channel(name, transform=None)

   Creates channel with *name*, which must be unique. If *transform*,
   is given, it must a function that is called before a message is
   sent to subscribers. The function is called with name of the
   channel and the message. It should return transformed message or
   "None". If "None" is returned, the message is dropped - subscribers
   will not receive the message. Otherwise, transformed message is
   sent to subscribers.

   A channel has following methods.

   subsribe(subscriber, timeout=None)

      Note: This method must be used with *yield* as "yield
        channel.subscribe(coro)"

      Subscribes *subscriber* (a coroutine or even another channel) to
      the channel. Any messages sent to the channel are then sent to
      each subscriber; i.e., messages are broadcast to all
      subscribers. It is possible to chain or create hierarchical
      channels with channels subscribing to other channels. If
      *timeout* is a positive number, the call fails if subscription
      is not successfull (e.g., the channel couldn't be located)
      before that many seconds.

   send(message)

      Calls *transform* function of the channel (see above) if it has
      one. If the function returns "None", the message is ignored.
      Otherwise the message is sent to current subscribers. Messages
      sent over a channel are queued (buffered) at receiving
      coroutines.  A coroutine *coro*, for example, that has
      subscribed to the channel can receive messages with "msg = yield
      coro.receive()".

   deliver(message, timeout=None, n=0)

      Note: This method must be used with *yield* as "recvd = yield
        channel.deliver(msg)"

      Similar to "send()", except that it waits until at least *n*
      subscribers are subscribed. It returns total number of end-point
      recipients (coroutines) the message has been delivered to - in
      case of heirarchical channels, it is the sum of recipients of
      all the channels. This may be less than *n* (e.g., delivering
      message to a subscriber may fail, due to network error), or more
      (e.g., there are more subscribers, or some subscribers are
      channels with more than one subscriber). If *n* is 0, then the
      message will be delivered to all current subscribers. In any
      case, *timeout* is maximum amount of time in seconds (or
      fraction of second) for delivering the message. Thus, for
      example, if *timeout* occurs before *n* subscribers are
      subscribed to the channel, the method returns 0.

   unsubsribe(subscriber, timeout=None)

      Note: This method must be used with *yield* as "yield
        channel.unsubscribe(coro)"

      Unsubscribes the subscriber (coroutine or another channel), so
      future messages to the channel are not sent to that subscriber.
      If *timeout* is a positive number, it is the number of seconds
      for unsubscribe request to complete.

   close()

      Close the channel. The channel can't be used for message passing
      after closing.

   set_transform(transform)

      Set/change *transform* as the method to call when message is
      sent to this channel. See "Channel" constructor and "send()".


Examples
--------

See Channels in tutorial for examples.


Message Passing
===============

Coroutine's "send()", "receive()" and "deliver()" offer one-to-one
message passing and Channel's "send()" and "deliver()" offer one-to-
many / broadcast message passing.

asyncoro delivers messages in the order they have been sent with
either one-to-one or broadcast message passing, unless some messages
couldn't be delivered due to failures; i.e., asyncoro guarantees
temporal order of messages in the absence of errors.


Examples
--------

See Message Passing in tutorial for examples.
