import "router"
Overview
Index

Overview

"router" is a Go package for peer-peer pub/sub message passing. The basic usage is to attach a send channel to an id in router to send messages, and attach a recv channel to an id to receive messages. If these 2 ids match, the messages from send channel will be "routed" to recv channel, e.g.

rot := router.New(...)
chan1 := make(chan string)
chan2 := make(chan string)
chan3 := make(chan string)
rot.AttachSendChan(PathID("/sports/basketball"), chan1)
rot.AttachRecvChan(PathID("/sports/basketball"), chan2)
rot.AttachRecvChan(PathID("/sports/*"), chan3)

We can use integers, strings, pathnames, or structs as Ids in router (maybe regex ids and tuple id in future).

we can connect two routers so that channels attached to router1 can communicate with channels attached to router2 transparently.

Index

Constants
Variables
func Broadcast(v reflect.Value, recvers []*RoutedChan)
func ExportedId(id Id) bool
func KeepLatestBroadcast(v reflect.Value, recvers []*RoutedChan)
func MemberString(m int) string
func ScopeString(s int) string
type BindEvent
type BindEventType
type ChanDirection
type ChanInfo
    func (ici ChanInfo) String() string
type ChanInfoMsg
type ChanReadyInfo
    func (cri ChanReadyInfo) String() string
type ChanState
type Channel
type ConnInfoMsg
type ConnReadyMsg
type Demarshaler
type DispatchFunc
    func (f DispatchFunc) Dispatch(v reflect.Value, recvers []*RoutedChan)
type DispatchPolicy
type Dispatcher
type FaultRaiser
    func NewFaultRaiser(id Id, r Router, src string) *FaultRaiser
    func (l *FaultRaiser) Close()
    func (l *FaultRaiser) Init(id Id, r Router, src string) *FaultRaiser
    func (r *FaultRaiser) Raise(msg error)
type FaultRecord
type FlowControlPolicy
type FlowRecver
type FlowSender
type Id
    func IntID(args ...interface{}) Id
    func MsgID(args ...interface{}) Id
    func PathID(args ...interface{}) Id
    func StrID(args ...interface{}) Id
type IdFilter
type IdTranslator
type IntId
    func (id IntId) Clone(args ...int) (nnid Id, err error)
    func (id IntId) Key() interface{}
    func (id1 IntId) Match(id2 Id) bool
    func (id IntId) MatchType() MatchType
    func (id IntId) Member() int
    func (id IntId) Scope() int
    func (id IntId) String() string
    func (id IntId) SysID(indx int, args ...int) (ssid Id, err error)
    func (id IntId) SysIdIndex() int
type LogPriority
    func (lp LogPriority) String() string
type LogRecord
type LogSink
    func NewLogSink(id Id, r Router) *LogSink
    func (l *LogSink) Close()
    func (l *LogSink) Init(id Id, r Router) *LogSink
type Logger
    func NewLogger(id Id, r Router, src string) *Logger
    func (l *Logger) Close()
    func (l *Logger) Init(id Id, r Router, src string) *Logger
    func (l *Logger) Log(p LogPriority, msg interface{})
    func (l *Logger) LogError(err error)
type Marshaler
type MarshalingPolicy
type MatchType
type MsgId
    func (id MsgId) Clone(args ...int) (nnid Id, err error)
    func (id MsgId) Key() interface{}
    func (id1 MsgId) Match(id2 Id) bool
    func (id MsgId) MatchType() MatchType
    func (id MsgId) Member() int
    func (id MsgId) Scope() int
    func (id MsgId) String() string
    func (id MsgId) SysID(indx int, args ...int) (ssid Id, err error)
    func (id MsgId) SysIdIndex() int
type MsgTag
    func (t MsgTag) String() string
