package transport import ( "context" "fmt" msgpack "gopkg.in/vmihailenco/msgpack.v2" "src.dualinventive.com/go/dinet" "src.dualinventive.com/go/dinet/rpc" ) // TestTransport for testing purposes type TestTransport struct { closed chan bool sQ chan *rpc.Msg rQ chan *rpc.Msg } // NewTestTransport creates a buffered transport func NewTestTransport(recvSize, sendSize int) *TestTransport { tt := &TestTransport{} tt.sQ = make(chan *rpc.Msg, sendSize) tt.rQ = make(chan *rpc.Msg, recvSize) tt.closed = make(chan bool) return tt } // Queue a msg for a single dinet.Reader interface call func (tt *TestTransport) Queue(msg *rpc.Msg) error { _, err := msgpack.Marshal(msg) if err != nil { return err } err = msg.UnmarshalMsgpack(msg.Bytes()) if err != nil { return err } select { case tt.sQ <- msg: return nil default: return fmt.Errorf("send queue full") } } // Dequeue a msg from the dinet.Writer interface, waits infinite or until context is done func (tt *TestTransport) Dequeue(ctx context.Context) (*rpc.Msg, error) { select { case msg := <-tt.rQ: return msg, nil case <-ctx.Done(): return nil, ctx.Err() } } // Connect does nothing func (tt *TestTransport) Connect() error { return nil } // Disconnect does nothing func (tt *TestTransport) Disconnect() error { return nil } // Close close the shit func (tt *TestTransport) Close() error { close(tt.closed) return nil } // Recv reads a message buffered with Queue func (tt *TestTransport) Recv() (*rpc.Msg, error) { select { case <-tt.closed: return nil, dinet.ErrClosed case msg := <-tt.sQ: err := msg.UnmarshalMsgpack(msg.Bytes()) if err != nil { return nil, err } return msg, nil } } // Send buffers a message which can be retreived with Dequeue func (tt *TestTransport) Send(msg *rpc.Msg) error { _, err := msgpack.Marshal(msg) if err != nil { return err } select { case tt.rQ <- msg: return nil default: return fmt.Errorf("send queue full") } }