"router" is a Go package for peer-peer pub/sub message passing. The basic usage is to attach a send channel to an id in router to send messages, and attach a recv channel to an id to receive messages. If these 2 ids match, the messages from send channel will be "routed" to recv channel, e.g.
rot := router.New(...)
chan1 := make(chan string)
chan2 := make(chan string)
chan3 := make(chan string)
rot.AttachSendChan(PathID("/sports/basketball"), chan1)
rot.AttachRecvChan(PathID("/sports/basketball"), chan2)
rot.AttachRecvChan(PathID("/sports/*"), chan3)
We can use integers, strings, pathnames, or structs as Ids in router (maybe regex ids and tuple id in future).
we can connect two routers so that channels attached to router1 can communicate with channels attached to router2 transparently.
chans.go chanset.go dispatcher.go filtrans.go flowctrl.go id.go logfault.go marshaler.go msg.go notifier.go proxy.go routedchan.go router.go stream.go
const (
MemberLocal = iota //peers (send chans and recv chans) are from the same router
MemberRemote //peers (send chans and recv chans) are from diff routers
NumMembership
)
Membership identifies whether communicating peers (send chans and recv chans) are from the same router or diff routers
const (
ScopeGlobal = iota // send to or recv from both local and remote peers
ScopeRemote // send to or recv from remote peers
ScopeLocal // send to or recv from local peers
NumScope
)
Scope is the scope to publish/subscribe (or send/recv) msgs
const (
ConnId = iota //msgs for router connection
DisconnId //msgs for router disconnection
ErrorId //msgs sent when one side detect errors
ReadyId //msgs sent when router's chans ready to recv more msgs
PubId //send new publications (set<id, chan type info>)
UnPubId //remove publications from connected routers
SubId //send new subscriptions (set<id, chan type info>)
UnSubId //remove subscriptions from connected routers
NumSysIds
)
Indices for sys msgs, used for creating SysIds
const (
RouterLogId = NumSysIds + iota
RouterFaultId
NumSysInternalIds
)
Some system level internal ids (for router internal logging and fault reporting)
const (
DefLogBufSize = 256
DefDataChanBufSize = 32
DefCmdChanBufSize = 64
UnlimitedBuffer = -1
)
Default size settings in router
var IntSysIdBase int = -10101 //Base value for SysIds of IntId
define 8 system msg ids
var PathSysIdBase string = "/10101" //Base value for SysIds of PathId
define 8 system msg ids
var StrSysIdBase string = "-10101" //Base value for SysIds of StrId
define 8 system msg ids
func Broadcast(v reflect.Value, recvers []*RoutedChan)
Simple broadcast is a plain function
func ExportedId(id Id) bool
A function used as predicate in router.idsForSend()/idsForRecv() to find all ids in a router's namespace which are exported to outside
func KeepLatestBroadcast(v reflect.Value, recvers []*RoutedChan)
KeepLastBroadcast never block. if running out of Chan buffer, drop old items and keep the latest items
func MemberString(m int) string
return the string values of membership
func ScopeString(s int) string
return string values of Scope
type BindEvent struct {
Type BindEventType
Count int //total attached
}
a message struct containing information for peer (sender/recver) binding/connection. sent by router whenever peer attached or detached.
Type: the type of event just happened: PeerAttach/PeerDetach/EndOfData Count: how many peers are still bound now
type BindEventType int8
const (
PeerAttach BindEventType = iota
PeerDetach
EndOfData
)
type ChanDirection int
const (
Send ChanDirection = iota
Recv
)
type ChanInfo struct {
Id Id
ChanType reflect.Type
ElemType *chanElemTypeData
}
a message struct holding information about id and its associated ChanType
func (ici ChanInfo) String() string
type ChanInfoMsg struct {
Info []*ChanInfo
}
a message struct for propagating router's namespace changes (chan attachments or detachments)
type ChanReadyInfo struct {
Id Id
Credit int
}
recver-router notify sender-router which channel are ready to recv how many msgs
func (cri ChanReadyInfo) String() string
type ChanState interface {
Type() reflect.Type
Interface() interface{}
IsNil() bool
Cap() int
Len() int
}
basic chan state
type Channel interface {
ChanState
Sender
Recver
}
Channel interface defines functional api of Go's channel: based on reflect.Value's channel related method set allow programming "generic" channels with reflect.Value as msgs add some utility Channel types
type ConnInfoMsg struct {
ConnInfo string
Error string
Id Id
Type string //async/flowControlled/raw
}
a message struct containing information about remote router connection
type ConnReadyMsg struct {
Info []*ChanReadyInfo
}
type Demarshaler interface {
Demarshal(interface{}) error
}
the common interface of all demarshaler such as GobDemarshaler and JsonDemarshaler
type DispatchFunc func(v reflect.Value, recvers []*RoutedChan)
DispatchFunc is a wrapper to convert a plain function into a dispatcher
func (f DispatchFunc) Dispatch(v reflect.Value, recvers []*RoutedChan)
type DispatchPolicy interface {
NewDispatcher() Dispatcher
}
The programming of Dispatchers: 1. do not depend on specific chan types 2. messages sent are represented as reflect.Value 3. receivers are array of RoutedChans with Channel interface of Send()/Recv()/...
DispatchPolicy is used to generate concrete dispatcher instances. For the kind of dispatcher which has no internal state, the same instance can be returned.
var BroadcastPolicy DispatchPolicy = PolicyFunc(func() Dispatcher { return DispatchFunc(Broadcast) })
BroadcastPolicy is used to generate broadcast dispatcher instances
var KeepLatestBroadcastPolicy DispatchPolicy = PolicyFunc(func() Dispatcher { return DispatchFunc(KeepLatestBroadcast) })
KeepLatestBroadcastPolicy is used to generate KeepLatest broadcast dispatcher instances
var RandomPolicy DispatchPolicy = PolicyFunc(func() Dispatcher { return NewRandomDispatcher() })
RandomPolicy is used to generate random dispatchers
var RoundRobinPolicy DispatchPolicy = PolicyFunc(func() Dispatcher { return NewRoundrobin() })
RoundRobinPolicy is ued to generate roundrobin dispatchers
type Dispatcher interface {
Dispatch(v reflect.Value, recvers []*RoutedChan)
}
Dispatcher is the common interface of all dispatchers
type FaultRaiser struct {
sync.Mutex
// contains filtered or unexported fields
}
FaultRaiser can be embedded into user structs/ types, which then can call Raise() directly
func NewFaultRaiser(id Id, r Router, src string) *FaultRaiser
create a new FaultRaiser to send FaultRecords to id in router "r"
func (l *FaultRaiser) Close()
func (l *FaultRaiser) Init(id Id, r Router, src string) *FaultRaiser
func (r *FaultRaiser) Raise(msg error)
raise a fault - send a FaultRecord to faultId in router
type FaultRecord struct {
Source string
Info error
Timestamp int64
}
FaultRecord records some details about fault
type FlowControlPolicy interface {
NewFlowSender(ch Channel, args ...interface{}) (FlowSender, error)
NewFlowRecver(ch Channel, ack func(int), args ...interface{}) (FlowRecver, error)
//Stringer interface, return name of FlowControlPolicy
String() string
}
FlowControlPolicy: implement diff flow control protocols. a flow control protocol has two parts:
sender: which send msgs and recv acks from recver. recver: which recv msgs and send acks to sender.
So besides wrapping a transport Channel (for send/recv msgs)
FlowSender expose Ack(int) method to recv acks FlowRecver constructor will take as argument a ack(int) callback for sending acks.
The following Windowing and XOnOff protocols are copied from Chapter 4 of "Design And Validation Of Computer Protocols" by Gerard J. Holzmann.
var WindowFlowController FlowControlPolicy = WindowFlowControlPolicy(0)
var XOnOffFlowController FlowControlPolicy = &XOnOffFlowControlPolicy{0.75, 0.25}
type FlowRecver interface {
Channel
}
type FlowSender interface {
Channel
Ack(int)
}
type Id interface {
//methods to query Id content
Scope() int
Member() int
//key value for storing Id in map
Key() interface{}
//for id matching
Match(Id) bool
MatchType() MatchType
//Generators for creating other ids of same type. Since often we don't
//know the exact types of Id.Val, so we have to create new ones from an existing id
SysID(int, ...int) (Id, error) //generate sys ids, also called as method of Router
SysIdIndex() int //return (0 - NumSysInternalIds) for SysIds, return -1 for others
Clone(...int) (Id, error)
//Stringer interface
String() string
}
Id defines the common interface shared by all kinds of ids: integers/strings/pathnames...
var (
DummyIntId Id = &IntId{Val: -10201}
DummyStrId Id = &StrId{Val: "-10201"}
DummyPathId Id = &PathId{Val: "-10201"}
DummyMsgId Id = &MsgId{Val: MsgTag{-10201, -10201}}
)
Some dummy ids, often used as seedId when creating router
func IntID(args ...interface{}) Id
IntId constructor, accepting the following arguments: Val int ScopeVal int MemberVal int
func MsgID(args ...interface{}) Id
MsgId constructor, accepting the following arguments: Family int Tag int ScopeVal int MemberVal int
func PathID(args ...interface{}) Id
PathId constructor, accepting the following arguments: Val string (path names, such as /sport/basketball/news/...) ScopeVal int MemberVal int
func StrID(args ...interface{}) Id
StrId constructor, accepting the following arguments: Val string ScopeVal int MemberVal int
type IdFilter interface {
BlockInward(Id) bool
BlockOutward(Id) bool
}
IdFilter: the common interface of filters. concrete filters should be defined by apps with app-specific rules. if no filter defined, there is no id filtering. 1. bound with specific proxy 2. defines which ids can pass in / out to router thru this proxy 3. only filter the ids of application msgs (NOT system msgs), only involved in processing namespace change msgs: PubId/SubId 4. by default, if no filter is defined, everything is allowed 5. filters are used against ids in local namespace, not translated ones
type IdTranslator interface {
TranslateInward(Id) Id
TranslateOutward(Id) Id
}
IdTransltor: the common interface of translators. concrete transltors should be defined by apps with app-specific rules. if no translator defined, there is no id transltions. 1. bound with specific proxy 2. translate ids of in / out msgs thru this proxy, effectively "mount" the msgs thru this proxy / conn to a subrange of router's id space 3. only translate the ids of application msgs (NOT system msgs), and it will affect the ids of every app msgs passed thru this proxy - must be highly efficient 4. by default, if no translator is defined, no translation
type IntId struct {
Val int
ScopeVal int
MemberVal int
}
Use integer as ids in router
func (id IntId) Clone(args ...int) (nnid Id, err error)
func (id IntId) Key() interface{}
func (id1 IntId) Match(id2 Id) bool
func (id IntId) MatchType() MatchType
func (id IntId) Member() int
func (id IntId) Scope() int
func (id IntId) String() string
func (id IntId) SysID(indx int, args ...int) (ssid Id, err error)
func (id IntId) SysIdIndex() int
type LogPriority int
const (
LOG_INFO LogPriority = iota
LOG_DEBUG
LOG_WARN
LOG_ERROR
)
func (lp LogPriority) String() string
type LogRecord struct {
Pri LogPriority
Source string
Info interface{}
Timestamp int64
}
LogRecord stores the log information
type LogSink struct {
// contains filtered or unexported fields
}
A simple log sink, showing log messages in console.
func NewLogSink(id Id, r Router) *LogSink
create a new log sink, which receives log messages from id in router "r"
func (l *LogSink) Close()
func (l *LogSink) Init(id Id, r Router) *LogSink
type Logger struct {
sync.Mutex
// contains filtered or unexported fields
}
Logger can be embedded into user structs / types, which then can use Log() / LogError() directly
func NewLogger(id Id, r Router, src string) *Logger
NewLogger will create a Logger object which sends log messages thru id in router "r"
func (l *Logger) Close()
func (l *Logger) Init(id Id, r Router, src string) *Logger
func (l *Logger) Log(p LogPriority, msg interface{})
send a log record to log id in router
func (l *Logger) LogError(err error)
send a log record and store error info in it
type Marshaler interface {
Marshal(interface{}) error
}
the common interface of all marshaler such as GobMarshaler and JsonMarshaler
type MarshalingPolicy interface {
NewMarshaler(io.Writer) Marshaler
NewDemarshaler(io.Reader) Demarshaler
Register(interface{})
}
the common interface of all Marshaling policy such as GobMarshaling and JsonMarshaling
var GobMarshaling MarshalingPolicy = &gobMarshalingPolicy{registry: make(map[interface{}]bool)}
use package "gob" for marshaling
var JsonMarshaling MarshalingPolicy = jsonMarshalingPolicy(1)
use package "json" for marshaling
type MatchType int
MatchType describes the types of namespaces and match algorithms used for id-matching
const (
ExactMatch MatchType = iota
PrefixMatch // for PathId
AssocMatch // for RegexId, TupleId
)
type MsgId struct {
Val MsgTag
ScopeVal int
MemberVal int
}
func (id MsgId) Clone(args ...int) (nnid Id, err error)
func (id MsgId) Key() interface{}
func (id1 MsgId) Match(id2 Id) bool
func (id MsgId) MatchType() MatchType
func (id MsgId) Member() int
func (id MsgId) Scope() int
func (id MsgId) String() string
func (id MsgId) SysID(indx int, args ...int) (ssid Id, err error)
func (id MsgId) SysIdIndex() int
type MsgTag struct {
Family int //divide all msgs into families: system, fault, provision,...
Tag int //further division inside a family
}
Use a common msgTag as Id
var MsgSysIdBase MsgTag = MsgTag{-10101, -10101} //Base value for SysIds of MsgId
define 8 system msg ids
func (t MsgTag) String() string
type PathId struct {
Val string
ScopeVal int
MemberVal int
}
Use file-system like pathname as ids PathId has diff Match() algo from StrId
func (id PathId) Clone(args ...int) (nnid Id, err error)
func (id PathId) Key() interface{}
func (id1 PathId) Match(id2 Id) bool
func (id PathId) MatchType() MatchType
func (id PathId) Member() int
func (id PathId) Scope() int
func (id PathId) String() string
func (id PathId) SysID(indx int, args ...int) (ssid Id, err error)
func (id PathId) SysIdIndex() int
type PolicyFunc func() Dispatcher
func (f PolicyFunc) NewDispatcher() Dispatcher
type Proxy interface {
//Connect to a local router
Connect(Proxy) error
//Connect to a remote router thru io conn
//1. io.ReadWriteCloser: transport connection
//2. MarshalingPolicy: gob or json marshaling
//3. remaining args can be a FlowControlPolicy (e.g. window based or XOnOff)
ConnectRemote(io.ReadWriteCloser, MarshalingPolicy, ...interface{}) error
//close proxy and disconnect from peer
Close()
//query messaging interface with peer
LocalPubInfo() []*ChanInfo
LocalSubInfo() []*ChanInfo
PeerPubInfo() []*ChanInfo
PeerSubInfo() []*ChanInfo
}
Proxy is the primary interface to connect router to its peer router. At both ends of a connection, there is a proxy object for its router. Simple router connection can be set up thru calling Router.Connect(). Proxy.Connect() can be used to set up more complicated connections, such as setting IdFilter to allow only a subset of messages pass thru the connection, or setting IdTranslator which can "relocate" remote message ids into a subspace in local router's namespace, or setting a flow control policy. Proxy.Close() is called to disconnect router from its peer.
func NewProxy(r Router, name string, f IdFilter, t IdTranslator) Proxy
Proxy constructor. It accepts the following arguments:
1. r: the router which will be bound with this proxy and be owner of this proxy 2. name: proxy's name, used in log messages if owner router's log is turned on 3. f: IdFilter to be installed at this proxy 4. t: IdTranslator to be installed at this proxy
type RandomDispatcher rand.Rand
Random dispatcher
func NewRandomDispatcher() *RandomDispatcher
func (rd *RandomDispatcher) Dispatch(v reflect.Value, recvers []*RoutedChan)
type RecvChan interface {
ChanState
Recver
}
RecvChan: recving end of Channel (<-chan)
type Recver interface {
Recv() (reflect.Value, bool)
TryRecv() (reflect.Value, bool)
}
common interface for msg recvers recvers will check if channels are closed or not
type Roundrobin int
Roundrobin dispatcher will keep the "next" index as its state
func NewRoundrobin() *Roundrobin
func (r *Roundrobin) Dispatch(v reflect.Value, recvers []*RoutedChan)
type RoutedChan struct {
Kind ChanDirection
Id Id
Channel //external SendChan/RecvChan, attached by clients
// contains filtered or unexported fields
}
RoutedChan represents channels which are attached to router. They expose Channel's interface: Send()/TrySend()/Recv()/TryRecv()/... and additional info:
1. Id - the id which channel is attached to 2. NumPeers() - return the number of bound peers 3. Peers() - array of bound peers RoutedChan 4. Detach() - detach the channel from router
func (e *RoutedChan) Close()
override Channel.Close() method
func (e *RoutedChan) Detach()
func (e *RoutedChan) Interface() interface{}
func (e *RoutedChan) NumPeers() int
func (e *RoutedChan) Peers() (copySet []*RoutedChan)
type Router interface {
//---- core api ----
//Attach chans to id in router, with an optional argument (chan *BindEvent)
//When specified, the optional argument will serve two purposes:
//1. used to tell when the remote peers connecting/disconn
//2. in AttachRecvChan, used as a flag to ask router to keep recv chan open when all senders close
//the returned RoutedChan object can be used to find the number of bound peers: routCh.NumPeers()
AttachSendChan(Id, interface{}, ...interface{}) (*RoutedChan, error)
//3. When attaching recv chans, an optional integer can specify the internal buffering size
AttachRecvChan(Id, interface{}, ...interface{}) (*RoutedChan, error)
//Detach sendChan/recvChan from router
DetachChan(Id, interface{}) error
//Shutdown router, and close attached proxies and chans
Close()
//Connect to a local router
Connect(Router) (Proxy, Proxy, error)
//Connect to a remote router thru io conn
//1. io.ReadWriteCloser: transport connection
//2. MarshalingPolicy: gob or json marshaling
//3. remaining args can be a FlowControlPolicy (e.g. window based or XOnOff)
ConnectRemote(io.ReadWriteCloser, MarshalingPolicy, ...interface{}) (Proxy, error)
//--- other utils ---
//return pre-created SysIds according to the router's id-type, with ScopeGlobal / MemberLocal
SysID(idx int) Id
//create a new SysId with "args..." specifying scope/membership
NewSysID(idx int, args ...int) Id
//return all ids and their ChanTypes from router's namespace which satisfy predicate
IdsForSend(predicate func(id Id) bool) map[interface{}]*ChanInfo
IdsForRecv(predicate func(id Id) bool) map[interface{}]*ChanInfo
}
Router is the main access point to functionality. Applications will create an instance of it thru router.New(...) and attach channels to it
func New(seedId Id, bufSize int, disp DispatchPolicy, args ...interface{}) Router
New is router constructor. It accepts the following arguments:
1. seedId: a dummy id to show what type of ids will be used. New ids will be type-checked against this.
2. bufSize: the buffer size used for router's internal channels.
if bufSize >= 0, its value will be used
if bufSize < 0, it means unlimited buffering, so router is async and sending on attached channels will never block
3. disp: dispatch policy for router. by default, it is BroadcastPolicy
4. optional arguments ...:
name: router's name, if name is defined, router internal logging will be turned on, ie LogRecord generated
LogScope: if this is set, a console log sink is installed to show router internal log
if logScope == ScopeLocal, only log msgs from local router will show up
if logScope == ScopeGlobal, all log msgs from connected routers will show up
type SendChan interface {
ChanState
Sender
}
SendChan: sending end of Channel (chan<-)
type Sender interface {
Send(reflect.Value)
TrySend(reflect.Value) bool
Close()
}
common interface for msg senders and senders are responsible for closing channels
type StrId struct {
Val string
ScopeVal int
MemberVal int
}
Use strings as ids in router
func (id StrId) Clone(args ...int) (nnid Id, err error)
func (id StrId) Key() interface{}
func (id1 StrId) Match(id2 Id) bool
func (id StrId) MatchType() MatchType
func (id StrId) Member() int
func (id StrId) Scope() int
func (id StrId) String() string
func (id StrId) SysID(indx int, args ...int) (ssid Id, err error)
func (id StrId) SysIdIndex() int
type WindowFlowControlPolicy byte
WindowFlowController: simple window flow control protocol for lossless transport the transport Channel between Sender, Recver should have capacity >= expected credit Figure 4.5 in Gerard's book
func (wfc WindowFlowControlPolicy) NewFlowRecver(ch Channel, ack func(int), args ...interface{}) (FlowRecver, error)
func (wfc WindowFlowControlPolicy) NewFlowSender(ch Channel, args ...interface{}) (FlowSender, error)
func (wfc WindowFlowControlPolicy) String() string
type XOnOffFlowControlPolicy struct {
// contains filtered or unexported fields
}
X-on/X-off protocol Figure 4.2 and Figure 4.3 in Gerard's book
func (fcp *XOnOffFlowControlPolicy) NewFlowRecver(ch Channel, ack func(int), args ...interface{}) (FlowRecver, error)
func (fcp *XOnOffFlowControlPolicy) NewFlowSender(ch Channel, args ...interface{}) (FlowSender, error)
func (fcp *XOnOffFlowControlPolicy) String() string