type PathId
    func (id PathId) Clone(args ...int) (nnid Id, err error)
    func (id PathId) Key() interface{}
    func (id1 PathId) Match(id2 Id) bool
    func (id PathId) MatchType() MatchType
    func (id PathId) Member() int
    func (id PathId) Scope() int
    func (id PathId) String() string
    func (id PathId) SysID(indx int, args ...int) (ssid Id, err error)
    func (id PathId) SysIdIndex() int
type PolicyFunc
    func (f PolicyFunc) NewDispatcher() Dispatcher
type Proxy
    func NewProxy(r Router, name string, f IdFilter, t IdTranslator) Proxy
type RandomDispatcher
    func NewRandomDispatcher() *RandomDispatcher
    func (rd *RandomDispatcher) Dispatch(v reflect.Value, recvers []*RoutedChan)
type RecvChan
type Recver
type Roundrobin
    func NewRoundrobin() *Roundrobin
    func (r *Roundrobin) Dispatch(v reflect.Value, recvers []*RoutedChan)
type RoutedChan
    func (e *RoutedChan) Close()
    func (e *RoutedChan) Detach()
    func (e *RoutedChan) Interface() interface{}
    func (e *RoutedChan) NumPeers() int
    func (e *RoutedChan) Peers() (copySet []*RoutedChan)
type Router
    func New(seedId Id, bufSize int, disp DispatchPolicy, args ...interface{}) Router
type SendChan
type Sender
type StrId
    func (id StrId) Clone(args ...int) (nnid Id, err error)
    func (id StrId) Key() interface{}
    func (id1 StrId) Match(id2 Id) bool
    func (id StrId) MatchType() MatchType
    func (id StrId) Member() int
    func (id StrId) Scope() int
    func (id StrId) String() string
    func (id StrId) SysID(indx int, args ...int) (ssid Id, err error)
    func (id StrId) SysIdIndex() int
type WindowFlowControlPolicy
    func (wfc WindowFlowControlPolicy) NewFlowRecver(ch Channel, ack func(int), args ...interface{}) (FlowRecver, error)
    func (wfc WindowFlowControlPolicy) NewFlowSender(ch Channel, args ...interface{}) (FlowSender, error)
    func (wfc WindowFlowControlPolicy) String() string
type XOnOffFlowControlPolicy
    func (fcp *XOnOffFlowControlPolicy) NewFlowRecver(ch Channel, ack func(int), args ...interface{}) (FlowRecver, error)
    func (fcp *XOnOffFlowControlPolicy) NewFlowSender(ch Channel, args ...interface{}) (FlowSender, error)
    func (fcp *XOnOffFlowControlPolicy) String() string

Package files

chans.go chanset.go dispatcher.go filtrans.go flowctrl.go id.go logfault.go marshaler.go msg.go notifier.go proxy.go routedchan.go router.go stream.go

Constants

const (
    MemberLocal  = iota //peers (send chans and recv chans) are from the same router
    MemberRemote        //peers (send chans and recv chans) are from diff routers
    NumMembership
)

Membership identifies whether communicating peers (send chans and recv chans) are from the same router or diff routers

const (
    ScopeGlobal = iota // send to or recv from both local and remote peers
    ScopeRemote        // send to or recv from remote peers
    ScopeLocal         // send to or recv from local peers
    NumScope
)

Scope is the scope to publish/subscribe (or send/recv) msgs

const (
    ConnId    = iota //msgs for router connection
    DisconnId        //msgs for router disconnection
    ErrorId          //msgs sent when one side detect errors
    ReadyId          //msgs sent when router's chans ready to recv more msgs
    PubId            //send new publications (set<id, chan type info>)
    UnPubId          //remove publications from connected routers
    SubId            //send new subscriptions (set<id, chan type info>)
    UnSubId          //remove subscriptions from connected routers
    NumSysIds
)

Indices for sys msgs, used for creating SysIds

const (
    RouterLogId = NumSysIds + iota
    RouterFaultId
    NumSysInternalIds
)

Some system level internal ids (for router internal logging and fault reporting)

