165 lines
4.0 KiB
Go
165 lines
4.0 KiB
Go
package influxdb
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
client "github.com/influxdata/influxdb1-client/v2"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
// Writer is an interface that accepts and dispatch measurement point
|
|
type Writer interface {
|
|
// Add adds the point into the buffer and waits for transmission.
|
|
Add(i *client.Point)
|
|
// Dispatch writes at the given interval the points to the InfluxDB database until the context is done.
|
|
Dispatch(ctx context.Context, interval time.Duration)
|
|
}
|
|
|
|
// writer writes the measurement points at a fixed interval or when the buffer is full
|
|
type writer struct {
|
|
// mu is locked when there is a write action or when the buffers are swapped
|
|
mu sync.Mutex
|
|
|
|
// writer is the channel to write items in one of the two buffers
|
|
writer chan *client.Point
|
|
// writebuffer is the buffer to write the measurements in
|
|
writebuffer chan *client.Point
|
|
// readbuffer is the buffer to read the measurements from
|
|
readbuffer chan *client.Point
|
|
|
|
// requestSwap is a channel to let the dispatcher know that a swap is needed
|
|
requestSwap chan bool
|
|
// bufferSwapped is a channel to let the writer know that the swap is executed
|
|
bufferSwapped chan bool
|
|
|
|
// db is the database to write the measurements in
|
|
db string
|
|
// client is the influx client
|
|
client client.Client
|
|
}
|
|
|
|
// NewWriter creates a new influx writer
|
|
func NewWriter(c client.Client, db string, size int) Writer {
|
|
return &writer{
|
|
writer: make(chan *client.Point),
|
|
writebuffer: make(chan *client.Point, size),
|
|
readbuffer: make(chan *client.Point, size),
|
|
requestSwap: make(chan bool),
|
|
bufferSwapped: make(chan bool),
|
|
db: db,
|
|
client: c,
|
|
}
|
|
}
|
|
|
|
// Add adds an item to the influx buffer
|
|
func (t *writer) Add(i *client.Point) {
|
|
t.writer <- i
|
|
}
|
|
|
|
// Dispatch writes the measurements on a fixed interval or when the buffer is full
|
|
func (t *writer) Dispatch(ctx context.Context, interval time.Duration) {
|
|
var wg sync.WaitGroup
|
|
defer wg.Wait()
|
|
|
|
wg.Add(1)
|
|
go func(ctx context.Context) {
|
|
t.bufferWriter(ctx)
|
|
wg.Done()
|
|
}(ctx)
|
|
|
|
// This ticker ticks on the right interval
|
|
ticker := time.NewTicker(interval)
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
ticker.Stop()
|
|
return
|
|
case <-t.requestSwap:
|
|
// Writer wants a swap. Swap the buffers.
|
|
t.swapBuffers()
|
|
t.bufferSwapped <- true
|
|
if err := t.flush(); err != nil {
|
|
logrus.Warnf("Cannot write to influx: %v", err)
|
|
}
|
|
case <-ticker.C:
|
|
// Interval is exceeded. Swap the buffers.
|
|
t.swapBuffers()
|
|
if err := t.flush(); err != nil {
|
|
logrus.Warnf("Cannot write to influx: %v", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// bufferWriter writes the measurements to the right buffer
|
|
func (t *writer) bufferWriter(ctx context.Context) {
|
|
for {
|
|
select {
|
|
case val := <-t.writer:
|
|
// Keep trying to write the value
|
|
for ok := false; !ok && ctx.Err() == nil; {
|
|
ok = t.bufferWrite(val)
|
|
}
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// bufferWrite writes the measurement in the buffer or returns false when the buffer is full
|
|
func (t *writer) bufferWrite(val *client.Point) bool {
|
|
var success bool
|
|
|
|
// Lock the mutex because we don't want that the buffers are swapped during the write action
|
|
t.mu.Lock()
|
|
select {
|
|
case t.writebuffer <- val:
|
|
success = true
|
|
default:
|
|
}
|
|
t.mu.Unlock()
|
|
|
|
// When the buffer is full, success is false
|
|
if !success {
|
|
// Request that we want a swap
|
|
t.requestSwap <- true
|
|
// Wait until it is swapped
|
|
<-t.bufferSwapped
|
|
}
|
|
return success
|
|
}
|
|
|
|
// flush flushes the reader buffer
|
|
func (t *writer) flush() error {
|
|
bp, err := client.NewBatchPoints(client.BatchPointsConfig{
|
|
Database: t.db,
|
|
Precision: "ns",
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
var containsData bool
|
|
for {
|
|
select {
|
|
case i := <-t.readbuffer:
|
|
bp.AddPoint(i)
|
|
containsData = true
|
|
default:
|
|
if !containsData {
|
|
return nil
|
|
}
|
|
logrus.Debugf("Write %d points", len(bp.Points()))
|
|
return t.client.Write(bp)
|
|
}
|
|
}
|
|
}
|
|
|
|
// swapBuffers swaps the reading and writing buffer
|
|
func (t *writer) swapBuffers() {
|
|
t.mu.Lock()
|
|
t.writebuffer, t.readbuffer = t.readbuffer, t.writebuffer
|
|
t.mu.Unlock()
|
|
}
|