package reader import ( "context" "sync" "time" "src.dualinventive.com/go/dinet" "src.dualinventive.com/go/lib/dilog" ) // MessageReader reads incomming messages from a pubsub connection and send the messages to Redis type MessageReader struct { mu sync.Mutex logger dilog.Logger pubsub dinet.Conn r dinet.Writer } // New creates a new message reader func New(logger dilog.Logger, pubSubURI string, r dinet.Writer) (*MessageReader, error) { con, err := dinet.NewConn(dinet.TransportZmq) if err != nil { return nil, err } err = con.Connect(pubSubURI + "?type=sub") if err != nil { return nil, err } reader := &MessageReader{ logger: logger, pubsub: con, r: r, } return reader, err } // AddConn connects to an additional ZMQ connection func (r *MessageReader) AddConn(pubSubURI string) error { return r.pubsub.Connect(pubSubURI + "?type=sub") } // Serve reads incomming messages from a pubsub connection and send the messages to Redis func (r *MessageReader) Serve(ctx context.Context) { var wg sync.WaitGroup // Wait until the goroutine ends defer wg.Wait() wg.Add(1) go func() { defer wg.Done() <-ctx.Done() r.mu.Lock() defer r.mu.Unlock() if err := r.pubsub.Close(); err != nil { r.logger.WithError(err).Error("unable to close pub sub connection") } }() for { if ctx.Err() != nil { r.logger.WithError(ctx.Err()).Info("reader context error") return } // Receive messages from the reader connection msg, err := r.pubsub.Recv() if err != nil { if err == dinet.ErrDisconnected { r.logger.WithError(err).Warning("Reconnecting due to error") } else { r.logger.WithError(err).Error("Unable to receive publish message due to") } r.reconnect(ctx) continue } if err := r.r.Send(msg); err != nil { r.logger.WithError(err).Warning("Send error") } } } func (r *MessageReader) reconnect(ctx context.Context) { r.mu.Lock() defer r.mu.Unlock() // Do not reconnect when the context is closed if ctx.Err() != nil { return } // Try to reconnect for err := r.pubsub.Reconnect(); err != nil; err = r.pubsub.Reconnect() { r.logger.WithError(err).Warning("Reconnect error") // Do not reconnect when the context is closed if ctx.Err() != nil { return } time.Sleep(100 * time.Millisecond) } }