const (
    DefLogBufSize      = 256
    DefDataChanBufSize = 32
    DefCmdChanBufSize  = 64
    UnlimitedBuffer    = -1
)

Default size settings in router

Variables

var IntSysIdBase int = -10101 //Base value for SysIds of IntId

define 8 system msg ids

var PathSysIdBase string = "/10101" //Base value for SysIds of PathId

define 8 system msg ids

var StrSysIdBase string = "-10101" //Base value for SysIds of StrId

define 8 system msg ids

func Broadcast

func Broadcast(v reflect.Value, recvers []*RoutedChan)

Simple broadcast is a plain function

func ExportedId

func ExportedId(id Id) bool

A function used as predicate in router.idsForSend()/idsForRecv() to find all ids in a router's namespace which are exported to outside

func KeepLatestBroadcast

func KeepLatestBroadcast(v reflect.Value, recvers []*RoutedChan)

KeepLastBroadcast never block. if running out of Chan buffer, drop old items and keep the latest items

func MemberString

func MemberString(m int) string

return the string values of membership

func ScopeString

func ScopeString(s int) string

return string values of Scope

type BindEvent

type BindEvent struct {
    Type  BindEventType
    Count int //total attached
}

a message struct containing information for peer (sender/recver) binding/connection. sent by router whenever peer attached or detached.

Type:  the type of event just happened: PeerAttach/PeerDetach/EndOfData
Count: how many peers are still bound now

type BindEventType

type BindEventType int8
const (
    PeerAttach BindEventType = iota
    PeerDetach
    EndOfData
)

type ChanDirection

type ChanDirection int
const (
    Send ChanDirection = iota
    Recv
)

type ChanInfo

type ChanInfo struct {
    Id       Id
    ChanType reflect.Type
    ElemType *chanElemTypeData
}

a message struct holding information about id and its associated ChanType

func (ChanInfo) String

func (ici ChanInfo) String() string

type ChanInfoMsg

type ChanInfoMsg struct {
    Info []*ChanInfo
}

a message struct for propagating router's namespace changes (chan attachments or detachments)

type ChanReadyInfo

type ChanReadyInfo struct {
    Id     Id
    Credit int
}

recver-router notify sender-router which channel are ready to recv how many msgs

func (ChanReadyInfo) String

func (cri ChanReadyInfo) String() string

type ChanState

type ChanState interface {
    Type() reflect.Type
    Interface() interface{}
    IsNil() bool
    Cap() int
    Len() int
}

basic chan state

type Channel

type Channel interface {
    ChanState
    Sender
    Recver
}

Channel interface defines functional api of Go's channel: based on reflect.Value's channel related method set allow programming "generic" channels with reflect.Value as msgs add some utility Channel types

type ConnInfoMsg

type ConnInfoMsg struct {
    ConnInfo string
    Error    string
    Id       Id
    Type     string //async/flowControlled/raw
}

a message struct containing information about remote router connection

type ConnReadyMsg

type ConnReadyMsg struct {
    Info []*ChanReadyInfo
}

type Demarshaler

type Demarshaler interface {
    Demarshal(interface{}) error
}

the common interface of all demarshaler such as GobDemarshaler and JsonDemarshaler

type DispatchFunc

type DispatchFunc func(v reflect.Value, recvers []*RoutedChan)

DispatchFunc is a wrapper to convert a plain function into a dispatcher

func (DispatchFunc) Dispatch

func (f DispatchFunc) Dispatch(v reflect.Value, recvers []*RoutedChan)

type DispatchPolicy

type DispatchPolicy interface {
    NewDispatcher() Dispatcher
}

The programming of Dispatchers: 1. do not depend on specific chan types 2. messages sent are represented as reflect.Value 3. receivers are array of RoutedChans with Channel interface of Send()/Recv()/...

DispatchPolicy is used to generate concrete dispatcher instances. For the kind of dispatcher which has no internal state, the same instance can be returned.

var BroadcastPolicy DispatchPolicy = PolicyFunc(func() Dispatcher { return DispatchFunc(Broadcast) })

