Binlog parser tool supports index file (#8914)

Signed-off-by: dragondriver <jiquan.long@zilliz.com>
pull/8948/head
dragondriver 2021-09-30 15:47:32 +08:00 committed by GitHub
parent 24dc03aad6
commit ca5e9ebe55
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 127 additions and 0 deletions

View File

@ -12,6 +12,7 @@
package storage
import (
"encoding/json"
"errors"
"fmt"
"os"
@ -175,6 +176,30 @@ func printBinlogFile(filename string) error {
if err := printDDLPayloadValues(event.eventHeader.TypeCode, r.descriptorEvent.descriptorEventData.PayloadDataType, event.PayloadReaderInterface); err != nil {
return err
}
case IndexFileEventType:
desc := r.descriptorEvent
extraBytes := desc.ExtraBytes
extra := make(map[string]interface{})
err = json.Unmarshal(extraBytes, &extra)
if err != nil {
return fmt.Errorf("failed to unmarshal extra: %s", err.Error())
}
fmt.Printf("indexBuildID: %v\n", extra["indexBuildID"])
fmt.Printf("indexName: %v\n", extra["indexName"])
fmt.Printf("indexID: %v\n", extra["indexID"])
evd, ok := event.eventData.(*indexFileEventData)
if !ok {
return errors.New("incorrect event data type")
}
fmt.Printf("index file event num: %d\n", eventNum)
physical, _ = tsoutil.ParseTS(evd.StartTimestamp)
fmt.Printf("\tStartTimestamp: %v\n", physical)
physical, _ = tsoutil.ParseTS(evd.EndTimestamp)
fmt.Printf("\tEndTimestamp: %v\n", physical)
key := fmt.Sprintf("%v", extra["key"])
if err := printIndexFilePayloadValues(event.PayloadReaderInterface, key); err != nil {
return err
}
default:
return fmt.Errorf("undefined event typd %d", event.eventHeader.TypeCode)
}
@ -347,3 +372,49 @@ func printDDLPayloadValues(eventType EventTypeCode, colType schemapb.DataType, r
}
return nil
}
// only print slice meta and index params
func printIndexFilePayloadValues(reader PayloadReaderInterface, key string) error {
if key == IndexParamsFile {
rows, err := reader.GetPayloadLengthFromReader()
if err != nil {
return err
}
var content []byte
for i := 0; i < rows; i++ {
val, err := reader.GetOneStringFromPayload(i)
if err != nil {
return err
}
content = append(content, []byte(val)...)
}
fmt.Print("index params: \n")
fmt.Println(string(content))
return nil
}
if key == "SLICE_META" {
rows, err := reader.GetPayloadLengthFromReader()
if err != nil {
return err
}
var content []byte
for i := 0; i < rows; i++ {
val, err := reader.GetOneStringFromPayload(i)
if err != nil {
return err
}
content = append(content, []byte(val)...)
}
// content is a json string serialized by milvus::json,
// it's better to use milvus::json to parse the content also,
// fortunately, the json string is readable enough.
fmt.Print("index slice meta: \n")
fmt.Println(string(content))
return nil
}
return nil
}

View File

@ -18,6 +18,9 @@ import (
"testing"
"time"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/uniquegenerator"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
@ -424,3 +427,56 @@ func TestPrintDDFiles(t *testing.T) {
PrintBinlogFiles(binlogFiles)
}
func TestPrintIndexFile(t *testing.T) {
indexBuildID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
version := int64(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
collectionID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
partitionID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
segmentID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
fieldID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
indexName := funcutil.GenRandomStr()
indexID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
indexParams := make(map[string]string)
indexParams["index_type"] = "IVF_FLAT"
datas := []*Blob{
{
Key: "ivf1",
Value: []byte{1, 2, 3},
},
{
Key: "ivf2",
Value: []byte{4, 5, 6},
},
{
Key: "SLICE_META",
Value: []byte(`"{"meta":[{"name":"IVF","slice_num":5,"total_len":20047555},{"name":"RAW_DATA","slice_num":20,"total_len":80025824}]}"`),
},
}
codec := NewIndexFileBinlogCodec()
serializedBlobs, err := codec.Serialize(indexBuildID, version, collectionID, partitionID, segmentID, fieldID, indexParams, indexName, indexID, datas)
assert.Nil(t, err)
var binlogFiles []string
for index, blob := range serializedBlobs {
fileName := fmt.Sprintf("/tmp/index_blob_%d.binlog", index)
binlogFiles = append(binlogFiles, fileName)
fd, err := os.Create(fileName)
assert.Nil(t, err)
num, err := fd.Write(blob.GetValue())
assert.Nil(t, err)
assert.Equal(t, num, len(blob.GetValue()))
err = fd.Close()
assert.Nil(t, err)
}
err = PrintBinlogFiles(binlogFiles)
assert.Nil(t, err)
// remove tmp files
for _, file := range binlogFiles {
_ = os.RemoveAll(file)
}
}