
Asynchronous Concurrenct Programming (asyncoro)
***********************************************

asyncoro provides API for asynchronous and concurrent programming with
coroutines using Python's generator functions. Coroutines are like
light weight threads - creating and running coroutines is very
efficient. Moreover, unlike in the case of thread programming, a
coroutine continues to run until it voluntarily gives up control (when
*yield* is used), so locking is not needed to protect critical
sections.

Programs developed with asyncoro have same logic and structure as
programs with threads, except for a few syntactic changes. Although
the API below has many methods, most of them are for additional
features of asyncoro (such as message passing, hot swapping,
monitoring etc.), and not needed for simple programs that are similar
to thread based programs. The differences compared to threaded
programming are:

   * Instead of creating threads, coroutines should be created with
     "Coro". The process function (first argument to "Coro") should be
     a generator function (i.e., function with *yield* statements),

   * Sockets, pipes etc, should be converted to asynchronous
     versions with Asynchronous Socket, Asynchronous Pipe etc.,

   * I/O operations, such as AsyncSocket's "send()", "receive()",
     "accept()", blocking operations, such as coroutine's "sleep()",
     Event's "wait()", etc., are implemented with generator methods;
     these should be used with *yield* (e.g., as "data = yield
     async_sock.receve(1024)"),

   * asyncoro's locking primitives ("asyncoro.Event",
     "asyncoro.Condition", etc.) should be used in place of Python's
     threading counterparts with *yield* on blocking operations (e.g.,
     as "yield async_event.wait()"),

   * Coroutine's "sleep()" should be used in place of "time.sleep()"
     (e.g., as "yield coro.sleep(2)").

Coroutines in asyncoro are essentially generator functions that
suspend execution when *yield* is used and are resumed by asyncoro's
scheduler (AsynCoro) after the asynchronous operation is complete.
Usually *yield* is used with an asynchronous call, such as socket's
"connect()", "send()" or pipe's "read()", "communicate()", waiting for
a message etc. With such statements, the asynchronous call is
initiated and control goes to scheduler which schedules another
coroutine, if one is ready to execute. When the asynchronous operation
is complete, the coroutine that called the opreation becomes ready to
execute. Thus, the coroutines in asyncoro are not strictly cooperative
tasks that pass control to each other, but each *yield* statement
transfers control to asyncoro's scheduler, which manages them.
However, asyncoro supports message passing, suspend/resume calls etc.,
so that coroutines can cooperate in a way that is easier to program
and understand.

Unlike with threads, there is no forced preemption with coroutines -
at any time at most one coroutine is executing and it continues to
execute until *yield* is called. Thus, there is no need for locking
critical sections with asyncoro.

asyncoro framework consists of "AsynCoro" scheduler, "Coro" to create
coroutines from generator functions, "Channel" to broadcast messages,
Asynchronous Socket to convert regular synchronous sockets to
asynchronous (non-blocking) sockets, Asynchronous Pipe for pipes,
Asynchronous File for files, Computation and "discoro_server()" for
distributed / parallel computing, "Lock" and "RLock" for locking
(although locking is not required with asyncoro), "Condition",
"Event", "Semaphore" primitives very similar to thread primitives
(except for a few syntactic changes noted above).


Examples
========

See Asynchronous Concurrent Programming, Channels and Message Passing
in tutorial for examples. There are many illustrative use cases in
'examples' directory under where asyncoro module is installed.

Following is a brief description of the examples included relevant to
this section:

* "examples/tut_coros.py" creates a number of coroutines that each
  suspend execution for a brief period. The number of coroutines
  created can be increased to thousands or tens of thousands to show
  asyncoro can scale well.

* "examples/tut_client_server.py" shows message passing ("send()"
  and "receive()" methods of coroutines) between local client and
  server coroutines. The remote version and local version are similar,
  except that remote versions register/locate coroutines.

* "examples/tut_channel.py" uses broadcasting Channel to exchange
  messages in local coroutines.


AsynCoro scheduler
==================

