package wsconn import ( "encoding/json" "fmt" "github.com/sirupsen/logrus" "src.dualinventive.com/go/dinet/rpc" "src.dualinventive.com/go/websocketserver/internal/redis" ) // mtiWssRequestResultItem type mtiWssRequestResultItem struct { DeviceUID string `json:"device:uid"` ProjectID uint64 `json:"project:id"` Fields []string `json:"fields"` } // Fetch all subscribed deviceFields from redis and send to client func (c *client) mtiWssSendDeviceFieldsFromRedis(deviceFields map[string][]string) { // When there are no device subscriptions we are finished if len(deviceFields) == 0 { return } // Fetch all values per device from Redis for deviceUID, fields := range deviceFields { key := redis.KeyPrefixDevice + deviceUID vals, err := redisClient.HMGet(key, fields) if err != nil { continue } // Filter out empty values from redis and attach field names to values values := make(map[string]string) for n, field := range fields { // Skip empty values (keys which don't exist in redis) if len(vals[n]) == 0 { continue } values[field] = vals[n] } c.PublishDeviceUIDUpdate(deviceUID, values) } } // Fetch all subscribed projectFields from redis and send to client func (c *client) mtiWssSendProjectFieldsFromRedis(projectFields map[uint64][]string) { // When there are no project subscriptions we are finished if len(projectFields) == 0 { return } // Fetch all values per project from Redis for projectID, fields := range projectFields { key := fmt.Sprintf("%s%d", redis.KeyPrefixProject, projectID) vals, err := redisClient.HMGet(key, fields) if err != nil { continue } // Filter out empty values from redis and attach field names to values values := make(map[string]string) for n, field := range fields { // Skip empty values (keys which don't exist in redis) if len(vals[n]) == 0 { continue } values[field] = vals[n] } c.PublishProjectIDUpdate(projectID, values) } } // mtiWssRealtimeRequest processes the result field to update the Redis subscriptions func (c *client) mtiWssDecodeRealtimeRequest(result *json.RawMessage) { subscriptions := make([]mtiWssRequestResultItem, 0) if err := json.Unmarshal(*result, &subscriptions); err != nil { return } deviceFields := make(map[string][]string) projectFields := make(map[uint64][]string) // Parse subscriptions for _, sub := range subscriptions { if len(sub.DeviceUID) == 32 { deviceFields[sub.DeviceUID] = sub.Fields } else if sub.ProjectID > 0 { projectFields[sub.ProjectID] = sub.Fields } } // Update connection subscriptions c.mu.Lock() c.deviceFields = deviceFields c.projectFields = projectFields c.mu.Unlock() // Update client with latest state of the redis values after subscription c.mtiWssSendDeviceFieldsFromRedis(deviceFields) c.mtiWssSendProjectFieldsFromRedis(projectFields) } // mtiWssDecodeReply processes a raw JSON response from mtiwss HTTP endpoint func (c *client) mtiWssDecodeReply(rep []byte) { result := &json.RawMessage{} msg := &rpc.Msg{Result: result} if err := json.Unmarshal(rep, &msg); err != nil { logrus.Error("Invalid response from mtiwss", string(rep)) return } if msg.ClassMethod == rpc.ClassMethodRealtimeRequest && msg.Error == nil { c.mtiWssDecodeRealtimeRequest(result) return } // Forward non realtime:request messages back to the browser c.Send(rep) } // mtiWssDecodeRequest process a raw JSON request from the websocket client // returns true if message was handled, false if it need to be send to mtiwss func (c *client) mtiWssDecodeRequest(req []byte) (bool, error) { msg := &rpc.Msg{} if err := json.Unmarshal(req, &msg); err != nil { return true, err } return false, nil }