BroadcastPolicy is used to generate broadcast dispatcher instances

var KeepLatestBroadcastPolicy DispatchPolicy = PolicyFunc(func() Dispatcher { return DispatchFunc(KeepLatestBroadcast) })

KeepLatestBroadcastPolicy is used to generate KeepLatest broadcast dispatcher instances

var RandomPolicy DispatchPolicy = PolicyFunc(func() Dispatcher { return NewRandomDispatcher() })

RandomPolicy is used to generate random dispatchers

var RoundRobinPolicy DispatchPolicy = PolicyFunc(func() Dispatcher { return NewRoundrobin() })

RoundRobinPolicy is ued to generate roundrobin dispatchers

type Dispatcher

type Dispatcher interface {
    Dispatch(v reflect.Value, recvers []*RoutedChan)
}

Dispatcher is the common interface of all dispatchers

type FaultRaiser

type FaultRaiser struct {
    sync.Mutex
    // contains filtered or unexported fields
}

FaultRaiser can be embedded into user structs/ types, which then can call Raise() directly

func NewFaultRaiser

func NewFaultRaiser(id Id, r Router, src string) *FaultRaiser

create a new FaultRaiser to send FaultRecords to id in router "r"

func (*FaultRaiser) Close

func (l *FaultRaiser) Close()

func (*FaultRaiser) Init

func (l *FaultRaiser) Init(id Id, r Router, src string) *FaultRaiser

func (*FaultRaiser) Raise

func (r *FaultRaiser) Raise(msg error)

raise a fault - send a FaultRecord to faultId in router

type FaultRecord

type FaultRecord struct {
    Source    string
    Info      error
    Timestamp int64
}

FaultRecord records some details about fault

type FlowControlPolicy

type FlowControlPolicy interface {
    NewFlowSender(ch Channel, args ...interface{}) (FlowSender, error)
    NewFlowRecver(ch Channel, ack func(int), args ...interface{}) (FlowRecver, error)
    //Stringer interface, return name of FlowControlPolicy
    String() string
}

FlowControlPolicy: implement diff flow control protocols. a flow control protocol has two parts:

sender: which send msgs and recv acks from recver.
recver: which recv msgs and send acks to sender.

So besides wrapping a transport Channel (for send/recv msgs)

FlowSender expose Ack(int) method to recv acks
FlowRecver constructor will take as argument a ack(int) callback for sending acks.

The following Windowing and XOnOff protocols are copied from Chapter 4 of "Design And Validation Of Computer Protocols" by Gerard J. Holzmann.

var WindowFlowController FlowControlPolicy = WindowFlowControlPolicy(0)
var XOnOffFlowController FlowControlPolicy = &XOnOffFlowControlPolicy{0.75, 0.25}

type FlowRecver

type FlowRecver interface {
    Channel
}

type FlowSender

type FlowSender interface {
    Channel
    Ack(int)
}

type Id

type Id interface {
    //methods to query Id content
    Scope() int
    Member() int

    //key value for storing Id in map
    Key() interface{}

    //for id matching
    Match(Id) bool
    MatchType() MatchType

    //Generators for creating other ids of same type. Since often we don't
    //know the exact types of Id.Val, so we have to create new ones from an existing id
    SysID(int, ...int) (Id, error) //generate sys ids, also called as method of Router
    SysIdIndex() int               //return (0 - NumSysInternalIds) for SysIds, return -1 for others
    Clone(...int) (Id, error)

    //Stringer interface
    String() string
}

Id defines the common interface shared by all kinds of ids: integers/strings/pathnames...

var (
    DummyIntId  Id = &IntId{Val: -10201}
    DummyStrId  Id = &StrId{Val: "-10201"}
    DummyPathId Id = &PathId{Val: "-10201"}
    DummyMsgId  Id = &MsgId{Val: MsgTag{-10201, -10201}}
)

Some dummy ids, often used as seedId when creating router

