So Autobus 1 worked great. I designed it, implemented it, and wrote support for it into a lot of applications that I wrote. And all-in-all, it was awesome.

In fact, as of my writing this (2011.09.11, which also happens to be the tenth anniversary of the World Trade Center attacks), Autobus powers nearly all of the automation stuff in my house.

So what's Autobus 2 all about?

Well...

There were a few things that I realized I did wrong in the original Autobus, and a few things that were still missing.

Probably the main thing I did wrong, and the main reason for my starting Autobus 2, is that Autobus has a server that everything connects to, and this server presents a single point of failure.... hm, I just realized a problem with this. If a bus broadcasts when it starts up to figure out what other services are available, and a service is registered locally right after that, chances are that the broadcast indicating that a new local service is available is going to collide with the responses from other devices telling what services they have.

And the responses from said other devices could end up colliding with one another.

Hm, this is an interesting problem.

So I'm thinking, why don't we have all broadcasts indicating that a service is available delayed by a second (or some delay configurable when constructing the Bus that would default to a second) so that it doesn't collide with responses to the "What services are available?" query.



The use of a server also means that each Autobus client has to be configured with the server that it's supposed to connect to, so it's generally impossible for an Autobus client to just appear on a network and not have to have any configuration in order for other things on the network to work with it.

It also means that all traffic is duplicated even when there exists a direct path between the client providing an interface and the client using said interface.

(Oh, and I'm going to start referring to interfaces as services from now on, since that more accurately describes what they do. They'll be referred to as services in the Autobus 2 code as well.)

So, service discovery without knowing a server in advance is missing.

Also missing from Autobus 1 are some additional methods of communication. I've found that functions, events, and objects are all very good at what they need to do, but that there are some other things that would be useful. The two I'm thinking of right now are channels and streams.

Channels would be this thing where a service provides a named channel, and then a client decides that it wants to open a session with said channel. It can do this, at which point there's an established link between the service and the client. If either one dies, the other gets notified that the channel has been closed; the same happens if either end explicitly closes the channel. This is useful for allowing clients to communicate where one side (or both) needs to know when the other side disappears.

Channels exchange JSON objects. Either side can send objects to the other side.

I'm thinking I'll visit streams later. My idea is that they're simply streams of bytes, but these can be emulated using channels whose objects are binary blobs of data for now, so I'm probably going to skip over this for now.

So... Autobus 2 will be distributed.

There won't be a central server. Like at all.

There can be, if services from two different networks need to be bridged. In that case, the central server would simply act as a client to other Autobus-providing things.

So I need to think more about how to do this.

So we want a way to be able to get a list of Autobus services currently available to the network. This could be done by just watching for broadcasts advertizing services and tracking the list based on that, and then removing services when a broadcast stating that the service is about to disappear is received or when it's been more than a few minutes since anything was received from the service.

So now it's becoming more complicated. I need to think things through.

So I'm thinking, what if we only have a socket opened to a particular service when we actually need to do stuff with that service?

Services would broadcast their JSON info objects (which have keys like "name" and a few other fixed things, plus any service-specific stuff such as, for example, "monitor.host" for monitord) every few minutes. They would also make three or four (or some other configurable number) of broadcasts when they're going down so that other clients on the network can remove the services immediately. If a client doesn't receive a broadcast for a service for a certain number of minutes, it removes the service from its list of currently-available services. Established connections to the service are still maintained until they, too, time out.