"AsynCoro" is a (singleton) scheduler that runs coroutines similar to
the way operating system's scheduler runs multiple processes. It is
initialized automatically (for example, when a coroutine is created),
so for most purposes the scheduler is transparent. The scheduler in
"asyncoro" manages coroutines, message passing, I/O events, timeouts,
wait/resume events etc., in a single concurrent program; it doesn't
provide distributed programming for message passing over network.
"disasyncoro" extends "AsynCoro" with features supporting distributed
programming, remote execution of coroutines etc.  If the scheduler
instance is needed, it can be obtained with either "AsynCoro()" or
"AsynCoro.instance()".

Unlike in other asychronous frameworks, in asyncoro there is no
explicit event loop - the I/O events are processed by the scheduler
and methods in Asynchronous Socket, Asynchronous Pipe etc. For
example, "recv()" method (which must be used with *yield*) sets up an
internal function to execute when the socket has data to read and
suspends the caller coroutine. The scheduler can execute any other
coroutines that are ready while the I/O operation is pending. When the
data has been read, the suspended coroutine is resumed with the data
read so that Asynchronous Socket's "recv()" works just as
"socket.recv()", except for using *yield*. Thus, programming with
asyncoro is very similar to that with threads, except for using
*yield* with certain methods.

class asyncoro.AsynCoro

   Creates and returns singleton scheduler. If a scheduler instance
   has already been created (such as when a coroutine was created), a
   new instance won't be created. "disasyncoro" extends "AsynCoro" for
   distributed programming and the constructor there has various
   options to customize.

   The scheduler following methods:

   instance()

      This static method returns instance of "AsynCoro" scheduler; use
      it as "scheduler = AsynCoro.instance()". If the instance has not
      been started (yet), it creates one and returns it.

   cur_coro()

      This static method returns coroutine (instance of Coroutine)
      being executed; use it as "coro = AsynCoro.cur_coro()". As
      mentioned below, if coroutine's generator function has
      "coro=None" parameter, Coroutine constructor initializes it to
      the coroutine instance (which is a way to document that method
      is used for creating coroutines).

   join()

         Note: This method must be called from (main) thread only -
           calling from a coroutine will deadlock entire coroutine
           framework.

      Waits for all scheduled non-daemon coroutines to finish. After
      join returns, more coroutines can be created (which are then
      added to scheduler).

   terminate()

         Note: This method must be called from (main) thread only -
           calling from a coroutine will deadlock entire coroutine
           framework.

      Terminates all scheduled coroutines and then the scheduler
      itself. If necessary, a new scheduler instance may be created
      with "AsynCoro()" or "AsynCoro.instance()".

The scheduler runs in a separate thread from user program. The
scheduler terminates when all non-daemon coroutines are terminated,
similar to Python's threading module.


Coroutine
=========

asyncoro's "Coro" class creates coroutines (light weight processes).
Coroutines are similar to threads in regular Python programs, except
for a few differences as noted above.

