Add payload bytes interface. (#13467)

Signed-off-by: godchen0212 <qingxiang.chen@zilliz.com>
pull/13712/head
godchen 2021-12-16 16:35:42 +08:00 committed by GitHub
parent 24b8b6e4c7
commit 7e56f08747
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 81 additions and 84 deletions

View File

@ -1029,13 +1029,13 @@ func TestIndexFileBinlog(t *testing.T) {
key := funcutil.GenRandomStr()
timestamp := Timestamp(time.Now().UnixNano())
payload := funcutil.GenRandomStr()
payload := funcutil.GenRandomBytes()
w := NewIndexFileBinlogWriter(indexBuildID, version, collectionID, partitionID, segmentID, fieldID, indexName, indexID, key)
e, err := w.NextIndexFileEventWriter()
assert.Nil(t, err)
err = e.AddOneStringToPayload(payload)
err = e.AddByteToPayload(payload)
assert.Nil(t, err)
e.SetEventTimestamp(timestamp, timestamp)
@ -1109,7 +1109,7 @@ func TestIndexFileBinlog(t *testing.T) {
//descriptor data fix, payload type
colType := UnsafeReadInt32(buf, pos)
assert.Equal(t, schemapb.DataType(colType), schemapb.DataType_String)
assert.Equal(t, schemapb.DataType(colType), schemapb.DataType_Int8)
pos += int(unsafe.Sizeof(colType))
//descriptor data, post header lengths

View File

@ -320,7 +320,7 @@ func NewIndexFileBinlogWriter(
descriptorEvent.PartitionID = partitionID
descriptorEvent.SegmentID = segmentID
descriptorEvent.FieldID = fieldID
descriptorEvent.PayloadDataType = schemapb.DataType_String
descriptorEvent.PayloadDataType = schemapb.DataType_Int8
descriptorEvent.AddExtra("indexBuildID", fmt.Sprintf("%d", indexBuildID))
descriptorEvent.AddExtra("version", fmt.Sprintf("%d", version))
descriptorEvent.AddExtra("indexName", indexName)

View File

@ -1038,17 +1038,9 @@ func (codec *IndexFileBinlogCodec) Serialize(
return nil, err
}
length := (len(datas[pos].Value) + maxLengthPerRowOfIndexFile - 1) / maxLengthPerRowOfIndexFile
for i := 0; i < length; i++ {
start := i * maxLengthPerRowOfIndexFile
end := (i + 1) * maxLengthPerRowOfIndexFile
if end > len(datas[pos].Value) {
end = len(datas[pos].Value)
}
err = eventWriter.AddOneStringToPayload(string(datas[pos].Value[start:end]))
if err != nil {
return nil, err
}
err = eventWriter.AddByteToPayload(datas[pos].Value)
if err != nil {
return nil, err
}
eventWriter.SetEventTimestamp(ts, ts)
@ -1085,17 +1077,9 @@ func (codec *IndexFileBinlogCodec) Serialize(
}
params, _ := json.Marshal(indexParams)
length := (len(params) + maxLengthPerRowOfIndexFile - 1) / maxLengthPerRowOfIndexFile
for i := 0; i < length; i++ {
start := i * maxLengthPerRowOfIndexFile
end := (i + 1) * maxLengthPerRowOfIndexFile
if end > len(params) {
end = len(params)
}
err = eventWriter.AddOneStringToPayload(string(params[start:end]))
if err != nil {
return nil, err
}
err = eventWriter.AddByteToPayload(params)
if err != nil {
return nil, err
}
eventWriter.SetEventTimestamp(ts, ts)
@ -1195,37 +1179,23 @@ func (codec *IndexFileBinlogCodec) DeserializeImpl(blobs []*Blob) (
break
}
switch dataType {
case schemapb.DataType_String:
length, err := eventReader.GetPayloadLengthFromReader()
case schemapb.DataType_Int8:
content, err := eventReader.GetByteFromPayload()
if err != nil {
log.Warn("failed to get payload length",
log.Warn("failed to get string from payload",
zap.Error(err))
eventReader.Close()
binlogReader.Close()
return 0, 0, 0, 0, 0, 0, nil, "", 0, nil, err
}
var content []byte
for i := 0; i < length; i++ {
singleString, err := eventReader.GetOneStringFromPayload(i)
if err != nil {
log.Warn("failed to get string from payload",
zap.Error(err))
eventReader.Close()
binlogReader.Close()
return 0, 0, 0, 0, 0, 0, nil, "", 0, nil, err
}
content = append(content, []byte(singleString)...)
}
if key == IndexParamsKey {
_ = json.Unmarshal(content, &indexParams)
} else {
datas = append(datas, &Blob{
Key: key,
Value: content,
})
blob := &Blob{Key: key}
blob.Value = make([]byte, len(content))
copy(blob.Value, content)
datas = append(datas, blob)
}
}
eventReader.Close()

View File

@ -402,7 +402,7 @@ func TestIndexFileBinlogCodec(t *testing.T) {
},
{
Key: "large",
Value: []byte(funcutil.RandomString(maxLengthPerRowOfIndexFile + 1)),
Value: funcutil.RandomBytes(maxLengthPerRowOfIndexFile + 1),
},
}

View File

@ -21,6 +21,7 @@ import (
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/stretchr/testify/assert"
)
@ -1086,8 +1087,8 @@ func TestIndexFileEvent(t *testing.T) {
assert.Nil(t, err)
w.SetEventTimestamp(tsoutil.ComposeTS(10, 0), tsoutil.ComposeTS(100, 0))
payload := "payload"
err = w.AddOneStringToPayload(payload)
payload := funcutil.GenRandomBytes()
err = w.AddByteToPayload(payload)
assert.Nil(t, err)
err = w.Finish()
@ -1106,9 +1107,9 @@ func TestIndexFileEvent(t *testing.T) {
payloadOffset := binary.Size(eventHeader{}) + binary.Size(indexFileEventData{})
pBuf := wBuf[payloadOffset:]
pR, err := NewPayloadReader(schemapb.DataType_String, pBuf)
pR, err := NewPayloadReader(schemapb.DataType_Int8, pBuf)
assert.Nil(t, err)
value, err := pR.GetOneStringFromPayload(0)
value, err := pR.GetByteFromPayload()
assert.Nil(t, err)
assert.Equal(t, payload, value)
pR.Close()

View File

@ -379,7 +379,7 @@ func newDropPartitionEventWriter(dataType schemapb.DataType) (*dropPartitionEven
}
func newIndexFileEventWriter() (*indexFileEventWriter, error) {
payloadWriter, err := NewPayloadWriter(schemapb.DataType_String)
payloadWriter, err := NewPayloadWriter(schemapb.DataType_Int8)
if err != nil {
return nil, err
}

View File

@ -34,6 +34,7 @@ import (
type PayloadWriterInterface interface {
AddDataToPayload(msgs interface{}, dim ...int) error
AddBoolToPayload(msgs []bool) error
AddByteToPayload(msgs []byte) error
AddInt8ToPayload(msgs []int8) error
AddInt16ToPayload(msgs []int16) error
AddInt32ToPayload(msgs []int32) error
@ -54,6 +55,7 @@ type PayloadWriterInterface interface {
type PayloadReaderInterface interface {
GetDataFromPayload(idx ...int) (interface{}, int, error)
GetBoolFromPayload() ([]bool, error)
GetByteFromPayload() ([]byte, error)
GetInt8FromPayload() ([]int8, error)
GetInt16FromPayload() ([]int16, error)
GetInt32FromPayload() ([]int32, error)
@ -176,6 +178,18 @@ func (w *PayloadWriter) AddBoolToPayload(msgs []bool) error {
return HandleCStatus(&status, "AddBoolToPayload failed")
}
func (w *PayloadWriter) AddByteToPayload(msgs []byte) error {
length := len(msgs)
if length <= 0 {
return errors.New("can't add empty msgs into payload")
}
cMsgs := (*C.int8_t)(unsafe.Pointer(&msgs[0]))
cLength := C.int(length)
status := C.AddInt8ToPayload(w.payloadWriterPtr, cMsgs, cLength)
return HandleCStatus(&status, "AddInt8ToPayload failed")
}
func (w *PayloadWriter) AddInt8ToPayload(msgs []int8) error {
length := len(msgs)
if length <= 0 {
@ -424,6 +438,24 @@ func (r *PayloadReader) GetBoolFromPayload() ([]bool, error) {
return slice, nil
}
// GetByteFromPayload returns byte slice from payload
func (r *PayloadReader) GetByteFromPayload() ([]byte, error) {
if r.colType != schemapb.DataType_Int8 {
return nil, errors.New("incorrect data type")
}
var cMsg *C.int8_t
var cSize C.int
status := C.GetInt8FromPayload(r.payloadReaderPtr, &cMsg, &cSize)
if err := HandleCStatus(&status, "GetInt8FromPayload failed"); err != nil {
return nil, err
}
slice := (*[1 << 28]byte)(unsafe.Pointer(cMsg))[:cSize:cSize]
return slice, nil
}
// GetInt8FromPayload returns int8 slice from payload
func (r *PayloadReader) GetInt8FromPayload() ([]int8, error) {
if r.colType != schemapb.DataType_Int8 {

View File

@ -378,42 +378,26 @@ func printDDLPayloadValues(eventType EventTypeCode, colType schemapb.DataType, r
// only print slice meta and index params
func printIndexFilePayloadValues(reader PayloadReaderInterface, key string) error {
if key == IndexParamsKey {
rows, err := reader.GetPayloadLengthFromReader()
content, err := reader.GetByteFromPayload()
if err != nil {
return err
}
var content []byte
for i := 0; i < rows; i++ {
val, err := reader.GetOneStringFromPayload(i)
if err != nil {
return err
}
content = append(content, []byte(val)...)
}
fmt.Print("index params: \n")
fmt.Println(string(content))
fmt.Println(content)
return nil
}
if key == "SLICE_META" {
rows, err := reader.GetPayloadLengthFromReader()
content, err := reader.GetByteFromPayload()
if err != nil {
return err
}
var content []byte
for i := 0; i < rows; i++ {
val, err := reader.GetOneStringFromPayload(i)
if err != nil {
return err
}
content = append(content, []byte(val)...)
}
// content is a json string serialized by milvus::json,
// it's better to use milvus::json to parse the content also,
// fortunately, the json string is readable enough.
fmt.Print("index slice meta: \n")
fmt.Println(string(content))
fmt.Println(content)
return nil
}

View File

@ -23,23 +23,33 @@ func init() {
r = rand.New(rand.NewSource(time.Now().UnixNano()))
}
var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
var letterRunes = []byte("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
// RandomString returns a batch of random string
func RandomString(n int) string {
b := make([]rune, n)
// RandomBytes returns a batch of random string
func RandomBytes(n int) []byte {
b := make([]byte, n)
for i := range b {
b[i] = letterRunes[r.Intn(len(letterRunes))]
}
return string(b)
return b
}
// RandomString returns a batch of random string
func RandomString(n int) string {
return string(RandomBytes(n))
}
// GenRandomBytes generates a random bytes.
func GenRandomBytes() []byte {
l := rand.Uint64()%10 + 1
b := make([]byte, l)
if _, err := rand.Read(b); err != nil {
return nil
}
return b
}
// GenRandomStr generates a random string.
func GenRandomStr() string {
l := rand.Uint64()%10 + 1
b := make([]byte, l)
if _, err := rand.Read(b); err != nil {
return ""
}
return fmt.Sprintf("%X", b)
return fmt.Sprintf("%X", GenRandomBytes())
}