Services can't change their JSON objects while they're live. They have to die and then come back to change their JSON objects. If an Autobus client receives a broadcast for a service with a changed JSON object, it should remove the service and then recreate it with the new JSON object. It should not destroy any already-existing connections to the service, however, as if the service has really vanished, those connections will time out on their own. (This should never happen anyway, so I'm not particularly worried about the adverse impacts.)

Ok so, that's pretty good for the service discovery protocol. And a particular client only broadcasts when it actually has services that it's providing, and it partially randomizes the time between broadcasts so as to reduce the chance of a conflict between two clients broadcasting at the same time.

So... We have something in a Bus instance that lets clients register functions to be called whenever a service appears or disappears. We have something to obtain a connection to a particular service and then do things with that connection. Services are identified uniquely by a triple consisting of the host to connect to, the port to connect to, and a string identifying the service. Connections, therefore, can be established either to a service discovered via normal UDP broadcast or to a service whose identity-triple is known beforehand; libautobus would provide functions to establish connections to either one.

(I'm thinking that hosts would provide a service named "autobus" that provides functions for asking them what other services they're providing to make it easier for things to connect to something when they only know a host and a port, but I'll add that later.)

So, we can establish connections to a particular host/port/service combination. When that happens, libautobus connects to that host/port/service and sends it the id of the service that we're connecting to. And thus begins a session with the specified service.

So let's see... I'm thinking that the user has to create a Bus instance to do anything, even if they don't want to use service discovery. They could disable the whole service discovery thing if the wanted when they create the Bus instance.

But the reason that I'm thinking a Bus instance should be created no matter what is that each Bus would start a thread which would take care of all of the networking related to the Bus itself; this thread would be the one that runs the select (or epoll) loop. All connections created from a specific bus would use its select loop.

(Autobus 2 is thread-safe just like Autobus is, if you can't tell.)

And service discovery can be disabled when a Bus is created, in which case it won't watch for service broadcasts and it won't send any broadcasts for any services locally registered with it.

But anyways... So let's see...

So, when services appear, they broadcast that they just appeared. They broadcast this a few times.

When services disappear (and they know about it, so this doesn't include when a machine just gets yanked from the network), they also broadcast a few times.

When a bus appears, it sends out a broadcast, querying the network for any services that are available on it. When it receives a response for such a query, it adds that service to its list of available services if said service is not already in the list. It also listens for broadcasts by other services indicating that they are available, and when it receives one, it adds it to the list of things.

It also listens for broadcasts indicating that a service is going away. When it receives such a broadcast, it immediately removes the service from the list of available services.

When it's been some time (this time TBD) since a broadcast was received for a particular service, Autobus tries to connect to it via a TCP socket. If it can't establish such a connection after a certain amount of time (by setting a TCP timeout of, say, 30 seconds), it removes the service from the list of available services.

The program using autobus can register listeners to be notified when services become available or go away. When registering such a listener, it can specify that the listener should also be called for every currently-available service as well, so that if, for example, the listener were used to update a UI list, the UI list would get correctly initially populated. Programs can also register both a service-is-available and a service-is-no-longer-available listener at the same time so that, in the aforementioned UI list scenario, there isn't a time gap between the add listener and the remove listener being registered during which extra, unexpected events might be received. (I'm thinking that there will just be one listener that you pass into such a dual-listener setup, which takes a boolean indicating which of the two things happened, but I can figure that out then.)

When a program decides that it wants to connect to a particular service, it can do so by calling the connect() method on a Service object; this establishes a socket connection to the specified service, throwing an exception if such a connection cannot be established, and returning a Connection instance otherwise.

Connection instances track an individual connection's state. They do not automatically reconnect, but they can be watched to see when they die. (Adding a "listen-for-dying" listener will cause the listener to be fired if the connection is already dead, if a certain parameter is specified when registering the listener.)

A particular connection instance can have functions called on it (synchronously, asynchronously without waiting for a response, and asynchronously where the response causes a particular function to be called with it or another function to be called if an exception is thrown or something else happens), events listened to, objects watched (the listeners for these objects will be called with the object's initial value, and the object's value will appear to be set to None when the connection disconnects), and such.

A connection can, of course, be manually closed.

So that takes care of that bit. Now, what about the stuff that allows connections to be reestablished?

Well, Autobus 2 allows things to move locations, obviously, so we can't have a Connection reestablish itself. Instead, there are proxies that bind to services specified by certain search criteria automatically and act as if they were those services, which provides the reconnecting behavior of Autobus 1. This also allows for proxies to be created that bind to multiple services (instead of just the first one that matches) and expose all of them; functions called on such proxies must be asynchronous (since there's not a single thing on which to call a function; it would also be specifiable what happens if the proxy isn't currently bound to any services, and I'm still debating on whether such a proxy should allow for receiving responses asynchronously or whether it should force responses not to be received), event listening is different in that the listener is notified in some way whenever any of the events on the services fire (along with which service fired the event), and object watching somehow provides a list of objects of all of the bound services and informs when a service is bound or unbound and when objects on each service update.

I haven't yet decided whether functions, events, and objects are in their own namespaces or if they share a namespace.

Single service proxies are interface-compatible with connections; it's only multiple-service proxies that aren't.

Multiple service proxies will provide functions for listening for when the proxy binds to and unbinds from a service. A multiple service proxy's facilities for watching objects will also provide essentially three different things that can be listened for: a new service having that object being made available (which will include the object's value), a service previously having been available's object disappearing due to the service being unbound, and an object provided by one of the bound service's values changing (which will provide the value).

Multiple-service proxies would also allow for events to be listened for. Each firing of an event would report for which bound service the firing was for.

I'll most likely implement multiple-service proxies first since the functionality of single-service proxies can be emulated using them.

Oh, and a single Bus instance will correspond to a single server socket opened for serving services. Each service registered to the bus will be provided using a service id created by combining the date the bus was created with a sequential number.

So I'm thinking that there should be a notion of service discoverers and service publishers. A service discoverer is a thing created for a particular bus. It tells the bus when it sees services become available to it and when it sees services disappear. A bus would, by default, create a single service discoverer that uses the above-described UDP broadcasting protocol to discover services on the local network. (The bus keeps track of which discoverers report which services as available so that the user can tell which discoverer(s) discovered a service, and it also properly handles a service being discovered twice by different discoverers since that could feasibly happen.)

Service publishers are the other end of the scale: they're attached to a bus and get notified when a new service is created for the bus. They can then publish that service as they wish. A default service publisher would be provided that issues UDP broadcasts every minute or two describing the services that it knows about, and also when the bus notifies it that a new service is available or that a service is no longer available.

Publishers and discoverers are installed onto a bus by passing them to the add_discoverer and add_publisher methods. They can be removed using remove_discoverer and remove_publisher. Most publishers and discoverers can only be used once. (add_discoverer and add_publisher call a discoverer or publisher's init function, and remove_publisher and remove_discoverer call a discoverer or publisher's destroy function.)

So, there would be a default discoverer created when constructing a bus that uses the above-described UDP broadcasting thing, watching for UDP broadcasts advertizing services and sending out an initial query for services when it's created. If a service didn't broadcast that it was there for a few minutes, this discoverer would try to use the bus itself to obtain a connection to the service (asynchronously, of course; connecting to a service can be done by passing a callback function when asking the bus to do this), and if it wasn't able to, then it would remove the service from the list of services that are available.

Then there would be a publisher that would send out a broadcast whenever a new service became available and send out another broadcast whenever a service was no longer available. It would also watch for queries asking what services are available and respond with its list of currently-available services, and it would re-broadcast the list of available services every minute or two (but it would randomly delay the broadcast so that multiple services on the same network will have a reduced chance of collision).

So, that covers using services. How about providing them?

Let's see... What if we have a function on a bus called create_service... we pass it an info object for the service, which shouldn't change. We also can optionally pass it a docstring for the service, which can be modified while the service is running via the returned LocalService object's doc attribute (which is actually a Python property). A given service's documentation could be obtained by watching its autobus.doc object.

So, the returned value from create_service is a LocalService object, which contains information about the service's id (which is a string that can't be changed), the bus that created it (which is a reference cycle, yes), the service's info object (which can't be modified), the doc property, a set (Python set) of connections currently connected to the service, and, of course, dictionaries of the service's functions, events, and objects. It also contains two dictionaries, event_listeners and object_watches, each of which map event names and object names to sets of the connection objects that are listening for those events or watching those objects. The reason that these maps are stored on the service instead of on the event or object itself is that listeners and watches can be placed for events and objects when they don't even exist yet; with events, the listeners simply will not receive any firings until the event is created, and with objects, the client will be told when they place the watch whether or not the object currently exists, and when it's created or removed, they will be notified about that (and in the case of it being created, they will be sent its initial value).