class asyncoro.Coro(target[, arg1, arg2, ...])

   Creates a coroutine, where *target* is a generator function (a
   function with *yield* statements), *arg1*, *arg2* etc. are
   arguments or keyword arguments to *target*. If *target* generator
   function has "coro=None" keyword argument, Coro constructor
   replaces "None" with the instance of Coro created, so coroutine can
   use this to invoke methods in Coro class (see below). Alternately,
   the instance can be obtained with the static method "coro =
   AsynCoro.cur_coro()".

   Consider the generator function (where *sock* is asynchronous
   socket and all statements are asynchronous, so all are used with
   *yield*):

      def get_reply(sock, msg, coro=None):
          yield sock.sendall(msg)
          yield coro.sleep(1)
          reply = yield sock.recv(1024)

   A coroutine for processing above function can be created with, for
   example, "Coro(get_reply, conn, "ping")". **Coro** constructor
   creates coroutine with the method *get_reply* with parameters
   "sock=conn", "msg="ping"" and *coro* set to the just created
   coroutine instance. (If *coro=None* argument is not used, the
   coroutine instance can be obtained with "coro =
   AsynCoro.cur_coro()".) The coroutine is then added to AsynCoro
   scheduler so it executes concurrently with other coroutines - there
   is no need to start it explicitly, as done with threads. Note that
   generator functions cannot use "return" statement. With asyncoro a
   return statement such as "return v" can be replaced with "raise
   StopIteration(v)". If a generator/coroutine does not use
   "StopIteration", then the last value yielded in the generator
   becomes the return value. Thus, in the example above *get_reply*
   does not use "StopIteration", so buffer received (in the last
   *yield*) is equivalent to return value of *get_reply*.

   Blocking operations, such as "socket.recv()", "socket.connect()",
   are implemented as generator functions in asynchronous
   implementation of socket Asynchronous Socket. These functions
   simply initiate the operation; *yield* should be used with them (as
   in the example above) so scheduler can run other eligible
   coroutines while the operation is pending. Calling these methods
   without *yield* simply returns generator function itself, instead
   of result of the method call. So care must be taken to use *yield*
   when calling generator functions. Using *yield* where it is not
   needed is not an error; e.g., "resume()" method of coroutines can
   be used without *yield*, but when used with *yield*, the caller
   gives control to scheduler which may execute resumed coroutine
   right away. In rest of the documentation, methods that need to be
   called with *yield* are noted so.

   In rest of the documentation we follow the convention of using
   "coro=None" keyword argument in generator methods and use *coro*
   variable to refer to the coroutine, i.e., instance of **Coro**,
   executing the generator function. This variable can be used to
   invoke methods of **Coro**, use it in other coroutines, for
   example, to send messages to it, or wake up from sleep etc. A
   coroutine has following methods:

   suspend(timeout=None, alarm_value=None)
   sleep(timeout=None, alarm_value=None)

      Note: This method must always be used with *yield* as "yield
        coro.sleep()".

      Suspends coroutine *coro* until *timeout*. If *timeout* is a
      positive number (float or int), the scheduler suspends execution
      of coroutine until that many seconds (or fractions of second).
      If *timeout* is "None", the coroutine is not woken up by the
      scheduler - some other coroutine needs to resume it. The value
      yielded by this method is the value it is resumed with or
      *alarm_value* if resumed by the scheduler due to timeout.  If
      *timeout=0*, this method returns *alarm_value* without
      suspending the coroutine.

      For example, if a coroutine executes "v = yield
      coro.sleep(2.9)", it is suspended for 2.9 seconds. If before
      timeout, another coroutine wakes up this coroutine (with
      "resume()" method) with a value, *v* is set to that value.
      Otherwise, after 2.9 seconds, this coroutine is resumed with
      "None" (default *alarm_value*) so *v* is set to "None" and
      coroutine continues execution. During the time coroutine is
      suspended, scheduler executes other scheduled coroutines. Within
      a coroutine "coro.sleep" (or "coro.suspend") must be used (with
      *yield*) instead of "time.sleep"; calling "time.sleep" will
      deadlock entire coroutine framework.

   resume(update=None)
   wakeup(update=None)

      Wakes up (suspended) coroutine "coro". As explained above, the
      suspended coroutine gets *update* (any Python object) as the
      value of yield statement that caused it to suspend. If
      sleep/resume synchronization is needed (so that resume waits
      until specific suspend is ready to receive), Event locking
      primitive can be used so that resuming coroutine waits on an
      event variable and suspending coroutine sets the event before
      going to sleep.

   send(msg)

      Sends message *msg* (any Python object) to the coroutine on
      which this method is invoked. If the coroutine is currently
      waiting for messages (with "receive()"), then it is resumed with
      *msg*. If it is not currently waiting for messages, the message
      is queued so that next "receive()" returns the message without
      suspending.

      Message can be any Python object when sender and recipients are
      in same program/asyncoro (i.e., messages are not sent over
      network). However, when sender and reecipient are on different
      asyncoro instances (over network), the messages must be
      serializable at the sender and unserializable at the receiver.
      If message includes any objects that have unserializable
      attributes, then their classes have to provide "__getstate__()"
      method to serialize the objects, and the remote program should
      have "__setstate__()" for those classes; see Pickle protocol.

      If the recipient is in a remote asyncoro, "send()" simply queues
      messages for transfer over network. A daemon coroutine in
      asyncoro transfers the messages in the order they are queued.
      This process, by default, may transfer each message with a new
      connection. As creating sockets and making connections is
      expensive, it may be rather inefficient, especially if messages
      are sent frequently. See "peer()" method in *Distributed
      Programming (disasyncoro)* for specifying that messages to peers
      should be sent as stream, using same connection.

   deliver(msg, timeout=None)

      Note: This method must always be used with *yield* as "recvd =
        yield rcoro.deliver(msg)".

      Similar to "send()" except that this method must be used with
      *yield* and it returns status of delivering the message to the
      recipient. If it is 1, the message has been successfully placed
      in recipient coroutine's message queue (when recipient calls
      "receive()", it gets the queued messages in the order they are
      received). If *timeout* is given and message couldn't be
      delivered before timeout, the return value is 0. If *timeout* is
      "None", delivery will not timeout. For local coroutines (i.e.,
      coroutines executing in the same program) *timeout* has no
      effect - if the recipient is valid, message will be delivered
      successfully. However, if the recipient is a remote coroutine
      (see *Distributed Programming (disasyncoro)*), network delays /
      failures may cause delivery to be delayed or delivery may fail
      (i.e., there is a possibility of delivery waiting forever); to
      avoid such issues, appropriate *timeout* may be used.

   receive(timeout=None, alarm_value=None)
   recv(timeout=None, alarm_value=None)

      Note: This method must always be used with *yield* as "msg =
        yield coro.receive()".

      Returns earliest queued message if there are pending messages,
      or suspends "coro" until either a message is sent to it or
      *timeout* seconds elapse. If called with *timeout=0*, this
      method will not suspend the coroutine - it will return either
      earliest queued message if available or *alarm_value*.

      *recv* is synonym for *receive*.

   set_daemon(flag=True)

      Marks the coroutine a daemon (process that never terminates) if
      *flag* is "True". Similar to threading module, AsynCoro
      scheduler waits for all non-daemon coroutines to terminate
      before exiting. The daemon status can be toggled by calling
      "set_daemon()" with *flag* set to "True" or "False".

   hot_swappable(flag)

      Marks if the coroutine's generator function can be replaced.
      This method can be used to set (with *flag=True*) or clear (with
      *flag=False*) the flag. With hot swapping, a coroutine's code
      can be updated (to new functionality) while the application is
      running.

   hot_swap(target[, arg1, arg2, ...])

      Requests AsynCoro to replace coroutine's generator function with
      *target([arg1, arg2, ...])*. AsynCoro then throws
      "HotSwapException" in the coroutine when:

         * coro indicated it can handle hot swap (i.e., last called
           "hot_swappable" with *flag=True*),

         * it is currently executing at top-level in the call stack
           (i.e., has not called other generator functions), and

         * has no pending asynchronous operations (socket I/O, tasks
           scheduled with AsyncThreadPool, etc.).

      The new generator is set as args[0] of "HotSwapException", so
      the coro can inspect new generator, if necessary, and can do any
      preparation for hot swapping, e.g., saving state (perhaps by
      sending state as a message to itself which can be retrieved in
      the new generator with "receive()"), or even ignore hot swap
      request. If/when it is ready for swap, it must re-raise the same
      "HotSwapException" (with the new generator as args[0]). This
      causes AsynCoro to close current generator function, replace it
      with the new generator function and schedule new generator for
      execution (from the beginning). Any messages (i.e., resume
      updates) queued in the previous generator are not reset, so new
      generator can process queued messages (e.g., use "receive()" in
      a loop with *timeout=0* until "receive()" returns
      *alarm_value*). Note that "hot_swap()" changes generator
      function of a particular coroutine for which it is called. If
      there are many coroutines using that generator function,
      "hot_swap()" may be called for each such coroutine.

   monitor(observe)

      Note: This method must always be used with *yield* as "v =
        yield coro.monitor(observe)".

      Sets "coro" as the monitor of coroutine *observe*. Then, when
      the coroutine *observe* is finished (either because coroutine's
      generator function finished exceution or was terminated by
      AsynCoro because of an uncaught exception), AsynCoro sends the
      status as message with "MonitorException" to "coro".
      "MonitorException" args[0] is set to the affected coroutine
      *observe* and args[1] is set to the exception tuple: If
      *observe* finished execution, the tuple is a pair, with first
      element set to (type) "StopIteration" and second element
      instance of "StopIteration" with the last value yielded by
      *observe*, and if *observe* was terminated due to uncaught
      exception, the tuple will have either 2 or 3 elements, with
      first element set to the type of exception, second element set
      to the uncaught exception, and third element set to trace, if
      available. The monitor coroutine can inspect "MonitorException"
      and possibly restart the affected coroutine (see below). A
      coroutine can be monitored by more than one monitor, and a
      monitor can monitor more than one coroutine. This method must
      always be used with *yield*.

   throw(*args)

      Throws exception **args* to coroutine (at the point where it is
      currently executing).

   terminate()

      Terminates the coroutine. This is useful, for example, to
      terminate server processes that otherwise never terminate.

   value()

      Note: This method must be called from a thread, not a
        coroutine.

      Returns the last value yielded by the coroutine, possibly
      waiting until coroutine terminates. This method should not be
      called from a coroutine - this will cause entire coroutine
      framework to deadlock. This method is meant for main thread in
      the user program to wait for (main) coroutine(s) it creates.

   finish()

      Note: This method must always be used in a coroutine with
        *yield* as "v = yield other.finish()".

      Returns the last value yielded by the coroutine "other",
      possibly waiting until it terminates.

   Faults in (local or remote) coroutines can be detected with
   "monitor()", and fault-toerant coroutines can be developed with
   "hot_swap()".