func IntID

func IntID(args ...interface{}) Id

IntId constructor, accepting the following arguments: Val int ScopeVal int MemberVal int

func MsgID

func MsgID(args ...interface{}) Id

MsgId constructor, accepting the following arguments: Family int Tag int ScopeVal int MemberVal int

func PathID

func PathID(args ...interface{}) Id

PathId constructor, accepting the following arguments: Val string (path names, such as /sport/basketball/news/...) ScopeVal int MemberVal int

func StrID

func StrID(args ...interface{}) Id

StrId constructor, accepting the following arguments: Val string ScopeVal int MemberVal int

type IdFilter

type IdFilter interface {
    BlockInward(Id) bool
    BlockOutward(Id) bool
}

IdFilter: the common interface of filters. concrete filters should be defined by apps with app-specific rules. if no filter defined, there is no id filtering. 1. bound with specific proxy 2. defines which ids can pass in / out to router thru this proxy 3. only filter the ids of application msgs (NOT system msgs), only involved in processing namespace change msgs: PubId/SubId 4. by default, if no filter is defined, everything is allowed 5. filters are used against ids in local namespace, not translated ones

type IdTranslator

type IdTranslator interface {
    TranslateInward(Id) Id
    TranslateOutward(Id) Id
}

IdTransltor: the common interface of translators. concrete transltors should be defined by apps with app-specific rules. if no translator defined, there is no id transltions. 1. bound with specific proxy 2. translate ids of in / out msgs thru this proxy, effectively "mount" the msgs thru this proxy / conn to a subrange of router's id space 3. only translate the ids of application msgs (NOT system msgs), and it will affect the ids of every app msgs passed thru this proxy - must be highly efficient 4. by default, if no translator is defined, no translation

type IntId

type IntId struct {
    Val       int
    ScopeVal  int
    MemberVal int
}

Use integer as ids in router

func (IntId) Clone

func (id IntId) Clone(args ...int) (nnid Id, err error)

func (IntId) Key

func (id IntId) Key() interface{}

func (IntId) Match

func (id1 IntId) Match(id2 Id) bool

func (IntId) MatchType

func (id IntId) MatchType() MatchType

func (IntId) Member

func (id IntId) Member() int

func (IntId) Scope

func (id IntId) Scope() int

func (IntId) String

func (id IntId) String() string

func (IntId) SysID

func (id IntId) SysID(indx int, args ...int) (ssid Id, err error)

func (IntId) SysIdIndex

func (id IntId) SysIdIndex() int

type LogPriority

type LogPriority int
const (
    LOG_INFO LogPriority = iota
    LOG_DEBUG
    LOG_WARN
    LOG_ERROR
)

func (LogPriority) String

func (lp LogPriority) String() string

type LogRecord

type LogRecord struct {
    Pri       LogPriority
    Source    string
    Info      interface{}
    Timestamp int64
}

LogRecord stores the log information

type LogSink

type LogSink struct {
    // contains filtered or unexported fields
}

A simple log sink, showing log messages in console.

func NewLogSink

func NewLogSink(id Id, r Router) *LogSink

create a new log sink, which receives log messages from id in router "r"

func (*LogSink) Close

func (l *LogSink) Close()

func (*LogSink) Init

func (l *LogSink) Init(id Id, r Router) *LogSink

type Logger

type Logger struct {
    sync.Mutex
    // contains filtered or unexported fields
}

Logger can be embedded into user structs / types, which then can use Log() / LogError() directly

func NewLogger

func NewLogger(id Id, r Router, src string) *Logger

NewLogger will create a Logger object which sends log messages thru id in router "r"

func (*Logger) Close

func (l *Logger) Close()

func (*Logger) Init

func (l *Logger) Init(id Id, r Router, src string) *Logger

func (*Logger) Log

func (l *Logger) Log(p LogPriority, msg interface{})

send a log record to log id in router

func (*Logger) LogError