So, those original dictionaries, the ones that hold the service's functions, events, and objects... these dictionaries map function, event, and object names to LocalFunction, LocalEvent, and LocalObject objects. These are created by calling their respective create_something function on a LocalService, passing in a name for the function/event/object, an optional (and later-modifiable) docstring for the function/event/object, and some thing-specific values. For objects, a parameter containing the initial value for the object is present. For functions, a parameter containing a Python function or other callable object is passed in, along with a function invocation mode. It can be either SYNC, to cause the function to be invoked synchronously on the I/O thread, THREAD, to cause the function to be invoked in its own thread but the response not sent until the function actually returns, or ASYNC, to cause the function to be run in its own thread but a return value of None to be sent immediately after said thread is started. (If this is None, or not specified, the mode attribute on the underlying function object will be used instead; if such an attribute is not present, THREAD will be used.) Events have no extra values that need to be passed in.

Local functions, events, and objects can be looked up by calling get_something on the LocalService object for those three types.

Those three classes also have remove functions (and so does LocalService) that remove the corresponding function, event, or object (or service) from its containing service (or bus).

All three of those have a name parameter, a doc property which can be modified to change the function's doc, a service attribute corresponding to the service that created the thing, and some thing-specific attributes. Functions have a function attribute which can be modified to change the underlying function that will be called, and a mode attribute representing the function's current mode. (This has exactly the same value as the value passed in when creating the function, and can be reset to null to look up the mode from the underlying function.) Events have a fire function that can be called to trigger the event. Objects have a value property that can be modified to change the value of the object, and they also have a set method that's essentially equivalent to self.value = new_vaue, where new_value is the only parameter to the set function.

When a LocalService is created, it is added to the bus's map of service ids to service objects, and all of the bus's publishers are notified that the service has been added. An object named autobus.objects is created from scratch, then objects autobus.functions, autobus.events, and autobus.doc are created the normal way (as pointed out later on, adding an object the normal way requires autobus.objects to already exist, hence autobus.objects being created manually). These objects are dictionaries mapping function, event, and object names to JSON objects providing more information about them; I'll decide what these contain later on, but at the very least they'll contain a doc attribute which is the documentation for that function, event, or object.

When a LocalService is removed, it is removed from the bus's map of service ids, and all of the bus's publishers are notified that the service has been removed. All current connections to the service (these are stored in a set on the LocalService object) are sent a message indicating that the service is disconnecting, and then they are terminated.

When a LocalFunction, LocalEvent, or LocalObject are created, they're added to their corresponding LocalService dictionary, and then added to their corresponding autobus.something object. In the case of objects, just before being added to the autobus.objects object, a message is sent to all clients watching the object telling them that the object now exists and providing its initial value.

When a LocalFunction is removed, it's removed from the autobus.functions object, and then removed from the LocalService dictionary.

When a LocalEvent is removed, essentially the same thing happens as when a LocalFunction is removed.

When a LocalObject is removed, the same thing happens as with functions and events, except that just after being removed from autobus.objects, all watchers of the object are sent a message indicating that the object no longer exists.

One other thing. ClientConnection objects, which are used to represent inbound connections tied to a particular service, 

Ok, that's the details of providing services. Now I need to go a bit lower-level with actually using services.

The bus can be asked to connect to a particular service, which it does given the service's host, port, and id. It tries to create a socket connection to it, and then returns (or passes to an async handler) one of the above-described Connection objects. This connection is tracked weakly by the bus with a reference to the socket maintained separately (so that it's still strongly-referenceable); when the connection is no longer reachable and gets collected as reported by the weakref pointing to it, the socket will be closed automatically if it has not already been closed.

A particular function can be retrieved from a connection by doing connection["function_name"]. This returns a Function object, which is a callable object. For now, the connection won't track its Function objects; a new one will be returned every time a client requests it. When the function is called, it takes all of the positional arguments that were passed to it and verifies that they can be correctly JSON-encoded, throwing an exception if they can't. Then it creates a function-call message, with its unique id and such. Then what it does is dependent on the type of call.

If the call is a synchronous call, it sends the message as a query (using the specified timeout, or a default if one was not specified). If the result is that the queue is empty, a timeout exception is thrown. If the result is that the connection was lost, a connection lost exception is thrown. If the result was that the remote function threw an exception, then some suitable exception type is thrown. Otherwise, the result is returned.