Locking Primitives
==================

class asyncoro.Lock

class asyncoro.RLock

class asyncoro.Semaphore

class asyncoro.Event

class asyncoro.Condition

Note: With asyncoro locking is not needed, as there is no forced
  preemption - at any time at most one coroutine is executing and the
  control is transfered to the scheduler only when *yield* statement
  is encountered. (In fact, the implementation of asynchronous locking
  primitives in asyncoro updates lists and counters without locking.)
  So with asyncoro "Lock" and "RLock" are optional.

asyncoro provides asynchronous implementations of "Lock", "RLock",
"Semaphore", "Event" and "Condition" primitives. They are similar to
versions in threading module. Any operation that would block in
threading module must be called with *yield* appropriately. For
example, acquiring a lock is a blocking operation, so it should be
invoked as "yield lock.acquire()". Similarly, Event's wait method or
Condition's wait method must be used as "yield event.wait()" or "yield
condition.wait()". For example, Condition variable cv in a client
should be used as (compare to example at threading module):

   while True:
     yield cv.acquire()
     while not an_item_is_available():
         yield cv.wait()
     get_an_available_item()
     cv.release()

See documentation strings in "asyncoro" module for more details on
which methods should be used with *yield* and which methods need not
be.


Channel
=======

Channel is a broadcast mechanism with which coroutines can exchange
messages. Messages sent to Channel are sent to its subscribers
(recipients). While a message can be sent one-to-one with coroutine's
"send()" or "deliver()" methods on the receiving coroutine, channels
can be used to broadcast a message so all its subscribers get that
message.

