package influxdb import ( "context" "fmt" "sync" "sync/atomic" "testing" "time" client "github.com/influxdata/influxdb1-client/v2" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) type testClient struct { pointsWritten uint32 batchWritten uint32 } func (t *testClient) TestData() (pointsWritten uint32, batchWritten uint32) { return atomic.LoadUint32(&t.pointsWritten), atomic.LoadUint32(&t.batchWritten) } // Ping checks that status of cluster, and will always return 0 time and no // error for UDP clients. func (t *testClient) Ping(timeout time.Duration) (time.Duration, string, error) { return 0, "", nil } // Write takes a BatchPoints object and writes all Points to InfluxDB and checks if the bp has precision "ns". func (t *testClient) Write(bp client.BatchPoints) error { if bp.Precision() != "ns" { return fmt.Errorf("invalid precision, must be ns") } atomic.AddUint32(&t.pointsWritten, uint32(len(bp.Points()))) atomic.AddUint32(&t.batchWritten, 1) return nil } // Query makes an InfluxDB Query on the database. This will fail if using // the UDP client. func (t *testClient) Query(q client.Query) (*client.Response, error) { return nil, nil } // QueryAsChunk is a NOOP to satisfy the client.Client interface func (t *testClient) QueryAsChunk(q client.Query) (*client.ChunkedResponse, error) { return nil, nil } // Close releases any resources a Client may be using. func (t *testClient) Close() error { return nil } func TestWritePointsWhenBufferFull(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) tc := &testClient{} w := NewWriter(tc, "db", 10) var wg sync.WaitGroup wg.Add(1) go func(ctx context.Context) { defer wg.Done() w.Dispatch(ctx, time.Hour) }(ctx) // Write 10 points p, err := client.NewPoint("dev", nil, map[string]interface{}{"key": 1}, time.Now()) require.Nil(t, err) for i := 0; i < 10; i++ { w.Add(p) } time.Sleep(100 * time.Millisecond) // 10 points in a 10 size buffer fits exactly so no need to write the points pointsWritten, batchWritten := tc.TestData() assert.Equal(t, uint32(0), pointsWritten) assert.Equal(t, uint32(0), batchWritten) // Buffer is full so this point will result in a buffer swap w.Add(p) time.Sleep(100 * time.Millisecond) pointsWritten, batchWritten = tc.TestData() assert.Equal(t, uint32(10), pointsWritten) assert.Equal(t, uint32(1), batchWritten) // There is already a point in the buffer, so only write 9 points for i := 0; i < 9; i++ { w.Add(p) } time.Sleep(100 * time.Millisecond) // 10 points in a 10 size buffer fits exactly so no need to write the points pointsWritten, batchWritten = tc.TestData() assert.Equal(t, uint32(10), pointsWritten) assert.Equal(t, uint32(1), batchWritten) // Buffer is full so this point will result in a buffer swap w.Add(p) time.Sleep(100 * time.Millisecond) pointsWritten, batchWritten = tc.TestData() assert.Equal(t, uint32(20), pointsWritten) assert.Equal(t, uint32(2), batchWritten) cancel() wg.Wait() } func TestWritePointsWhenInterval(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) tc := &testClient{} w := NewWriter(tc, "db", 10) var wg sync.WaitGroup wg.Add(1) go func(ctx context.Context) { defer wg.Done() w.Dispatch(ctx, time.Second) }(ctx) // Write 5 points p, err := client.NewPoint("dev", nil, map[string]interface{}{"key": 1}, time.Now()) require.Nil(t, err) for i := 0; i < 5; i++ { w.Add(p) } time.Sleep(100 * time.Millisecond) // 5 points in a 10 size buffer fits and the duration is not exceeded no need to write the points pointsWritten, batchWritten := tc.TestData() assert.Equal(t, uint32(0), pointsWritten) assert.Equal(t, uint32(0), batchWritten) time.Sleep(time.Second) // Interval exceeded so the buffer is swapped pointsWritten, batchWritten = tc.TestData() assert.Equal(t, uint32(5), pointsWritten) assert.Equal(t, uint32(1), batchWritten) time.Sleep(time.Second) // Interval exceeded so the buffer is swapped (nothing in the buffer) pointsWritten, batchWritten = tc.TestData() assert.Equal(t, uint32(5), pointsWritten) assert.Equal(t, uint32(1), batchWritten) cancel() wg.Wait() }