If the call is an asynchronous call with a callback, the message is sent as an asynchronous query. Asynchronous queries have a function that will be called with the response message when it arrives, or with an instance of TimeoutException if the query timed out, or DisconnectedException if the connection disconnects before the query finishes. The function passed into this query will pass these exceptions through to the function to be called when a response is received (and note that since asynchronous query callbacks are called on the event thread, this will be called on the event thread as well). If it's not an exception that was received, the message is checked to see if the remote side failed with an exception. If it did, an instance of this exception is passed into the callback. Otherwise, the return value is passed into the callback. 

If the call is an asynchronous call without a callback, the message is just sent, not queried. Like Autobus 1, Autobus 2 messages contain a flag indicating, if they're a command, whether a response should be sent; this will be set not to send a response, and the recipient will detect that and call the function accordingly.

Events can be retrieved from a connection by doing connection.events["event_name"]. This will return an Event object. Events, like functions, are one-time things that are created on access.

Events have a listen method and an unlisten method.

When listen is called, passing in a function, the function in question is added to a dictionary stored on the connection that maps event names to sets of functions listening for the event. If such an entry did not already exist in that dictionary, then this must be the first time that the event has been listened for on this connection, so a message is sent to the server telling it that we're interested in listening to this event. Note that the event is added to the newly-created set /before/ the message is sent to the server.

When unlisten is called passing in a function, the function is removed from the dictionary. If it's the last function in the dictionary, then nothing else is listening for the event, so a message is sent to the server telling it that we're no longer interested in the event, and the set is removed from the dictionary.

When a message arrives telling us that the event was triggered, the list of functions is looked up, and each one of them is called. They can have modes set on them just like server-side functions can, although SYNC and ASYNC function the same; THREAD is, of course, the default.

Objects can be retrieved from a connection by doing connection.events["object_name"]. This will return an Object object. Objects, like functions, are one-time things that are created on access.

Objects have three methods: watch, unwatch, and get.

When watch is called, passing in a function, the function is stored in a dictionary just the same as when an event is listened for. If the set had to be newly-created, then a message is sent to the server telling it to watch the specified object. If the set did not have to be newly-created, then the watch function is immediately called, passing in the current value known to the client, or a special value if the client knows that the object does not currently exist.

When unwatch is called, the function is removed from the set in the dictionary. If the set is now empty, a message is sent to the server telling it to stop watching the object, and we remove the now-empty set from the dictionary.

When a new message for the object arrives, the functions for the object are called, passing in the new value of the object, or a special value if the object no longer exists.

When an object's get function is called, a message is sent to the server, asking it for the object's current value. The response value from the server is then returned, or a special value if the server reports that the object does not exist. Like all methods that require blocking for the event thread, get can be called with a callback, in which case the callback is invoked when the response arrives.



So, now I need to figure out how to do reactors, the things that handle select loop things...

So, what do reactors need to do?

Well... what do we need to do in Autobus?

We need to be able to open sockets to specific hosts and ports, and get told when the connection goes through successfully. We need to be able to find out once a line of text has been read from a socket. We need to be able to write lines of text to sockets, without blocking while said lines are written. We need to be able to find out when a particular socket has closed. We need to be able to listen on a particular port for connections and accept those connections and have them registered as their own sockets. We need to be able to schedule tasks that should happen at regular intervals. We need to be able to run things on the select loop thread on request.

So I'm just using threads for now.

So, what messages should the protocol have?

Well...

The protocol is line-oriented; lines, each containing a JSON object, are used as messages.

The message id is stored as a key named _id. Responses to commands always use the same id as the command that caused the response.

The message type is stored as a key named _type, which is 1 for a command, 2 for a response, or 3 for a notice (a command for which the response should be discarded).

The command itself is stored in _command, which is the name of the command. This only exists for commands and notices, not for responses since the client that sent the command should know what command the response is in response to.

In responses, if the command failed, there is a key named _error which contains a JSON object providing more information about the error. Right now, this object contains two keys, type and text; type contains a command-specific machine-readable reason for the error, while text contains a human-readable description of the error. type is optional; text is required.

So what commands are there?

Well...

bind: This command is sent as the first message from a client to a server. It tells it to bind to a particular service. It has one key, service, which is the id of the service to bind to. If the service does not exist, an error of type "no_such_service" is sent, and the connection is closed.

call: This command is sent to call a particular function. It has two keys, name and args; name is the name of the function to call, and args is a list of the arguments to supply to the function. The response is a normal response even if the underlying remote function throws an exception; in that case, it will contain a key, exception, which contains a JSON object which must at least contain a key named text whose value is a human-readable form of the exception; other keys can be present to pass details of the exception specific to the autobus implementation running on the server, and compatible clients and servers can use this to seamlessly translate exceptions. If an exception was not thrown, then the response contains a single key, result, which is the return value of the function. If some other error occurs (such as the function returning a non-json value), then an autobus error response can be sent.

watch: This command is sent to watch a particular object. It has one key, name, which is the name of the object to watch. The response contains either zero keys if the object does not exist on the server at the moment, or one key, value, which contains the current value of the object.

changed: This notice (it must be sent as a notice) is sent from the server to the client when an object that the client is watching has its value change. One or two keys are present: name, which is the name of the object, and value, which is the new value of the object; if this command is being sent because the object was just deleted, then value is not present, so name is the only key.

unwatch: This command has the same syntax as the watch command, but the response is empty. It stops watching an object previously watched with the watch command.

