src.dualinventive.com/go/dinet/rpc/msg.go

323 lines
8.3 KiB
Go

package rpc
import (
"fmt"
"math/rand"
"reflect"
"strings"
"time"
"src.dualinventive.com/go/dinet/ditime"
)
// CurrentDinetrpc is the current DI-Net RPC version
const CurrentDinetrpc = uint(1)
// DeferredDecoder is a type which is used for deferred decoding
type DeferredDecoder interface {
Unmarshal(interface{}) error
IsEmpty() bool
}
// Msg represents a single DI-Net RPC message
type Msg struct {
Dinetrpc uint `json:"dinetrpc" msgpack:"dinetrpc"`
Async Async `json:"async,omitempty" msgpack:"async,omitempty"`
Rt bool `json:"rt,omitempty" msgpack:"rt,omitempty"`
RtSeqnr uint16 `json:"rt:seqnr,omitempty" msgpack:"rt:seqnr,omitempty"`
ID uint32 `json:"id,omitempty" msgpack:"id,omitempty"`
DeviceUID string `json:"device:uid,omitempty" msgpack:"device:uid,omitempty"`
ProjectID uint `json:"project:id,omitempty" msgpack:"project:id,omitempty"`
UserID uint `json:"user:id,omitempty" msgpack:"user:id,omitempty"`
Time ditime.Time `json:"time" msgpack:"time"`
TimeRecv ditime.Time `json:"time:recv,omitempty" msgpack:"-"`
Type MsgType `json:"-" msgpack:"-"`
ClassMethod ClassMethod `json:"-" msgpack:"-"`
Error *Error `json:"error,omitempty" msgpack:"error,omitempty"`
Result DeferredDecoder `json:"result,omitempty" msgpack:"result,omitempty"`
Params DeferredDecoder `json:"params,omitempty" msgpack:"params,omitempty"`
rawBytes []byte
class string
method string
}
// NewParams allows for any datatype (not conforming the DeferredDecoder interface)
// to be applied into Params
func NewParams(d interface{}) DeferredDecoder {
return &deferredDecoder{out: d}
}
// NewResult allows for any datatype (not conforming the DeferredDecoder interface)
// to be applied into Result
func NewResult(d interface{}) DeferredDecoder {
return &deferredDecoder{out: d}
}
// CreateReply creates a copy of a message for replies
func (m *Msg) CreateReply() *Msg {
msg := *m
msg.Type = MsgTypeReply
msg.Params = nil
msg.Error = nil
msg.Time = 0
return &msg
}
// msgAlias is a helper struct for serialisation and deserialisation
type msgAlias Msg
// msgMeta is a helper struct for serialisation and deserialisation
type msgMeta struct {
Req string `json:"req,omitempty" msgpack:"req,omitempty"`
Rep string `json:"rep,omitempty" msgpack:"rep,omitempty"`
Pub string `json:"pub,omitempty" msgpack:"pub,omitempty"`
*msgAlias
}
// UnmarshalFinish is a helper function which must be called after deserialization
func (m *msgMeta) UnmarshalFinish() {
switch {
case m.Req != "":
m.msgAlias.Type = MsgTypeRequest
m.msgAlias.ClassMethod = ClassMethod(m.Req)
case m.Rep != "":
m.msgAlias.Type = MsgTypeReply
m.msgAlias.ClassMethod = ClassMethod(m.Rep)
case m.Pub != "":
m.msgAlias.Type = MsgTypePublish
m.msgAlias.ClassMethod = ClassMethod(m.Pub)
}
}
func (m *msgMeta) detachDeferredDecoders() {
if dec, ok := m.msgAlias.Result.(*deferredDecoder); ok {
if len(dec.data) == 0 && dec.out == nil {
m.msgAlias.Result = nil
}
}
if dec, ok := m.msgAlias.Params.(*deferredDecoder); ok {
if len(dec.data) == 0 && dec.out == nil {
m.msgAlias.Params = nil
}
}
}
// MarshalPrepare is a helper function which must be called before serialization
func (m *msgMeta) MarshalPrepare() {
m.detachDeferredDecoders()
if m.msgAlias.Dinetrpc == 0 {
m.msgAlias.Dinetrpc = CurrentDinetrpc
}
if m.msgAlias.Time == 0 {
m.msgAlias.Time = ditime.Now()
}
switch m.msgAlias.Type {
case MsgTypeRequest:
m.Req = string(m.msgAlias.ClassMethod)
if m.msgAlias.ID == 0 {
m.msgAlias.ID = rand.Uint32()
}
case MsgTypeReply:
m.Rep = string(m.msgAlias.ClassMethod)
case MsgTypePublish:
m.Pub = string(m.msgAlias.ClassMethod)
m.msgAlias.ID = 0
}
}
func (m *Msg) unmarshalGeneric(data []byte, f genericUnmarshaller) error {
if m.Params == nil {
m.Params = &deferredDecoder{}
}
if m.Result == nil {
m.Result = &deferredDecoder{}
}
msg := &msgMeta{msgAlias: (*msgAlias)(m)}
if err := f(data, &msg); err != nil {
return err
}
msg.UnmarshalFinish()
msg.rawBytes = data
// When params is null, the Params is set to nil by the decoder
// we explicitly don't want that and assure that params is never nil
if m.Params == nil {
m.Params = &deferredDecoder{}
}
if m.Result == nil {
m.Result = &deferredDecoder{}
}
return nil
}
// Bytes returns the raw message bytes after successful Marshal or Unmarshal
// it returns an empty slice when the message is incorrect
func (m *Msg) Bytes() []byte {
return m.rawBytes
}
// IsError checks if the message is an Error reply message
func (m *Msg) IsError() bool {
if m.Error == nil {
return false
}
return m.Error.Code != Ok && m.Type == MsgTypeReply
}
// SetError sets the ErrorCode in the message
func (m *Msg) SetError(err ErrorCode) *Msg {
if m.Error == nil {
m.Error = GetError(err)
return m
}
m.Error = GetError(err, m.Error.Data)
return m
}
// Valid checks if the RPC message is valid
func (m *Msg) Valid() error {
// check dinet rpc
if m.Dinetrpc != CurrentDinetrpc {
return &InvalidMsgError{ErrMsgInvalidDinetrpc, m}
}
if err := m.validClassMethod(); err != nil {
return err
}
// when the error message is set, it must contain an error
if m.Type == MsgTypeReply && m.Error != nil && m.Error.Code == Ok {
return &InvalidMsgError{ErrMsgErrorWithOk, m}
}
// check time
if m.Time == 0 {
return &InvalidMsgError{ErrMsgEmptyTime, m}
}
return nil
}
func (m *Msg) validClassMethod() error {
// device must be empty or valid
if len(m.DeviceUID) > 0 && !ValidDeviceUID(m.DeviceUID) {
return &InvalidMsgError{ErrMsgInvalidDeviceUID, m}
}
// message must contain either device UID, project ID or user ID
if !ValidDeviceUID(m.DeviceUID) && !ValidProjectID(m.ProjectID) && !ValidUserID(m.UserID) {
return &InvalidMsgError{ErrMsgEmptyDeviceProjectUser, m}
}
// no class method is invalid
if len(m.ClassMethod) == 0 {
return &InvalidMsgError{ErrMsgInvalidClassMethod, m}
}
return nil
}
// Class calculates the RPC message class from the ClassMethod property
func (m *Msg) Class() Class {
if m.class == "" {
m.splitClassMethod()
}
return Class(m.class)
}
// Method calculates the RPC message method from the ClassMethod property
func (m *Msg) Method() string {
if m.method == "" {
m.splitClassMethod()
}
return m.method
}
// splitClassMethod splits the class:method and caches it in m.method, and m.class
func (m *Msg) splitClassMethod() {
s := strings.SplitN(string(m.ClassMethod), ":", 2)
if len(s) > 1 {
m.class = s[0]
m.method = s[1]
}
}
// String from the DI-Net RPC message
func (m *Msg) String() string {
prefix := m.Time.Format(time.StampMilli)
if m.DeviceUID != "" {
prefix += " device:" + m.DeviceUID
} else if m.ProjectID != 0 {
prefix += fmt.Sprintf(" project:%d", m.ProjectID)
}
if m.UserID != 0 {
prefix += fmt.Sprintf(" user:%d", m.UserID)
}
prefix += fmt.Sprintf(" %s %s", m.Type.String(), m.ClassMethod)
if m.IsError() {
return prefix + fmt.Sprintf(" ERROR %v", m.Error.Code)
}
if m.Type == MsgTypeRequest {
return prefix + fmt.Sprintf(" %v", m.Params)
}
return prefix + fmt.Sprintf(" %v", m.Result)
}
// Compare checks if the given message is the same. It ignores non relevant data e.g. time, ID
func (m *Msg) Compare(msg2 *Msg) bool {
return CompareMsg(m, msg2)
}
// CompareMsg checks if the two messages are the same. It ignores non relevant data e.g. time, ID
func CompareMsg(msg1 *Msg, msg2 *Msg) bool {
if !CompareMsgHeader(msg1, msg2) {
return false
}
if !CompareError(msg1.Error, msg2.Error) {
return false
}
if !reflect.DeepEqual(msg1.Result, msg2.Result) {
return false
}
if !reflect.DeepEqual(msg1.Params, msg2.Params) {
return false
}
return true
}
// CompareMsgHeader checks if the two message headers are the same. It ignores non relevant data e.g. time, ID
func CompareMsgHeader(msg1 *Msg, msg2 *Msg) bool {
if msg1.Dinetrpc != msg2.Dinetrpc {
return false
}
if msg1.Async != msg2.Async {
return false
}
if msg1.DeviceUID != msg2.DeviceUID {
return false
}
if msg1.ProjectID != msg2.ProjectID {
return false
}
if msg1.UserID != msg2.UserID {
return false
}
if msg1.Type != msg2.Type {
return false
}
if msg1.ClassMethod != msg2.ClassMethod {
return false
}
return true
}