func (l *Logger) LogError(err error)

send a log record and store error info in it

type Marshaler

type Marshaler interface {
    Marshal(interface{}) error
}

the common interface of all marshaler such as GobMarshaler and JsonMarshaler

type MarshalingPolicy

type MarshalingPolicy interface {
    NewMarshaler(io.Writer) Marshaler
    NewDemarshaler(io.Reader) Demarshaler
    Register(interface{})
}

the common interface of all Marshaling policy such as GobMarshaling and JsonMarshaling

var GobMarshaling MarshalingPolicy = &gobMarshalingPolicy{registry: make(map[interface{}]bool)}

use package "gob" for marshaling

var JsonMarshaling MarshalingPolicy = jsonMarshalingPolicy(1)

use package "json" for marshaling

type MatchType

type MatchType int

MatchType describes the types of namespaces and match algorithms used for id-matching

const (
    ExactMatch  MatchType = iota
    PrefixMatch           // for PathId
    AssocMatch            // for RegexId, TupleId
)

type MsgId

type MsgId struct {
    Val       MsgTag
    ScopeVal  int
    MemberVal int
}

func (MsgId) Clone

func (id MsgId) Clone(args ...int) (nnid Id, err error)

func (MsgId) Key

func (id MsgId) Key() interface{}

func (MsgId) Match

func (id1 MsgId) Match(id2 Id) bool

func (MsgId) MatchType

func (id MsgId) MatchType() MatchType

func (MsgId) Member

func (id MsgId) Member() int

func (MsgId) Scope

func (id MsgId) Scope() int

func (MsgId) String

func (id MsgId) String() string

func (MsgId) SysID

func (id MsgId) SysID(indx int, args ...int) (ssid Id, err error)

func (MsgId) SysIdIndex

func (id MsgId) SysIdIndex() int

type MsgTag

type MsgTag struct {
    Family int //divide all msgs into families: system, fault, provision,...
    Tag    int //further division inside a family
}

Use a common msgTag as Id

var MsgSysIdBase MsgTag = MsgTag{-10101, -10101} //Base value for SysIds of MsgId

define 8 system msg ids

func (MsgTag) String

func (t MsgTag) String() string

type PathId

type PathId struct {
    Val       string
    ScopeVal  int
    MemberVal int
}

Use file-system like pathname as ids PathId has diff Match() algo from StrId

func (PathId) Clone

func (id PathId) Clone(args ...int) (nnid Id, err error)

func (PathId) Key

func (id PathId) Key() interface{}

func (PathId) Match

func (id1 PathId) Match(id2 Id) bool

func (PathId) MatchType

func (id PathId) MatchType() MatchType

func (PathId) Member

func (id PathId) Member() int

func (PathId) Scope

func (id PathId) Scope() int

func (PathId) String

func (id PathId) String() string

func (PathId) SysID

func (id PathId) SysID(indx int, args ...int) (ssid Id, err error)

func (PathId) SysIdIndex

func (id PathId) SysIdIndex() int

type PolicyFunc

type PolicyFunc func() Dispatcher

func (PolicyFunc) NewDispatcher

func (f PolicyFunc) NewDispatcher() Dispatcher

type Proxy

type Proxy interface {
    //Connect to a local router
    Connect(Proxy) error
    //Connect to a remote router thru io conn
    //1. io.ReadWriteCloser: transport connection
    //2. MarshalingPolicy: gob or json marshaling
    //3. remaining args can be a FlowControlPolicy (e.g. window based or XOnOff)
    ConnectRemote(io.ReadWriteCloser, MarshalingPolicy, ...interface{}) error
    //close proxy and disconnect from peer
    Close()
    //query messaging interface with peer
    LocalPubInfo() []*ChanInfo
    LocalSubInfo() []*ChanInfo
    PeerPubInfo() []*ChanInfo
    PeerSubInfo() []*ChanInfo
}

