148 lines
4.1 KiB
Go
148 lines
4.1 KiB
Go
package influxdb
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
client "github.com/influxdata/influxdb1-client/v2"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
type testClient struct {
|
|
pointsWritten uint32
|
|
batchWritten uint32
|
|
}
|
|
|
|
func (t *testClient) TestData() (pointsWritten uint32, batchWritten uint32) {
|
|
return atomic.LoadUint32(&t.pointsWritten), atomic.LoadUint32(&t.batchWritten)
|
|
}
|
|
|
|
// Ping checks that status of cluster, and will always return 0 time and no
|
|
// error for UDP clients.
|
|
func (t *testClient) Ping(timeout time.Duration) (time.Duration, string, error) {
|
|
return 0, "", nil
|
|
}
|
|
|
|
// Write takes a BatchPoints object and writes all Points to InfluxDB and checks if the bp has precision "ns".
|
|
func (t *testClient) Write(bp client.BatchPoints) error {
|
|
if bp.Precision() != "ns" {
|
|
return fmt.Errorf("invalid precision, must be ns")
|
|
}
|
|
atomic.AddUint32(&t.pointsWritten, uint32(len(bp.Points())))
|
|
atomic.AddUint32(&t.batchWritten, 1)
|
|
return nil
|
|
}
|
|
|
|
// Query makes an InfluxDB Query on the database. This will fail if using
|
|
// the UDP client.
|
|
func (t *testClient) Query(q client.Query) (*client.Response, error) {
|
|
return nil, nil
|
|
}
|
|
|
|
// QueryAsChunk is a NOOP to satisfy the client.Client interface
|
|
func (t *testClient) QueryAsChunk(q client.Query) (*client.ChunkedResponse, error) {
|
|
return nil, nil
|
|
}
|
|
|
|
// Close releases any resources a Client may be using.
|
|
func (t *testClient) Close() error {
|
|
return nil
|
|
}
|
|
|
|
func TestWritePointsWhenBufferFull(t *testing.T) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
tc := &testClient{}
|
|
w := NewWriter(tc, "db", 10)
|
|
var wg sync.WaitGroup
|
|
wg.Add(1)
|
|
go func(ctx context.Context) {
|
|
defer wg.Done()
|
|
w.Dispatch(ctx, time.Hour)
|
|
}(ctx)
|
|
|
|
// Write 10 points
|
|
p, err := client.NewPoint("dev", nil, map[string]interface{}{"key": 1}, time.Now())
|
|
require.Nil(t, err)
|
|
for i := 0; i < 10; i++ {
|
|
w.Add(p)
|
|
}
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
// 10 points in a 10 size buffer fits exactly so no need to write the points
|
|
pointsWritten, batchWritten := tc.TestData()
|
|
assert.Equal(t, uint32(0), pointsWritten)
|
|
assert.Equal(t, uint32(0), batchWritten)
|
|
|
|
// Buffer is full so this point will result in a buffer swap
|
|
w.Add(p)
|
|
time.Sleep(100 * time.Millisecond)
|
|
pointsWritten, batchWritten = tc.TestData()
|
|
assert.Equal(t, uint32(10), pointsWritten)
|
|
assert.Equal(t, uint32(1), batchWritten)
|
|
|
|
// There is already a point in the buffer, so only write 9 points
|
|
for i := 0; i < 9; i++ {
|
|
w.Add(p)
|
|
}
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
// 10 points in a 10 size buffer fits exactly so no need to write the points
|
|
pointsWritten, batchWritten = tc.TestData()
|
|
assert.Equal(t, uint32(10), pointsWritten)
|
|
assert.Equal(t, uint32(1), batchWritten)
|
|
|
|
// Buffer is full so this point will result in a buffer swap
|
|
w.Add(p)
|
|
time.Sleep(100 * time.Millisecond)
|
|
pointsWritten, batchWritten = tc.TestData()
|
|
assert.Equal(t, uint32(20), pointsWritten)
|
|
assert.Equal(t, uint32(2), batchWritten)
|
|
|
|
cancel()
|
|
wg.Wait()
|
|
}
|
|
|
|
func TestWritePointsWhenInterval(t *testing.T) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
tc := &testClient{}
|
|
w := NewWriter(tc, "db", 10)
|
|
var wg sync.WaitGroup
|
|
wg.Add(1)
|
|
go func(ctx context.Context) {
|
|
defer wg.Done()
|
|
w.Dispatch(ctx, time.Second)
|
|
}(ctx)
|
|
|
|
// Write 5 points
|
|
p, err := client.NewPoint("dev", nil, map[string]interface{}{"key": 1}, time.Now())
|
|
require.Nil(t, err)
|
|
for i := 0; i < 5; i++ {
|
|
w.Add(p)
|
|
}
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
// 5 points in a 10 size buffer fits and the duration is not exceeded no need to write the points
|
|
pointsWritten, batchWritten := tc.TestData()
|
|
assert.Equal(t, uint32(0), pointsWritten)
|
|
assert.Equal(t, uint32(0), batchWritten)
|
|
|
|
time.Sleep(time.Second)
|
|
// Interval exceeded so the buffer is swapped
|
|
pointsWritten, batchWritten = tc.TestData()
|
|
assert.Equal(t, uint32(5), pointsWritten)
|
|
assert.Equal(t, uint32(1), batchWritten)
|
|
|
|
time.Sleep(time.Second)
|
|
// Interval exceeded so the buffer is swapped (nothing in the buffer)
|
|
pointsWritten, batchWritten = tc.TestData()
|
|
assert.Equal(t, uint32(5), pointsWritten)
|
|
assert.Equal(t, uint32(1), batchWritten)
|
|
cancel()
|
|
wg.Wait()
|
|
}
|