package rpc import ( "fmt" "math/rand" "reflect" "strings" "time" "src.dualinventive.com/go/dinet/ditime" ) // CurrentDinetrpc is the current DI-Net RPC version const CurrentDinetrpc = uint(1) // DeferredDecoder is a type which is used for deferred decoding type DeferredDecoder interface { Unmarshal(interface{}) error IsEmpty() bool } // Msg represents a single DI-Net RPC message type Msg struct { Dinetrpc uint `json:"dinetrpc" msgpack:"dinetrpc"` Async Async `json:"async,omitempty" msgpack:"async,omitempty"` Rt bool `json:"rt,omitempty" msgpack:"rt,omitempty"` RtSeqnr uint16 `json:"rt:seqnr,omitempty" msgpack:"rt:seqnr,omitempty"` ID uint32 `json:"id,omitempty" msgpack:"id,omitempty"` DeviceUID string `json:"device:uid,omitempty" msgpack:"device:uid,omitempty"` ProjectID uint `json:"project:id,omitempty" msgpack:"project:id,omitempty"` UserID uint `json:"user:id,omitempty" msgpack:"user:id,omitempty"` Time ditime.Time `json:"time" msgpack:"time"` TimeRecv ditime.Time `json:"time:recv,omitempty" msgpack:"-"` Type MsgType `json:"-" msgpack:"-"` ClassMethod ClassMethod `json:"-" msgpack:"-"` Error *Error `json:"error,omitempty" msgpack:"error,omitempty"` Result DeferredDecoder `json:"result,omitempty" msgpack:"result,omitempty"` Params DeferredDecoder `json:"params,omitempty" msgpack:"params,omitempty"` rawBytes []byte class string method string } // NewParams allows for any datatype (not conforming the DeferredDecoder interface) // to be applied into Params func NewParams(d interface{}) DeferredDecoder { return &deferredDecoder{out: d} } // NewResult allows for any datatype (not conforming the DeferredDecoder interface) // to be applied into Result func NewResult(d interface{}) DeferredDecoder { return &deferredDecoder{out: d} } // CreateReply creates a copy of a message for replies func (m *Msg) CreateReply() *Msg { msg := *m msg.Type = MsgTypeReply msg.Params = nil msg.Error = nil msg.Time = 0 return &msg } // msgAlias is a helper struct for serialisation and deserialisation type msgAlias Msg // msgMeta is a helper struct for serialisation and deserialisation type msgMeta struct { Req string `json:"req,omitempty" msgpack:"req,omitempty"` Rep string `json:"rep,omitempty" msgpack:"rep,omitempty"` Pub string `json:"pub,omitempty" msgpack:"pub,omitempty"` *msgAlias } // UnmarshalFinish is a helper function which must be called after deserialization func (m *msgMeta) UnmarshalFinish() { switch { case m.Req != "": m.msgAlias.Type = MsgTypeRequest m.msgAlias.ClassMethod = ClassMethod(m.Req) case m.Rep != "": m.msgAlias.Type = MsgTypeReply m.msgAlias.ClassMethod = ClassMethod(m.Rep) case m.Pub != "": m.msgAlias.Type = MsgTypePublish m.msgAlias.ClassMethod = ClassMethod(m.Pub) } } func (m *msgMeta) detachDeferredDecoders() { if dec, ok := m.msgAlias.Result.(*deferredDecoder); ok { if len(dec.data) == 0 && dec.out == nil { m.msgAlias.Result = nil } } if dec, ok := m.msgAlias.Params.(*deferredDecoder); ok { if len(dec.data) == 0 && dec.out == nil { m.msgAlias.Params = nil } } } // MarshalPrepare is a helper function which must be called before serialization func (m *msgMeta) MarshalPrepare() { m.detachDeferredDecoders() if m.msgAlias.Dinetrpc == 0 { m.msgAlias.Dinetrpc = CurrentDinetrpc } if m.msgAlias.Time == 0 { m.msgAlias.Time = ditime.Now() } switch m.msgAlias.Type { case MsgTypeRequest: m.Req = string(m.msgAlias.ClassMethod) if m.msgAlias.ID == 0 { m.msgAlias.ID = rand.Uint32() } case MsgTypeReply: m.Rep = string(m.msgAlias.ClassMethod) case MsgTypePublish: m.Pub = string(m.msgAlias.ClassMethod) m.msgAlias.ID = 0 } } func (m *Msg) unmarshalGeneric(data []byte, f genericUnmarshaller) error { if m.Params == nil { m.Params = &deferredDecoder{} } if m.Result == nil { m.Result = &deferredDecoder{} } msg := &msgMeta{msgAlias: (*msgAlias)(m)} if err := f(data, &msg); err != nil { return err } msg.UnmarshalFinish() msg.rawBytes = data // When params is null, the Params is set to nil by the decoder // we explicitly don't want that and assure that params is never nil if m.Params == nil { m.Params = &deferredDecoder{} } if m.Result == nil { m.Result = &deferredDecoder{} } return nil } // Bytes returns the raw message bytes after successful Marshal or Unmarshal // it returns an empty slice when the message is incorrect func (m *Msg) Bytes() []byte { return m.rawBytes } // IsError checks if the message is an Error reply message func (m *Msg) IsError() bool { if m.Error == nil { return false } return m.Error.Code != Ok && m.Type == MsgTypeReply } // SetError sets the ErrorCode in the message func (m *Msg) SetError(err ErrorCode) *Msg { if m.Error == nil { m.Error = GetError(err) return m } m.Error = GetError(err, m.Error.Data) return m } // Valid checks if the RPC message is valid func (m *Msg) Valid() error { // check dinet rpc if m.Dinetrpc != CurrentDinetrpc { return &InvalidMsgError{ErrMsgInvalidDinetrpc, m} } if err := m.validClassMethod(); err != nil { return err } // when the error message is set, it must contain an error if m.Type == MsgTypeReply && m.Error != nil && m.Error.Code == Ok { return &InvalidMsgError{ErrMsgErrorWithOk, m} } // check time if m.Time == 0 { return &InvalidMsgError{ErrMsgEmptyTime, m} } return nil } func (m *Msg) validClassMethod() error { // device must be empty or valid if len(m.DeviceUID) > 0 && !ValidDeviceUID(m.DeviceUID) { return &InvalidMsgError{ErrMsgInvalidDeviceUID, m} } // message must contain either device UID, project ID or user ID if !ValidDeviceUID(m.DeviceUID) && !ValidProjectID(m.ProjectID) && !ValidUserID(m.UserID) { return &InvalidMsgError{ErrMsgEmptyDeviceProjectUser, m} } // no class method is invalid if len(m.ClassMethod) == 0 { return &InvalidMsgError{ErrMsgInvalidClassMethod, m} } return nil } // Class calculates the RPC message class from the ClassMethod property func (m *Msg) Class() Class { if m.class == "" { m.splitClassMethod() } return Class(m.class) } // Method calculates the RPC message method from the ClassMethod property func (m *Msg) Method() string { if m.method == "" { m.splitClassMethod() } return m.method } // splitClassMethod splits the class:method and caches it in m.method, and m.class func (m *Msg) splitClassMethod() { s := strings.SplitN(string(m.ClassMethod), ":", 2) if len(s) > 1 { m.class = s[0] m.method = s[1] } } // String from the DI-Net RPC message func (m *Msg) String() string { prefix := m.Time.Format(time.StampMilli) if m.DeviceUID != "" { prefix += " device:" + m.DeviceUID } else if m.ProjectID != 0 { prefix += fmt.Sprintf(" project:%d", m.ProjectID) } if m.UserID != 0 { prefix += fmt.Sprintf(" user:%d", m.UserID) } prefix += fmt.Sprintf(" %s %s", m.Type.String(), m.ClassMethod) if m.IsError() { return prefix + fmt.Sprintf(" ERROR %v", m.Error.Code) } if m.Type == MsgTypeRequest { return prefix + fmt.Sprintf(" %v", m.Params) } return prefix + fmt.Sprintf(" %v", m.Result) } // Compare checks if the given message is the same. It ignores non relevant data e.g. time, ID func (m *Msg) Compare(msg2 *Msg) bool { return CompareMsg(m, msg2) } // CompareMsg checks if the two messages are the same. It ignores non relevant data e.g. time, ID func CompareMsg(msg1 *Msg, msg2 *Msg) bool { if !CompareMsgHeader(msg1, msg2) { return false } if !CompareError(msg1.Error, msg2.Error) { return false } if !reflect.DeepEqual(msg1.Result, msg2.Result) { return false } if !reflect.DeepEqual(msg1.Params, msg2.Params) { return false } return true } // CompareMsgHeader checks if the two message headers are the same. It ignores non relevant data e.g. time, ID func CompareMsgHeader(msg1 *Msg, msg2 *Msg) bool { if msg1.Dinetrpc != msg2.Dinetrpc { return false } if msg1.Async != msg2.Async { return false } if msg1.DeviceUID != msg2.DeviceUID { return false } if msg1.ProjectID != msg2.ProjectID { return false } if msg1.UserID != msg2.UserID { return false } if msg1.Type != msg2.Type { return false } if msg1.ClassMethod != msg2.ClassMethod { return false } return true }