Proxy is the primary interface to connect router to its peer router. At both ends of a connection, there is a proxy object for its router. Simple router connection can be set up thru calling Router.Connect(). Proxy.Connect() can be used to set up more complicated connections, such as setting IdFilter to allow only a subset of messages pass thru the connection, or setting IdTranslator which can "relocate" remote message ids into a subspace in local router's namespace, or setting a flow control policy. Proxy.Close() is called to disconnect router from its peer.

func NewProxy

func NewProxy(r Router, name string, f IdFilter, t IdTranslator) Proxy

Proxy constructor. It accepts the following arguments:

1. r:    the router which will be bound with this proxy and be owner of this proxy
2. name: proxy's name, used in log messages if owner router's log is turned on
3. f:    IdFilter to be installed at this proxy
4. t:    IdTranslator to be installed at this proxy

type RandomDispatcher

type RandomDispatcher rand.Rand

Random dispatcher

func NewRandomDispatcher

func NewRandomDispatcher() *RandomDispatcher

func (*RandomDispatcher) Dispatch

func (rd *RandomDispatcher) Dispatch(v reflect.Value, recvers []*RoutedChan)

type RecvChan

type RecvChan interface {
    ChanState
    Recver
}

RecvChan: recving end of Channel (<-chan)

type Recver

type Recver interface {
    Recv() (reflect.Value, bool)
    TryRecv() (reflect.Value, bool)
}

common interface for msg recvers recvers will check if channels are closed or not

type Roundrobin

type Roundrobin int

Roundrobin dispatcher will keep the "next" index as its state

func NewRoundrobin

func NewRoundrobin() *Roundrobin

func (*Roundrobin) Dispatch

func (r *Roundrobin) Dispatch(v reflect.Value, recvers []*RoutedChan)

type RoutedChan

type RoutedChan struct {
    Kind    ChanDirection
    Id      Id
    Channel //external SendChan/RecvChan, attached by clients
    // contains filtered or unexported fields
}

RoutedChan represents channels which are attached to router. They expose Channel's interface: Send()/TrySend()/Recv()/TryRecv()/... and additional info:

1. Id - the id which channel is attached to
2. NumPeers() - return the number of bound peers
3. Peers() - array of bound peers RoutedChan
4. Detach() - detach the channel from router

func (*RoutedChan) Close

func (e *RoutedChan) Close()

override Channel.Close() method

func (*RoutedChan) Detach

func (e *RoutedChan) Detach()

func (*RoutedChan) Interface

func (e *RoutedChan) Interface() interface{}

func (*RoutedChan) NumPeers

func (e *RoutedChan) NumPeers() int

func (*RoutedChan) Peers

func (e *RoutedChan) Peers() (copySet []*RoutedChan)

type Router

type Router interface {
    //---- core api ----
    //Attach chans to id in router, with an optional argument (chan *BindEvent)
    //When specified, the optional argument will serve two purposes:
    //1. used to tell when the remote peers connecting/disconn
    //2. in AttachRecvChan, used as a flag to ask router to keep recv chan open when all senders close
    //the returned RoutedChan object can be used to find the number of bound peers: routCh.NumPeers()
    AttachSendChan(Id, interface{}, ...interface{}) (*RoutedChan, error)
    //3. When attaching recv chans, an optional integer can specify the internal buffering size
    AttachRecvChan(Id, interface{}, ...interface{}) (*RoutedChan, error)

    //Detach sendChan/recvChan from router
    DetachChan(Id, interface{}) error

    //Shutdown router, and close attached proxies and chans
    Close()

    //Connect to a local router
    Connect(Router) (Proxy, Proxy, error)

    //Connect to a remote router thru io conn
    //1. io.ReadWriteCloser: transport connection
    //2. MarshalingPolicy: gob or json marshaling
    //3. remaining args can be a FlowControlPolicy (e.g. window based or XOnOff)
    ConnectRemote(io.ReadWriteCloser, MarshalingPolicy, ...interface{}) (Proxy, error)

    //--- other utils ---
    //return pre-created SysIds according to the router's id-type, with ScopeGlobal / MemberLocal
    SysID(idx int) Id

    //create a new SysId with "args..." specifying scope/membership
    NewSysID(idx int, args ...int) Id

    //return all ids and their ChanTypes from router's namespace which satisfy predicate
    IdsForSend(predicate func(id Id) bool) map[interface{}]*ChanInfo
    IdsForRecv(predicate func(id Id) bool) map[interface{}]*ChanInfo
}

