src.dualinventive.com/go/devsim/transport/test_transport.go

101 lines
1.9 KiB
Go

package transport
import (
"context"
"fmt"
msgpack "gopkg.in/vmihailenco/msgpack.v2"
"src.dualinventive.com/go/dinet"
"src.dualinventive.com/go/dinet/rpc"
)
// TestTransport for testing purposes
type TestTransport struct {
closed chan bool
sQ chan *rpc.Msg
rQ chan *rpc.Msg
}
// NewTestTransport creates a buffered transport
func NewTestTransport(recvSize, sendSize int) *TestTransport {
tt := &TestTransport{}
tt.sQ = make(chan *rpc.Msg, sendSize)
tt.rQ = make(chan *rpc.Msg, recvSize)
tt.closed = make(chan bool)
return tt
}
// Queue a msg for a single dinet.Reader interface call
func (tt *TestTransport) Queue(msg *rpc.Msg) error {
_, err := msgpack.Marshal(msg)
if err != nil {
return err
}
err = msg.UnmarshalMsgpack(msg.Bytes())
if err != nil {
return err
}
select {
case tt.sQ <- msg:
return nil
default:
return fmt.Errorf("send queue full")
}
}
// Dequeue a msg from the dinet.Writer interface, waits infinite or until context is done
func (tt *TestTransport) Dequeue(ctx context.Context) (*rpc.Msg, error) {
select {
case msg := <-tt.rQ:
return msg, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
// Connect does nothing
func (tt *TestTransport) Connect() error {
return nil
}
// Disconnect does nothing
func (tt *TestTransport) Disconnect() error {
return nil
}
// Close close the shit
func (tt *TestTransport) Close() error {
close(tt.closed)
return nil
}
// Recv reads a message buffered with Queue
func (tt *TestTransport) Recv() (*rpc.Msg, error) {
select {
case <-tt.closed:
return nil, dinet.ErrClosed
case msg := <-tt.sQ:
err := msg.UnmarshalMsgpack(msg.Bytes())
if err != nil {
return nil, err
}
return msg, nil
}
}
// Send buffers a message which can be retreived with Dequeue
func (tt *TestTransport) Send(msg *rpc.Msg) error {
_, err := msgpack.Marshal(msg)
if err != nil {
return err
}
select {
case tt.rQ <- msg:
return nil
default:
return fmt.Errorf("send queue full")
}
}