
Tutorial / Examples
*******************

Many examples are included in 'examples' directory where asyncoro
module is installed (with PyPI / pip). See *README* file in that
'examples' directory for brief description of each of the programs. A
few examples from them are explained below in more detail.


Asynchronous Concurrent Programming
===================================

asyncoro's concurrency framework has some features similar to Actor
Model. With asyncoro, computation units are created with coroutines.
Each coroutine has a message queue from which it can receive message
sent to it by other coroutines. In addition to this one-to-one
communication between coroutines, asyncoro supports one-to-many
communication with broadcasting channels.


Coroutines
----------

Coroutine in asyncoro is a computational unit created with generator
function, i.e., Python function with one or more *yield* statements.
Creating coroutines is similar to creating threads, except that the
process function must be generator function and coroutine is created
with Coroutine instead of "threading.Thread". If the generator
functions used for creating coroutines have default argument
*coro=None*, Coroutine constructor sets this parameter set to
coroutine instance created. This parameter, thus, can be used to call
methods on it (e.g., "receive()", "sleep()" etc.).

An example program that creates coroutines is:

   import asyncoro, random, time

   def coro_proc(n, coro=None):
       s = random.uniform(0.5, 3)
       print('%f: coroutine %d sleeping for %f seconds' % (time.time(), n, s))
       yield coro.sleep(s)
       print('%f: coroutine %d terminating' % (time.time(), n))

   # create 10 coroutines running generator function 'coro_proc'
   for i in range(10):
       # coroutine function is called with 'i'
       asyncoro.Coro(coro_proc, i)

Coroutines are created with Coroutine constructor.  The first argument
must be a generator function and rest of the arguments should
correspond to parameters in generator definition. In the above
program, the generator function *coro_proc* has *coro=None* keyword
argument, so Coroutine constructor sets *coro* to the coroutine
created - this parameter should not be passed to the constructor. The
constructor schedule execution of this coroutine in asncoro's
scheduler. In *coro_proc*, the expression *coro.sleep(s)* suspends
execution of running coroutine for given time. During that time other
coroutines that are ready to execute will be executed. The total time
taken by the above program should be roughly same as maximum of the
sleep times (at most 3 seconds in the above program).

Note that since "sleep()" is a generator function, it must be called
with *yield* (otherwise, *coro.sleep* simply returns new generator
instance and thus will not suspend the execution as desired).


Message Passing
---------------

With the concurrent and asychronous behavior of coroutines in
asyncoro, communication among them is accomplished by sending and
receiving messages. A message can be any Python object. In case
message is being sent to a remote coroutine (i.e., coroutine running
with another program), the message must be serializable. A coroutine
can either send a message to another coroutine (one-to-one
communication) or broadcast a message over channel to many coroutines
(one-to-many communication). At the reeciver coroutine the messages
are stored in a queue (similar to what is called Mailbox in other
concurrecy frameworks) in the order they are received so that a
"receive()" returns oldest message, blocking until a message becomes
available.

With one-to-one communication, a coroutine invokes "send()" method on
the receipient coroutine. Sending a message simply queues the message
in recipient; it doesn't, for example, wait for recipient to process
the message. If necessary, "deliver()" method may be used instead of
"send()" when sending message over network; it indicates how many
recipients received the message - see *Asynchronous Concurrenct
Programming (asyncoro)* for details.

An example client/server program with asyncoro is:

   import asyncoro, random

   def server_proc(coro=None):
       coro.set_daemon()
       while True:
           msg = yield coro.receive()
           print('processing %s' % (msg))

   msg_id = 0

   def client_proc(server, n, coro=None):
       global msg_id
       for x in range(3):
           yield coro.suspend(random.uniform(0.5, 3))
           msg_id += 1
           server.send('%d: %d / %d' % (msg_id, n, x))

   server = asyncoro.Coro(server_proc)
   for i in range(10):
       asyncoro.Coro(client_proc, server, i)