class asyncoro.Channel(name, transform=None)

   Creates channel with *name*, which must be unique. If *transform*,
   is given, it must a function that is called before a message is
   sent to subscribers. The function is called with name of the
   channel and the message. It should return transformed message or
   "None". If "None" is returned, the message is dropped - subscribers
   will not receive the message. Otherwise, transformed message is
   sent to subscribers.

   A channel has following methods.

   subsribe(subscriber, timeout=None)

      Note: This method must be used with *yield* as "yield
        channel.subscribe(coro)"

      Subscribes *subscriber* (a coroutine or even another channel) to
      the channel. Any messages sent to the channel are then sent to
      each subscriber; i.e., messages are broadcast to all
      subscribers. It is possible to chain or create hierarchical
      channels with channels subscribing to other channels. If
      *timeout* is a positive number, the call fails if subscription
      is not successfull (e.g., the channel couldn't be located)
      before that many seconds.

   send(message)

      Calls *transform* function of the channel (see above) if it has
      one. If the function returns "None", the message is ignored.
      Otherwise the message is sent to current subscribers. Messages
      sent over a channel are queued (buffered) at receiving
      coroutines.  A coroutine *coro*, for example, that has
      subscribed to the channel can receive messages with "msg = yield
      coro.receive()".

   deliver(message, timeout=None, n=0)

      Note: This method must be used with *yield* as "recvd = yield
        channel.deliver(msg)"

      Similar to "send()", except that it waits until at least *n*
      subscribers are subscribed. It returns total number of end-point
      recipients (coroutines) the message has been delivered to - in
      case of heirarchical channels, it is the sum of recipients of
      all the channels. This may be less than *n* (e.g., delivering
      message to a subscriber may fail, due to network error), or more
      (e.g., there are more subscribers, or some subscribers are
      channels with more than one subscriber). If *n* is 0, then the
      message will be delivered to all current subscribers. In any
      case, *timeout* is maximum amount of time in seconds (or
      fraction of second) for delivering the message. Thus, for
      example, if *timeout* occurs before *n* subscribers are
      subscribed to the channel, the method returns 0.

   unsubsribe(subscriber, timeout=None)

      Note: This method must be used with *yield* as "yield
        channel.unsubscribe(coro)"

      Unsubscribes the subscriber (coroutine or another channel), so
      future messages to the channel are not sent to that subscriber.
      If *timeout* is a positive number, it is the number of seconds
      for unsubscribe request to complete.

   close()

      Close the channel. The channel can't be used for message passing
      after closing.

   set_transform(transform)

      Set/change *transform* as the method to call when message is
      sent to this channel. See "Channel" constructor and "send()".


Message Passing
===============

Coroutine's "send()", "receive()" and "deliver()" offer one-to-one
message passing and Channel's "send()" and "deliver()" offer one-to-
many / broadcast message passing.

asyncoro delivers messages in the order they have been sent with
either one-to-one or broadcast message passing (i.e., with either
*send* or *deliver* methods of coroutines or channels). In other
words, asyncoro guarantees temporal order of messages.


AsyncThreadPool
===============

asyncoro framework and all coroutines run in a single thread. It
implements concurrency (running more than one process) by interleaving
coroutines - suspending a coroutine that is waiting for some event and
running a coroutine that is ready to execute. All the blocking
operations, such as sending/receiving data (sockets, message passing),
or sleeping, are implemented with generator funtions that schedule the
operation and suspend the coroutine. However, asyncoro framework
doesn't implement every blocking operation. Sometimes, it is necessary
to use functions in other modules that block the thread until the
operation is complete. For example, reading standard input will block
the thread until the read method is complete. If such functions are
used in a coroutine, entire asyncoro framework and all coroutines are
blocked; i.e., asyncoro scheduler itself is blocked, so even if there
are other coroutines eligible to run, they won't be executed.
AsyncThreadPool class can be used to run such blocking functions in
separate threads so asyncoro itself is not affected by them.

class asyncoro.AsyncThreadPool(num_threads)

   Creates a pool with given number of threads. When a blocking
   function is scheduled, an available thread in the pool is used to
   execute that function. More threads will allow more blocking
   functions to be running simultaneously, but take more system
   resources.

   async_task(target, *args, **kwargs)

      Note: This method must be used with *yield* as "val = yield
        pool.async_task(target, args, kwargs)"

      Schedules given *target* function with arguments **args* and
      keyword arguments ***kwargs* for execution with a thread in the
      pool. If all threads are currently executing other functions,
      then the function will be executed when a thread becomes
      available (i.e., done with currently executing function).

      The value returned by this method is the value returned by the
      function.

   join()

      Waits for all scheduled blocking functions to finish. This
      method should be called from main thread, not from any
      coroutine, as this method is a blocking operation.

   terminate()

      Waits for all scheduled blocking functions to finish and then
      terminate the threads; the pool can no longer be used for
      scheduling tasks. This method should be called from main thread,
      not from any coroutine, as this method is a blocking operation.

See "examples/chat_client.py" which uses thread pool (with 1 thread)
to execute "sys.stdin.readline" (a bloking function).
