diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 1835d85935..0763dbf153 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -315,7 +315,7 @@ func (m *meta) SetState(segmentID UniqueID, targetState commonpb.SegmentState) e clonedSegment.State = targetState oldState := curSegInfo.GetState() if clonedSegment != nil && isSegmentHealthy(clonedSegment) { - if err := m.catalog.AlterSegments(m.ctx, []*datapb.SegmentInfo{clonedSegment.SegmentInfo}); err != nil { + if err := m.catalog.AlterSegment(m.ctx, clonedSegment.SegmentInfo, curSegInfo.SegmentInfo); err != nil { log.Error("meta update: setting segment state - failed to alter segments", zap.Int64("segment ID", segmentID), zap.String("target state", targetState.String()), @@ -353,7 +353,7 @@ func (m *meta) UnsetIsImporting(segmentID UniqueID) error { clonedSegment := curSegInfo.Clone() clonedSegment.IsImporting = false if isSegmentHealthy(clonedSegment) { - if err := m.catalog.AlterSegments(m.ctx, []*datapb.SegmentInfo{clonedSegment.SegmentInfo}); err != nil { + if err := m.catalog.AlterSegment(m.ctx, clonedSegment.SegmentInfo, curSegInfo.SegmentInfo); err != nil { log.Error("meta update: unsetting isImport state of segment - failed to unset segment isImporting state", zap.Int64("segment ID", segmentID), zap.Error(err)) @@ -823,7 +823,7 @@ func (m *meta) AddAllocation(segmentID UniqueID, allocation *Allocation) error { // Persist segment updates first. clonedSegment := curSegInfo.Clone(AddAllocation(allocation)) if clonedSegment != nil && isSegmentHealthy(clonedSegment) { - if err := m.catalog.AlterSegments(m.ctx, []*datapb.SegmentInfo{clonedSegment.SegmentInfo}); err != nil { + if err := m.catalog.AlterSegment(m.ctx, clonedSegment.SegmentInfo, curSegInfo.SegmentInfo); err != nil { log.Error("meta update: add allocation failed", zap.Int64("segment ID", segmentID), zap.Error(err)) diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index 0a7d7e46ec..aca6390644 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -607,7 +607,7 @@ func TestSaveHandoffMeta(t *testing.T) { info := &datapb.SegmentInfo{ ID: 100, - State: commonpb.SegmentState_Flushed, + State: commonpb.SegmentState_Flushing, } segmentInfo := &SegmentInfo{ SegmentInfo: info, @@ -618,7 +618,20 @@ func TestSaveHandoffMeta(t *testing.T) { keys, _, err := kvClient.LoadWithPrefix(util.FlushedSegmentPrefix) assert.Nil(t, err) + assert.Equal(t, 0, len(keys)) + + newInfo := &datapb.SegmentInfo{ + ID: 100, + State: commonpb.SegmentState_Flushed, + } + + err = meta.catalog.AlterSegment(context.TODO(), newInfo, segmentInfo.SegmentInfo) + assert.Nil(t, err) + + keys, _, err = kvClient.LoadWithPrefix(util.FlushedSegmentPrefix) + assert.Nil(t, err) assert.Equal(t, 1, len(keys)) + segmentID, err := strconv.ParseInt(filepath.Base(keys[0]), 10, 64) assert.Nil(t, err) assert.Equal(t, 100, int(segmentID)) diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index ebf1712973..843e4e5cd4 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -788,7 +788,11 @@ func (s *Server) startFlushLoop(ctx context.Context) { return case segmentID := <-s.flushCh: //Ignore return error - _ = s.postFlush(ctx, segmentID) + log.Info("flush successfully", zap.Any("segmentID", segmentID)) + err := s.postFlush(ctx, segmentID) + if err != nil { + log.Warn("failed to do post flush", zap.Any("segmentID", segmentID), zap.Error(err)) + } } } }() diff --git a/internal/metastore/catalog.go b/internal/metastore/catalog.go index f9931331e2..f0fc7c2b37 100644 --- a/internal/metastore/catalog.go +++ b/internal/metastore/catalog.go @@ -72,10 +72,13 @@ func (t AlterType) String() string { type DataCoordCatalog interface { ListSegments(ctx context.Context) ([]*datapb.SegmentInfo, error) AddSegment(ctx context.Context, segment *datapb.SegmentInfo) error - AddFakedSegment(ctx context.Context, segment *datapb.SegmentInfo) error - AlterSegments(ctx context.Context, segments []*datapb.SegmentInfo) error + // TODO Remove this later, we should update flush segments info for each segment separately, so far we still need transaction + AlterSegments(ctx context.Context, newSegments []*datapb.SegmentInfo) error // AlterSegmentsAndAddNewSegment for transaction AlterSegmentsAndAddNewSegment(ctx context.Context, segments []*datapb.SegmentInfo, newSegment *datapb.SegmentInfo) error + // TODO Remove this later, only a hack + AddFakedSegment(ctx context.Context, segment *datapb.SegmentInfo) error + AlterSegment(ctx context.Context, newSegment *datapb.SegmentInfo, oldSegment *datapb.SegmentInfo) error SaveDroppedSegmentsInBatch(ctx context.Context, segments []*datapb.SegmentInfo) error DropSegment(ctx context.Context, segment *datapb.SegmentInfo) error MarkChannelDeleted(ctx context.Context, channel string) error diff --git a/internal/metastore/kv/datacoord/kv_catalog.go b/internal/metastore/kv/datacoord/kv_catalog.go index 5140843d93..23c5c0e875 100644 --- a/internal/metastore/kv/datacoord/kv_catalog.go +++ b/internal/metastore/kv/datacoord/kv_catalog.go @@ -113,44 +113,41 @@ func (kc *Catalog) AddSegment(ctx context.Context, segment *datapb.SegmentInfo) if err != nil { return err } + return kc.Txn.MultiSave(kvs) +} - // save handoff req if segment is flushed - if segment.State == commonpb.SegmentState_Flushed { - flushSegKey := buildFlushedSegmentPath(segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID()) - newSeg := &datapb.SegmentInfo{ID: segment.GetID()} - segBytes, err := marshalSegmentInfo(newSeg) +func (kc *Catalog) AlterSegments(ctx context.Context, newSegments []*datapb.SegmentInfo) error { + if len(newSegments) == 0 { + return nil + } + + kvs := make(map[string]string) + for _, segment := range newSegments { + segmentKvs, err := buildSegmentAndBinlogsKvs(segment) if err != nil { return err } - kvs[flushSegKey] = segBytes + maps.Copy(kvs, segmentKvs) } return kc.Txn.MultiSave(kvs) } -func (kc *Catalog) AlterSegments(ctx context.Context, modSegments []*datapb.SegmentInfo) error { - if len(modSegments) == 0 { - return nil - } - +func (kc *Catalog) AlterSegment(ctx context.Context, newSegment *datapb.SegmentInfo, oldSegment *datapb.SegmentInfo) error { kvs := make(map[string]string) - for _, segment := range modSegments { - segmentKvs, err := buildSegmentAndBinlogsKvs(segment) + segmentKvs, err := buildSegmentAndBinlogsKvs(newSegment) + if err != nil { + return err + } + maps.Copy(kvs, segmentKvs) + if newSegment.State == commonpb.SegmentState_Flushed && oldSegment.State != commonpb.SegmentState_Flushed { + flushSegKey := buildFlushedSegmentPath(newSegment.GetCollectionID(), newSegment.GetPartitionID(), newSegment.GetID()) + newSeg := &datapb.SegmentInfo{ID: newSegment.GetID()} + segBytes, err := marshalSegmentInfo(newSeg) if err != nil { return err } - - maps.Copy(kvs, segmentKvs) - // save handoff req if segment is flushed - if segment.State == commonpb.SegmentState_Flushed { - flushSegKey := buildFlushedSegmentPath(segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID()) - newSeg := &datapb.SegmentInfo{ID: segment.GetID()} - segBytes, err := marshalSegmentInfo(newSeg) - if err != nil { - return err - } - kvs[flushSegKey] = segBytes - } + kvs[flushSegKey] = segBytes } return kc.Txn.MultiSave(kvs) diff --git a/internal/metastore/kv/datacoord/kv_catalog_test.go b/internal/metastore/kv/datacoord/kv_catalog_test.go index 72bd047c3d..58ea0f524a 100644 --- a/internal/metastore/kv/datacoord/kv_catalog_test.go +++ b/internal/metastore/kv/datacoord/kv_catalog_test.go @@ -284,7 +284,7 @@ func Test_AddSegments(t *testing.T) { _, ok := savedKvs[k4] assert.False(t, ok) - assert.Equal(t, 5, len(savedKvs)) + assert.Equal(t, 4, len(savedKvs)) verifySavedKvsForSegment(t, savedKvs) }) } @@ -330,7 +330,7 @@ func Test_AlterSegments(t *testing.T) { _, ok := savedKvs[k4] assert.False(t, ok) - assert.Equal(t, 5, len(savedKvs)) + assert.Equal(t, 4, len(savedKvs)) verifySavedKvsForSegment(t, savedKvs) }) }