323 lines
8.3 KiB
Go
323 lines
8.3 KiB
Go
package rpc
|
|
|
|
import (
|
|
"fmt"
|
|
"math/rand"
|
|
"reflect"
|
|
"strings"
|
|
"time"
|
|
|
|
"src.dualinventive.com/go/dinet/ditime"
|
|
)
|
|
|
|
// CurrentDinetrpc is the current DI-Net RPC version
|
|
const CurrentDinetrpc = uint(1)
|
|
|
|
// DeferredDecoder is a type which is used for deferred decoding
|
|
type DeferredDecoder interface {
|
|
Unmarshal(interface{}) error
|
|
IsEmpty() bool
|
|
}
|
|
|
|
// Msg represents a single DI-Net RPC message
|
|
type Msg struct {
|
|
Dinetrpc uint `json:"dinetrpc" msgpack:"dinetrpc"`
|
|
Async Async `json:"async,omitempty" msgpack:"async,omitempty"`
|
|
Rt bool `json:"rt,omitempty" msgpack:"rt,omitempty"`
|
|
RtSeqnr uint16 `json:"rt:seqnr,omitempty" msgpack:"rt:seqnr,omitempty"`
|
|
ID uint32 `json:"id,omitempty" msgpack:"id,omitempty"`
|
|
DeviceUID string `json:"device:uid,omitempty" msgpack:"device:uid,omitempty"`
|
|
ProjectID uint `json:"project:id,omitempty" msgpack:"project:id,omitempty"`
|
|
UserID uint `json:"user:id,omitempty" msgpack:"user:id,omitempty"`
|
|
Time ditime.Time `json:"time" msgpack:"time"`
|
|
TimeRecv ditime.Time `json:"time:recv,omitempty" msgpack:"-"`
|
|
Type MsgType `json:"-" msgpack:"-"`
|
|
ClassMethod ClassMethod `json:"-" msgpack:"-"`
|
|
Error *Error `json:"error,omitempty" msgpack:"error,omitempty"`
|
|
Result DeferredDecoder `json:"result,omitempty" msgpack:"result,omitempty"`
|
|
Params DeferredDecoder `json:"params,omitempty" msgpack:"params,omitempty"`
|
|
rawBytes []byte
|
|
class string
|
|
method string
|
|
}
|
|
|
|
// NewParams allows for any datatype (not conforming the DeferredDecoder interface)
|
|
// to be applied into Params
|
|
func NewParams(d interface{}) DeferredDecoder {
|
|
return &deferredDecoder{out: d}
|
|
}
|
|
|
|
// NewResult allows for any datatype (not conforming the DeferredDecoder interface)
|
|
// to be applied into Result
|
|
func NewResult(d interface{}) DeferredDecoder {
|
|
return &deferredDecoder{out: d}
|
|
}
|
|
|
|
// CreateReply creates a copy of a message for replies
|
|
func (m *Msg) CreateReply() *Msg {
|
|
msg := *m
|
|
msg.Type = MsgTypeReply
|
|
msg.Params = nil
|
|
msg.Error = nil
|
|
msg.Time = 0
|
|
return &msg
|
|
}
|
|
|
|
// msgAlias is a helper struct for serialisation and deserialisation
|
|
type msgAlias Msg
|
|
|
|
// msgMeta is a helper struct for serialisation and deserialisation
|
|
type msgMeta struct {
|
|
Req string `json:"req,omitempty" msgpack:"req,omitempty"`
|
|
Rep string `json:"rep,omitempty" msgpack:"rep,omitempty"`
|
|
Pub string `json:"pub,omitempty" msgpack:"pub,omitempty"`
|
|
*msgAlias
|
|
}
|
|
|
|
// UnmarshalFinish is a helper function which must be called after deserialization
|
|
func (m *msgMeta) UnmarshalFinish() {
|
|
switch {
|
|
case m.Req != "":
|
|
m.msgAlias.Type = MsgTypeRequest
|
|
m.msgAlias.ClassMethod = ClassMethod(m.Req)
|
|
case m.Rep != "":
|
|
m.msgAlias.Type = MsgTypeReply
|
|
m.msgAlias.ClassMethod = ClassMethod(m.Rep)
|
|
case m.Pub != "":
|
|
m.msgAlias.Type = MsgTypePublish
|
|
m.msgAlias.ClassMethod = ClassMethod(m.Pub)
|
|
}
|
|
}
|
|
|
|
func (m *msgMeta) detachDeferredDecoders() {
|
|
if dec, ok := m.msgAlias.Result.(*deferredDecoder); ok {
|
|
if len(dec.data) == 0 && dec.out == nil {
|
|
m.msgAlias.Result = nil
|
|
}
|
|
}
|
|
|
|
if dec, ok := m.msgAlias.Params.(*deferredDecoder); ok {
|
|
if len(dec.data) == 0 && dec.out == nil {
|
|
m.msgAlias.Params = nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// MarshalPrepare is a helper function which must be called before serialization
|
|
func (m *msgMeta) MarshalPrepare() {
|
|
m.detachDeferredDecoders()
|
|
|
|
if m.msgAlias.Dinetrpc == 0 {
|
|
m.msgAlias.Dinetrpc = CurrentDinetrpc
|
|
}
|
|
if m.msgAlias.Time == 0 {
|
|
m.msgAlias.Time = ditime.Now()
|
|
}
|
|
|
|
switch m.msgAlias.Type {
|
|
case MsgTypeRequest:
|
|
m.Req = string(m.msgAlias.ClassMethod)
|
|
if m.msgAlias.ID == 0 {
|
|
m.msgAlias.ID = rand.Uint32()
|
|
}
|
|
case MsgTypeReply:
|
|
m.Rep = string(m.msgAlias.ClassMethod)
|
|
case MsgTypePublish:
|
|
m.Pub = string(m.msgAlias.ClassMethod)
|
|
m.msgAlias.ID = 0
|
|
}
|
|
}
|
|
|
|
func (m *Msg) unmarshalGeneric(data []byte, f genericUnmarshaller) error {
|
|
if m.Params == nil {
|
|
m.Params = &deferredDecoder{}
|
|
}
|
|
|
|
if m.Result == nil {
|
|
m.Result = &deferredDecoder{}
|
|
}
|
|
|
|
msg := &msgMeta{msgAlias: (*msgAlias)(m)}
|
|
|
|
if err := f(data, &msg); err != nil {
|
|
return err
|
|
}
|
|
|
|
msg.UnmarshalFinish()
|
|
|
|
msg.rawBytes = data
|
|
|
|
// When params is null, the Params is set to nil by the decoder
|
|
// we explicitly don't want that and assure that params is never nil
|
|
if m.Params == nil {
|
|
m.Params = &deferredDecoder{}
|
|
}
|
|
if m.Result == nil {
|
|
m.Result = &deferredDecoder{}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Bytes returns the raw message bytes after successful Marshal or Unmarshal
|
|
// it returns an empty slice when the message is incorrect
|
|
func (m *Msg) Bytes() []byte {
|
|
return m.rawBytes
|
|
}
|
|
|
|
// IsError checks if the message is an Error reply message
|
|
func (m *Msg) IsError() bool {
|
|
if m.Error == nil {
|
|
return false
|
|
}
|
|
return m.Error.Code != Ok && m.Type == MsgTypeReply
|
|
}
|
|
|
|
// SetError sets the ErrorCode in the message
|
|
func (m *Msg) SetError(err ErrorCode) *Msg {
|
|
if m.Error == nil {
|
|
m.Error = GetError(err)
|
|
return m
|
|
}
|
|
m.Error = GetError(err, m.Error.Data)
|
|
return m
|
|
}
|
|
|
|
// Valid checks if the RPC message is valid
|
|
func (m *Msg) Valid() error {
|
|
// check dinet rpc
|
|
if m.Dinetrpc != CurrentDinetrpc {
|
|
return &InvalidMsgError{ErrMsgInvalidDinetrpc, m}
|
|
}
|
|
|
|
if err := m.validClassMethod(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// when the error message is set, it must contain an error
|
|
if m.Type == MsgTypeReply && m.Error != nil && m.Error.Code == Ok {
|
|
return &InvalidMsgError{ErrMsgErrorWithOk, m}
|
|
}
|
|
|
|
// check time
|
|
if m.Time == 0 {
|
|
return &InvalidMsgError{ErrMsgEmptyTime, m}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (m *Msg) validClassMethod() error {
|
|
// device must be empty or valid
|
|
if len(m.DeviceUID) > 0 && !ValidDeviceUID(m.DeviceUID) {
|
|
return &InvalidMsgError{ErrMsgInvalidDeviceUID, m}
|
|
}
|
|
|
|
// message must contain either device UID, project ID or user ID
|
|
if !ValidDeviceUID(m.DeviceUID) && !ValidProjectID(m.ProjectID) && !ValidUserID(m.UserID) {
|
|
return &InvalidMsgError{ErrMsgEmptyDeviceProjectUser, m}
|
|
}
|
|
|
|
// no class method is invalid
|
|
if len(m.ClassMethod) == 0 {
|
|
return &InvalidMsgError{ErrMsgInvalidClassMethod, m}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Class calculates the RPC message class from the ClassMethod property
|
|
func (m *Msg) Class() Class {
|
|
if m.class == "" {
|
|
m.splitClassMethod()
|
|
}
|
|
return Class(m.class)
|
|
}
|
|
|
|
// Method calculates the RPC message method from the ClassMethod property
|
|
func (m *Msg) Method() string {
|
|
if m.method == "" {
|
|
m.splitClassMethod()
|
|
}
|
|
return m.method
|
|
}
|
|
|
|
// splitClassMethod splits the class:method and caches it in m.method, and m.class
|
|
func (m *Msg) splitClassMethod() {
|
|
s := strings.SplitN(string(m.ClassMethod), ":", 2)
|
|
if len(s) > 1 {
|
|
m.class = s[0]
|
|
m.method = s[1]
|
|
}
|
|
}
|
|
|
|
// String from the DI-Net RPC message
|
|
func (m *Msg) String() string {
|
|
prefix := m.Time.Format(time.StampMilli)
|
|
|
|
if m.DeviceUID != "" {
|
|
prefix += " device:" + m.DeviceUID
|
|
} else if m.ProjectID != 0 {
|
|
prefix += fmt.Sprintf(" project:%d", m.ProjectID)
|
|
}
|
|
|
|
if m.UserID != 0 {
|
|
prefix += fmt.Sprintf(" user:%d", m.UserID)
|
|
}
|
|
|
|
prefix += fmt.Sprintf(" %s %s", m.Type.String(), m.ClassMethod)
|
|
|
|
if m.IsError() {
|
|
return prefix + fmt.Sprintf(" ERROR %v", m.Error.Code)
|
|
}
|
|
if m.Type == MsgTypeRequest {
|
|
return prefix + fmt.Sprintf(" %v", m.Params)
|
|
}
|
|
return prefix + fmt.Sprintf(" %v", m.Result)
|
|
}
|
|
|
|
// Compare checks if the given message is the same. It ignores non relevant data e.g. time, ID
|
|
func (m *Msg) Compare(msg2 *Msg) bool {
|
|
return CompareMsg(m, msg2)
|
|
}
|
|
|
|
// CompareMsg checks if the two messages are the same. It ignores non relevant data e.g. time, ID
|
|
func CompareMsg(msg1 *Msg, msg2 *Msg) bool {
|
|
if !CompareMsgHeader(msg1, msg2) {
|
|
return false
|
|
}
|
|
if !CompareError(msg1.Error, msg2.Error) {
|
|
return false
|
|
}
|
|
if !reflect.DeepEqual(msg1.Result, msg2.Result) {
|
|
return false
|
|
}
|
|
if !reflect.DeepEqual(msg1.Params, msg2.Params) {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
// CompareMsgHeader checks if the two message headers are the same. It ignores non relevant data e.g. time, ID
|
|
func CompareMsgHeader(msg1 *Msg, msg2 *Msg) bool {
|
|
if msg1.Dinetrpc != msg2.Dinetrpc {
|
|
return false
|
|
}
|
|
if msg1.Async != msg2.Async {
|
|
return false
|
|
}
|
|
if msg1.DeviceUID != msg2.DeviceUID {
|
|
return false
|
|
}
|
|
if msg1.ProjectID != msg2.ProjectID {
|
|
return false
|
|
}
|
|
if msg1.UserID != msg2.UserID {
|
|
return false
|
|
}
|
|
if msg1.Type != msg2.Type {
|
|
return false
|
|
}
|
|
if msg1.ClassMethod != msg2.ClassMethod {
|
|
return false
|
|
}
|
|
return true
|
|
}
|