listen: This command is sent to listen to a particular event. It has one key, name, which is the name of the event to listen for. The response is empty.

fired: This notice (it must be sent as a notice) is sent from the server t the client when an event that the client is listening for has fired. One key is present, args, which is a list of the event's arguments.

unlisten: This command is sent to stop listening to a particular event. It has the same syntax as the listen command, and its response is empty.



BIG NOTE HERE: watch and listen are /not/ reentrant. The results are undefined if a watch is issued for an object that is being watched (and, of course, has not been unwatched yet; watching, unwatching, and then watching again is prefectly fine).

So I just decided something else: I'm going to have service ids be unique. And I'll do that by having them contain the date at which they were created, the hostname of the computer publishing them, and a bunch of random data. The point in that is that then services accessible via multiple routes (say, for example, a service available on the loopback adapter, on an ethernet network, on a wi-fi network, and from a bridge, all at the same time) can be properly detected as being the same service instead of four separate services.



So yeah, this is turning out well. Going to think out the stuff inside Bus that deals with discovery...

So, each call to discover and undiscover should specify the discoverer that made the discovery. That allows three things: 1, autobus can ignore things from discoverers that aren't registered to the bus (or ones that have already been removed), 2, multiple discoveries of a single host/port/service combination by several different discoverers can be tracked separately as expected, and 3, multiple discoveries of a host/port/service combination by a single discoverer can be properly ignored (or the info object changed, as necessary).

So, there are two things that users are going to want to access: a list of all host/port/service combinations, and a list of all service ids with one of the host/port combinations at which that particular service can be accessed, with preference going to hosts that are 127.0.0.1.

So, the question is: how should this sort of data be presented to users?

Well... we're going to also want some ability to filter information based on certain criteria relating to the info object.

I'm thinking we need to have a function somewhere (probably a global in autobus2) where you pass it an info object and a search object and it returns whether they match. The idea would be that keys in the filter correspond to keys in the info object and their values are either JSON values (including strings) to require that key in the info object to have that value, autobus2.ANY (which would be a singleton) to indicate that the key simply has to be present, autobus2.NOT_PRESENT to indicate that the key must not be present, and a regex returned from re.compile to require that key to be a string that matches that regex. In the future, I might allow values to be functions which match as long as the function, when called on the actual value, returns True, or for the whole search criteria to be a function, which would be passed the info object and which would return true if it matches.

So we have some mechanism to filter out services based on their info objects. That's good.

Now... I'm thinking that, for now, a service's info object shouldn't change, and if a discoverer reports that it has changed, th... hm, I just realized a problem with things being made aware of only a particular instance of a service: what if the info object for a service is different between two discoverers that reported the info object? This could feasibly happen due to network delays.

Well... Let me think... What sort of things are we going to want to do with this whole system?

Well...

I want to be able to have a program that shows every service available on a network. It should show every route to those services as well, grouped under the service's id.

Then I want to have a program that watches for all services with the key "monitor.host" in their info object and uses a MultipleServiceProxy to watch all of the objects on them that provide CPU information and such. It should only show each service once no matter how many hosts the service is actually available from.

So let's see... For listeners that want to filter services based on info objects but are fine with getting one entry for every route to a service...

So, the idea is that for these, we'd do something like call listen_for_services, passing in a filter and a function. The function would be then called for every service that matches the filter.

When a service becomes available, every function where its info matches the filter is informed that the service is available.

When a service disappears, every function where the service's info object matches the filter is informed that the service disappeared.



Ok, so, I'm going to revisit some of the above stuff later. I got it working so that service discovery works on a basic extent (it doesn't remove the service if it can't find it for a period of time yet, which still needs to be added). And functions can be called. But there's still other stuff to do.

But I just decided that streams, as I mentioned them near the top of the file, are actually going to be different. Or something...

So I'm thinking that channels will be things that exchange JSON objects. I might get rid of this later by adding some sort of session management to Autobus itself, so that servers can tell when clients disconnect, although I might not. Streams, however, would be different.

Streams would be established just like channels and would function like channels, except for two things: the data sent would be binary blobs of data, and more importantly, the data would be sent over UDP.

So other than exchanging JSON objects, the only real difference between channels and streams would be that one uses TCP and the other uses UDP. So maybe I should just make them be the same thing, and the person establishing the channel (or the person receiving the channel; I need to decide which makes more sense) decides whether the channel communicates over TCP or UDP. And if it communicates over TCP, whether the channel runs over its own out-of-band socket, which would allow large amounts of data to be transferred over it without freezing up the main communications socket.

But anyways... Before I implement that, I need to do objects.

Actually, before I do objects, I need to fix up service discovery so that it removes services that haven't broadcasted their existence in a while.

Ok so that's done. Now, what's next?

So it looks like I sort of forgot to add the code into the Bus class that actually tracks which services have been discovered thus far. So I should probably go add that.

So let's see... There can be multiple discoveries for a particular service id, even from the same discoverer (BroadcastDiscoverer will discover the same service twice if it's running on the local computer; once through the loopback adapter and once through the normal network adapter). So we need to account for that. And one (or more) of the discoveries for a particular service can disappear at any time; the normal network adapter one might disappear, for example, if the network interface goes down, leaving only the loopback discovery of the service available.

So we need to account for that. So it seems like there should be two types of listeners that we can register: listeners that are notified whenever a discovery or an undiscovery are made, and listeners that are notified whenever a new service becomes available, a service disappears, and a service changes to using a different host and port. The former listeners would be called in exactly the same circumstances that the discoverers themselves call bus.discover and bus.undiscover; the latter listeners would be called when a service that had not already been discovered is discovered, the last discovery of a particular service is undiscovered, and whatever is stored as the current discovery to use (Autobus will use the first discovery made for a particular service for now) is undiscovered and as such a different discovery of the service must be used, respectively.