The main program first creates *server* coroutine with *server_proc*
generator function, which has *coro=None* keyword parameter, so
Coroutine constructor passes the coroutine instance as *coro* (thus,
*server* in the main program is same as *coro* in *server_proc*). The
main program creates 10 client coroutines with *client_proc*, passing
*server* as the first argument and an identifier as second argument.
The main program has no use for client coroutines, so it doesn't save
them. Each of the client coroutines suspends itself for a brief period
and sends a unique message to the server. Since server_proc never
terminates on its own, we indicate that it is a daemon process so that
asyncoro can terminate it once all non-daemon coroutines (in this case
client coroutines) are terminated (after sending 3 messages each);
otherwise, asyncoro's scheduler will never terminate as the server
coroutine is still running.

Unlike with threads, asyncoro's scheduler doesn't preempt running
coroutine. Thus, locking is not required with asyncoro. To illustrate
this concept, *msg_id*, a global, shared variable, is updated in
*client_proc* without having to worry about non-deterministic values.
asyncoro, however, provides all locking primitives similar to thread
locking primitives. Some of the methods in these locking primitives
are generator methods (blocking operations in synchronous threading
module), so they must be used with *yield*.

In this case the messages sent by clients are strings. If, say, server
needs to send a reply back tot the client, then the messages can be in
the form of dictionary, tuple, list etc. to pass client's coroutine
instance (e.g., as list [coro, msg_id, n, x] from which server can
retrieve the client coroutine that sent the message).


Channels
--------

If one-to-many or broadcast communication is needed, asyncoro's
Channel can be used. To receive messages on a channel, a coroutine
must subscribe to it. After subscribing to a channel, any message sent
to that channel will be received by each of its current subscribers.

These concepts are used in the program below where a client sends a
series of numbers over a channel. Two coroutines receive these numbers
to compute sum and product of those numbers:

   import asyncoro, random

   def seqsum(coro=None):
       # compute sum of numbers received over channel
       result = 0
       while True:
           msg = yield coro.receive()
           if msg is None:
               break
           result += msg
       print('sum: %f' % result)

   def seqprod(coro=None):
       # compute product of numbers received over channel
       result = 1
       while True:
           msg = yield coro.receive()
           if msg is None:
               break
           result *= msg
       print('prod: %f' % result)

   def client_proc(coro=None):
       channel = asyncoro.Channel('sum_prod')
       # create two coroutines to compute sum and product of
       # numbers sent over the channel
       sum_coro = asyncoro.Coro(seqsum)
       prod_coro = asyncoro.Coro(seqprod)
       yield channel.subscribe(sum_coro)
       yield channel.subscribe(prod_coro)
       for x in range(4):
           r = random.uniform(0.5, 3)
           channel.send(r)
           print('sent %f' % r)
       channel.send(None)
       yield channel.unsubscribe(sum_coro)
       yield channel.unsubscribe(prod_coro)

   asyncoro.Coro(client_proc)

A coroutine can subscribe to as many channels as necessary. All such
messages, as well as messages sent directly to a coroutine, are
received with *coro.receive()* method.

A channel, c2, may subscribe to another channel, c1, so that any
message sent to c1 will also be received by all of its subscribers,
including c2, which in turn causes its subscribers to receive that
message as well. In this case, a message sent to c2 will not be
receieved by c1. This way a hierarchy of channels can be created to
reflect the heirarchy of components in a system.

Care must be taken not to create cycles in subscription with channel
hierarchy; e.g., channel c1 subscribing to channel c2 in the above
example. asyncoro doesn't detect cycles in subscriptions and will
cause runtime exception due to recursion.


Asynchronous Network Programming
================================

Some of Python library's (synchronous) socket operations, such as
*connect*, *accept* and *recv* are blocking operations; i.e., they
wait for the operation complete. These blocking operations are not
suitable with asyncoro, as during that time other eligible coroutines
are also blocked from executing.

asyncoro provides Asynchronous Socket class to convert Python's
blocking socket to a non-blocking socket. Essentially Asynchronous
Socket is a wrapper that implements blocking operations as generator
functions that can be used in coroutines (with *yield*, as done with
any generator function).

