influxdb/replications/service_test.go

1041 lines
32 KiB
Go

package replications
import (
"bytes"
"compress/gzip"
"context"
"errors"
"fmt"
"testing"
"github.com/golang/mock/gomock"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/platform"
ierrors "github.com/influxdata/influxdb/v2/kit/platform/errors"
"github.com/influxdata/influxdb/v2/mock"
"github.com/influxdata/influxdb/v2/models"
replicationsMock "github.com/influxdata/influxdb/v2/replications/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)
//go:generate go run github.com/golang/mock/mockgen -package mock -destination ./mock/validator.go github.com/influxdata/influxdb/v2/replications ReplicationValidator
//go:generate go run github.com/golang/mock/mockgen -package mock -destination ./mock/bucket_service.go github.com/influxdata/influxdb/v2/replications BucketService
//go:generate go run github.com/golang/mock/mockgen -package mock -destination ./mock/queue_management.go github.com/influxdata/influxdb/v2/replications DurableQueueManager
//go:generate go run github.com/golang/mock/mockgen -package mock -destination ./mock/points_writer.go github.com/influxdata/influxdb/v2/storage PointsWriter
//go:generate go run github.com/golang/mock/mockgen -package mock -destination ./mock/service_store.go github.com/influxdata/influxdb/v2/replications ServiceStore
var (
ctx = context.Background()
orgID = platform.ID(10)
id1 = platform.ID(1)
id2 = platform.ID(2)
desc = "testing testing"
replication1 = influxdb.Replication{
ID: id1,
OrgID: orgID,
Name: "test",
Description: &desc,
RemoteID: platform.ID(100),
LocalBucketID: platform.ID(1000),
RemoteBucketID: idPointer(99999),
MaxQueueSizeBytes: 3 * influxdb.DefaultReplicationMaxQueueSizeBytes,
}
replication2 = influxdb.Replication{
ID: id2,
OrgID: orgID,
Name: "test",
Description: &desc,
RemoteID: platform.ID(100),
LocalBucketID: platform.ID(1000),
RemoteBucketID: idPointer(99999),
MaxQueueSizeBytes: 3 * influxdb.DefaultReplicationMaxQueueSizeBytes,
}
createReq = influxdb.CreateReplicationRequest{
OrgID: replication1.OrgID,
Name: replication1.Name,
Description: replication1.Description,
RemoteID: replication1.RemoteID,
LocalBucketID: replication1.LocalBucketID,
RemoteBucketID: *replication1.RemoteBucketID,
MaxQueueSizeBytes: replication1.MaxQueueSizeBytes,
}
newRemoteID = platform.ID(200)
newQueueSize = influxdb.MinReplicationMaxQueueSizeBytes
updateReqWithNewSize = influxdb.UpdateReplicationRequest{
RemoteID: &newRemoteID,
MaxQueueSizeBytes: &newQueueSize,
}
updatedReplicationWithNewSize = influxdb.Replication{
ID: replication1.ID,
OrgID: replication1.OrgID,
Name: replication1.Name,
Description: replication1.Description,
RemoteID: *updateReqWithNewSize.RemoteID,
LocalBucketID: replication1.LocalBucketID,
RemoteBucketID: replication1.RemoteBucketID,
MaxQueueSizeBytes: *updateReqWithNewSize.MaxQueueSizeBytes,
}
updateReqWithNoNewSize = influxdb.UpdateReplicationRequest{
RemoteID: &newRemoteID,
}
updatedReplicationWithNoNewSize = influxdb.Replication{
ID: replication1.ID,
OrgID: replication1.OrgID,
Name: replication1.Name,
Description: replication1.Description,
RemoteID: *updateReqWithNewSize.RemoteID,
LocalBucketID: replication1.LocalBucketID,
RemoteBucketID: replication1.RemoteBucketID,
MaxQueueSizeBytes: replication1.MaxQueueSizeBytes,
}
remoteID = platform.ID(888888)
httpConfig = influxdb.ReplicationHTTPConfig{
RemoteURL: fmt.Sprintf("http://%s.cloud", replication1.RemoteID),
RemoteToken: replication1.RemoteID.String(),
RemoteOrgID: &remoteID,
AllowInsecureTLS: true,
RemoteBucketID: replication1.RemoteBucketID,
}
)
func idPointer(id int) *platform.ID {
p := platform.ID(id)
return &p
}
func TestListReplications(t *testing.T) {
t.Parallel()
filter := influxdb.ReplicationListFilter{}
tests := []struct {
name string
list influxdb.Replications
ids []platform.ID
sizes map[platform.ID]int64
rsizes map[platform.ID]int64
storeErr error
queueManagerErr error
queueManagerRemainingSizesErr error
}{
{
name: "matches multiple",
list: influxdb.Replications{
Replications: []influxdb.Replication{replication1, replication2},
},
ids: []platform.ID{replication1.ID, replication2.ID},
sizes: map[platform.ID]int64{replication1.ID: 1000, replication2.ID: 2000},
},
{
name: "matches one",
list: influxdb.Replications{
Replications: []influxdb.Replication{replication1},
},
ids: []platform.ID{replication1.ID},
sizes: map[platform.ID]int64{replication1.ID: 1000},
},
{
name: "matches none",
list: influxdb.Replications{},
},
{
name: "store error",
storeErr: errors.New("error from store"),
},
{
name: "queue manager error",
list: influxdb.Replications{
Replications: []influxdb.Replication{replication1},
},
ids: []platform.ID{replication1.ID},
queueManagerErr: errors.New("error from queue manager"),
},
{
name: "queue manager error - remaining queue size",
list: influxdb.Replications{
Replications: []influxdb.Replication{replication1},
},
ids: []platform.ID{replication1.ID},
queueManagerRemainingSizesErr: errors.New("Remaining Queue Size erro"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
svc, mocks := newTestService(t)
mocks.serviceStore.EXPECT().ListReplications(gomock.Any(), filter).Return(&tt.list, tt.storeErr)
if tt.storeErr == nil && len(tt.list.Replications) > 0 {
mocks.durableQueueManager.EXPECT().CurrentQueueSizes(tt.ids).Return(tt.sizes, tt.queueManagerErr)
}
if tt.storeErr == nil && tt.queueManagerErr == nil && len(tt.list.Replications) > 0 {
mocks.durableQueueManager.EXPECT().RemainingQueueSizes(tt.ids).Return(tt.rsizes, tt.queueManagerRemainingSizesErr)
}
got, err := svc.ListReplications(ctx, filter)
var wantErr error
if tt.storeErr != nil {
wantErr = tt.storeErr
} else if tt.queueManagerErr != nil {
wantErr = tt.queueManagerErr
} else if tt.queueManagerRemainingSizesErr != nil {
wantErr = tt.queueManagerRemainingSizesErr
}
require.Equal(t, wantErr, err)
if wantErr != nil {
require.Nil(t, got)
return
}
for _, r := range got.Replications {
require.Equal(t, tt.sizes[r.ID], r.CurrentQueueSizeBytes)
require.Equal(t, tt.rsizes[r.ID], r.RemainingBytesToBeSynced)
}
})
}
}
func TestCreateReplication(t *testing.T) {
t.Parallel()
tests := []struct {
name string
create influxdb.CreateReplicationRequest
storeErr error
bucketErr error
queueManagerErr error
want *influxdb.Replication
wantErr error
}{
{
name: "success",
create: createReq,
want: &replication1,
},
{
name: "bucket service error",
create: createReq,
bucketErr: errors.New("bucket service error"),
wantErr: errLocalBucketNotFound(createReq.LocalBucketID, errors.New("bucket service error")),
},
{
name: "initialize queue error",
create: createReq,
queueManagerErr: errors.New("queue manager error"),
wantErr: errors.New("queue manager error"),
},
{
name: "store create error",
create: createReq,
storeErr: errors.New("store create error"),
wantErr: errors.New("store create error"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
svc, mocks := newTestService(t)
mocks.bucketSvc.EXPECT().RLock()
mocks.bucketSvc.EXPECT().RUnlock()
mocks.serviceStore.EXPECT().Lock()
mocks.serviceStore.EXPECT().Unlock()
mocks.bucketSvc.EXPECT().FindBucketByID(gomock.Any(), tt.create.LocalBucketID).Return(nil, tt.bucketErr)
if tt.bucketErr == nil {
mocks.durableQueueManager.EXPECT().InitializeQueue(id1, tt.create.MaxQueueSizeBytes, tt.create.OrgID, tt.create.LocalBucketID, tt.create.MaxAgeSeconds).Return(tt.queueManagerErr)
}
if tt.queueManagerErr == nil && tt.bucketErr == nil {
mocks.serviceStore.EXPECT().CreateReplication(gomock.Any(), id1, tt.create).Return(tt.want, tt.storeErr)
}
if tt.storeErr != nil {
mocks.durableQueueManager.EXPECT().DeleteQueue(id1).Return(nil)
}
got, err := svc.CreateReplication(ctx, tt.create)
require.Equal(t, tt.want, got)
require.Equal(t, tt.wantErr, err)
})
}
}
func TestValidateNewReplication(t *testing.T) {
t.Parallel()
tests := []struct {
name string
req influxdb.CreateReplicationRequest
storeErr error
bucketErr error
validatorErr error
wantErr error
}{
{
name: "valid",
req: createReq,
},
{
name: "bucket service error",
req: createReq,
bucketErr: errors.New("bucket service error"),
wantErr: errLocalBucketNotFound(createReq.LocalBucketID, errors.New("bucket service error")),
},
{
name: "store populate error",
req: createReq,
storeErr: errors.New("store populate error"),
wantErr: errors.New("store populate error"),
},
{
name: "validation error - invalid replication",
req: createReq,
validatorErr: errors.New("validation error"),
wantErr: &ierrors.Error{
Code: ierrors.EInvalid,
Msg: "replication parameters fail validation",
Err: errors.New("validation error"),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
svc, mocks := newTestService(t)
mocks.bucketSvc.EXPECT().FindBucketByID(gomock.Any(), tt.req.LocalBucketID).Return(nil, tt.bucketErr)
testConfig := &influxdb.ReplicationHTTPConfig{RemoteBucketID: &tt.req.RemoteBucketID}
if tt.bucketErr == nil {
mocks.serviceStore.EXPECT().PopulateRemoteHTTPConfig(gomock.Any(), tt.req.RemoteID, testConfig).Return(tt.storeErr)
}
if tt.bucketErr == nil && tt.storeErr == nil {
mocks.validator.EXPECT().ValidateReplication(gomock.Any(), testConfig).Return(tt.validatorErr)
}
err := svc.ValidateNewReplication(ctx, tt.req)
require.Equal(t, tt.wantErr, err)
})
}
}
func TestGetReplication(t *testing.T) {
t.Parallel()
tests := []struct {
name string
sizes map[platform.ID]int64
rsizes map[platform.ID]int64
storeErr error
queueManagerErr error
queueManagerRemainingSizesErr error
storeWant influxdb.Replication
want influxdb.Replication
}{
{
name: "success",
sizes: map[platform.ID]int64{replication1.ID: 1000},
storeWant: replication1,
want: replication1,
},
{
name: "store error",
storeErr: errors.New("store error"),
},
{
name: "queue manager error",
storeWant: replication1,
queueManagerErr: errors.New("queue manager error"),
},
{
name: "queue manager error - remaining queue size",
storeWant: replication1,
queueManagerRemainingSizesErr: errors.New("queue manager error"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
svc, mocks := newTestService(t)
mocks.serviceStore.EXPECT().GetReplication(gomock.Any(), id1).Return(&tt.storeWant, tt.storeErr)
if tt.storeErr == nil {
mocks.durableQueueManager.EXPECT().CurrentQueueSizes([]platform.ID{id1}).Return(tt.sizes, tt.queueManagerErr)
}
if tt.storeErr == nil && tt.queueManagerErr == nil {
mocks.durableQueueManager.EXPECT().RemainingQueueSizes([]platform.ID{id1}).Return(tt.rsizes, tt.queueManagerRemainingSizesErr)
}
got, err := svc.GetReplication(ctx, id1)
var wantErr error
if tt.storeErr != nil {
wantErr = tt.storeErr
} else if tt.queueManagerErr != nil {
wantErr = tt.queueManagerErr
} else if tt.queueManagerRemainingSizesErr != nil {
wantErr = tt.queueManagerRemainingSizesErr
}
require.Equal(t, wantErr, err)
if wantErr != nil {
require.Nil(t, got)
return
}
require.Equal(t, tt.sizes[got.ID], got.CurrentQueueSizeBytes)
require.Equal(t, tt.rsizes[got.ID], got.RemainingBytesToBeSynced)
})
}
}
func TestUpdateReplication(t *testing.T) {
t.Parallel()
tests := []struct {
name string
request influxdb.UpdateReplicationRequest
sizes map[platform.ID]int64
rsizes map[platform.ID]int64
storeErr error
queueManagerUpdateSizeErr error
queueManagerCurrentSizesErr error
queueManagerRemainingSizesErr error
storeUpdate *influxdb.Replication
want *influxdb.Replication
wantErr error
}{
{
name: "success with new max queue size",
request: updateReqWithNewSize,
sizes: map[platform.ID]int64{replication1.ID: *updateReqWithNewSize.MaxQueueSizeBytes},
storeUpdate: &updatedReplicationWithNewSize,
want: &updatedReplicationWithNewSize,
},
{
name: "success with no new max queue size",
request: updateReqWithNoNewSize,
sizes: map[platform.ID]int64{replication1.ID: updatedReplicationWithNoNewSize.MaxQueueSizeBytes},
storeUpdate: &updatedReplicationWithNoNewSize,
want: &updatedReplicationWithNoNewSize,
},
{
name: "store error",
request: updateReqWithNoNewSize,
storeErr: errors.New("store error"),
wantErr: errors.New("store error"),
},
{
name: "queue manager error - update max queue size",
request: updateReqWithNewSize,
queueManagerUpdateSizeErr: errors.New("update max size err"),
wantErr: errors.New("update max size err"),
},
{
name: "queue manager error - current queue size",
request: updateReqWithNoNewSize,
queueManagerCurrentSizesErr: errors.New("current size err"),
storeUpdate: &updatedReplicationWithNoNewSize,
wantErr: errors.New("current size err"),
},
{
name: "queue manager error - remaining queue size",
request: updateReqWithNoNewSize,
queueManagerRemainingSizesErr: errors.New("remaining queue size err"),
storeUpdate: &updatedReplicationWithNoNewSize,
wantErr: errors.New("remaining queue size err"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
svc, mocks := newTestService(t)
mocks.serviceStore.EXPECT().Lock()
mocks.serviceStore.EXPECT().Unlock()
mocks.serviceStore.EXPECT().UpdateReplication(gomock.Any(), id1, tt.request).Return(tt.storeUpdate, tt.storeErr)
if tt.storeErr == nil && tt.request.MaxQueueSizeBytes != nil {
mocks.durableQueueManager.EXPECT().UpdateMaxQueueSize(id1, *tt.request.MaxQueueSizeBytes).Return(tt.queueManagerUpdateSizeErr)
}
if tt.storeErr == nil && tt.queueManagerUpdateSizeErr == nil {
mocks.durableQueueManager.EXPECT().CurrentQueueSizes([]platform.ID{id1}).Return(tt.sizes, tt.queueManagerCurrentSizesErr)
}
if tt.storeErr == nil && tt.queueManagerUpdateSizeErr == nil && tt.queueManagerCurrentSizesErr == nil {
mocks.durableQueueManager.EXPECT().RemainingQueueSizes([]platform.ID{id1}).Return(tt.rsizes, tt.queueManagerRemainingSizesErr)
}
got, err := svc.UpdateReplication(ctx, id1, tt.request)
require.Equal(t, tt.want, got)
require.Equal(t, tt.wantErr, err)
})
}
}
func TestValidateUpdatedReplication(t *testing.T) {
t.Parallel()
tests := []struct {
name string
request influxdb.UpdateReplicationRequest
baseConfig *influxdb.ReplicationHTTPConfig
storeGetConfigErr error
storePopulateConfigErr error
validatorErr error
want error
}{
{
name: "success",
request: updateReqWithNoNewSize,
baseConfig: &httpConfig,
},
{
name: "store get full http config error",
storeGetConfigErr: errors.New("store get full http config error"),
want: errors.New("store get full http config error"),
},
{
name: "store get populate remote config error",
request: updateReqWithNoNewSize,
storePopulateConfigErr: errors.New("store populate http config error"),
want: errors.New("store populate http config error"),
},
{
name: "invalid update",
request: updateReqWithNoNewSize,
validatorErr: errors.New("invalid"),
want: &ierrors.Error{
Code: ierrors.EInvalid,
Msg: "validation fails after applying update",
Err: errors.New("invalid"),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
svc, mocks := newTestService(t)
mocks.serviceStore.EXPECT().GetFullHTTPConfig(gomock.Any(), id1).Return(tt.baseConfig, tt.storeGetConfigErr)
if tt.storeGetConfigErr == nil {
mocks.serviceStore.EXPECT().PopulateRemoteHTTPConfig(gomock.Any(), *tt.request.RemoteID, tt.baseConfig).Return(tt.storePopulateConfigErr)
}
if tt.storeGetConfigErr == nil && tt.storePopulateConfigErr == nil {
mocks.validator.EXPECT().ValidateReplication(gomock.Any(), tt.baseConfig).Return(tt.validatorErr)
}
err := svc.ValidateUpdatedReplication(ctx, id1, tt.request)
require.Equal(t, tt.want, err)
})
}
}
func TestDeleteReplication(t *testing.T) {
t.Parallel()
tests := []struct {
name string
storeErr error
queueManagerErr error
}{
{
name: "success",
},
{
name: "store error",
storeErr: errors.New("store error"),
},
{
name: "queue manager error",
queueManagerErr: errors.New("queue manager error"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
svc, mocks := newTestService(t)
mocks.serviceStore.EXPECT().Lock()
mocks.serviceStore.EXPECT().Unlock()
mocks.serviceStore.EXPECT().DeleteReplication(gomock.Any(), id1).Return(tt.storeErr)
if tt.storeErr == nil {
mocks.durableQueueManager.EXPECT().DeleteQueue(id1).Return(tt.queueManagerErr)
}
err := svc.DeleteReplication(ctx, id1)
var wantErr error
if tt.storeErr != nil {
wantErr = tt.storeErr
} else if tt.queueManagerErr != nil {
wantErr = tt.queueManagerErr
}
require.Equal(t, wantErr, err)
})
}
}
func TestDeleteBucketReplications(t *testing.T) {
t.Parallel()
tests := []struct {
name string
storeErr error
storeIDs []platform.ID
queueManagerErr error
wantErr error
}{
{
name: "success - single replication IDs match bucket ID",
storeIDs: []platform.ID{id1},
},
{
name: "success - multiple replication IDs match bucket ID",
storeIDs: []platform.ID{id1, id2},
},
{
name: "zero replication IDs match bucket ID",
storeIDs: []platform.ID{},
},
{
name: "store error",
storeErr: errors.New("store error"),
wantErr: errors.New("store error"),
},
{
name: "queue manager delete queue error",
storeIDs: []platform.ID{id1},
queueManagerErr: errors.New("queue manager error"),
wantErr: fmt.Errorf("deleting replications for bucket %q failed, see server logs for details", id1),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
svc, mocks := newTestService(t)
mocks.serviceStore.EXPECT().Lock()
mocks.serviceStore.EXPECT().Unlock()
mocks.serviceStore.EXPECT().DeleteBucketReplications(gomock.Any(), id1).Return(tt.storeIDs, tt.storeErr)
if tt.storeErr == nil {
for _, id := range tt.storeIDs {
mocks.durableQueueManager.EXPECT().DeleteQueue(id).Return(tt.queueManagerErr)
}
}
err := svc.DeleteBucketReplications(ctx, id1)
require.Equal(t, tt.wantErr, err)
})
}
}
func TestValidateReplication(t *testing.T) {
t.Parallel()
tests := []struct {
name string
storeErr error
validatorErr error
wantErr error
}{
{
name: "valid",
},
{
name: "store error",
storeErr: errors.New("store error"),
wantErr: errors.New("store error"),
},
{
name: "validation error - invalid replication",
validatorErr: errors.New("validation error"),
wantErr: &ierrors.Error{
Code: ierrors.EInvalid,
Msg: "replication failed validation",
Err: errors.New("validation error"),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
svc, mocks := newTestService(t)
mocks.serviceStore.EXPECT().GetFullHTTPConfig(gomock.Any(), id1).Return(&httpConfig, tt.storeErr)
if tt.storeErr == nil {
mocks.validator.EXPECT().ValidateReplication(gomock.Any(), &httpConfig).Return(tt.validatorErr)
}
err := svc.ValidateReplication(ctx, id1)
require.Equal(t, tt.wantErr, err)
})
}
}
func TestWritePoints(t *testing.T) {
t.Parallel()
svc, mocks := newTestService(t)
replications := make([]platform.ID, 2)
replications[0] = replication1.ID
replications[1] = replication2.ID
mocks.durableQueueManager.EXPECT().GetReplications(orgID, id1).Return(replications)
points, err := models.ParsePointsString(`
cpu,host=0 value=1.1 6000000000
cpu,host=A value=1.2 2000000000
cpu,host=A value=1.3 3000000000
cpu,host=B value=1.3 4000000000
cpu,host=B value=1.3 5000000000
cpu,host=C value=1.3 1000000000
mem,host=C value=1.3 1000000000
disk,host=C value=1.3 1000000000`)
require.NoError(t, err)
// Points should successfully write to local TSM.
mocks.pointWriter.EXPECT().WritePoints(gomock.Any(), orgID, id1, points).Return(nil)
// Points should successfully be enqueued in the 2 replications associated with the local bucket.
for _, id := range replications {
mocks.durableQueueManager.EXPECT().
EnqueueData(id, gomock.Any(), len(points)).
DoAndReturn(func(_ platform.ID, data []byte, numPoints int) error {
require.Equal(t, len(points), numPoints)
checkCompressedData(t, data, points)
return nil
})
}
require.NoError(t, svc.WritePoints(ctx, orgID, id1, points))
}
func TestWritePointsBatches(t *testing.T) {
t.Parallel()
tests := []struct {
name string
setupFn func(*testing.T, *service)
}{
{
name: "batch bytes size",
setupFn: func(t *testing.T, svc *service) {
t.Helper()
// Set batch size to smaller size for testing (should result in 3 batches sized 93, 93, and 63 - total size 249)
svc.maxRemoteWriteBatchSize = 100
},
},
{
name: "batch point size",
setupFn: func(t *testing.T, svc *service) {
t.Helper()
// Set point size to smaller size for testing (should result in 3 batches with 3 points, 3 points, and 2 points)
svc.maxRemoteWritePointSize = 3
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
svc, mocks := newTestService(t)
tt.setupFn(t, svc)
// Define metadata for two replications
replications := make([]platform.ID, 2)
replications[0] = replication1.ID
replications[1] = replication2.ID
mocks.durableQueueManager.EXPECT().GetReplications(orgID, id1).Return(replications)
// Define some points of line protocol, parse string --> []Point
points, err := models.ParsePointsString(`
cpu,host=0 value=1.1 6000000000
cpu,host=A value=1.2 2000000000
cpu,host=A value=1.3 3000000000
cpu,host=B value=1.3 4000000000
cpu,host=B value=1.3 5000000000
cpu,host=C value=1.3 1000000000
mem,host=C value=1.3 1000000000
disk,host=C value=1.3 1000000000`)
require.NoError(t, err)
// Points should successfully write to local TSM.
mocks.pointWriter.EXPECT().WritePoints(gomock.Any(), orgID, id1, points).Return(nil)
// Points should successfully be enqueued in the 2 replications associated with the local bucket.
for _, id := range replications {
// Check batch 1
mocks.durableQueueManager.EXPECT().
EnqueueData(id, gomock.Any(), 3).
DoAndReturn(func(_ platform.ID, data []byte, numPoints int) error {
require.Equal(t, 3, numPoints)
checkCompressedData(t, data, points[:3])
return nil
})
// Check batch 2
mocks.durableQueueManager.EXPECT().
EnqueueData(id, gomock.Any(), 3).
DoAndReturn(func(_ platform.ID, data []byte, numPoints int) error {
require.Equal(t, 3, numPoints)
checkCompressedData(t, data, points[3:6])
return nil
})
// Check batch 3
mocks.durableQueueManager.EXPECT().
EnqueueData(id, gomock.Any(), 2).
DoAndReturn(func(_ platform.ID, data []byte, numPoints int) error {
require.Equal(t, 2, numPoints)
checkCompressedData(t, data, points[6:])
return nil
})
}
require.NoError(t, svc.WritePoints(ctx, orgID, id1, points))
})
}
}
func TestWritePointsInstanceID(t *testing.T) {
t.Parallel()
svc, mocks := newTestService(t)
svc.instanceID = "hello-edge"
replications := make([]platform.ID, 2)
replications[0] = replication1.ID
replications[1] = replication2.ID
mocks.durableQueueManager.EXPECT().GetReplications(orgID, id1).Return(replications)
writePoints, err := models.ParsePointsString(`
cpu,host=0 value=1.1 6000000000
cpu,host=A value=1.2 2000000000
cpu,host=A value=1.3 3000000000
cpu,host=B value=1.3 4000000000
cpu,host=B value=1.3 5000000000
cpu,host=C value=1.3 1000000000
mem,host=C value=1.3 1000000000
disk,host=C value=1.3 1000000000`)
require.NoError(t, err)
expectedPoints, err := models.ParsePointsString(`
cpu,host=0,_instance_id=hello-edge value=1.1 6000000000
cpu,host=A,_instance_id=hello-edge value=1.2 2000000000
cpu,host=A,_instance_id=hello-edge value=1.3 3000000000
cpu,host=B,_instance_id=hello-edge value=1.3 4000000000
cpu,host=B,_instance_id=hello-edge value=1.3 5000000000
cpu,host=C,_instance_id=hello-edge value=1.3 1000000000
mem,host=C,_instance_id=hello-edge value=1.3 1000000000
disk,host=C,_instance_id=hello-edge value=1.3 1000000000`)
require.NoError(t, err)
// Points should successfully write to local TSM.
mocks.pointWriter.EXPECT().WritePoints(gomock.Any(), orgID, id1, writePoints).Return(nil)
// Points should successfully be enqueued in the 2 replications associated with the local bucket.
for _, id := range replications {
mocks.durableQueueManager.EXPECT().
EnqueueData(id, gomock.Any(), len(writePoints)).
DoAndReturn(func(_ platform.ID, data []byte, numPoints int) error {
require.Equal(t, len(writePoints), numPoints)
checkCompressedData(t, data, expectedPoints)
return nil
})
}
require.NoError(t, svc.WritePoints(ctx, orgID, id1, writePoints))
}
func TestWritePoints_LocalFailure(t *testing.T) {
t.Parallel()
svc, mocks := newTestService(t)
replications := make([]platform.ID, 2)
replications[0] = replication1.ID
replications[1] = replication2.ID
mocks.durableQueueManager.EXPECT().GetReplications(orgID, id1).Return(replications)
points, err := models.ParsePointsString(`
cpu,host=0 value=1.1 6000000000
cpu,host=A value=1.2 2000000000
cpu,host=A value=1.3 3000000000
cpu,host=B value=1.3 4000000000
cpu,host=B value=1.3 5000000000
cpu,host=C value=1.3 1000000000
mem,host=C value=1.3 1000000000
disk,host=C value=1.3 1000000000`)
require.NoError(t, err)
// Points should fail to write to local TSM.
writeErr := errors.New("O NO")
mocks.pointWriter.EXPECT().WritePoints(gomock.Any(), orgID, id1, points).Return(writeErr)
// Don't expect any calls to enqueue points.
require.Equal(t, writeErr, svc.WritePoints(ctx, orgID, id1, points))
}
func TestOpen(t *testing.T) {
t.Parallel()
filter := influxdb.ReplicationListFilter{}
tests := []struct {
name string
storeErr error
queueManagerErr error
replicationsMap map[platform.ID]*influxdb.TrackedReplication
list *influxdb.Replications
}{
{
name: "no error, multiple replications from storage",
replicationsMap: map[platform.ID]*influxdb.TrackedReplication{
replication1.ID: {
MaxQueueSizeBytes: replication1.MaxQueueSizeBytes,
MaxAgeSeconds: replication1.MaxAgeSeconds,
OrgID: replication1.OrgID,
LocalBucketID: replication1.LocalBucketID,
},
replication2.ID: {
MaxQueueSizeBytes: replication2.MaxQueueSizeBytes,
MaxAgeSeconds: replication2.MaxAgeSeconds,
OrgID: replication2.OrgID,
LocalBucketID: replication2.LocalBucketID,
},
},
list: &influxdb.Replications{
Replications: []influxdb.Replication{replication1, replication2},
},
},
{
name: "no error, one stored replication",
replicationsMap: map[platform.ID]*influxdb.TrackedReplication{
replication1.ID: {
MaxQueueSizeBytes: replication1.MaxQueueSizeBytes,
MaxAgeSeconds: replication1.MaxAgeSeconds,
OrgID: replication1.OrgID,
LocalBucketID: replication1.LocalBucketID,
},
},
list: &influxdb.Replications{
Replications: []influxdb.Replication{replication1},
},
},
{
name: "store error",
storeErr: errors.New("store error"),
},
{
name: "queue manager error",
replicationsMap: map[platform.ID]*influxdb.TrackedReplication{
replication1.ID: {
MaxQueueSizeBytes: replication1.MaxQueueSizeBytes,
MaxAgeSeconds: replication1.MaxAgeSeconds,
OrgID: replication1.OrgID,
LocalBucketID: replication1.LocalBucketID,
},
},
list: &influxdb.Replications{
Replications: []influxdb.Replication{replication1},
},
queueManagerErr: errors.New("queue manager error"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
svc, mocks := newTestService(t)
mocks.serviceStore.EXPECT().ListReplications(gomock.Any(), filter).Return(tt.list, tt.storeErr)
if tt.storeErr == nil {
mocks.durableQueueManager.EXPECT().StartReplicationQueues(tt.replicationsMap).Return(tt.queueManagerErr)
}
var wantErr error
if tt.storeErr != nil {
wantErr = tt.storeErr
} else if tt.queueManagerErr != nil {
wantErr = tt.queueManagerErr
}
err := svc.Open(ctx)
require.Equal(t, wantErr, err)
})
}
}
type mocks struct {
bucketSvc *replicationsMock.MockBucketService
validator *replicationsMock.MockReplicationValidator
durableQueueManager *replicationsMock.MockDurableQueueManager
pointWriter *replicationsMock.MockPointsWriter
serviceStore *replicationsMock.MockServiceStore
}
func checkCompressedData(t *testing.T, data []byte, expectedPoints []models.Point) {
gzBuf := bytes.NewBuffer(data)
gzr, err := gzip.NewReader(gzBuf)
require.NoError(t, err)
defer gzr.Close()
var buf bytes.Buffer
_, err = buf.ReadFrom(gzr)
require.NoError(t, err)
require.NoError(t, gzr.Close())
writtenPoints, err := models.ParsePoints(buf.Bytes())
require.NoError(t, err)
require.ElementsMatch(t, writtenPoints, expectedPoints)
}
func newTestService(t *testing.T) (*service, mocks) {
logger := zaptest.NewLogger(t)
ctrl := gomock.NewController(t)
mocks := mocks{
bucketSvc: replicationsMock.NewMockBucketService(ctrl),
validator: replicationsMock.NewMockReplicationValidator(ctrl),
durableQueueManager: replicationsMock.NewMockDurableQueueManager(ctrl),
pointWriter: replicationsMock.NewMockPointsWriter(ctrl),
serviceStore: replicationsMock.NewMockServiceStore(ctrl),
}
svc := service{
store: mocks.serviceStore,
idGenerator: mock.NewIncrementingIDGenerator(id1),
bucketService: mocks.bucketSvc,
validator: mocks.validator,
log: logger,
durableQueueManager: mocks.durableQueueManager,
localWriter: mocks.pointWriter,
maxRemoteWriteBatchSize: maxRemoteWriteBatchSize,
maxRemoteWritePointSize: maxRemoteWritePointSize,
}
return &svc, mocks
}