It's also possible to specify info object filters when registering either of the above types of listeners. Those listeners will then only be called for events relating to services whose info objects match the specified criteria. Service info objects are not allowed to change over the lifetime of the service; when the first discovery is made for a particular service, Autobus stores the info object, and uses it for all other discoveries of the service until the service is undiscovered regardless of the info object reported by the discoverer in case someone on the network is trying to mess things up by broadcasting different info objects for the same service.

It's also possible to specify when a listener is registered that it should be called at the time of registration for all services/discoveries currently available. Single service and multiple service proxies would do something like that so that they can bind themselves immediately to any services that are currently available.

So I'm going to go implement that and see how it goes.

Well... Let's see how I would implement that...

So we have a map stored inside the bus that maps services ids that have been discovered to... what, exactly?

Well, we need to be able to track all the discoveries for a particular service.

So what if we have the value be a set of (host, port) tuples?

That should work.

Hm... We need a way to keep track of the discoverer that issued a particular discovery...

Ok, what if we have the map be a map of service ids to maps that map (host, port) tuples to lists of discoverers that discovered that service id, host, port combination?

Except that they should be ordered maps because then discoveries of services will be used in... no, that doesn't work, because we need to be able to specify a default discovery for a particular service...

Ok, what if the map maps service ids to DiscoveredService instances?

DiscoveredService would contain an attribute called default that holds a (host, port) tuple. This is the default discovery that is to be used for this service. Then DiscoveredService holds a map of (host, port) tuples to lists of discoverers that have discovered this (host, port) combination.

So now we can get to the logic of the whole system.

Hm... Let me think how discovery listeners and service listeners are going to work...

What if we have discovery_listener(service_id, host, port, event)...  event could then be one of DISCOVERED or UNDISCOVERED.

And then we have service_listener(service_id, host, port, event), and event would be DISCOVERED, CHANGED, or UNDISCOVERED. host and port wouldn't be specified for UNDISCOVERED.



Ok so I got that all implemented, and added an additional parameter after port, info, which is the service's info object. So no there are two things that need to be done: events and objects, and service proxies. Which one do I want to do first?

Hm...

I'm thinking service proxies since they're the thing that I need most right now.

So let's do some thinking...

So I just added a thing to connections that allows a listener to be added to them that is called when the connection is closed. That's going to be really useful.

So let's see...

I'm thinking that I don't want to duplicate the proxy code for single service proxies and multiple service proxies, so I'm thinking that I'm going to implement them as the same class and have it be in different modes.

So... The code to keep a connection established... And the other code...

I'm starting to think that maybe what I should do is write a ConnectionManager thing where you tell it the host/port/service combinations to connect to and it keeps connections open to them, reestablishing them when needed.

Hm... Man, this is getting difficult...

The main issue is how to keep connections open to all of the services that the proxy is supposed to be keeping connections open to and how to automatically re-watch things and such when things need to be watched and such.

Ok, let me see if there's any way I can make this whole thing simpler...

So what if I wrote a class called ConnectionManager that works as I mentioned above?

It would let you know when it's opened a new connection, and it would let you know when it's lost a connection.

It would let you ask it for a list of all opened connections.

It would let you tell it the host/port/server combinations to create or remove connections for.

It would take care of closing connections that you no longer want it to connect to.