For example, below is the server program that accepts connections and
processes each connection:

   import socket, sys, asyncoro

   def process(conn, coro=None):
       data = ''
       while True:
           data += yield conn.recv(128)
           if data[-1] == '/':
              break
       conn.close()
       print('received: %s' % data)

   def server_proc(host, port, coro=None):
       coro.set_daemon()
       sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
       sock = asyncoro.AsyncSocket(sock)
       sock.bind((host, port))
       sock.listen(128)

       while True:
           conn, addr = yield sock.accept()
           asyncoro.Coro(process, conn)

   asyncoro.Coro(server_proc, '127.0.0.1', 8010)
   while True:
       cmd = sys.stdin.readline().strip().lower()
       if cmd == 'exit' or cmd == 'quit':
           break

The two differences to note in 'server_proc' coroutine function
compared to programming with threads: the TCP socket is converted to
asynchronous socket with Asynchronous Socket so it can be used in
coroutines, and *accept* is used with *yield* as this is a generator
function (in AsyncSocket). Then a new coroutine is created to process
the connection.  The socket returned from *accept* of an asynchronous
socket is also an asychronous socket, so no need to convert it with
Asynchronous Socket. In the 'process' coroutine function, *recv* is
used with *yield* as it is also a generator function of asynchronous
socket.

Below is a client program that creates 10 coroutines each of which
connects to the server above and sends a message. Each message ends
with a marker '/' so that the server can receive the full message.:

   import socket, sys, asyncoro, random

   def client_proc(host, port, n, coro=None):
       sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
       sock = asyncoro.AsyncSocket(sock)
       yield sock.connect((host, port))
       msg = '%d: ' % n + '-' * random.randint(100,300) + '/'
       yield sock.sendall(msg)
       sock.close()

   for n in range(1, 10):
       asyncoro.Coro(client_proc, '127.0.0.1', 8010, n)

Here again, the TCP socket is converted to asynchronous socket with
Asynchronous Socket so it can be used in coroutines, and the
operations *connect* and *sendall* are used with *yield* as these are
generator functions.

In essence, using asyncoro for asynchronous network programming is
very similar to thread programming, except for creating coroutines
with Coroutine instead of threads, converting Python library's sockets
to asynchronous sockets with Asynchronous Socket, and using *yield*
with certain methods.


Distributed Programming
=======================

asyncoro includes a scheduler (class AsynCoro) that runs coroutines,
suspends them when necessary, deliver messages etc. When a coroutine
is created with Coroutine as done in above programs, the scheduler is
started if one has not been already started. The default behavior with
the scheduler is to not start network services required for
communication with coroutines or channels in another program. To use
distributed programming, "disasyncoro" must be imported instead of
"asyncoro".

Coroutines and channels can be registered with asyncoro so that they
can be located by coroutines running in a remote location. The
reference (to remote coroutine or channel) obtained by locating can be
used to send messages, monitor (in the case of coroutine) etc.

Using these features, the above client/server program can be separated
in to client and server programs that run on two different locations.
The server program is:

   import random, sys
   import asyncoro.disasyncoro as asyncoro

   def server_proc(coro=None):
       coro.set_daemon()
       coro.register('server_coro')
       while True:
           msg = yield coro.receive()
           print('processing %s' % (msg))

   server = asyncoro.Coro(server_proc)
   while True:
       cmd = sys.stdin.readline().strip().lower()
       if cmd == 'quit' or cmd == 'exit':
           break

There are two differences with this version from the one in
concurrency section above:

   * "disasyncoro" is imported to start network services for
     distributed programming.

   * The server coroutine registers itself with the name
     "server_coro" so that client can use that name to obtain a
     reference to the this server which can be used to send messages.

The client program is:

   import random
   import asyncoro.disasyncoro as asyncoro

   def client_proc(n, coro=None):
       global msg_id
       server = yield Coro.locate('server_coro')
       for x in range(3):
           yield coro.suspend(random.uniform(0.5, 3))
           msg_id += 1
           server.send('%d: %d / %d' % (msg_id, n, x))

   msg_id = 0
   for i in range(10):
       asyncoro.Coro(client_proc, i)

In this case there are two differences compared to the version in
concurrent programming section above:

   * As is done in the server, "disasyncoro" is imported to start
     network service.

   * In the client coroutine a reference to remote server is
     obtained using the name the server is registered with. "locate()"
     is a generator function so it must be called with *yield*.

The above client and server programs can be run either on the same
computer or on different computers on the same network. Even if they
are run on the same computer, the client and server coroutines are
considered remote to each other. The client program can be run
multiple instances simultaneously, if desired.

