package ll import ( "bytes" "encoding/binary" "errors" "io" ) // A Decoder reads and decodes Low-level messages from an input stream type Decoder struct { buf []byte rd io.Reader r, w int s decoderState dataSize int } const ( decoderBufChunkSize = 4096 decoderBufMaxSize = 64 * 1024 ) type decoderState uint const ( decoderStateStart decoderState = iota decoderStateHeader decoderState = iota decoderStateData decoderState = iota ) // NewDecoder returns a new decoder that reads from r // The decoder introduces its own buffering and may read data from r beyond single Messages requested func NewDecoder(r io.Reader) *Decoder { buf := make([]byte, decoderBufChunkSize) return &Decoder{buf: buf, rd: r, s: decoderStateStart} } func (dec *Decoder) readHeader(msg *Msg) error { if len(dec.buf[dec.r:]) < remaningHeaderSize { return nil } r := bytes.NewReader(dec.buf[dec.r:]) if err := dec.readType(r, &msg.Type); err != nil { return err } if err := dec.readSize(r); err != nil { return err } dec.r += remaningHeaderSize dec.s = decoderStateData return nil } func (dec *Decoder) readType(r io.Reader, t *MsgType) error { if err := binary.Read(r, binary.BigEndian, t); err != nil { return err } if !t.Valid() { return errors.New("invalid message type") } return nil } func (dec *Decoder) readSize(r io.Reader) error { var t uint16 if err := binary.Read(r, binary.BigEndian, &t); err != nil { return err } dec.dataSize = int(t) - hdrSize return nil } // readData reads all data into the message when we have enough, it returns true when // add data is read, false otherwise func (dec *Decoder) readData(m *Msg) bool { if len(dec.buf[dec.r:]) < dec.dataSize { return false } if dec.dataSize > 0 { mData := make([]byte, dec.dataSize) copy(mData, dec.buf[dec.r:dec.r+dec.dataSize]) m.Data = mData } dec.r += dec.dataSize dec.s = decoderStateStart return true } // Decode reads the next Msg from its input and stores it in the value pointed to by m. func (dec *Decoder) Decode(m *Msg) error { for { prevState := dec.s switch dec.s { case decoderStateStart: if r := bytes.Index(dec.buf[dec.r:], []byte(hdrMagic)); r != -1 { dec.r += r + len(hdrMagic) dec.s = decoderStateHeader } case decoderStateHeader: if err := dec.readHeader(m); err != nil { return err } case decoderStateData: if done := dec.readData(m); done { return nil } } // When the state changes, first go into the new state before calling fill again if prevState == dec.s { if err := dec.fill(); err != nil { return err } } } } func (dec *Decoder) grow() error { if cap(dec.buf)+decoderBufChunkSize > decoderBufMaxSize { return errors.New("ll: 2^16-1 packet size exceeded") } newBuf := make([]byte, len(dec.buf), cap(dec.buf)+decoderBufChunkSize) copy(newBuf, dec.buf) dec.buf = newBuf return nil } func (dec *Decoder) fill() error { // Slide existing data to beginning. if dec.r > 0 { copy(dec.buf, dec.buf[dec.r:dec.w]) dec.w -= dec.r dec.r = 0 } // Check if we need to grow if dec.w >= cap(dec.buf) { if err := dec.grow(); err != nil { return err } } // Read new data n, err := dec.rd.Read(dec.buf[dec.w:cap(dec.buf)]) if err != nil { return err } dec.w += n dec.buf = dec.buf[0:dec.w] return nil }