mirror of https://github.com/milvus-io/milvus.git
Close event and binlog reader (#12173)
Signed-off-by: godchen <qingxiang.chen@zilliz.com>pull/12180/head
parent
e9585625e6
commit
9d5bcd3e3a
|
@ -81,7 +81,6 @@ func TestIndexNode(t *testing.T) {
|
|||
|
||||
t.Run("CreateIndex FloatVector", func(t *testing.T) {
|
||||
var insertCodec storage.InsertCodec
|
||||
defer insertCodec.Close()
|
||||
|
||||
insertCodec.Schema = &etcdpb.CollectionMeta{
|
||||
ID: collectionID,
|
||||
|
@ -197,7 +196,6 @@ func TestIndexNode(t *testing.T) {
|
|||
})
|
||||
t.Run("CreateIndex BinaryVector", func(t *testing.T) {
|
||||
var insertCodec storage.InsertCodec
|
||||
defer insertCodec.Close()
|
||||
|
||||
insertCodec.Schema = &etcdpb.CollectionMeta{
|
||||
ID: collectionID,
|
||||
|
@ -310,7 +308,6 @@ func TestIndexNode(t *testing.T) {
|
|||
|
||||
t.Run("Create Deleted_Index", func(t *testing.T) {
|
||||
var insertCodec storage.InsertCodec
|
||||
defer insertCodec.Close()
|
||||
|
||||
insertCodec.Schema = &etcdpb.CollectionMeta{
|
||||
ID: collectionID,
|
||||
|
@ -489,7 +486,6 @@ func TestCreateIndexFailed(t *testing.T) {
|
|||
|
||||
t.Run("CreateIndex error", func(t *testing.T) {
|
||||
var insertCodec storage.InsertCodec
|
||||
defer insertCodec.Close()
|
||||
|
||||
insertCodec.Schema = &etcdpb.CollectionMeta{
|
||||
ID: collectionID,
|
||||
|
@ -608,7 +604,6 @@ func TestCreateIndexFailed(t *testing.T) {
|
|||
|
||||
t.Run("Invalid Param", func(t *testing.T) {
|
||||
var insertCodec storage.InsertCodec
|
||||
defer insertCodec.Close()
|
||||
|
||||
insertCodec.Schema = &etcdpb.CollectionMeta{
|
||||
ID: collectionID,
|
||||
|
|
|
@ -348,7 +348,6 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error {
|
|||
|
||||
storageBlobs := getStorageBlobs(blobs)
|
||||
var insertCodec storage.InsertCodec
|
||||
defer insertCodec.Close()
|
||||
collectionID, partitionID, segmentID, insertData, err2 := insertCodec.DeserializeAll(storageBlobs)
|
||||
if err2 != nil {
|
||||
return err2
|
||||
|
@ -407,7 +406,6 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_ = codec.Close()
|
||||
tr.Record("serialize index codec done")
|
||||
|
||||
getSavePathByKey := func(key string) string {
|
||||
|
|
|
@ -108,7 +108,6 @@ func (loader *indexLoader) getIndexBinlog(indexPath []string) ([][]byte, indexPa
|
|||
var indexParams indexParam
|
||||
var indexName string
|
||||
indexCodec := storage.NewIndexFileBinlogCodec()
|
||||
defer indexCodec.Close()
|
||||
for _, p := range indexPath {
|
||||
log.Debug("", zap.String("load path", fmt.Sprintln(indexPath)))
|
||||
indexPiece, err := loader.kv.Load(p)
|
||||
|
|
|
@ -225,12 +225,6 @@ func (loader *segmentLoader) filterFieldBinlogs(fieldBinlogs []*datapb.FieldBinl
|
|||
|
||||
func (loader *segmentLoader) loadSegmentFieldsData(segment *Segment, fieldBinlogs []*datapb.FieldBinlog, segmentType segmentType) error {
|
||||
iCodec := storage.InsertCodec{}
|
||||
defer func() {
|
||||
err := iCodec.Close()
|
||||
if err != nil {
|
||||
log.Warn(err.Error())
|
||||
}
|
||||
}()
|
||||
blobs := make([]*storage.Blob, 0)
|
||||
for _, fb := range fieldBinlogs {
|
||||
log.Debug("load segment fields data",
|
||||
|
|
|
@ -58,7 +58,6 @@ func NewInsertBinlogIterator(blobs []*Blob, PKfieldID UniqueID) (*InsertBinlogIt
|
|||
reader := NewInsertCodec(nil)
|
||||
|
||||
_, _, serData, err := reader.Deserialize(blobs)
|
||||
defer reader.Close()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
@ -28,7 +28,6 @@ func generateTestData(t *testing.T, num int) []*Blob {
|
|||
{FieldID: 101, Name: "int32", DataType: schemapb.DataType_Int32},
|
||||
}}
|
||||
insertCodec := NewInsertCodec(&etcdpb.CollectionMeta{ID: 1, Schema: schema})
|
||||
defer insertCodec.Close()
|
||||
|
||||
data := &InsertData{Data: map[FieldID]FieldData{rootcoord.TimeStampField: &Int64FieldData{Data: []int64{}}, rootcoord.RowIDField: &Int64FieldData{Data: []int64{}}, 101: &Int32FieldData{Data: []int32{}}}}
|
||||
for i := 1; i <= num; i++ {
|
||||
|
|
|
@ -268,8 +268,7 @@ type InsertData struct {
|
|||
// Blob key example:
|
||||
// ${tenant}/insert_log/${collection_id}/${partition_id}/${segment_id}/${field_id}/${log_idx}
|
||||
type InsertCodec struct {
|
||||
Schema *etcdpb.CollectionMeta
|
||||
readerCloseFunc []func() error
|
||||
Schema *etcdpb.CollectionMeta
|
||||
}
|
||||
|
||||
func NewInsertCodec(schema *etcdpb.CollectionMeta) *InsertCodec {
|
||||
|
@ -397,9 +396,6 @@ func (insertCodec *InsertCodec) DeserializeAll(blobs []*Blob) (
|
|||
if len(blobs) == 0 {
|
||||
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, nil, fmt.Errorf("blobs is empty")
|
||||
}
|
||||
readerClose := func(reader *BinlogReader) func() error {
|
||||
return func() error { return reader.Close() }
|
||||
}
|
||||
|
||||
var blobList BlobList = blobs
|
||||
sort.Sort(blobList)
|
||||
|
@ -607,6 +603,10 @@ func (insertCodec *InsertCodec) DeserializeAll(blobs []*Blob) (
|
|||
default:
|
||||
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, nil, fmt.Errorf("undefined data type %d", dataType)
|
||||
}
|
||||
err = eventReader.Close()
|
||||
if err != nil {
|
||||
log.Warn("event reader close failed", zap.Error(err))
|
||||
}
|
||||
}
|
||||
if fieldID == rootcoord.TimeStampField {
|
||||
blobInfo := BlobInfo{
|
||||
|
@ -614,7 +614,10 @@ func (insertCodec *InsertCodec) DeserializeAll(blobs []*Blob) (
|
|||
}
|
||||
resultData.Infos = append(resultData.Infos, blobInfo)
|
||||
}
|
||||
insertCodec.readerCloseFunc = append(insertCodec.readerCloseFunc, readerClose(binlogReader))
|
||||
err = binlogReader.Close()
|
||||
if err != nil {
|
||||
log.Warn("event reader close failed", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
return cID, pID, sID, resultData, nil
|
||||
|
@ -629,16 +632,6 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
|
|||
return partitionID, segmentID, data, err
|
||||
}
|
||||
|
||||
func (insertCodec *InsertCodec) Close() error {
|
||||
for _, closeFunc := range insertCodec.readerCloseFunc {
|
||||
err := closeFunc()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteData saves each entity delete message represented as <primarykey,timestamp> map.
|
||||
// timestamp represents the time when this instance was deleted
|
||||
type DeleteData struct {
|
||||
|
@ -656,7 +649,6 @@ func (data *DeleteData) Append(pk UniqueID, ts Timestamp) {
|
|||
|
||||
// DeleteCodec serializes and deserializes the delete data
|
||||
type DeleteCodec struct {
|
||||
readerCloseFunc []func() error
|
||||
}
|
||||
|
||||
// NewDeleteCodec returns a DeleteCodec
|
||||
|
@ -724,9 +716,6 @@ func (deleteCodec *DeleteCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
|
|||
if len(blobs) == 0 {
|
||||
return InvalidUniqueID, InvalidUniqueID, nil, fmt.Errorf("blobs is empty")
|
||||
}
|
||||
readerClose := func(reader *BinlogReader) func() error {
|
||||
return func() error { return reader.Close() }
|
||||
}
|
||||
|
||||
var pid, sid UniqueID
|
||||
result := &DeleteData{}
|
||||
|
@ -771,8 +760,14 @@ func (deleteCodec *DeleteCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
|
|||
result.Pks = append(result.Pks, pk)
|
||||
result.Tss = append(result.Tss, ts)
|
||||
}
|
||||
|
||||
deleteCodec.readerCloseFunc = append(deleteCodec.readerCloseFunc, readerClose(binlogReader))
|
||||
err = eventReader.Close()
|
||||
if err != nil {
|
||||
log.Warn("event reader close failed", zap.Error(err))
|
||||
}
|
||||
err = binlogReader.Close()
|
||||
if err != nil {
|
||||
log.Warn("event reader close failed", zap.Error(err))
|
||||
}
|
||||
|
||||
}
|
||||
result.RowCount = int64(len(result.Pks))
|
||||
|
@ -784,8 +779,7 @@ func (deleteCodec *DeleteCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
|
|||
// ${tenant}/data_definition_log/${collection_id}/ts/${log_idx}
|
||||
// ${tenant}/data_definition_log/${collection_id}/ddl/${log_idx}
|
||||
type DataDefinitionCodec struct {
|
||||
collectionID int64
|
||||
readerCloseFunc []func() error
|
||||
collectionID int64
|
||||
}
|
||||
|
||||
func NewDataDefinitionCodec(collectionID int64) *DataDefinitionCodec {
|
||||
|
@ -912,9 +906,6 @@ func (dataDefinitionCodec *DataDefinitionCodec) Deserialize(blobs []*Blob) (ts [
|
|||
if len(blobs) == 0 {
|
||||
return nil, nil, fmt.Errorf("blobs is empty")
|
||||
}
|
||||
readerClose := func(reader *BinlogReader) func() error {
|
||||
return func() error { return reader.Close() }
|
||||
}
|
||||
var requestsStrings []string
|
||||
var resultTs []Timestamp
|
||||
|
||||
|
@ -958,32 +949,26 @@ func (dataDefinitionCodec *DataDefinitionCodec) Deserialize(blobs []*Blob) (ts [
|
|||
requestsStrings = append(requestsStrings, singleString)
|
||||
}
|
||||
}
|
||||
err = eventReader.Close()
|
||||
if err != nil {
|
||||
log.Warn("event reader close failed", zap.Error(err))
|
||||
}
|
||||
}
|
||||
err = binlogReader.Close()
|
||||
if err != nil {
|
||||
log.Warn("event reader close failed", zap.Error(err))
|
||||
}
|
||||
|
||||
dataDefinitionCodec.readerCloseFunc = append(dataDefinitionCodec.readerCloseFunc, readerClose(binlogReader))
|
||||
}
|
||||
|
||||
return resultTs, requestsStrings, nil
|
||||
}
|
||||
|
||||
func (dataDefinitionCodec *DataDefinitionCodec) Close() error {
|
||||
for _, closeFunc := range dataDefinitionCodec.readerCloseFunc {
|
||||
err := closeFunc()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type IndexFileBinlogCodec struct {
|
||||
readerCloseFuncs []func() error
|
||||
}
|
||||
|
||||
func NewIndexFileBinlogCodec() *IndexFileBinlogCodec {
|
||||
return &IndexFileBinlogCodec{
|
||||
readerCloseFuncs: make([]func() error, 0),
|
||||
}
|
||||
return &IndexFileBinlogCodec{}
|
||||
}
|
||||
|
||||
func (codec *IndexFileBinlogCodec) Serialize(
|
||||
|
@ -1118,10 +1103,6 @@ func (codec *IndexFileBinlogCodec) DeserializeImpl(blobs []*Blob) (
|
|||
if len(blobs) == 0 {
|
||||
return 0, 0, 0, 0, 0, 0, nil, "", 0, nil, errors.New("blobs is empty")
|
||||
}
|
||||
readerClose := func(reader *BinlogReader) func() error {
|
||||
return func() error { return reader.Close() }
|
||||
}
|
||||
|
||||
indexParams = make(map[string]string)
|
||||
datas = make([]*Blob, 0)
|
||||
|
||||
|
@ -1205,9 +1186,16 @@ func (codec *IndexFileBinlogCodec) DeserializeImpl(blobs []*Blob) (
|
|||
})
|
||||
}
|
||||
}
|
||||
err = eventReader.Close()
|
||||
if err != nil {
|
||||
log.Warn("event reader close failed", zap.Error(err))
|
||||
}
|
||||
}
|
||||
err = binlogReader.Close()
|
||||
if err != nil {
|
||||
log.Warn("event reader close failed", zap.Error(err))
|
||||
}
|
||||
|
||||
codec.readerCloseFuncs = append(codec.readerCloseFuncs, readerClose(binlogReader))
|
||||
}
|
||||
|
||||
return indexBuildID, version, collectionID, partitionID, segmentID, fieldID, indexParams, indexName, indexID, datas, nil
|
||||
|
@ -1224,16 +1212,6 @@ func (codec *IndexFileBinlogCodec) Deserialize(blobs []*Blob) (
|
|||
return datas, indexParams, indexName, indexID, err
|
||||
}
|
||||
|
||||
func (codec *IndexFileBinlogCodec) Close() error {
|
||||
for _, closeFunc := range codec.readerCloseFuncs {
|
||||
err := closeFunc()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type IndexCodec struct {
|
||||
}
|
||||
|
||||
|
|
|
@ -293,7 +293,6 @@ func TestInsertCodec(t *testing.T) {
|
|||
assert.Equal(t, []string{"1", "2", "3", "4"}, resultData.Data[StringField].(*StringFieldData).Data)
|
||||
assert.Equal(t, []byte{0, 255, 0, 255}, resultData.Data[BinaryVectorField].(*BinaryVectorFieldData).Data)
|
||||
assert.Equal(t, []float32{0, 1, 2, 3, 0, 1, 2, 3, 4, 5, 6, 7, 4, 5, 6, 7}, resultData.Data[FloatVectorField].(*FloatVectorFieldData).Data)
|
||||
assert.Nil(t, insertCodec.Close())
|
||||
log.Debug("Data", zap.Any("Data", resultData.Data))
|
||||
log.Debug("Infos", zap.Any("Infos", resultData.Infos))
|
||||
|
||||
|
@ -353,7 +352,6 @@ func TestDDCodec(t *testing.T) {
|
|||
assert.Nil(t, err)
|
||||
assert.Equal(t, resultTs, ts)
|
||||
assert.Equal(t, resultRequests, ddRequests)
|
||||
assert.Nil(t, dataDefinitionCodec.Close())
|
||||
|
||||
blobs = []*Blob{}
|
||||
_, _, err = dataDefinitionCodec.Deserialize(blobs)
|
||||
|
@ -416,9 +414,6 @@ func TestIndexFileBinlogCodec(t *testing.T) {
|
|||
assert.Equal(t, indexName, idxName)
|
||||
assert.Equal(t, indexID, idxID)
|
||||
|
||||
err = codec.Close()
|
||||
assert.Nil(t, err)
|
||||
|
||||
// empty
|
||||
_, _, _, _, _, _, _, _, _, _, err = codec.DeserializeImpl(nil)
|
||||
assert.NotNil(t, err)
|
||||
|
|
|
@ -425,7 +425,6 @@ func TestPrintDDFiles(t *testing.T) {
|
|||
assert.Nil(t, err)
|
||||
assert.Equal(t, resultTs, ts)
|
||||
assert.Equal(t, resultRequests, ddRequests)
|
||||
assert.Nil(t, dataDefinitionCodec.Close())
|
||||
|
||||
PrintBinlogFiles(binlogFiles)
|
||||
}
|
||||
|
|
|
@ -119,7 +119,6 @@ func TestGetBinlogSize(t *testing.T) {
|
|||
}
|
||||
|
||||
codec := NewIndexFileBinlogCodec()
|
||||
defer codec.Close()
|
||||
|
||||
serializedBlobs, err := codec.Serialize(indexBuildID, version, collectionID, partitionID, segmentID, fieldID, indexParams, indexName, indexID, datas)
|
||||
assert.Nil(t, err)
|
||||
|
@ -194,7 +193,6 @@ func TestEstimateMemorySize(t *testing.T) {
|
|||
}
|
||||
|
||||
codec := NewIndexFileBinlogCodec()
|
||||
defer codec.Close()
|
||||
|
||||
serializedBlobs, err := codec.Serialize(indexBuildID, version, collectionID, partitionID, segmentID, fieldID, indexParams, indexName, indexID, datas)
|
||||
assert.Nil(t, err)
|
||||
|
|
|
@ -63,7 +63,6 @@ func (vcm *VectorChunkManager) downloadVectorFile(key string) ([]byte, error) {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer insertCodec.Close()
|
||||
|
||||
var results []byte
|
||||
for _, singleData := range data.Data {
|
||||
|
|
Loading…
Reference in New Issue