101 lines
1.9 KiB
Go
101 lines
1.9 KiB
Go
package transport
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
|
|
msgpack "gopkg.in/vmihailenco/msgpack.v2"
|
|
"src.dualinventive.com/go/dinet"
|
|
"src.dualinventive.com/go/dinet/rpc"
|
|
)
|
|
|
|
// TestTransport for testing purposes
|
|
type TestTransport struct {
|
|
closed chan bool
|
|
sQ chan *rpc.Msg
|
|
rQ chan *rpc.Msg
|
|
}
|
|
|
|
// NewTestTransport creates a buffered transport
|
|
func NewTestTransport(recvSize, sendSize int) *TestTransport {
|
|
tt := &TestTransport{}
|
|
tt.sQ = make(chan *rpc.Msg, sendSize)
|
|
tt.rQ = make(chan *rpc.Msg, recvSize)
|
|
tt.closed = make(chan bool)
|
|
return tt
|
|
}
|
|
|
|
// Queue a msg for a single dinet.Reader interface call
|
|
func (tt *TestTransport) Queue(msg *rpc.Msg) error {
|
|
_, err := msgpack.Marshal(msg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = msg.UnmarshalMsgpack(msg.Bytes())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
select {
|
|
case tt.sQ <- msg:
|
|
return nil
|
|
default:
|
|
return fmt.Errorf("send queue full")
|
|
}
|
|
}
|
|
|
|
// Dequeue a msg from the dinet.Writer interface, waits infinite or until context is done
|
|
func (tt *TestTransport) Dequeue(ctx context.Context) (*rpc.Msg, error) {
|
|
select {
|
|
case msg := <-tt.rQ:
|
|
return msg, nil
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
}
|
|
}
|
|
|
|
// Connect does nothing
|
|
func (tt *TestTransport) Connect() error {
|
|
return nil
|
|
}
|
|
|
|
// Disconnect does nothing
|
|
func (tt *TestTransport) Disconnect() error {
|
|
return nil
|
|
}
|
|
|
|
// Close close the shit
|
|
func (tt *TestTransport) Close() error {
|
|
close(tt.closed)
|
|
return nil
|
|
}
|
|
|
|
// Recv reads a message buffered with Queue
|
|
func (tt *TestTransport) Recv() (*rpc.Msg, error) {
|
|
select {
|
|
case <-tt.closed:
|
|
return nil, dinet.ErrClosed
|
|
case msg := <-tt.sQ:
|
|
err := msg.UnmarshalMsgpack(msg.Bytes())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return msg, nil
|
|
}
|
|
}
|
|
|
|
// Send buffers a message which can be retreived with Dequeue
|
|
func (tt *TestTransport) Send(msg *rpc.Msg) error {
|
|
_, err := msgpack.Marshal(msg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
select {
|
|
case tt.rQ <- msg:
|
|
return nil
|
|
default:
|
|
return fmt.Errorf("send queue full")
|
|
}
|
|
}
|