package testenv import ( "fmt" "io" "sync" "time" "src.dualinventive.com/go/lib/cp3000" "src.dualinventive.com/go/lib/dilog" ) // CP3000IO contains basic assert IO functions type CP3000IO struct { mx sync.RWMutex Logger dilog.Logger requests []reqRep writeQueue [][]byte c chan bool writeError chan error emptyQueue chan bool } // NewCP3000IO creates a new CP3000IO func NewCP3000IO(log dilog.Logger) *CP3000IO { cp := &CP3000IO{ Logger: log, c: make(chan bool), writeError: make(chan error), emptyQueue: make(chan bool, 1), } cp.updateEmptyQueueFlag() return cp } // Send sends a cp3000 message to set the test in the correct state func (r *CP3000IO) Send(req *cp3000.Msg) { r.mx.Lock() r.writeQueue = append(r.writeQueue, req.Bytes()) r.mx.Unlock() for { time.Sleep(time.Millisecond) r.mx.RLock() if len(r.writeQueue) == 0 { r.mx.RUnlock() break } r.mx.RUnlock() } } // WaitForEmptyQueue waits until the queue is empty. func (r *CP3000IO) WaitForEmptyQueue() { <-r.emptyQueue // We have emptied the empty queue flag, so we need to update it again. r.mx.Lock() r.updateEmptyQueueFlag() r.mx.Unlock() } // updateEmptyQueueFlag updates the empty queue flag. Keep in mind that this requires a lock func (r *CP3000IO) updateEmptyQueueFlag() { if len(r.requests) == 0 { // Add the empty queue flag select { case r.emptyQueue <- true: default: } return } // Remove the empty queue flag, because there are items select { case <-r.emptyQueue: default: } } // AssertReqRep asserts that the next call will be 'req'. When it is, it replies with 'rep' func (r *CP3000IO) AssertReqRep(req *cp3000.Msg, rep *cp3000.Msg) { r.mx.Lock() defer r.mx.Unlock() r.requests = append(r.requests, reqRep{false, req, rep}) // Items are added so we need to update the empty queue flag r.updateEmptyQueueFlag() } // Pub puts a message in the queue and is automatically send when the previous message is handled func (r *CP3000IO) Pub(pub *cp3000.Msg) { r.mx.Lock() defer r.mx.Unlock() r.requests = append(r.requests, reqRep{true, nil, pub}) // Items are added so we need to update the empty queue flag r.updateEmptyQueueFlag() } // waitForRead blocks until there is something to read. When the connection is closed it returns with nil, io.EOF func (r *CP3000IO) waitForRead() ([]byte, error) { ticker := time.NewTicker(time.Millisecond) defer ticker.Stop() for { select { case <-r.c: return nil, io.EOF case writeError := <-r.writeError: return nil, writeError case <-ticker.C: r.mx.Lock() if len(r.writeQueue) > 0 { var ret []byte ret, r.writeQueue = r.writeQueue[0], r.writeQueue[1:] r.mx.Unlock() return ret, nil } r.mx.Unlock() } } } // Read implements the reader interface func (r *CP3000IO) Read(p []byte) (n int, err error) { msg, err := r.waitForRead() if err != nil { return 0, err } return copy(p, msg), nil } // Write implements the writer interface func (r *CP3000IO) Write(p []byte) (n int, err error) { r.mx.Lock() defer r.mx.Unlock() var request reqRep if len(r.requests) == 0 { err := fmt.Errorf("message is written, but didn't expect one (got '%s')", string(p)) r.writeError <- err return 0, err } request, r.requests = r.requests[0], r.requests[1:] if request.req.String() != string(p) { err := fmt.Errorf("expected another message (expect '%s' got '%s'", request.req.String(), string(p)) r.writeError <- err return 0, err } if request.rep != nil { r.writeQueue = append(r.writeQueue, request.rep.Bytes()) // add all publish messages for len(r.requests) > 0 && r.requests[0].pub { r.writeQueue = append(r.writeQueue, r.requests[0].rep.Bytes()) r.requests = r.requests[1:] } } r.updateEmptyQueueFlag() return len(p), nil } // Close implements the closer inferface func (r *CP3000IO) Close() error { r.WaitForEmptyQueue() close(r.c) return nil } type reqRep struct { pub bool req *cp3000.Msg rep *cp3000.Msg }