package dinet import ( "errors" "sync" "sync/atomic" "src.dualinventive.com/go/dinet/ditime" "src.dualinventive.com/go/dinet/rpc" "src.dualinventive.com/go/lib/dilog" ) // Compile-time interface validation var _ Device = &ChildDevice{} var _ ParentDevice = &ChildDevice{} // ChildDevice is a device that needs a parent to operate, because the parent device manages the connection. // ChildDevice has some helper functions to make it easier to implement a functional device. // ChildDevice requires deviceInfo for the missing pieces. Don't forget to add your own subscribes and call Handshake() // when you are done. type ChildDevice struct { devInfo DeviceInfo parent ParentDevice logger dilog.Logger closed *int32 configs map[uint16]deviceConfig childrenLock sync.RWMutex children map[string]Device errorLock sync.RWMutex errors []rpc.ErrorCode stateLock sync.RWMutex state rpc.DeviceState gatewaysLock sync.RWMutex gateways []string } // NewChildDevice creates a new ChildDevice with the provided device information and parent. The gateway of the parent // is already set, but the actual register is really made after Handshake() func NewChildDevice(devInfo DeviceInfo, parent ParentDevice) *ChildDevice { l := parent.Logger().WithFields(dilog.Fields{ "device:uid": devInfo.UID(), "device:type": devInfo.Type(), }) dev := &ChildDevice{ devInfo: devInfo, parent: parent, logger: l, closed: new(int32), children: make(map[string]Device), configs: make(map[uint16]deviceConfig), state: rpc.DeviceStateIdle, } dev.AddGateway(parent.UID()) dev.Subscribe(rpc.ClassMethodDeviceData, rpc.MsgTypeRequest, dev.requestDeviceData) dev.Subscribe(rpc.ClassMethodDeviceInfo, rpc.MsgTypeRequest, dev.deviceInfo) dev.Subscribe(rpc.ClassMethodConfigInfo, rpc.MsgTypeRequest, dev.configInfo) dev.Subscribe(rpc.ClassMethodConfigGet, rpc.MsgTypeRequest, dev.configGet) dev.Subscribe(rpc.ClassMethodConfigSet, rpc.MsgTypeRequest, dev.configSet) dev.Subscribe(rpc.ClassMethodConfigReset, rpc.MsgTypeRequest, dev.configReset) dev.Subscribe(rpc.ClassMethodConnectionInfo, rpc.MsgTypeRequest, dev.connectionInfo) return dev } // Handshake registers by the parent which connects this device to SMP. // After this call the device is officially 'online' func (d *ChildDevice) Handshake() error { return d.parent.RegisterDevice(d.devInfo) } // Close unregisters the connection func (d *ChildDevice) Close() error { // Change the value to closed if the value was not closed. If there was no change return nil if !atomic.CompareAndSwapInt32(d.closed, 0, 1) { return nil } d.childrenLock.RLock() for _, c := range d.children { if err := c.Close(); err != nil { d.logger.WithError(err).WithField("child", c.UID()).Error("cannot close child") } } d.childrenLock.RUnlock() // Let the info device know that we are closing if err := d.devInfo.Close(); err != nil { d.logger.WithError(err).Error("info device close failed") } // Unregister this device return d.parent.UnregisterUID(d.devInfo.UID()) } // UID returns the device UID of this device func (d *ChildDevice) UID() string { return d.devInfo.UID() } // Search will search for the UID in the tree below this device. When the device is not found it will also look in // the trees of the siblings and parents. func (d *ChildDevice) Search(uid string) (Device, error) { if uid == d.UID() { return d, nil } d.childrenLock.RLock() defer d.childrenLock.RUnlock() // First find via the direct children (fastest solution) for pUID, dev := range d.children { if pUID == uid { return dev, nil } } // Not found als direct child. Now find in the sub children for _, dev := range d.children { pDev, ok := dev.(ParentDevice) if !ok { continue } fDev, err := pDev.Search(uid) if err == nil { return fDev, nil } } return nil, errors.New("not found") } // Parent returns the parent of this child device func (d *ChildDevice) Parent() ParentDevice { return d.parent } // GetState returns the current device state func (d *ChildDevice) GetState() rpc.DeviceState { d.stateLock.RLock() defer d.stateLock.RUnlock() return d.state } // SetState sets the current device state func (d *ChildDevice) SetState(st rpc.DeviceState) { d.stateLock.Lock() d.state = st d.stateLock.Unlock() } // AddGateway adds a gateway to the list of gateways func (d *ChildDevice) AddGateway(deviceUID string) { d.gatewaysLock.Lock() defer d.gatewaysLock.Unlock() for _, gwUID := range d.gateways { if gwUID == deviceUID { return } } d.gateways = append(d.gateways, deviceUID) } // Logger returns a logger with the device:uid set func (d *ChildDevice) Logger() dilog.Logger { return d.logger } // getErrors returns all the errors of this device using a mutex func (d *ChildDevice) getErrors() []rpc.ErrorCode { d.errorLock.RLock() defer d.errorLock.RUnlock() return d.errors } // ClearError removes an error code from a device func (d *ChildDevice) ClearError(err rpc.ErrorCode) bool { var changed bool d.errorLock.Lock() index := -1 for i, code := range d.errors { if code == err { index = i break } } if index >= 0 { d.errors = append(d.errors[:index], d.errors[index+1:]...) changed = true } d.errorLock.Unlock() return changed } // SetError sets an error into the device func (d *ChildDevice) SetError(err rpc.ErrorCode) bool { d.errorLock.Lock() for _, code := range d.errors { if code == err { d.errorLock.Unlock() return false } } d.errors = append(d.errors, err) d.errorLock.Unlock() return true } // HasError returns whether the device has an active error func (d *ChildDevice) HasError() bool { d.errorLock.RLock() defer d.errorLock.RUnlock() return len(d.errors) > 0 } // AddConfig adds an configuration item func (d *ChildDevice) AddConfig(c rpc.Config, get RPCCallback, set ConfigSetCallback, reset ConfigResetCallback) { d.configs[c.UID()] = deviceConfig{ c: c, get: get, set: set, reset: reset, } } // SendDeviceData publishes the device:data property func (d *ChildDevice) SendDeviceData() error { return d.Send(d.getDeviceData(rpc.MsgTypePublish, 0)) } // getDeviceData retrieves the device data property func (d *ChildDevice) getDeviceData(t rpc.MsgType, id uint32) *rpc.Msg { return &rpc.Msg{ ClassMethod: rpc.ClassMethodDeviceData, ID: id, Type: t, Result: rpc.NewResult([]rpc.DeviceData{{ State: d.GetState(), Error: d.HasError(), Errors: d.getErrors(), }}), } } // PublishSensorData publishes a single ResultValueItem func (d *ChildDevice) PublishSensorData(t ditime.Time, data []*rpc.ResultValueItem) error { return d.Send(&rpc.Msg{ Time: t, ClassMethod: rpc.ClassMethodSensorData, Type: rpc.MsgTypePublish, Result: rpc.NewResult(data), }) } // Subscribe subscribed for callbacks for this device func (d *ChildDevice) Subscribe(classMethod rpc.ClassMethod, t rpc.MsgType, cb RPCCallback) { d.parent.Router().Subscribe(d.devInfo.UID(), classMethod, t, cb) } // Send transfers a message for this device to the remote host func (d *ChildDevice) Send(m *rpc.Msg) error { m.DeviceUID = d.devInfo.UID() return d.parent.Router().Send(m) } // RegisterUID registers the uid on the connection (dinetlowlevel message type Register) // DONT use this function, this is for internal use only. Use childdevice.Handshake() func (d *ChildDevice) RegisterUID(uid string) error { return d.parent.RegisterUID(uid) } // UnregisterUID unregisters the uid on the connection (dinetlowlevel message type Unregister) // DONT use this function, this is for internal use only. Use childdevice.Close() func (d *ChildDevice) UnregisterUID(uid string) error { return d.parent.UnregisterUID(uid) } // RegisterDevice registers a new device on behalf of the current device. The children will be updated with the given // device // DONT use this function, this is for internal use only. Use childdevice.Handshake() func (d *ChildDevice) RegisterDevice(device Device) error { d.childrenLock.Lock() d.children[device.UID()] = device d.childrenLock.Unlock() return d.parent.RegisterUID(device.UID()) } // UnregisterDevice unregisters a device on behalf of the current device. The children will be updated with the given // device // DONT use this function, this is for internal use only. Use childdevice.Close() func (d *ChildDevice) UnregisterDevice(uid string) error { d.childrenLock.Lock() delete(d.children, uid) d.childrenLock.Unlock() return d.parent.UnregisterUID(uid) } // Router returns the router func (d *ChildDevice) Router() *Router { return d.parent.Router() } // requestDeviceData is the callback for device:data func (d *ChildDevice) requestDeviceData(req *rpc.Msg) *rpc.Msg { return d.getDeviceData(rpc.MsgTypeReply, req.ID) } // deviceInfo is the callback for device:info func (d *ChildDevice) configInfo(req *rpc.Msg) *rpc.Msg { rep := req.CreateReply() res := make([]rpc.ResultConfigInfoItem, 0, len(d.configs)) for _, item := range d.configs { res = append(res, item.c.Info()) } rep.Result = rpc.NewResult(res) return rep } // configSet is the callback for config:set func (d *ChildDevice) configSet(req *rpc.Msg) *rpc.Msg { rep := req.CreateReply() p := rpc.ConfigParam{} err := req.Params.Unmarshal(&p) if err != nil { d.logger.WithError(err).Warning("config set error") return rep.SetError(rpc.EParam) } if item, ok := d.configs[p.UID]; ok && item.set != nil { return rep.SetError(item.set(p)) } return rep.SetError(rpc.EOpnotsupp) } // configGet is the callback for config:get func (d *ChildDevice) configGet(req *rpc.Msg) *rpc.Msg { rep := req.CreateReply() p := rpc.ConfigParam{} err := req.Params.Unmarshal(&p) if err != nil { d.logger.WithError(err).Warning("config set error") return rep.SetError(rpc.EParam) } if item, ok := d.configs[p.UID]; ok && item.get != nil { return item.get(rep) } return rep.SetError(rpc.EOpnotsupp) } // configReset is the callback for config:reset func (d *ChildDevice) configReset(req *rpc.Msg) *rpc.Msg { rep := req.CreateReply() p := rpc.ConfigParam{} err := req.Params.Unmarshal(&p) if err != nil { d.logger.WithError(err).Warning("config set error") return rep.SetError(rpc.EParam) } if item, ok := d.configs[p.UID]; ok && item.reset != nil { return rep.SetError(item.reset()) } return rep.SetError(rpc.EOpnotsupp) } // deviceInfo is the callback for device:info func (d *ChildDevice) deviceInfo(req *rpc.Msg) *rpc.Msg { rep := req.CreateReply() v, rpcerr := d.devInfo.Versions() if rpcerr != rpc.Ok { return rep.SetError(rpcerr) } rep.Result = rpc.NewResult([]rpc.DeviceInfo{{ Type: d.devInfo.Type(), Version: v, }}) return rep } // connectionInfo is the callback for connection:info func (d *ChildDevice) connectionInfo(req *rpc.Msg) *rpc.Msg { rep := req.CreateReply() timeoutDuration := d.devInfo.TimeoutDuration() data, rpcerr := d.devInfo.Transport() if rpcerr != rpc.Ok { return rep.SetError(rpcerr) } d.gatewaysLock.RLock() defer d.gatewaysLock.RUnlock() ci := rpc.ConnectionInfo{ Timeout: uint32(timeoutDuration.Seconds()), Gateways: d.gateways, } if err := unmarshalTransportData(&ci, data); err != nil { return rep.SetError(rpc.EOpnotsupp) } rep.Result = rpc.NewResult([]rpc.ConnectionInfo{ci}) return rep } // unmarshalTransportData updates the Transport of the ci using the given data func unmarshalTransportData(ci *rpc.ConnectionInfo, data interface{}) error { switch t := data.(type) { case *rpc.ConnectionInfoService: ci.Service = t ci.Transport = rpc.ConnectionTransportService case *rpc.ConnectionInfoCan: ci.Can = t ci.Transport = rpc.ConnectionTransportCan case *rpc.ConnectionInfoCP3000: ci.CP3000 = t ci.Transport = rpc.ConnectionTransportCP3000 case *rpc.ConnectionInfoXBee: ci.XBee = t ci.Transport = rpc.ConnectionTransportXBee case *rpc.ConnectionInfoCellular: ci.Cellular = t ci.Transport = rpc.ConnectionTransportCellular case *rpc.ConnectionInfoLoRa: ci.LoRa = t ci.Transport = rpc.ConnectionTransportLoRa case *rpc.ConnectionInfoNBIoT: ci.NBIoT = t ci.Transport = rpc.ConnectionTransportNBIoT default: return ErrInvalidTransportType } return nil }