(Oh, and connection closure, I'm thinking, will be synchronized on a particular lock on connections, and the various startup actions performed by things using ConnectionManagers would likewise synchronize on this lock so as to not throw exceptions until after if the connection closes itself prematurely. Or something...)

So let me go try writing that and see how far I get and what roadblocks I run into.



So, having started that, I'm sort of thinking that I should rewrite connections to automatically reconnect themselves. I think that will work better.

Then the ConnectionManager simply creates and removes connections as needed but doesn't have to reconnect them.

That would also allow the single service proxy to not even need a connection manager.



Hm... So should I use three fields, alive, connected, and ready?

I dunno, this is getting super super complicated...

Ok, there has to be a better way to do this...

So I'm wondering if using Erlang-style message passing in the core of everything would help any...

Let's think... So yeah, I think I'm going to try to convert the internals of connections to be based on message passing.

So let's see... How would that work?

Well... I would still have an input thread and an output thread. Those already use message passing of a sort, so they can work into this architecture already without needing any changes.

So let me think... We'd have a central thread started up for every Connection object. It would have a queue that listens for messages being sent to the thread, and that thread's sole purpose in life would be to read off messages and do things with them.

Messages would be sent to this thread by any of a number of things happening: a new piece of input data arriving from the remote end of the connection, the connection being lost, a local thread calling a function (which would cause a "call this function" sort of message to be sent off to the remote end, and a passed-in queue to be notified when the response arrives via another message to the connection thread), a local thread adding an observer to or removing an observer from an object or an event, a function whose calling policy is THREAD returning (this would cause a message to be delivered to the connection, which would, in turn, forward the message to the remote end if a disconnect/reconnect had not happened in the mean time), and so on.

I think this would work.

So then messages are tuples (name, ...) where name is the name of the message, which for now would just be a string although I might change it to be an instance of Singleton in the future.

Hm... How does this work in with attempting to connect to the server?

Maybe we spawn a thread to connect to the server, and when it succeeds, fails, or times out, it sends a message back to us.

Ok, this is getting really complicated really fast.

Let me go look at how connections are implemented right now and see why all this complication was necessary...

Ok, I think we can accomplish this all using locks. So when we're attempting to connect, we don't lock on the connection's lock until after we've managed to successfully connect. Then we start the input thread and the output thread but don't actually store them in the connection's fields yet. Then we send a thing to the output thread to send to the server telling it the service we want to bind to. We've stuck a... hm

This is going to require the ability to swap out the function that an input thread calls when it has data available, so that we can set it to a function that goes and stuffs the first thing received back into a queue, since that'll be the response to our request to bind to the service... And then that function would set it to None which would cause the input thread to block up until the other code finishes initializing and sets the normal receive function into it...

So let's imagine the input thread has a lock and a condition associated with said lock. The input thread reads a packet, then acquires the lock and checks to see if there's a function available. If there's not, it waits on the condition, then checks again to see if the function's available, and so on until a function is available. Then it unlocks the lock, calls that function with the received packet, then receives another packet from the network and starts over again.

Then, in the thing where we set the function to be called, we lock on the lock, then set the function to the function passed in, then notify the condition, then unlock.

So under normal conditions, we go through, lock, see that the function's avilable, get our local copy of it, unlock, and call it.

Now let's say in the process of calling the function, or while we're waiting for a new packet to arrive, the function gets set to None. The condition is notified, which does nothing since the input thread isn't waiting on it. Then the input thread receives another packet, acquires the lock, and checks to see if a function is available. It isn't, so it waits on the condition. While it's waiting, something else goes and sets the function to have an actual value, which notifies the condition. The input thread wakes up, sees that there's a value, and uses it.

I've gone through this over and over in my head and I can't see any race conditions, so this looks like it'll work.



Ok so I've done that. Now let's get back to connections...

So once we've been able to make a socket connection, we start up an input thread and an output thread. Then we send a bind message to the output thread, then set the input thread's function to a function that stuffs the result in a queue and sets the input thread's function to None so that only one received packet will be stuffed into the queue. Then we go wait with a timeout of, say, 10 seconds on the queue for the response. If the response was a failure (or None, which would happen if the connection was lost before the response arrived, or if the query on the queue timed out), we go set the input thread's function to a function that discards everything so that the input thread will eventually terminate, close the socket (and the output thread), and act as if the connection attempt completely failed (because it did) and increase the amount of time we wait for the next connection attempt.

If, on the other hand, the bind succeeds, the connection thread then locks on the connection, sends all of the initial messages (object and event watches and such), and sets the output thread and the socket and stuff into the connection object, and then sets the input thread's function to be whatever Connection's function for dealing with inbound messages is. The connection thread then unlocks the connection's lock and dies.

Ok, I think that'll work. Now to implement it...

Yep, that worked spectacularly. And in the process, I condensed a bunch of code in the Connection class so that it's now something like two thirds of how long it originally was, and works just as well. So basically, a lot more maintainable. Some of the semantics have changed, but that's all been accounted for, and ended up lending itself to further condensation.

So with that done, I can now start work on service proxies.

So let's see... I think I'm going to try to merge single service proxies and multiple service proxies together, into a class called ServiceProxy. It'll have a constructor parameter indicating whether it should be in single mode or multiple mode. (I would dearly love to call this promiscuous mode, but I'm fairly certain I'll end up confusing a lot of network administrators trying to understand Autobus since NICs already use the term for something completely different... Ah well...)

So what exactly does ServiceProxy do?

Well... Not really that much, thankfully. Having connections be persistent and having the bus deal with the logic of discovering services makes ServiceProxy's task a lot simpler.

Oh, I'm thinking that each service proxy will have a mutex that it locks on when it's doing what it's doing, and it passes that mutex to every connection it creates so that all the connections are using the same mutex. That way it could lock on that mutex when, for example, a function's called, and then scan through the list of connections that are currently open and call the function on those connections. That would prevent situations where a connection disconnects in the middle of all that, so the actual function call on the connection ends up throwing a NotConnectedException.

So let's see... A service proxy registers a listener on the bus, listening for when services are discovered, changed, or removed. It treats changed notices the same as undiscovers followed by discovers.

So when a new discovery is made, a connection is created and put into the proxy's map of service ids to connections, and all event listeners and object watches currently present on the proxy are added to it.

When a service is removed, the corresponding connection in the proxy's map of service ids to connections is closed.

Wait a sec, I'm describing a multiple service proxy, not a proxy that can work as either one...

Maybe they should be different...

They just share a lot in common, particularly in terms of how they work... And I'd thought of implementing single service proxies on top of multiple service proxies, but then there's all those wasted connections to all of the services when only one needs to be maintained...

Ok, I'm still going to write them together, but I need to put a bit more thought into how it's done.

Actually no...

I'm really tempted to just go write them separately and then see what code they have in common, and merge the pieces of them that I can together and see how it goes.

Yeah, I'm going to do that.

So first I'll write SingleServiceProxy since I think that will be the easiest, and then I'll go from there.

So... When a single service proxy is created, it adds listeners to the bus listening for service changes and such.

So... It keeps track of a map of service ids to the hosts and ports that they're currently available on. It updates this whenever the bus tells it that discovery info has changed.

It also keeps track of a single connection.

When a discovery happens, it adds it to the map, then checks to see if the connection is null. If it is, it opens a connection to the service just discovered and sets that as the connection.

When a change happens, it updates the map, then checks to see if the connection is for the service that changed. If it is, the connection is closed and a connection to the service's new location opened.

When a removal happens, the corresponding service is removed from the map. Then, if the connection is to that service, the connection is closed, and a connection to one of the other services in the map opened. (This new service is essentially picked at random.)

When a function is asked for, a SingleServiceFunction is returned. When such a function is called, it checks to see if there's currently a connection (while locked on the single service proxy's lock), and if there's not, it throws an exception, and if there is, it stores it in a local variable and unlocks. It then calls the function and returns whatever the function returns.