If the client and server programs are run on computers on the same
network (i.e., they share same router or gateway), then the schedulers
discover each other. If the programs are on computers on different
networks, the scheduler in the client program needs to be informed
about the location of server's scheduler. This is done by adding the
line "yield AsynCoro().peer('remote_node')" before using name
location, where *remote_node* is either the IP address or name of the
remote peer.


Distributed Communicating Processes
===================================

While RCI (Remote Coroutine Invocation) provides API for creating pre-
defined functionality that can be executed remotely, module "discoro"
provides support for clients to send computations that can be executed
remotely, optionally running them in parallel in separate processes to
use multiple processors. See *Distributed Communicating Processes
(discoro)* for details.

Program below sends *rcoro_proc* generator function to a server
running "discoronode.py" program, for creating (remote) coroutines to
execute *compute* which simply sleeps for given number of seconds and
sends back the same number (as the result). Coroutine *client_proc*
creates discoro AsynCoro in the client program itself (alternately,
"discoro.py" can be run as a separate program to which multiple
clients can schedule computations). The same coroutine is also set as
*status_coro* before scheduling computation so all status
notifications are sent to it as messages, which are processed to know
which discoro server processes are available to schedule new jobs,
which jobs are finished etc. Alternately, jobs can simply be schduled
to execute and scheduler will load balance coroutines; see
*discoro_client.py* and *discoro_client2.py* in 'examples' directory
under the installation path.:

   import asyncoro.discoro as discoro
   import asyncoro.disasyncoro as asyncoro

   # this generator function is sent to remote server to run
   # coroutines there
   def rcoro_proc(n, coro=None):
       yield coro.sleep(n)
       raise StopIteration(n)

   def client_proc(computation, njobs, coro=None):
       status = {'submitted': 0, 'done': 0}

       def submit_job(where, coro=None):
           arg = random.uniform(5, 20)
           rcoro = yield computation.run_at(where, rcoro_proc, arg)
           if isinstance(rcoro, asyncoro.Coro):
               print('%s processing %s' % (rcoro.location, arg))
           else:
               print('Job %s failed: %s' % (status['submitted'], str(rcoro)))
           status['submitted'] += 1

       discoro.Scheduler()
       computation.status_coro = coro
       if (yield computation.schedule()):
           raise Exception('Failed to schedule computation')
       # job submitter assumes that a process can run at most one coroutine at  a time,
       # although more than one coroutine (many thousands, if necessary) can be run
       while True:
           msg = yield coro.receive()
           if isinstance(msg, asyncoro.MonitorException):
               rcoro = msg.args[0]
               if msg.args[1][0] == StopIteration:
                   print('Remote coroutine %s finished with %s' % (rcoro.location, msg.args[1][1]))
               else:
                   asyncoro.logger.warning('Remote coroutine %s terminated with "%s"' %
                                           (rcoro.location, str(msg.args[1])))
                status['done'] += 1
                # because jobs are submitted with 'yield' with coroutines,
                # and 'submitted' is incremented after 'yield', it is
                # likely that more than 'njobs' are submitted
                if status['done'] >= njobs and status['done'] == status['submitted']:
                    break
               if status['submitted'] < njobs:
                   # schedule another job at this process
                   asyncoro.Coro(submit_job, rcoro.location)
           elif isinstance(msg, discoro.StatusMessage):
                # a new process is ready (if special initialization is
                # required for preparing process, schedule it)
               if msg.status == discoro.Scheduler.ProcInitialized:
                   asyncoro.Coro(submit_job, msg.location)
           else:
               asyncoro.logger.debug('Ignoring status message %s' % msg)
       yield computation.close()

   if __name__ == '__main__':
       import logging, random
       asyncoro.logger.setLevel(logging.DEBUG)
       computation = discoro.Computation([rcoro_proc])
       # run 10 jobs
       asyncoro.Coro(client_proc, computation, 10)

To test, run "discoronode.py" program on a computer in local network
and this client program.

If the tasks and client don't need to communicate (as in the example
above), it is easier to use dispy project. If the tasks and client
need to communicate, separating scheduler and client would make it
easier. See 'discoro_client*.py' files in the examples directory under
installation directory of asyncoro for additional use cases.
