package cp3000 import ( "bufio" "errors" "io" "sync" "sync/atomic" "time" "src.dualinventive.com/go/lib/dilog" ) // ErrTimeout is raised when the IORouter Receive timeouts var ErrTimeout = errors.New("timeout") // IORouter is a CP3000 router which can route commands to callbacks type IORouter struct { client io.ReadWriteCloser callbacks map[Command]Callback recv chan *Msg closed *int32 replyWriteTimeout time.Duration logger dilog.Logger devicesLock sync.RWMutex devices map[string]io.Closer } // NewIORouter returns a new router instance func NewIORouter(client io.ReadWriteCloser) Router { return &IORouter{ client: client, callbacks: make(map[Command]Callback), recv: make(chan *Msg), closed: new(int32), replyWriteTimeout: time.Second * 2, logger: dilog.NewNilLogger(), devices: make(map[string]io.Closer), } } // SetLogger allows setting a custom logger func (r *IORouter) SetLogger(logger dilog.Logger) { r.logger = logger } // Close closes a CP3000 connection func (r *IORouter) Close() error { // Change the value to closed if the value was not closed. If there was no change return nil if !atomic.CompareAndSwapInt32(r.closed, 0, 1) { return nil } r.devicesLock.RLock() defer r.devicesLock.RUnlock() // Close all the devices that are registrated here for _, dev := range r.devices { if err := dev.Close(); err != nil { r.logger.WithError(err).Error("cp3000 router: cannot close device") } } return r.client.Close() } // AddDevice adds a device to the router. This device will be closed when the router is closed func (r *IORouter) AddDevice(uid string, d io.Closer) { r.devicesLock.Lock() r.devices[uid] = d r.devicesLock.Unlock() } // RemoveDevice removes the device from the router func (r *IORouter) RemoveDevice(uid string) { // If we are closed, we don't need to remove the device from the list if atomic.LoadInt32(r.closed) == 1 { return } r.devicesLock.Lock() delete(r.devices, uid) r.devicesLock.Unlock() } // Register registers a new callback func (r *IORouter) Register(command Command, cb Callback) { r.callbacks[command] = cb } // SetReplyWriteTimeout is the timeout the receive-loop waits for delivering replies // to the caller, when the timeout expires, the reply is dropped. Default timeout // is 2 seconds func (r *IORouter) SetReplyWriteTimeout(timeout time.Duration) { r.replyWriteTimeout = timeout } // ReceiveReply waits for a reply func (r *IORouter) ReceiveReply(timeout time.Duration) (*Msg, error) { select { case msg := <-r.recv: return msg, nil case <-time.After(timeout): return nil, ErrTimeout } } // Send will create a CP3000 command with the given parameters and writes to w. // W can be a TCP connection (for direct communication) or any other communication. func (r *IORouter) Send(command Command, params ...string) error { return Send(r.client, command, params...) } // SendReply sends a reply via CP3000 func (r *IORouter) SendReply(reply uint8) error { return SendReply(r.client, reply) } // Run handles incoming requests func (r *IORouter) Run() { // Make a buffer to hold incoming data. scanner := bufio.NewScanner(r.client) defer r.Close() runloop: for scanner.Scan() { msg, err := Decode(scanner.Text()) if err != nil { r.logger.WithError(err).Warning("decode of cp3000 message failed") continue } if msg.Type == TypeReply { select { case r.recv <- msg: continue case <-time.After(r.replyWriteTimeout): r.logger.Warning("reply received, timeout while delivering") } continue } if cb, ok := r.callbacks[msg.Command]; ok { if err := cb(r, msg.Params); err != nil { r.logger.WithError(err).Warning("callback error") break runloop } } else { r.logger.WithField("command", msg.Command).Warning("no callback") } } if err := scanner.Err(); err != nil { r.logger.WithError(err).Error("scanner error") } }