I think that's it to start with. Let's see if it works.

Ok, that worked wonderfully well.

This also makes me think that lots of things are going to want to be able to act as service-like thingies, where you can ask the for functions, events, and objects, and so on, and maybe I should create a common superclass that implements a lot of that automatically... Connection, SingleServiceProxy, and MultipleServiceProxy could all extend it... (And I'm also thinking of creating a common superclass for those things that have reentrant __enter__ and __exit__ functions that simply call close() on the last exit...)

So I'm going to go write the automatic close thingie first...

That also worked wonderfully well. And I've converted Bus, Connection, and SingleServiceProxy over to use it.



Hm, I'm not sure if I mentioned this or not, but I got MultipleServiceProxy working as well.

So now I'm working on getting objects working.

And I'm having a hard time of it.

Basically, I'm trying to figure out how to properly synchronize everything on the client side.

So what does the object system need?

And what things do the object system and the event system have in common?

Well... actually, the event system is going to be easier to implement because it doesn't have all of these synchronization issues since events don't have an initial value that needs to be conveyed to things listening for it.

So with objects...

Watch and unwatch commands sent to the remote end are /not/ reentrant, so we have to locally track whether we've already submitted a request to watch an object or not.

So, we have a map that maps object names to lists of functions listening for changes to that object.

And then presumably we have another map that maps object names to their known current values. This only holds values for objects that have been watched and whose response has arrived.

So the main problem I'm seeing is when we try to unwatch an object. This happens when all functions watching the object have been removed. The problem is how to deal with instances where more functions are added to watch the object, with the unwatch response not having yet arrived and a potential number of changed responses queued up for us to receive, which could cause object change notifications to these functions that are inaccurate.

Maybe we should just say screw it, an occasional notification that an object has become null is a small price to pay and just run with it.

So in that case, the object_values map would only be altered by inbound changed commands and watch and unwatch responses. A changed command would cause the corresponding value in object_values to change to the new object value as reported by the server, as would a watch response. An unwatch response would delete the entry in object_values.

So whenever a changed command or watch/unwatch response is received, we update object_values, then call all of the listeners we currently have and tell them about the change.

Then when we add a new listener, if it's the first listener, we send a watch command. And when we remove a listener, if it's the last listener, we send an unwatch command.

Hm... How does this play in with connections being disconnected and having to reconnect?

Well... Let's say that when a connection disconnects, we clear out object_values, then scan through the list of listeners and notify them that the object has gone away..

Then, when a connection reconnects, we look at all of the objects that we're currently listening for and send out watches for them. When the watch responses arrive, the listeners will be duly notified.

That should work.

So let's see if we can implement that.



Ok, so I just did a ton of stuff. I basically got objects working and then wrote support for them into MultipleServiceProxy. So objects work (and I'll be writing support for them into SingleServiceProxy soon).


So now I'm thinking I need to create the introspection service. There will be one introspection service per autobus2.Bus instance (other languages can choose whether to create one introspection service per bus or one introspection service per user-created service; either's fine, but the Python version uses the former since the introspection service is built to be able to provide information about multiple services at a time).

The introspection service will have an info object {"type": "autobus"}. It will publish the following functions:...

hm...

I'm thinking this will be a lot easier if I have special autobus.something (or autobus_something) functions/events/objects on every service...

Hm... So what are the advantages and disadvantages of the approaches?

Let's see... If we have an introspection service, we can... You know what, I'm going to have an introspection service. I think this will make things easier.

I do think that I should probably have every service publish the service id of their introspection service in their info object... That might come in useful, although I don't yet have a use for it myself, so maybe I should wait on it...

Anyways... So when a new Bus instance is created, it creates a service for itself as mentioned above. It then creates some functions, events, and objects which I'll get into in a bit.

So then when a service is created... Hang on, let me go add a thing to local services to allow them to be closed, which deletes them and stuffs, cause that's going to be useful in the future, and it's going to tie into how I implement the introspection service...

Ok, that's done. So now... When we create a Bus, we go create a service as mentioned above. We then create some functions, events, and objects that I'll describe in a bit. We then store this service on the bus as a field.

When someone asks us to create a service, we duly create it. Then we go and tell the introspection service's objects that they've changed (since they're virtual objects) and to go update themselves. If the service we're creating is the introspection service itself (which we can tell by there being no other services yet existing), then we skip this step... TODO: when do we actually do this step then?

Man, if I'd gotten the KVO stuff working, this'd be a lot easier... Maybe I should go write that first...

Actually no, I'm going to get Autobus objects working. So I just added a thing to allow Autobus objects to be virtual, where a function is invoked to get their value when Autobus needs it. That sorts out all of these issues, since I can just write that function to introspect the currently-active services on the bus and then once the introspection service itself is set up I can tell those objects that they've changed and so to go call that method again, which solves the bootstrapping problem.

And speaking of that, I need to go create a thing to copy objects passed to local.LocalObject.set_value to prevent them from being accidentally modified by outside code...

Ok, that's done. Now to creating the introspection service...









































