src.dualinventive.com/go/dinet/transport_lowlevel.go

176 lines
3.7 KiB
Go

package dinet
import (
"net"
"sync"
"time"
"github.com/sirupsen/logrus"
"gopkg.in/vmihailenco/msgpack.v4"
"src.dualinventive.com/go/dinet/ll"
"src.dualinventive.com/go/dinet/rpc"
)
// LowLevelTransport is an encoder to encode/decode rpc messages over a low level tcp socket
type LowLevelTransport struct {
mu sync.RWMutex
host string
con net.Conn
encoder *ll.Encoder
decoder *ll.Decoder
deviceUID string
}
// Connect connects to the specified host
func (lle *LowLevelTransport) Connect(host string) error {
var err error
if host != "" {
lle.host = host
}
lle.con, err = net.Dial("tcp", lle.host)
if err != nil {
return err
}
lle.encoder = ll.NewEncoder(lle.con)
lle.decoder = ll.NewDecoder(lle.con)
return nil
}
// Send encodes a rpc message and sends it over the tcp socket
func (lle *LowLevelTransport) Send(m *rpc.Msg) error {
lle.mu.RLock()
defer lle.mu.RUnlock()
if lle.encoder == nil {
return ErrClosed
}
data, err := msgpack.Marshal(m)
if err != nil {
return err
}
return lle.encoder.Encode(&ll.Msg{Type: ll.MsgTypePlain, Data: data})
}
func (lle *LowLevelTransport) recv() (*rpc.Msg, error) {
msg := &ll.Msg{}
lle.mu.RLock()
if lle.decoder == nil {
lle.mu.RUnlock()
return nil, ErrClosed
}
err := lle.decoder.Decode(msg)
lle.mu.RUnlock()
if err != nil {
return nil, err
}
if msg.Type == ll.MsgTypePlain {
rpcmsg := &rpc.Msg{}
if err := msgpack.Unmarshal(msg.Data, rpcmsg); err != nil {
logrus.Errorf("Error unmarshalling package: %v", err)
return nil, err
}
if err := rpcmsg.Valid(); err != nil {
return nil, err
}
return rpcmsg, nil
}
return nil, ErrNotSupported
}
// Recv waits until it gets a rpc message from the tcp socket and decodes the message
// currently it only supports ll.MsgTypePlain and other messages are silent dropped
func (lle *LowLevelTransport) Recv() (*rpc.Msg, error) {
for {
msg, err := lle.recv()
if err != nil {
if err == ErrNotSupported {
continue
}
return nil, err
}
return msg, nil
}
}
func (lle *LowLevelTransport) close() error {
lle.encoder = nil
lle.decoder = nil
return lle.con.Close()
}
// Close disconnects the socket
func (lle *LowLevelTransport) Close() error {
lle.mu.Lock()
defer lle.mu.Unlock()
return lle.close()
}
// Reconnect closes the socket and reconnects again
func (lle *LowLevelTransport) Reconnect() error {
lle.mu.Lock()
defer lle.mu.Unlock()
if err := lle.close(); err != nil {
return err
}
if err := lle.Connect(""); err != nil {
return err
}
return lle.encoder.Encode(&ll.Msg{Type: ll.MsgTypeHsRequest, Data: []byte(lle.deviceUID)})
}
// SetTimeout is not supported
func (lle *LowLevelTransport) SetTimeout(timeout time.Duration) error {
return ErrNotSupported
}
// DeviceHandshake sends a handshake for the given device
func (lle *LowLevelTransport) DeviceHandshake(deviceUID string) error {
lle.mu.RLock()
defer lle.mu.RUnlock()
if lle.encoder == nil {
return ErrClosed
}
err := lle.encoder.Encode(&ll.Msg{Type: ll.MsgTypeHsRequest, Data: []byte(deviceUID)})
if err != nil {
return err
}
msg := ll.Msg{}
if err := lle.decoder.Decode(&msg); err != nil {
return err
}
if msg.IsReplySuccess() {
lle.deviceUID = deviceUID
return nil
}
return ErrHandshakeDenied
}
// DeviceRegister registers the device
func (lle *LowLevelTransport) DeviceRegister(deviceUID string) error {
lle.mu.RLock()
err := lle.encoder.Encode(&ll.Msg{Type: ll.MsgTypeRegister, Data: []byte(deviceUID)})
lle.mu.RUnlock()
return err
}
// DeviceUnregister unregisters the device
func (lle *LowLevelTransport) DeviceUnregister(devUID string) error {
lle.mu.RLock()
defer lle.mu.RUnlock()
return lle.encoder.Encode(&ll.Msg{Type: ll.MsgTypeUnregister, Data: []byte(devUID)})
}