src.dualinventive.com/go/influxdb-logger/internal/influxdb/writer.go

165 lines
4.0 KiB
Go

package influxdb
import (
"context"
"sync"
"time"
client "github.com/influxdata/influxdb1-client/v2"
"github.com/sirupsen/logrus"
)
// Writer is an interface that accepts and dispatch measurement point
type Writer interface {
// Add adds the point into the buffer and waits for transmission.
Add(i *client.Point)
// Dispatch writes at the given interval the points to the InfluxDB database until the context is done.
Dispatch(ctx context.Context, interval time.Duration)
}
// writer writes the measurement points at a fixed interval or when the buffer is full
type writer struct {
// mu is locked when there is a write action or when the buffers are swapped
mu sync.Mutex
// writer is the channel to write items in one of the two buffers
writer chan *client.Point
// writebuffer is the buffer to write the measurements in
writebuffer chan *client.Point
// readbuffer is the buffer to read the measurements from
readbuffer chan *client.Point
// requestSwap is a channel to let the dispatcher know that a swap is needed
requestSwap chan bool
// bufferSwapped is a channel to let the writer know that the swap is executed
bufferSwapped chan bool
// db is the database to write the measurements in
db string
// client is the influx client
client client.Client
}
// NewWriter creates a new influx writer
func NewWriter(c client.Client, db string, size int) Writer {
return &writer{
writer: make(chan *client.Point),
writebuffer: make(chan *client.Point, size),
readbuffer: make(chan *client.Point, size),
requestSwap: make(chan bool),
bufferSwapped: make(chan bool),
db: db,
client: c,
}
}
// Add adds an item to the influx buffer
func (t *writer) Add(i *client.Point) {
t.writer <- i
}
// Dispatch writes the measurements on a fixed interval or when the buffer is full
func (t *writer) Dispatch(ctx context.Context, interval time.Duration) {
var wg sync.WaitGroup
defer wg.Wait()
wg.Add(1)
go func(ctx context.Context) {
t.bufferWriter(ctx)
wg.Done()
}(ctx)
// This ticker ticks on the right interval
ticker := time.NewTicker(interval)
for {
select {
case <-ctx.Done():
ticker.Stop()
return
case <-t.requestSwap:
// Writer wants a swap. Swap the buffers.
t.swapBuffers()
t.bufferSwapped <- true
if err := t.flush(); err != nil {
logrus.Warnf("Cannot write to influx: %v", err)
}
case <-ticker.C:
// Interval is exceeded. Swap the buffers.
t.swapBuffers()
if err := t.flush(); err != nil {
logrus.Warnf("Cannot write to influx: %v", err)
}
}
}
}
// bufferWriter writes the measurements to the right buffer
func (t *writer) bufferWriter(ctx context.Context) {
for {
select {
case val := <-t.writer:
// Keep trying to write the value
for ok := false; !ok && ctx.Err() == nil; {
ok = t.bufferWrite(val)
}
case <-ctx.Done():
return
}
}
}
// bufferWrite writes the measurement in the buffer or returns false when the buffer is full
func (t *writer) bufferWrite(val *client.Point) bool {
var success bool
// Lock the mutex because we don't want that the buffers are swapped during the write action
t.mu.Lock()
select {
case t.writebuffer <- val:
success = true
default:
}
t.mu.Unlock()
// When the buffer is full, success is false
if !success {
// Request that we want a swap
t.requestSwap <- true
// Wait until it is swapped
<-t.bufferSwapped
}
return success
}
// flush flushes the reader buffer
func (t *writer) flush() error {
bp, err := client.NewBatchPoints(client.BatchPointsConfig{
Database: t.db,
Precision: "ns",
})
if err != nil {
return err
}
var containsData bool
for {
select {
case i := <-t.readbuffer:
bp.AddPoint(i)
containsData = true
default:
if !containsData {
return nil
}
logrus.Debugf("Write %d points", len(bp.Points()))
return t.client.Write(bp)
}
}
}
// swapBuffers swaps the reading and writing buffer
func (t *writer) swapBuffers() {
t.mu.Lock()
t.writebuffer, t.readbuffer = t.readbuffer, t.writebuffer
t.mu.Unlock()
}