Router is the main access point to functionality. Applications will create an instance of it thru router.New(...) and attach channels to it

func New

func New(seedId Id, bufSize int, disp DispatchPolicy, args ...interface{}) Router

New is router constructor. It accepts the following arguments:

1. seedId: a dummy id to show what type of ids will be used. New ids will be type-checked against this.
2. bufSize: the buffer size used for router's internal channels.
   if bufSize >= 0, its value will be used
   if bufSize < 0, it means unlimited buffering, so router is async and sending on attached channels will never block
3. disp: dispatch policy for router. by default, it is BroadcastPolicy
4. optional arguments ...:
   name:     router's name, if name is defined, router internal logging will be turned on, ie LogRecord generated
   LogScope: if this is set, a console log sink is installed to show router internal log
      if logScope == ScopeLocal, only log msgs from local router will show up
      if logScope == ScopeGlobal, all log msgs from connected routers will show up

type SendChan

type SendChan interface {
    ChanState
    Sender
}

SendChan: sending end of Channel (chan<-)

type Sender

type Sender interface {
    Send(reflect.Value)
    TrySend(reflect.Value) bool
    Close()
}

common interface for msg senders and senders are responsible for closing channels

type StrId

type StrId struct {
    Val       string
    ScopeVal  int
    MemberVal int
}

Use strings as ids in router

func (StrId) Clone

func (id StrId) Clone(args ...int) (nnid Id, err error)

func (StrId) Key

func (id StrId) Key() interface{}

func (StrId) Match

func (id1 StrId) Match(id2 Id) bool

func (StrId) MatchType

func (id StrId) MatchType() MatchType

func (StrId) Member

func (id StrId) Member() int

func (StrId) Scope

func (id StrId) Scope() int

func (StrId) String

func (id StrId) String() string

func (StrId) SysID

func (id StrId) SysID(indx int, args ...int) (ssid Id, err error)

func (StrId) SysIdIndex

func (id StrId) SysIdIndex() int

type WindowFlowControlPolicy

type WindowFlowControlPolicy byte

WindowFlowController: simple window flow control protocol for lossless transport the transport Channel between Sender, Recver should have capacity >= expected credit Figure 4.5 in Gerard's book

func (WindowFlowControlPolicy) NewFlowRecver

func (wfc WindowFlowControlPolicy) NewFlowRecver(ch Channel, ack func(int), args ...interface{}) (FlowRecver, error)

func (WindowFlowControlPolicy) NewFlowSender

func (wfc WindowFlowControlPolicy) NewFlowSender(ch Channel, args ...interface{}) (FlowSender, error)

func (WindowFlowControlPolicy) String

func (wfc WindowFlowControlPolicy) String() string

type XOnOffFlowControlPolicy

type XOnOffFlowControlPolicy struct {
    // contains filtered or unexported fields
}

X-on/X-off protocol Figure 4.2 and Figure 4.3 in Gerard's book

func (*XOnOffFlowControlPolicy) NewFlowRecver

func (fcp *XOnOffFlowControlPolicy) NewFlowRecver(ch Channel, ack func(int), args ...interface{}) (FlowRecver, error)

func (*XOnOffFlowControlPolicy) NewFlowSender

func (fcp *XOnOffFlowControlPolicy) NewFlowSender(ch Channel, args ...interface{}) (FlowSender, error)

func (*XOnOffFlowControlPolicy) String

func (fcp *XOnOffFlowControlPolicy) String() string