package dinet import ( "encoding/json" "sync" "time" "src.dualinventive.com/go/dinet/rpc" ) // TestTransport is an transport which routes all messages from and to channels type TestTransport struct { closeLock sync.Mutex mu sync.RWMutex r chan *rpc.Msg w chan *rpc.Msg Read <-chan *rpc.Msg Write chan<- *rpc.Msg } // Connect connects to the specified host func (tt *TestTransport) Connect(host string) error { tt.mu.Lock() tt.r = make(chan *rpc.Msg) tt.Read = tt.r tt.w = make(chan *rpc.Msg) tt.Write = tt.w tt.mu.Unlock() return nil } // Send sends a rpc message and sends it over the read channel func (tt *TestTransport) Send(m *rpc.Msg) error { tt.mu.RLock() defer tt.mu.RUnlock() if tt.r != nil { b, err := json.Marshal(m) if err != nil { return err } msg := &rpc.Msg{} if err = json.Unmarshal(b, msg); err != nil { return err } tt.r <- msg return nil } return ErrDisconnected } // Recv waits until it gets a rpc message via the Write channel func (tt *TestTransport) Recv() (*rpc.Msg, error) { tt.mu.RLock() defer tt.mu.RUnlock() if tt.w == nil { return nil, ErrDisconnected } msg := <-tt.w if msg == nil { return nil, ErrDisconnected } b, err := json.Marshal(msg) if err != nil { return nil, err } msg = &rpc.Msg{} if err = json.Unmarshal(b, msg); err != nil { return nil, err } if err = msg.Valid(); err != nil { return nil, err } return msg, nil } // Close closes the connection func (tt *TestTransport) Close() error { // the mutex handling in this function is a bit odd, but it prevents data-races // we must check if channels are actually there (i.e. we are connected) aprior to // closing everything tt.closeLock.Lock() defer tt.closeLock.Unlock() tt.mu.RLock() hasWriter := tt.w != nil hasReader := tt.r != nil tt.mu.RUnlock() if hasWriter { close(tt.w) tt.mu.Lock() tt.w = nil tt.Write = nil tt.mu.Unlock() } if hasReader { close(tt.r) tt.mu.Lock() tt.r = nil tt.Read = nil tt.mu.Unlock() } return nil } // Reconnect closes the socket and reconnects again func (tt *TestTransport) Reconnect() error { return nil } // SetTimeout is not supported func (tt *TestTransport) SetTimeout(timeout time.Duration) error { return ErrNotSupported } // DeviceHandshake sends a handschake for the given device func (tt *TestTransport) DeviceHandshake(deviceUID string) error { return nil } // DeviceRegister registers the device func (tt *TestTransport) DeviceRegister(deviceUID string) error { return nil } // DeviceUnregister registers the device func (tt *TestTransport) DeviceUnregister(deviceUID string) error { return nil }