fix: Show Replication Queue size and Replication TCP Errors (#23960)
* feat: Show remaining replication queue size * fix: Show non-http related error messages * fix: Show non-http related error messages with backoff * fix: Updates for replication tests * chore: formatting * chore: formatting * chore: formatting * chore: formatting * chore: lowercase json field --------- Co-authored-by: Geoffrey <suitableZebraCaller@users.noreply.github.com> Co-authored-by: Jeffrey Smith II <jeffreyssmith2nd@gmail.com>pull/24077/head
parent
e2f835bb0f
commit
ec7fdd3a58
|
@ -20,20 +20,21 @@ var ErrMaxQueueSizeTooSmall = errors.Error{
|
||||||
|
|
||||||
// Replication contains all info about a replication that should be returned to users.
|
// Replication contains all info about a replication that should be returned to users.
|
||||||
type Replication struct {
|
type Replication struct {
|
||||||
ID platform.ID `json:"id" db:"id"`
|
ID platform.ID `json:"id" db:"id"`
|
||||||
OrgID platform.ID `json:"orgID" db:"org_id"`
|
OrgID platform.ID `json:"orgID" db:"org_id"`
|
||||||
Name string `json:"name" db:"name"`
|
Name string `json:"name" db:"name"`
|
||||||
Description *string `json:"description,omitempty" db:"description"`
|
Description *string `json:"description,omitempty" db:"description"`
|
||||||
RemoteID platform.ID `json:"remoteID" db:"remote_id"`
|
RemoteID platform.ID `json:"remoteID" db:"remote_id"`
|
||||||
LocalBucketID platform.ID `json:"localBucketID" db:"local_bucket_id"`
|
LocalBucketID platform.ID `json:"localBucketID" db:"local_bucket_id"`
|
||||||
RemoteBucketID *platform.ID `json:"remoteBucketID" db:"remote_bucket_id"`
|
RemoteBucketID *platform.ID `json:"remoteBucketID" db:"remote_bucket_id"`
|
||||||
RemoteBucketName string `json:"RemoteBucketName" db:"remote_bucket_name"`
|
RemoteBucketName string `json:"RemoteBucketName" db:"remote_bucket_name"`
|
||||||
MaxQueueSizeBytes int64 `json:"maxQueueSizeBytes" db:"max_queue_size_bytes"`
|
MaxQueueSizeBytes int64 `json:"maxQueueSizeBytes" db:"max_queue_size_bytes"`
|
||||||
CurrentQueueSizeBytes int64 `json:"currentQueueSizeBytes" db:"current_queue_size_bytes"`
|
CurrentQueueSizeBytes int64 `json:"currentQueueSizeBytes" db:"current_queue_size_bytes"`
|
||||||
LatestResponseCode *int32 `json:"latestResponseCode,omitempty" db:"latest_response_code"`
|
RemainingQueueSizeBytes int64 `json:"remainingQueueSizeBytes" db:"remaining_queue_size_bytes"`
|
||||||
LatestErrorMessage *string `json:"latestErrorMessage,omitempty" db:"latest_error_message"`
|
LatestResponseCode *int32 `json:"latestResponseCode,omitempty" db:"latest_response_code"`
|
||||||
DropNonRetryableData bool `json:"dropNonRetryableData" db:"drop_non_retryable_data"`
|
LatestErrorMessage *string `json:"latestErrorMessage,omitempty" db:"latest_error_message"`
|
||||||
MaxAgeSeconds int64 `json:"maxAgeSeconds" db:"max_age_seconds"`
|
DropNonRetryableData bool `json:"dropNonRetryableData" db:"drop_non_retryable_data"`
|
||||||
|
MaxAgeSeconds int64 `json:"maxAgeSeconds" db:"max_age_seconds"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReplicationListFilter is a selection filter for listing replications.
|
// ReplicationListFilter is a selection filter for listing replications.
|
||||||
|
|
|
@ -315,6 +315,23 @@ func (qm *durableQueueManager) CurrentQueueSizes(ids []platform.ID) (map[platfor
|
||||||
return sizes, nil
|
return sizes, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Returns the remaining number of bytes in Queue to be read:
|
||||||
|
func (qm *durableQueueManager) RemainingQueueSizes(ids []platform.ID) (map[platform.ID]int64, error) {
|
||||||
|
qm.mutex.RLock()
|
||||||
|
defer qm.mutex.RUnlock()
|
||||||
|
|
||||||
|
sizes := make(map[platform.ID]int64, len(ids))
|
||||||
|
|
||||||
|
for _, id := range ids {
|
||||||
|
if _, exist := qm.replicationQueues[id]; !exist {
|
||||||
|
return nil, fmt.Errorf("durable queue not found for replication ID %q", id)
|
||||||
|
}
|
||||||
|
sizes[id] = qm.replicationQueues[id].queue.TotalBytes()
|
||||||
|
}
|
||||||
|
|
||||||
|
return sizes, nil
|
||||||
|
}
|
||||||
|
|
||||||
// StartReplicationQueues updates the durableQueueManager.replicationQueues map, fully removing any partially deleted
|
// StartReplicationQueues updates the durableQueueManager.replicationQueues map, fully removing any partially deleted
|
||||||
// queues (present on disk, but not tracked in sqlite), opening all current queues, and logging info for each.
|
// queues (present on disk, but not tracked in sqlite), opening all current queues, and logging info for each.
|
||||||
func (qm *durableQueueManager) StartReplicationQueues(trackedReplications map[platform.ID]*influxdb.TrackedReplication) error {
|
func (qm *durableQueueManager) StartReplicationQueues(trackedReplications map[platform.ID]*influxdb.TrackedReplication) error {
|
||||||
|
|
|
@ -417,6 +417,11 @@ func TestEnqueueData(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
// Empty queues are 8 bytes for the footer.
|
// Empty queues are 8 bytes for the footer.
|
||||||
require.Equal(t, map[platform.ID]int64{id1: 8}, sizes)
|
require.Equal(t, map[platform.ID]int64{id1: 8}, sizes)
|
||||||
|
// Remaining queue should initially be empty:
|
||||||
|
rsizes, err := qm.RemainingQueueSizes([]platform.ID{id1})
|
||||||
|
require.NoError(t, err)
|
||||||
|
// Empty queue = 0 bytes:
|
||||||
|
require.Equal(t, map[platform.ID]int64{id1: 0}, rsizes)
|
||||||
|
|
||||||
data := "some fake data"
|
data := "some fake data"
|
||||||
|
|
||||||
|
@ -430,6 +435,11 @@ func TestEnqueueData(t *testing.T) {
|
||||||
sizes, err = qm.CurrentQueueSizes([]platform.ID{id1})
|
sizes, err = qm.CurrentQueueSizes([]platform.ID{id1})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Greater(t, sizes[id1], int64(8))
|
require.Greater(t, sizes[id1], int64(8))
|
||||||
|
rsizes, err = qm.RemainingQueueSizes([]platform.ID{id1})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Greater(t, rsizes[id1], int64(0))
|
||||||
|
// Difference between disk size and queue should only be footer size
|
||||||
|
require.Equal(t, sizes[id1]-rsizes[id1], int64(8))
|
||||||
|
|
||||||
written, err := qm.replicationQueues[id1].queue.Current()
|
written, err := qm.replicationQueues[id1].queue.Current()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -481,8 +491,17 @@ func TestSendWrite(t *testing.T) {
|
||||||
require.True(t, scan.Next())
|
require.True(t, scan.Next())
|
||||||
require.Equal(t, []byte(points[pointIndex]), scan.Bytes())
|
require.Equal(t, []byte(points[pointIndex]), scan.Bytes())
|
||||||
require.NoError(t, scan.Err())
|
require.NoError(t, scan.Err())
|
||||||
|
// Initial Queue size should be size of data + footer
|
||||||
|
rsizesI, err := qm.RemainingQueueSizes([]platform.ID{id1})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, rsizesI[id1], int64(8+len(points[pointIndex])))
|
||||||
// Send the write to the "remote" with a success
|
// Send the write to the "remote" with a success
|
||||||
rq.SendWrite()
|
rq.SendWrite()
|
||||||
|
// Queue becomes empty after write:
|
||||||
|
rsizesJ, err := qm.RemainingQueueSizes([]platform.ID{id1})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, rsizesJ[id1], int64(0))
|
||||||
|
|
||||||
// Make sure the data is no longer in the queue
|
// Make sure the data is no longer in the queue
|
||||||
_, err = rq.queue.NewScanner()
|
_, err = rq.queue.NewScanner()
|
||||||
require.Equal(t, io.EOF, err)
|
require.Equal(t, io.EOF, err)
|
||||||
|
@ -496,9 +515,15 @@ func TestSendWrite(t *testing.T) {
|
||||||
require.True(t, scan.Next())
|
require.True(t, scan.Next())
|
||||||
require.Equal(t, []byte(points[pointIndex]), scan.Bytes())
|
require.Equal(t, []byte(points[pointIndex]), scan.Bytes())
|
||||||
require.NoError(t, scan.Err())
|
require.NoError(t, scan.Err())
|
||||||
|
rsizesI, err = qm.RemainingQueueSizes([]platform.ID{id1})
|
||||||
|
require.NoError(t, err)
|
||||||
// Send the write to the "remote" with a FAILURE
|
// Send the write to the "remote" with a FAILURE
|
||||||
shouldFailThisWrite = true
|
shouldFailThisWrite = true
|
||||||
rq.SendWrite()
|
rq.SendWrite()
|
||||||
|
// Queue size should not have decreased if write has failed:
|
||||||
|
rsizesJ, err = qm.RemainingQueueSizes([]platform.ID{id1})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, rsizesJ[id1], rsizesI[id1])
|
||||||
// Make sure the data is still in the queue
|
// Make sure the data is still in the queue
|
||||||
scan, err = rq.queue.NewScanner()
|
scan, err = rq.queue.NewScanner()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -508,6 +533,11 @@ func TestSendWrite(t *testing.T) {
|
||||||
// Send the write to the "remote" again, with a SUCCESS
|
// Send the write to the "remote" again, with a SUCCESS
|
||||||
shouldFailThisWrite = false
|
shouldFailThisWrite = false
|
||||||
rq.SendWrite()
|
rq.SendWrite()
|
||||||
|
// Queue Becomes empty after a successful write
|
||||||
|
rsizesJ, err = qm.RemainingQueueSizes([]platform.ID{id1})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, rsizesJ[id1], int64(0))
|
||||||
|
|
||||||
// Make sure the data is no longer in the queue
|
// Make sure the data is no longer in the queue
|
||||||
_, err = rq.queue.NewScanner()
|
_, err = rq.queue.NewScanner()
|
||||||
require.Equal(t, io.EOF, err)
|
require.Equal(t, io.EOF, err)
|
||||||
|
|
|
@ -120,6 +120,21 @@ func (mr *MockDurableQueueManagerMockRecorder) InitializeQueue(arg0, arg1, arg2,
|
||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InitializeQueue", reflect.TypeOf((*MockDurableQueueManager)(nil).InitializeQueue), arg0, arg1, arg2, arg3, arg4)
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InitializeQueue", reflect.TypeOf((*MockDurableQueueManager)(nil).InitializeQueue), arg0, arg1, arg2, arg3, arg4)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RemainingQueueSizes mocks base method.
|
||||||
|
func (m *MockDurableQueueManager) RemainingQueueSizes(arg0 []platform.ID) (map[platform.ID]int64, error) {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
ret := m.ctrl.Call(m, "RemainingQueueSizes", arg0)
|
||||||
|
ret0, _ := ret[0].(map[platform.ID]int64)
|
||||||
|
ret1, _ := ret[1].(error)
|
||||||
|
return ret0, ret1
|
||||||
|
}
|
||||||
|
|
||||||
|
// RemainingQueueSizes indicates an expected call of RemainingQueueSizes.
|
||||||
|
func (mr *MockDurableQueueManagerMockRecorder) RemainingQueueSizes(arg0 interface{}) *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemainingQueueSizes", reflect.TypeOf((*MockDurableQueueManager)(nil).RemainingQueueSizes), arg0)
|
||||||
|
}
|
||||||
|
|
||||||
// StartReplicationQueues mocks base method.
|
// StartReplicationQueues mocks base method.
|
||||||
func (m *MockDurableQueueManager) StartReplicationQueues(arg0 map[platform.ID]*influxdb.TrackedReplication) error {
|
func (m *MockDurableQueueManager) StartReplicationQueues(arg0 map[platform.ID]*influxdb.TrackedReplication) error {
|
||||||
m.ctrl.T.Helper()
|
m.ctrl.T.Helper()
|
||||||
|
|
|
@ -113,6 +113,11 @@ func (w *writer) Write(data []byte, attempts int) (backoff time.Duration, err er
|
||||||
res, postWriteErr := PostWrite(ctx, conf, data, w.clientTimeout)
|
res, postWriteErr := PostWrite(ctx, conf, data, w.clientTimeout)
|
||||||
res, msg, ok := normalizeResponse(res, postWriteErr)
|
res, msg, ok := normalizeResponse(res, postWriteErr)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
// Update Response info:
|
||||||
|
if err := w.configStore.UpdateResponseInfo(ctx, w.replicationID, res.StatusCode, msg); err != nil {
|
||||||
|
w.logger.Debug("failed to update config store with latest remote write response info", zap.Error(err))
|
||||||
|
return w.backoff(attempts), err
|
||||||
|
}
|
||||||
// bail out
|
// bail out
|
||||||
return w.backoff(attempts), postWriteErr
|
return w.backoff(attempts), postWriteErr
|
||||||
}
|
}
|
||||||
|
|
|
@ -80,10 +80,9 @@ func TestWrite(t *testing.T) {
|
||||||
testConfig := &influxdb.ReplicationHTTPConfig{
|
testConfig := &influxdb.ReplicationHTTPConfig{
|
||||||
RemoteURL: "not a good URL",
|
RemoteURL: "not a good URL",
|
||||||
}
|
}
|
||||||
|
|
||||||
w, configStore, _ := testWriter(t)
|
w, configStore, _ := testWriter(t)
|
||||||
|
|
||||||
configStore.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(testConfig, nil)
|
configStore.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(testConfig, nil)
|
||||||
|
configStore.EXPECT().UpdateResponseInfo(gomock.Any(), testID, int(0), gomock.Any())
|
||||||
_, actualErr := w.Write([]byte{}, 1)
|
_, actualErr := w.Write([]byte{}, 1)
|
||||||
require.Error(t, actualErr)
|
require.Error(t, actualErr)
|
||||||
})
|
})
|
||||||
|
|
|
@ -76,6 +76,7 @@ type DurableQueueManager interface {
|
||||||
DeleteQueue(replicationID platform.ID) error
|
DeleteQueue(replicationID platform.ID) error
|
||||||
UpdateMaxQueueSize(replicationID platform.ID, maxQueueSizeBytes int64) error
|
UpdateMaxQueueSize(replicationID platform.ID, maxQueueSizeBytes int64) error
|
||||||
CurrentQueueSizes(ids []platform.ID) (map[platform.ID]int64, error)
|
CurrentQueueSizes(ids []platform.ID) (map[platform.ID]int64, error)
|
||||||
|
RemainingQueueSizes(ids []platform.ID) (map[platform.ID]int64, error)
|
||||||
StartReplicationQueues(trackedReplications map[platform.ID]*influxdb.TrackedReplication) error
|
StartReplicationQueues(trackedReplications map[platform.ID]*influxdb.TrackedReplication) error
|
||||||
CloseAll() error
|
CloseAll() error
|
||||||
EnqueueData(replicationID platform.ID, data []byte, numPoints int) error
|
EnqueueData(replicationID platform.ID, data []byte, numPoints int) error
|
||||||
|
@ -129,6 +130,13 @@ func (s *service) ListReplications(ctx context.Context, filter influxdb.Replicat
|
||||||
for i := range rs.Replications {
|
for i := range rs.Replications {
|
||||||
rs.Replications[i].CurrentQueueSizeBytes = sizes[rs.Replications[i].ID]
|
rs.Replications[i].CurrentQueueSizeBytes = sizes[rs.Replications[i].ID]
|
||||||
}
|
}
|
||||||
|
rsizes, err := s.durableQueueManager.RemainingQueueSizes(ids)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
for i := range rs.Replications {
|
||||||
|
rs.Replications[i].RemainingQueueSizeBytes = rsizes[rs.Replications[i].ID]
|
||||||
|
}
|
||||||
|
|
||||||
return rs, nil
|
return rs, nil
|
||||||
}
|
}
|
||||||
|
@ -196,6 +204,11 @@ func (s *service) GetReplication(ctx context.Context, id platform.ID) (*influxdb
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
r.CurrentQueueSizeBytes = sizes[r.ID]
|
r.CurrentQueueSizeBytes = sizes[r.ID]
|
||||||
|
rsizes, err := s.durableQueueManager.RemainingQueueSizes([]platform.ID{r.ID})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
r.RemainingQueueSizeBytes = rsizes[r.ID]
|
||||||
|
|
||||||
return r, nil
|
return r, nil
|
||||||
}
|
}
|
||||||
|
@ -221,6 +234,11 @@ func (s *service) UpdateReplication(ctx context.Context, id platform.ID, request
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
r.CurrentQueueSizeBytes = sizes[r.ID]
|
r.CurrentQueueSizeBytes = sizes[r.ID]
|
||||||
|
rsizes, err := s.durableQueueManager.RemainingQueueSizes([]platform.ID{r.ID})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
r.RemainingQueueSizeBytes = rsizes[r.ID]
|
||||||
|
|
||||||
return r, nil
|
return r, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -110,12 +110,14 @@ func TestListReplications(t *testing.T) {
|
||||||
filter := influxdb.ReplicationListFilter{}
|
filter := influxdb.ReplicationListFilter{}
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
list influxdb.Replications
|
list influxdb.Replications
|
||||||
ids []platform.ID
|
ids []platform.ID
|
||||||
sizes map[platform.ID]int64
|
sizes map[platform.ID]int64
|
||||||
storeErr error
|
rsizes map[platform.ID]int64
|
||||||
queueManagerErr error
|
storeErr error
|
||||||
|
queueManagerErr error
|
||||||
|
queueManagerRemainingSizesErr error
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "matches multiple",
|
name: "matches multiple",
|
||||||
|
@ -149,6 +151,14 @@ func TestListReplications(t *testing.T) {
|
||||||
ids: []platform.ID{replication1.ID},
|
ids: []platform.ID{replication1.ID},
|
||||||
queueManagerErr: errors.New("error from queue manager"),
|
queueManagerErr: errors.New("error from queue manager"),
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "queue manager error - remaining queue size",
|
||||||
|
list: influxdb.Replications{
|
||||||
|
Replications: []influxdb.Replication{replication1},
|
||||||
|
},
|
||||||
|
ids: []platform.ID{replication1.ID},
|
||||||
|
queueManagerRemainingSizesErr: errors.New("Remaining Queue Size erro"),
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
|
@ -161,6 +171,9 @@ func TestListReplications(t *testing.T) {
|
||||||
mocks.durableQueueManager.EXPECT().CurrentQueueSizes(tt.ids).Return(tt.sizes, tt.queueManagerErr)
|
mocks.durableQueueManager.EXPECT().CurrentQueueSizes(tt.ids).Return(tt.sizes, tt.queueManagerErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if tt.storeErr == nil && tt.queueManagerErr == nil && len(tt.list.Replications) > 0 {
|
||||||
|
mocks.durableQueueManager.EXPECT().RemainingQueueSizes(tt.ids).Return(tt.rsizes, tt.queueManagerRemainingSizesErr)
|
||||||
|
}
|
||||||
got, err := svc.ListReplications(ctx, filter)
|
got, err := svc.ListReplications(ctx, filter)
|
||||||
|
|
||||||
var wantErr error
|
var wantErr error
|
||||||
|
@ -168,6 +181,8 @@ func TestListReplications(t *testing.T) {
|
||||||
wantErr = tt.storeErr
|
wantErr = tt.storeErr
|
||||||
} else if tt.queueManagerErr != nil {
|
} else if tt.queueManagerErr != nil {
|
||||||
wantErr = tt.queueManagerErr
|
wantErr = tt.queueManagerErr
|
||||||
|
} else if tt.queueManagerRemainingSizesErr != nil {
|
||||||
|
wantErr = tt.queueManagerRemainingSizesErr
|
||||||
}
|
}
|
||||||
|
|
||||||
require.Equal(t, wantErr, err)
|
require.Equal(t, wantErr, err)
|
||||||
|
@ -179,6 +194,7 @@ func TestListReplications(t *testing.T) {
|
||||||
|
|
||||||
for _, r := range got.Replications {
|
for _, r := range got.Replications {
|
||||||
require.Equal(t, tt.sizes[r.ID], r.CurrentQueueSizeBytes)
|
require.Equal(t, tt.sizes[r.ID], r.CurrentQueueSizeBytes)
|
||||||
|
require.Equal(t, tt.rsizes[r.ID], r.RemainingQueueSizeBytes)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -315,12 +331,14 @@ func TestGetReplication(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
sizes map[platform.ID]int64
|
sizes map[platform.ID]int64
|
||||||
storeErr error
|
rsizes map[platform.ID]int64
|
||||||
queueManagerErr error
|
storeErr error
|
||||||
storeWant influxdb.Replication
|
queueManagerErr error
|
||||||
want influxdb.Replication
|
queueManagerRemainingSizesErr error
|
||||||
|
storeWant influxdb.Replication
|
||||||
|
want influxdb.Replication
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "success",
|
name: "success",
|
||||||
|
@ -337,6 +355,11 @@ func TestGetReplication(t *testing.T) {
|
||||||
storeWant: replication1,
|
storeWant: replication1,
|
||||||
queueManagerErr: errors.New("queue manager error"),
|
queueManagerErr: errors.New("queue manager error"),
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "queue manager error - remaining queue size",
|
||||||
|
storeWant: replication1,
|
||||||
|
queueManagerRemainingSizesErr: errors.New("queue manager error"),
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
|
@ -348,6 +371,9 @@ func TestGetReplication(t *testing.T) {
|
||||||
if tt.storeErr == nil {
|
if tt.storeErr == nil {
|
||||||
mocks.durableQueueManager.EXPECT().CurrentQueueSizes([]platform.ID{id1}).Return(tt.sizes, tt.queueManagerErr)
|
mocks.durableQueueManager.EXPECT().CurrentQueueSizes([]platform.ID{id1}).Return(tt.sizes, tt.queueManagerErr)
|
||||||
}
|
}
|
||||||
|
if tt.storeErr == nil && tt.queueManagerErr == nil {
|
||||||
|
mocks.durableQueueManager.EXPECT().RemainingQueueSizes([]platform.ID{id1}).Return(tt.rsizes, tt.queueManagerRemainingSizesErr)
|
||||||
|
}
|
||||||
|
|
||||||
got, err := svc.GetReplication(ctx, id1)
|
got, err := svc.GetReplication(ctx, id1)
|
||||||
|
|
||||||
|
@ -356,6 +382,8 @@ func TestGetReplication(t *testing.T) {
|
||||||
wantErr = tt.storeErr
|
wantErr = tt.storeErr
|
||||||
} else if tt.queueManagerErr != nil {
|
} else if tt.queueManagerErr != nil {
|
||||||
wantErr = tt.queueManagerErr
|
wantErr = tt.queueManagerErr
|
||||||
|
} else if tt.queueManagerRemainingSizesErr != nil {
|
||||||
|
wantErr = tt.queueManagerRemainingSizesErr
|
||||||
}
|
}
|
||||||
|
|
||||||
require.Equal(t, wantErr, err)
|
require.Equal(t, wantErr, err)
|
||||||
|
@ -366,6 +394,8 @@ func TestGetReplication(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
require.Equal(t, tt.sizes[got.ID], got.CurrentQueueSizeBytes)
|
require.Equal(t, tt.sizes[got.ID], got.CurrentQueueSizeBytes)
|
||||||
|
require.Equal(t, tt.rsizes[got.ID], got.RemainingQueueSizeBytes)
|
||||||
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -374,15 +404,17 @@ func TestUpdateReplication(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
request influxdb.UpdateReplicationRequest
|
request influxdb.UpdateReplicationRequest
|
||||||
sizes map[platform.ID]int64
|
sizes map[platform.ID]int64
|
||||||
storeErr error
|
rsizes map[platform.ID]int64
|
||||||
queueManagerUpdateSizeErr error
|
storeErr error
|
||||||
queueManagerCurrentSizesErr error
|
queueManagerUpdateSizeErr error
|
||||||
storeUpdate *influxdb.Replication
|
queueManagerCurrentSizesErr error
|
||||||
want *influxdb.Replication
|
queueManagerRemainingSizesErr error
|
||||||
wantErr error
|
storeUpdate *influxdb.Replication
|
||||||
|
want *influxdb.Replication
|
||||||
|
wantErr error
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "success with new max queue size",
|
name: "success with new max queue size",
|
||||||
|
@ -417,6 +449,13 @@ func TestUpdateReplication(t *testing.T) {
|
||||||
storeUpdate: &updatedReplicationWithNoNewSize,
|
storeUpdate: &updatedReplicationWithNoNewSize,
|
||||||
wantErr: errors.New("current size err"),
|
wantErr: errors.New("current size err"),
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "queue manager error - remaining queue size",
|
||||||
|
request: updateReqWithNoNewSize,
|
||||||
|
queueManagerRemainingSizesErr: errors.New("remaining queue size err"),
|
||||||
|
storeUpdate: &updatedReplicationWithNoNewSize,
|
||||||
|
wantErr: errors.New("remaining queue size err"),
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
|
@ -436,6 +475,10 @@ func TestUpdateReplication(t *testing.T) {
|
||||||
mocks.durableQueueManager.EXPECT().CurrentQueueSizes([]platform.ID{id1}).Return(tt.sizes, tt.queueManagerCurrentSizesErr)
|
mocks.durableQueueManager.EXPECT().CurrentQueueSizes([]platform.ID{id1}).Return(tt.sizes, tt.queueManagerCurrentSizesErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if tt.storeErr == nil && tt.queueManagerUpdateSizeErr == nil && tt.queueManagerCurrentSizesErr == nil {
|
||||||
|
mocks.durableQueueManager.EXPECT().RemainingQueueSizes([]platform.ID{id1}).Return(tt.rsizes, tt.queueManagerRemainingSizesErr)
|
||||||
|
}
|
||||||
|
|
||||||
got, err := svc.UpdateReplication(ctx, id1, tt.request)
|
got, err := svc.UpdateReplication(ctx, id1, tt.request)
|
||||||
require.Equal(t, tt.want, got)
|
require.Equal(t, tt.want, got)
|
||||||
require.Equal(t, tt.wantErr, err)
|
require.Equal(t, tt.wantErr, err)
|
||||||
|
|
Loading…
Reference in New Issue