From ec7fdd3a58d3a0b2386af045cb41057c47113a4c Mon Sep 17 00:00:00 2001 From: suitableZebraCaller <15273771+suitableZebraCaller@users.noreply.github.com> Date: Fri, 3 Feb 2023 01:47:45 +1100 Subject: [PATCH] fix: Show Replication Queue size and Replication TCP Errors (#23960) * feat: Show remaining replication queue size * fix: Show non-http related error messages * fix: Show non-http related error messages with backoff * fix: Updates for replication tests * chore: formatting * chore: formatting * chore: formatting * chore: formatting * chore: lowercase json field --------- Co-authored-by: Geoffrey Co-authored-by: Jeffrey Smith II --- replication.go | 29 ++++--- replications/internal/queue_management.go | 17 ++++ .../internal/queue_management_test.go | 30 +++++++ replications/mock/queue_management.go | 15 ++++ replications/remotewrite/writer.go | 5 ++ replications/remotewrite/writer_test.go | 3 +- replications/service.go | 18 ++++ replications/service_test.go | 85 ++++++++++++++----- 8 files changed, 165 insertions(+), 37 deletions(-) diff --git a/replication.go b/replication.go index a006e0c6cb..d6f7ee09d7 100644 --- a/replication.go +++ b/replication.go @@ -20,20 +20,21 @@ var ErrMaxQueueSizeTooSmall = errors.Error{ // Replication contains all info about a replication that should be returned to users. type Replication struct { - ID platform.ID `json:"id" db:"id"` - OrgID platform.ID `json:"orgID" db:"org_id"` - Name string `json:"name" db:"name"` - Description *string `json:"description,omitempty" db:"description"` - RemoteID platform.ID `json:"remoteID" db:"remote_id"` - LocalBucketID platform.ID `json:"localBucketID" db:"local_bucket_id"` - RemoteBucketID *platform.ID `json:"remoteBucketID" db:"remote_bucket_id"` - RemoteBucketName string `json:"RemoteBucketName" db:"remote_bucket_name"` - MaxQueueSizeBytes int64 `json:"maxQueueSizeBytes" db:"max_queue_size_bytes"` - CurrentQueueSizeBytes int64 `json:"currentQueueSizeBytes" db:"current_queue_size_bytes"` - LatestResponseCode *int32 `json:"latestResponseCode,omitempty" db:"latest_response_code"` - LatestErrorMessage *string `json:"latestErrorMessage,omitempty" db:"latest_error_message"` - DropNonRetryableData bool `json:"dropNonRetryableData" db:"drop_non_retryable_data"` - MaxAgeSeconds int64 `json:"maxAgeSeconds" db:"max_age_seconds"` + ID platform.ID `json:"id" db:"id"` + OrgID platform.ID `json:"orgID" db:"org_id"` + Name string `json:"name" db:"name"` + Description *string `json:"description,omitempty" db:"description"` + RemoteID platform.ID `json:"remoteID" db:"remote_id"` + LocalBucketID platform.ID `json:"localBucketID" db:"local_bucket_id"` + RemoteBucketID *platform.ID `json:"remoteBucketID" db:"remote_bucket_id"` + RemoteBucketName string `json:"RemoteBucketName" db:"remote_bucket_name"` + MaxQueueSizeBytes int64 `json:"maxQueueSizeBytes" db:"max_queue_size_bytes"` + CurrentQueueSizeBytes int64 `json:"currentQueueSizeBytes" db:"current_queue_size_bytes"` + RemainingQueueSizeBytes int64 `json:"remainingQueueSizeBytes" db:"remaining_queue_size_bytes"` + LatestResponseCode *int32 `json:"latestResponseCode,omitempty" db:"latest_response_code"` + LatestErrorMessage *string `json:"latestErrorMessage,omitempty" db:"latest_error_message"` + DropNonRetryableData bool `json:"dropNonRetryableData" db:"drop_non_retryable_data"` + MaxAgeSeconds int64 `json:"maxAgeSeconds" db:"max_age_seconds"` } // ReplicationListFilter is a selection filter for listing replications. diff --git a/replications/internal/queue_management.go b/replications/internal/queue_management.go index c0e40ee778..0bf8a3c18b 100644 --- a/replications/internal/queue_management.go +++ b/replications/internal/queue_management.go @@ -315,6 +315,23 @@ func (qm *durableQueueManager) CurrentQueueSizes(ids []platform.ID) (map[platfor return sizes, nil } +// Returns the remaining number of bytes in Queue to be read: +func (qm *durableQueueManager) RemainingQueueSizes(ids []platform.ID) (map[platform.ID]int64, error) { + qm.mutex.RLock() + defer qm.mutex.RUnlock() + + sizes := make(map[platform.ID]int64, len(ids)) + + for _, id := range ids { + if _, exist := qm.replicationQueues[id]; !exist { + return nil, fmt.Errorf("durable queue not found for replication ID %q", id) + } + sizes[id] = qm.replicationQueues[id].queue.TotalBytes() + } + + return sizes, nil +} + // StartReplicationQueues updates the durableQueueManager.replicationQueues map, fully removing any partially deleted // queues (present on disk, but not tracked in sqlite), opening all current queues, and logging info for each. func (qm *durableQueueManager) StartReplicationQueues(trackedReplications map[platform.ID]*influxdb.TrackedReplication) error { diff --git a/replications/internal/queue_management_test.go b/replications/internal/queue_management_test.go index 88de275e47..278cd9fb38 100644 --- a/replications/internal/queue_management_test.go +++ b/replications/internal/queue_management_test.go @@ -417,6 +417,11 @@ func TestEnqueueData(t *testing.T) { require.NoError(t, err) // Empty queues are 8 bytes for the footer. require.Equal(t, map[platform.ID]int64{id1: 8}, sizes) + // Remaining queue should initially be empty: + rsizes, err := qm.RemainingQueueSizes([]platform.ID{id1}) + require.NoError(t, err) + // Empty queue = 0 bytes: + require.Equal(t, map[platform.ID]int64{id1: 0}, rsizes) data := "some fake data" @@ -430,6 +435,11 @@ func TestEnqueueData(t *testing.T) { sizes, err = qm.CurrentQueueSizes([]platform.ID{id1}) require.NoError(t, err) require.Greater(t, sizes[id1], int64(8)) + rsizes, err = qm.RemainingQueueSizes([]platform.ID{id1}) + require.NoError(t, err) + require.Greater(t, rsizes[id1], int64(0)) + // Difference between disk size and queue should only be footer size + require.Equal(t, sizes[id1]-rsizes[id1], int64(8)) written, err := qm.replicationQueues[id1].queue.Current() require.NoError(t, err) @@ -481,8 +491,17 @@ func TestSendWrite(t *testing.T) { require.True(t, scan.Next()) require.Equal(t, []byte(points[pointIndex]), scan.Bytes()) require.NoError(t, scan.Err()) + // Initial Queue size should be size of data + footer + rsizesI, err := qm.RemainingQueueSizes([]platform.ID{id1}) + require.NoError(t, err) + require.Equal(t, rsizesI[id1], int64(8+len(points[pointIndex]))) // Send the write to the "remote" with a success rq.SendWrite() + // Queue becomes empty after write: + rsizesJ, err := qm.RemainingQueueSizes([]platform.ID{id1}) + require.NoError(t, err) + require.Equal(t, rsizesJ[id1], int64(0)) + // Make sure the data is no longer in the queue _, err = rq.queue.NewScanner() require.Equal(t, io.EOF, err) @@ -496,9 +515,15 @@ func TestSendWrite(t *testing.T) { require.True(t, scan.Next()) require.Equal(t, []byte(points[pointIndex]), scan.Bytes()) require.NoError(t, scan.Err()) + rsizesI, err = qm.RemainingQueueSizes([]platform.ID{id1}) + require.NoError(t, err) // Send the write to the "remote" with a FAILURE shouldFailThisWrite = true rq.SendWrite() + // Queue size should not have decreased if write has failed: + rsizesJ, err = qm.RemainingQueueSizes([]platform.ID{id1}) + require.NoError(t, err) + require.Equal(t, rsizesJ[id1], rsizesI[id1]) // Make sure the data is still in the queue scan, err = rq.queue.NewScanner() require.NoError(t, err) @@ -508,6 +533,11 @@ func TestSendWrite(t *testing.T) { // Send the write to the "remote" again, with a SUCCESS shouldFailThisWrite = false rq.SendWrite() + // Queue Becomes empty after a successful write + rsizesJ, err = qm.RemainingQueueSizes([]platform.ID{id1}) + require.NoError(t, err) + require.Equal(t, rsizesJ[id1], int64(0)) + // Make sure the data is no longer in the queue _, err = rq.queue.NewScanner() require.Equal(t, io.EOF, err) diff --git a/replications/mock/queue_management.go b/replications/mock/queue_management.go index 583aa1f4fd..2e8c01d30e 100644 --- a/replications/mock/queue_management.go +++ b/replications/mock/queue_management.go @@ -120,6 +120,21 @@ func (mr *MockDurableQueueManagerMockRecorder) InitializeQueue(arg0, arg1, arg2, return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InitializeQueue", reflect.TypeOf((*MockDurableQueueManager)(nil).InitializeQueue), arg0, arg1, arg2, arg3, arg4) } +// RemainingQueueSizes mocks base method. +func (m *MockDurableQueueManager) RemainingQueueSizes(arg0 []platform.ID) (map[platform.ID]int64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RemainingQueueSizes", arg0) + ret0, _ := ret[0].(map[platform.ID]int64) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// RemainingQueueSizes indicates an expected call of RemainingQueueSizes. +func (mr *MockDurableQueueManagerMockRecorder) RemainingQueueSizes(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemainingQueueSizes", reflect.TypeOf((*MockDurableQueueManager)(nil).RemainingQueueSizes), arg0) +} + // StartReplicationQueues mocks base method. func (m *MockDurableQueueManager) StartReplicationQueues(arg0 map[platform.ID]*influxdb.TrackedReplication) error { m.ctrl.T.Helper() diff --git a/replications/remotewrite/writer.go b/replications/remotewrite/writer.go index 854d870b7a..39a29b21be 100644 --- a/replications/remotewrite/writer.go +++ b/replications/remotewrite/writer.go @@ -113,6 +113,11 @@ func (w *writer) Write(data []byte, attempts int) (backoff time.Duration, err er res, postWriteErr := PostWrite(ctx, conf, data, w.clientTimeout) res, msg, ok := normalizeResponse(res, postWriteErr) if !ok { + // Update Response info: + if err := w.configStore.UpdateResponseInfo(ctx, w.replicationID, res.StatusCode, msg); err != nil { + w.logger.Debug("failed to update config store with latest remote write response info", zap.Error(err)) + return w.backoff(attempts), err + } // bail out return w.backoff(attempts), postWriteErr } diff --git a/replications/remotewrite/writer_test.go b/replications/remotewrite/writer_test.go index 363a9977c7..e5362dc79f 100644 --- a/replications/remotewrite/writer_test.go +++ b/replications/remotewrite/writer_test.go @@ -80,10 +80,9 @@ func TestWrite(t *testing.T) { testConfig := &influxdb.ReplicationHTTPConfig{ RemoteURL: "not a good URL", } - w, configStore, _ := testWriter(t) - configStore.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(testConfig, nil) + configStore.EXPECT().UpdateResponseInfo(gomock.Any(), testID, int(0), gomock.Any()) _, actualErr := w.Write([]byte{}, 1) require.Error(t, actualErr) }) diff --git a/replications/service.go b/replications/service.go index 439d8d66b6..f9caf5ad14 100644 --- a/replications/service.go +++ b/replications/service.go @@ -76,6 +76,7 @@ type DurableQueueManager interface { DeleteQueue(replicationID platform.ID) error UpdateMaxQueueSize(replicationID platform.ID, maxQueueSizeBytes int64) error CurrentQueueSizes(ids []platform.ID) (map[platform.ID]int64, error) + RemainingQueueSizes(ids []platform.ID) (map[platform.ID]int64, error) StartReplicationQueues(trackedReplications map[platform.ID]*influxdb.TrackedReplication) error CloseAll() error EnqueueData(replicationID platform.ID, data []byte, numPoints int) error @@ -129,6 +130,13 @@ func (s *service) ListReplications(ctx context.Context, filter influxdb.Replicat for i := range rs.Replications { rs.Replications[i].CurrentQueueSizeBytes = sizes[rs.Replications[i].ID] } + rsizes, err := s.durableQueueManager.RemainingQueueSizes(ids) + if err != nil { + return nil, err + } + for i := range rs.Replications { + rs.Replications[i].RemainingQueueSizeBytes = rsizes[rs.Replications[i].ID] + } return rs, nil } @@ -196,6 +204,11 @@ func (s *service) GetReplication(ctx context.Context, id platform.ID) (*influxdb return nil, err } r.CurrentQueueSizeBytes = sizes[r.ID] + rsizes, err := s.durableQueueManager.RemainingQueueSizes([]platform.ID{r.ID}) + if err != nil { + return nil, err + } + r.RemainingQueueSizeBytes = rsizes[r.ID] return r, nil } @@ -221,6 +234,11 @@ func (s *service) UpdateReplication(ctx context.Context, id platform.ID, request return nil, err } r.CurrentQueueSizeBytes = sizes[r.ID] + rsizes, err := s.durableQueueManager.RemainingQueueSizes([]platform.ID{r.ID}) + if err != nil { + return nil, err + } + r.RemainingQueueSizeBytes = rsizes[r.ID] return r, nil } diff --git a/replications/service_test.go b/replications/service_test.go index 52b991dcad..53c18fc056 100644 --- a/replications/service_test.go +++ b/replications/service_test.go @@ -110,12 +110,14 @@ func TestListReplications(t *testing.T) { filter := influxdb.ReplicationListFilter{} tests := []struct { - name string - list influxdb.Replications - ids []platform.ID - sizes map[platform.ID]int64 - storeErr error - queueManagerErr error + name string + list influxdb.Replications + ids []platform.ID + sizes map[platform.ID]int64 + rsizes map[platform.ID]int64 + storeErr error + queueManagerErr error + queueManagerRemainingSizesErr error }{ { name: "matches multiple", @@ -149,6 +151,14 @@ func TestListReplications(t *testing.T) { ids: []platform.ID{replication1.ID}, queueManagerErr: errors.New("error from queue manager"), }, + { + name: "queue manager error - remaining queue size", + list: influxdb.Replications{ + Replications: []influxdb.Replication{replication1}, + }, + ids: []platform.ID{replication1.ID}, + queueManagerRemainingSizesErr: errors.New("Remaining Queue Size erro"), + }, } for _, tt := range tests { @@ -161,6 +171,9 @@ func TestListReplications(t *testing.T) { mocks.durableQueueManager.EXPECT().CurrentQueueSizes(tt.ids).Return(tt.sizes, tt.queueManagerErr) } + if tt.storeErr == nil && tt.queueManagerErr == nil && len(tt.list.Replications) > 0 { + mocks.durableQueueManager.EXPECT().RemainingQueueSizes(tt.ids).Return(tt.rsizes, tt.queueManagerRemainingSizesErr) + } got, err := svc.ListReplications(ctx, filter) var wantErr error @@ -168,6 +181,8 @@ func TestListReplications(t *testing.T) { wantErr = tt.storeErr } else if tt.queueManagerErr != nil { wantErr = tt.queueManagerErr + } else if tt.queueManagerRemainingSizesErr != nil { + wantErr = tt.queueManagerRemainingSizesErr } require.Equal(t, wantErr, err) @@ -179,6 +194,7 @@ func TestListReplications(t *testing.T) { for _, r := range got.Replications { require.Equal(t, tt.sizes[r.ID], r.CurrentQueueSizeBytes) + require.Equal(t, tt.rsizes[r.ID], r.RemainingQueueSizeBytes) } }) } @@ -315,12 +331,14 @@ func TestGetReplication(t *testing.T) { t.Parallel() tests := []struct { - name string - sizes map[platform.ID]int64 - storeErr error - queueManagerErr error - storeWant influxdb.Replication - want influxdb.Replication + name string + sizes map[platform.ID]int64 + rsizes map[platform.ID]int64 + storeErr error + queueManagerErr error + queueManagerRemainingSizesErr error + storeWant influxdb.Replication + want influxdb.Replication }{ { name: "success", @@ -337,6 +355,11 @@ func TestGetReplication(t *testing.T) { storeWant: replication1, queueManagerErr: errors.New("queue manager error"), }, + { + name: "queue manager error - remaining queue size", + storeWant: replication1, + queueManagerRemainingSizesErr: errors.New("queue manager error"), + }, } for _, tt := range tests { @@ -348,6 +371,9 @@ func TestGetReplication(t *testing.T) { if tt.storeErr == nil { mocks.durableQueueManager.EXPECT().CurrentQueueSizes([]platform.ID{id1}).Return(tt.sizes, tt.queueManagerErr) } + if tt.storeErr == nil && tt.queueManagerErr == nil { + mocks.durableQueueManager.EXPECT().RemainingQueueSizes([]platform.ID{id1}).Return(tt.rsizes, tt.queueManagerRemainingSizesErr) + } got, err := svc.GetReplication(ctx, id1) @@ -356,6 +382,8 @@ func TestGetReplication(t *testing.T) { wantErr = tt.storeErr } else if tt.queueManagerErr != nil { wantErr = tt.queueManagerErr + } else if tt.queueManagerRemainingSizesErr != nil { + wantErr = tt.queueManagerRemainingSizesErr } require.Equal(t, wantErr, err) @@ -366,6 +394,8 @@ func TestGetReplication(t *testing.T) { } require.Equal(t, tt.sizes[got.ID], got.CurrentQueueSizeBytes) + require.Equal(t, tt.rsizes[got.ID], got.RemainingQueueSizeBytes) + }) } } @@ -374,15 +404,17 @@ func TestUpdateReplication(t *testing.T) { t.Parallel() tests := []struct { - name string - request influxdb.UpdateReplicationRequest - sizes map[platform.ID]int64 - storeErr error - queueManagerUpdateSizeErr error - queueManagerCurrentSizesErr error - storeUpdate *influxdb.Replication - want *influxdb.Replication - wantErr error + name string + request influxdb.UpdateReplicationRequest + sizes map[platform.ID]int64 + rsizes map[platform.ID]int64 + storeErr error + queueManagerUpdateSizeErr error + queueManagerCurrentSizesErr error + queueManagerRemainingSizesErr error + storeUpdate *influxdb.Replication + want *influxdb.Replication + wantErr error }{ { name: "success with new max queue size", @@ -417,6 +449,13 @@ func TestUpdateReplication(t *testing.T) { storeUpdate: &updatedReplicationWithNoNewSize, wantErr: errors.New("current size err"), }, + { + name: "queue manager error - remaining queue size", + request: updateReqWithNoNewSize, + queueManagerRemainingSizesErr: errors.New("remaining queue size err"), + storeUpdate: &updatedReplicationWithNoNewSize, + wantErr: errors.New("remaining queue size err"), + }, } for _, tt := range tests { @@ -436,6 +475,10 @@ func TestUpdateReplication(t *testing.T) { mocks.durableQueueManager.EXPECT().CurrentQueueSizes([]platform.ID{id1}).Return(tt.sizes, tt.queueManagerCurrentSizesErr) } + if tt.storeErr == nil && tt.queueManagerUpdateSizeErr == nil && tt.queueManagerCurrentSizesErr == nil { + mocks.durableQueueManager.EXPECT().RemainingQueueSizes([]platform.ID{id1}).Return(tt.rsizes, tt.queueManagerRemainingSizesErr) + } + got, err := svc.UpdateReplication(ctx, id1, tt.request) require.Equal(t, tt.want, got) require.Equal(t, tt.wantErr, err)