package wss import ( "context" "net/http" "time" "github.com/sirupsen/logrus" "src.dualinventive.com/go/websocketserver/internal/mtiwss" "src.dualinventive.com/go/websocketserver/internal/redis" "src.dualinventive.com/go/websocketserver/internal/wsconn" ) // Server wraps a http.Servers for Websocket clients type Server struct { srv *http.Server cancel func() } // redisEventHandler handles single events func redisEventHandler(ev redis.Event, r *redis.Redis) error { values, err := r.HGetAll(ev.Chan + ":" + ev.Msg) if err != nil { return err } // TODO add a hkey:key:value cache in front of all connections so only // the changes are published. Discuss with frontend and app developers wsconn.Publish(ev, values) return nil } // redisEventsHandler reads all cached redis events every second func redisEventsHandler(ctx context.Context, r *redis.Redis) { ticker := time.NewTicker(time.Second) for { select { case <-ctx.Done(): ticker.Stop() return case <-ticker.C: for _, ev := range r.ReadEvents() { err := redisEventHandler(ev, r) if err != nil { logrus.Warnf("redis event error: %v", err) } } } } } // NewServer creates a new http.Server with Websocket endpoint at "/" func NewServer(RedisURIs []string, HTTPListenAddr, MTIServerURI string) (*Server, error) { // Redis r, err := redis.New(RedisURIs) if err != nil { return nil, err } s := &Server{} ctx, cancel := context.WithCancel(context.Background()) s.cancel = cancel // MTIWSS endpoint mtiwss := mtiwss.New(MTIServerURI) // HTTP websocket endpoint wsconn.SetRedis(r) mux := http.NewServeMux() mux.HandleFunc("/", wsconn.NewWsHandlerContext(ctx, mtiwss)) s.srv = &http.Server{ Addr: HTTPListenAddr, Handler: mux, } // Read and process the redis publish events go redisEventsHandler(ctx, r) return s, nil } // ListenAndServe listens and accepts connections func (s *Server) ListenAndServe() error { return s.srv.ListenAndServe() } // Close will disconnect all clients and stops ListenAndServe func (s *Server) Close() error { s.cancel() return s.srv.Close() }