src.dualinventive.com/go/websocketserver/internal/wss/server.go

97 lines
2.1 KiB
Go

package wss
import (
"context"
"net/http"
"time"
"github.com/sirupsen/logrus"
"src.dualinventive.com/go/websocketserver/internal/mtiwss"
"src.dualinventive.com/go/websocketserver/internal/redis"
"src.dualinventive.com/go/websocketserver/internal/wsconn"
)
// Server wraps a http.Servers for Websocket clients
type Server struct {
srv *http.Server
cancel func()
}
// redisEventHandler handles single events
func redisEventHandler(ev redis.Event, r *redis.Redis) error {
values, err := r.HGetAll(ev.Chan + ":" + ev.Msg)
if err != nil {
return err
}
// TODO add a hkey:key:value cache in front of all connections so only
// the changes are published. Discuss with frontend and app developers
wsconn.Publish(ev, values)
return nil
}
// redisEventsHandler reads all cached redis events every second
func redisEventsHandler(ctx context.Context, r *redis.Redis) {
ticker := time.NewTicker(time.Second)
for {
select {
case <-ctx.Done():
ticker.Stop()
return
case <-ticker.C:
for _, ev := range r.ReadEvents() {
err := redisEventHandler(ev, r)
if err != nil {
logrus.Warnf("redis event error: %v", err)
}
}
}
}
}
// NewServer creates a new http.Server with Websocket endpoint at "/"
func NewServer(RedisURIs []string, HTTPListenAddr, MTIServerURI string) (*Server, error) {
// Redis
r, err := redis.New(RedisURIs)
if err != nil {
return nil, err
}
s := &Server{}
ctx, cancel := context.WithCancel(context.Background())
s.cancel = cancel
// MTIWSS endpoint
mtiwss := mtiwss.New(MTIServerURI)
// HTTP websocket endpoint
wsconn.SetRedis(r)
mux := http.NewServeMux()
mux.HandleFunc("/", wsconn.NewWsHandlerContext(ctx, mtiwss))
s.srv = &http.Server{
Addr: HTTPListenAddr,
Handler: mux,
}
// Read and process the redis publish events
go redisEventsHandler(ctx, r)
return s, nil
}
// ListenAndServe listens and accepts connections
func (s *Server) ListenAndServe() error {
return s.srv.ListenAndServe()
}
// Close will disconnect all clients and stops ListenAndServe
func (s *Server) Close() error {
s.cancel()
return s.srv.Close()
}