Fix writenode bug

Signed-off-by: XuanYang-cn <xuan.yang@zilliz.com>
pull/4973/head^2
XuanYang-cn 2020-12-29 21:15:00 +08:00 committed by yefu.chen
parent 1abc69277b
commit 3a2e2a04b6
7 changed files with 407 additions and 399 deletions

1
.gitignore vendored
View File

@ -60,3 +60,4 @@ cwrapper_build
**/compile_commands.json
**/.lint
typescript
**/.pytest_cache/

View File

@ -36,5 +36,5 @@ writeNode:
flush:
# max buffer size to flush
insertBufSize: 20
insertBufSize: 500
ddBufSize: 20

View File

@ -2,7 +2,6 @@ package allocator
import (
"context"
"fmt"
"log"
"time"
@ -46,7 +45,6 @@ func NewIDAllocator(ctx context.Context, masterAddr string) (*IDAllocator, error
}
func (ia *IDAllocator) syncID() bool {
fmt.Println("syncID")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
req := &internalpb.IDRequest{
PeerID: ia.PeerID,

View File

@ -163,7 +163,6 @@ func (ttBarrier *hardTimeTickBarrier) Start() error {
// Suppose ttmsg.Timestamp from stream is always larger than the previous one,
// that `ttmsg.Timestamp > oldT`
ttmsg := timetickmsg.(*ms.TimeTickMsg)
log.Printf("[hardTimeTickBarrier] peer(%d)=%d\n", ttmsg.PeerID, ttmsg.Timestamp)
oldT, ok := ttBarrier.peer2Tt[ttmsg.PeerID]
if !ok {

View File

@ -4,6 +4,7 @@ import (
"context"
"errors"
"log"
"path"
"sort"
"strconv"
@ -148,14 +149,14 @@ func (ddNode *ddNode) Operate(in []*Msg) []*Msg {
// Blob key example:
// ${tenant}/data_definition_log/${collection_id}/ts/${log_idx}
// ${tenant}/data_definition_log/${collection_id}/ddl/${log_idx}
keyCommon := Params.DdLogRootPath + strconv.FormatInt(collectionID, 10) + "/"
keyCommon := path.Join(Params.DdLogRootPath, strconv.FormatInt(collectionID, 10))
// save ts binlog
timestampLogIdx, err := ddNode.idAllocator.AllocOne()
if err != nil {
log.Println(err)
}
timestampKey := keyCommon + binLogs[0].GetKey() + "/" + strconv.FormatInt(timestampLogIdx, 10)
timestampKey := path.Join(keyCommon, binLogs[0].GetKey(), strconv.FormatInt(timestampLogIdx, 10))
err = ddNode.kv.Save(timestampKey, string(binLogs[0].GetValue()))
if err != nil {
log.Println(err)
@ -167,7 +168,7 @@ func (ddNode *ddNode) Operate(in []*Msg) []*Msg {
if err != nil {
log.Println(err)
}
ddKey := keyCommon + binLogs[1].GetKey() + "/" + strconv.FormatInt(ddLogIdx, 10)
ddKey := path.Join(keyCommon, binLogs[1].GetKey(), strconv.FormatInt(ddLogIdx, 10))
err = ddNode.kv.Save(ddKey, string(binLogs[1].GetValue()))
if err != nil {
log.Println(err)

View File

@ -64,12 +64,12 @@ func (ib *insertBuffer) size(segmentID UniqueID) int {
for _, data := range idata.Data {
fdata, ok := data.(*storage.FloatVectorFieldData)
if ok && fdata.NumRows > maxSize {
maxSize = len(fdata.Data)
maxSize = fdata.NumRows
}
bdata, ok := data.(*storage.BinaryVectorFieldData)
if ok && bdata.NumRows > maxSize {
maxSize = len(bdata.Data)
maxSize = bdata.NumRows
}
}
@ -97,424 +97,433 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
log.Println("Error: type assertion failed for insertMsg")
// TODO: add error handling
}
for _, task := range iMsg.insertMessages {
if len(task.RowIDs) != len(task.Timestamps) || len(task.RowIDs) != len(task.RowData) {
// iMsg is insertMsg
// 1. iMsg -> buffer
for _, msg := range iMsg.insertMessages {
if len(msg.RowIDs) != len(msg.Timestamps) || len(msg.RowIDs) != len(msg.RowData) {
log.Println("Error: misaligned messages detected")
continue
}
currentSegID := msg.GetSegmentID()
// iMsg is insertMsg
// 1. iMsg -> buffer
for _, msg := range iMsg.insertMessages {
currentSegID := msg.GetSegmentID()
idata, ok := ibNode.insertBuffer.insertData[currentSegID]
if !ok {
idata = &InsertData{
Data: make(map[UniqueID]storage.FieldData),
}
}
idata, ok := ibNode.insertBuffer.insertData[currentSegID]
if !ok {
idata = &InsertData{
Data: make(map[UniqueID]storage.FieldData),
// Timestamps
_, ok = idata.Data[1].(*storage.Int64FieldData)
if !ok {
idata.Data[1] = &storage.Int64FieldData{
Data: []int64{},
NumRows: 0,
}
}
tsData := idata.Data[1].(*storage.Int64FieldData)
for _, ts := range msg.Timestamps {
tsData.Data = append(tsData.Data, int64(ts))
}
tsData.NumRows += len(msg.Timestamps)
// 1.1 Get CollectionMeta from etcd
segMeta, collMeta, err := ibNode.getMeta(currentSegID)
if err != nil {
// GOOSE TODO add error handler
log.Println("Get meta wrong")
}
// 1.2 Get Fields
var pos = 0 // Record position of blob
for _, field := range collMeta.Schema.Fields {
switch field.DataType {
case schemapb.DataType_VECTOR_FLOAT:
var dim int
for _, t := range field.TypeParams {
if t.Key == "dim" {
dim, err = strconv.Atoi(t.Value)
if err != nil {
log.Println("strconv wrong")
}
break
}
}
}
// Timestamps
_, ok = idata.Data[1].(*storage.Int64FieldData)
if !ok {
idata.Data[1] = &storage.Int64FieldData{
Data: []int64{},
NumRows: 0,
if dim <= 0 {
log.Println("invalid dim")
// TODO: add error handling
}
}
tsData := idata.Data[1].(*storage.Int64FieldData)
for _, ts := range msg.Timestamps {
tsData.Data = append(tsData.Data, int64(ts))
}
tsData.NumRows += len(msg.Timestamps)
// 1.1 Get CollectionMeta from etcd
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.FloatVectorFieldData{
NumRows: 0,
Data: make([]float32, 0),
Dim: dim,
}
}
fieldData := idata.Data[field.FieldID].(*storage.FloatVectorFieldData)
for _, blob := range msg.RowData {
for j := 0; j < dim; j++ {
v := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:])
fieldData.Data = append(fieldData.Data, math.Float32frombits(v))
pos++
}
}
fieldData.NumRows += len(msg.RowIDs)
// log.Println(".Float vector data:\n",
// "..NumRows:",
// idata.Data[field.FieldID].(*storage.FloatVectorFieldData).NumRows,
// "..Dim:",
// idata.Data[field.FieldID].(*storage.FloatVectorFieldData).Dim)
case schemapb.DataType_VECTOR_BINARY:
var dim int
for _, t := range field.TypeParams {
if t.Key == "dim" {
dim, err = strconv.Atoi(t.Value)
if err != nil {
log.Println("strconv wrong")
}
break
}
}
if dim <= 0 {
log.Println("invalid dim")
// TODO: add error handling
}
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.BinaryVectorFieldData{
NumRows: 0,
Data: make([]byte, 0),
Dim: dim,
}
}
fieldData := idata.Data[field.FieldID].(*storage.BinaryVectorFieldData)
for _, blob := range msg.RowData {
for d := 0; d < dim/8; d++ {
v := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:])
fieldData.Data = append(fieldData.Data, byte(v))
pos++
}
}
fieldData.NumRows += len(msg.RowData)
// log.Println(
// ".Binary vector data:\n",
// "..NumRows:",
// idata.Data[field.FieldID].(*storage.BinaryVectorFieldData).NumRows,
// "..Dim:",
// idata.Data[field.FieldID].(*storage.BinaryVectorFieldData).Dim)
case schemapb.DataType_BOOL:
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.BoolFieldData{
NumRows: 0,
Data: make([]bool, 0),
}
}
fieldData := idata.Data[field.FieldID].(*storage.BoolFieldData)
for _, blob := range msg.RowData {
boolInt := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:])
if boolInt == 1 {
fieldData.Data = append(fieldData.Data, true)
} else {
fieldData.Data = append(fieldData.Data, false)
}
pos++
}
fieldData.NumRows += len(msg.RowIDs)
// log.Println("Bool data:",
// idata.Data[field.FieldID].(*storage.BoolFieldData).Data)
case schemapb.DataType_INT8:
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.Int8FieldData{
NumRows: 0,
Data: make([]int8, 0),
}
}
fieldData := idata.Data[field.FieldID].(*storage.Int8FieldData)
for _, blob := range msg.RowData {
v := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:])
fieldData.Data = append(fieldData.Data, int8(v))
pos++
}
fieldData.NumRows += len(msg.RowIDs)
// log.Println("Int8 data:",
// idata.Data[field.FieldID].(*storage.Int8FieldData).Data)
case schemapb.DataType_INT16:
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.Int16FieldData{
NumRows: 0,
Data: make([]int16, 0),
}
}
fieldData := idata.Data[field.FieldID].(*storage.Int16FieldData)
for _, blob := range msg.RowData {
v := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:])
fieldData.Data = append(fieldData.Data, int16(v))
pos++
}
fieldData.NumRows += len(msg.RowIDs)
// log.Println("Int16 data:",
// idata.Data[field.FieldID].(*storage.Int16FieldData).Data)
case schemapb.DataType_INT32:
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.Int32FieldData{
NumRows: 0,
Data: make([]int32, 0),
}
}
fieldData := idata.Data[field.FieldID].(*storage.Int32FieldData)
for _, blob := range msg.RowData {
v := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:])
fieldData.Data = append(fieldData.Data, int32(v))
pos++
}
fieldData.NumRows += len(msg.RowIDs)
// log.Println("Int32 data:",
// idata.Data[field.FieldID].(*storage.Int32FieldData).Data)
case schemapb.DataType_INT64:
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.Int64FieldData{
NumRows: 0,
Data: make([]int64, 0),
}
}
fieldData := idata.Data[field.FieldID].(*storage.Int64FieldData)
for _, blob := range msg.RowData {
v := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:])
fieldData.Data = append(fieldData.Data, int64(v))
pos++
}
fieldData.NumRows += len(msg.RowIDs)
// log.Println("Int64 data:",
// idata.Data[field.FieldID].(*storage.Int64FieldData).Data)
case schemapb.DataType_FLOAT:
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.FloatFieldData{
NumRows: 0,
Data: make([]float32, 0),
}
}
fieldData := idata.Data[field.FieldID].(*storage.FloatFieldData)
for _, blob := range msg.RowData {
v := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:])
fieldData.Data = append(fieldData.Data, math.Float32frombits(v))
pos++
}
fieldData.NumRows += len(msg.RowIDs)
// log.Println("Float32 data:",
// idata.Data[field.FieldID].(*storage.FloatFieldData).Data)
case schemapb.DataType_DOUBLE:
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.DoubleFieldData{
NumRows: 0,
Data: make([]float64, 0),
}
}
fieldData := idata.Data[field.FieldID].(*storage.DoubleFieldData)
for _, blob := range msg.RowData {
v := binary.LittleEndian.Uint64(blob.GetValue()[pos*4:])
fieldData.Data = append(fieldData.Data, math.Float64frombits(v))
pos++
}
fieldData.NumRows += len(msg.RowIDs)
// log.Println("Float64 data:",
// idata.Data[field.FieldID].(*storage.DoubleFieldData).Data)
}
}
// 1.3 store in buffer
ibNode.insertBuffer.insertData[currentSegID] = idata
// 1.4 if full
// 1.4.1 generate binlogs
if ibNode.insertBuffer.full(currentSegID) {
log.Printf(". Insert Buffer full, auto flushing (%v) rows of data...", ibNode.insertBuffer.size(currentSegID))
// partitionTag -> partitionID
partitionTag := msg.GetPartitionTag()
partitionID, err := typeutil.Hash32String(partitionTag)
if err != nil {
log.Println("partitionTag to partitionID wrong")
// TODO GOOSE add error handler
}
inCodec := storage.NewInsertCodec(collMeta)
// buffer data to binlogs
binLogs, err := inCodec.Serialize(partitionID,
currentSegID, ibNode.insertBuffer.insertData[currentSegID])
if err != nil {
log.Println("generate binlog wrong")
}
// clear buffer
delete(ibNode.insertBuffer.insertData, currentSegID)
log.Println(".. Clearing buffer")
// 1.5.2 binLogs -> minIO/S3
collIDStr := strconv.FormatInt(segMeta.GetCollectionID(), 10)
partitionIDStr := strconv.FormatInt(partitionID, 10)
segIDStr := strconv.FormatInt(currentSegID, 10)
keyPrefix := path.Join(ibNode.minioPrifex, collIDStr, partitionIDStr, segIDStr)
log.Printf(".. Saving (%v) binlogs to MinIO ...", len(binLogs))
for index, blob := range binLogs {
uid, err := ibNode.idAllocator.AllocOne()
if err != nil {
log.Println("Allocate Id failed")
// GOOSE TODO error handler
}
key := path.Join(keyPrefix, blob.Key, strconv.FormatInt(uid, 10))
err = ibNode.minIOKV.Save(key, string(blob.Value[:]))
if err != nil {
log.Println("Save to MinIO failed")
// GOOSE TODO error handler
}
fieldID, err := strconv.ParseInt(blob.Key, 10, 32)
if err != nil {
log.Println("string to fieldID wrong")
// GOOSE TODO error handler
}
inBinlogMsg := &insertFlushSyncMsg{
flushCompleted: false,
insertBinlogPathMsg: insertBinlogPathMsg{
ts: iMsg.timeRange.timestampMax,
segID: currentSegID,
fieldID: fieldID,
paths: []string{key},
},
}
log.Println("... Appending binlog paths ...", index)
ibNode.outCh <- inBinlogMsg
}
}
}
if len(iMsg.insertMessages) > 0 {
log.Println("---insert buffer status---")
var stopSign int = 0
for k := range ibNode.insertBuffer.insertData {
if stopSign >= 10 {
break
}
log.Printf("seg(%v) buffer size = (%v)", k, ibNode.insertBuffer.size(k))
stopSign++
}
}
// iMsg is Flush() msg from master
// 1. insertBuffer(not empty) -> binLogs -> minIO/S3
for _, msg := range iMsg.flushMessages {
currentSegID := msg.GetSegmentID()
flushTs := msg.GetTimestamp()
log.Printf(". Receiving flush message segID(%v)...", currentSegID)
if ibNode.insertBuffer.size(currentSegID) > 0 {
log.Println(".. Buffer not empty, flushing ...")
segMeta, collMeta, err := ibNode.getMeta(currentSegID)
if err != nil {
// GOOSE TODO add error handler
log.Println("Get meta wrong")
}
inCodec := storage.NewInsertCodec(collMeta)
// 1.2 Get Fields
var pos = 0 // Record position of blob
for _, field := range collMeta.Schema.Fields {
switch field.DataType {
case schemapb.DataType_VECTOR_FLOAT:
var dim int
for _, t := range field.TypeParams {
if t.Key == "dim" {
dim, err = strconv.Atoi(t.Value)
if err != nil {
log.Println("strconv wrong")
}
break
}
}
if dim <= 0 {
log.Println("invalid dim")
// TODO: add error handling
}
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.FloatVectorFieldData{
NumRows: 0,
Data: make([]float32, 0),
Dim: dim,
}
}
fieldData := idata.Data[field.FieldID].(*storage.FloatVectorFieldData)
for _, blob := range msg.RowData {
for j := 0; j < dim; j++ {
v := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:])
fieldData.Data = append(fieldData.Data, math.Float32frombits(v))
pos++
}
}
fieldData.NumRows += len(msg.RowIDs)
log.Println("Float vector data:",
idata.Data[field.FieldID].(*storage.FloatVectorFieldData).Data,
"NumRows:",
idata.Data[field.FieldID].(*storage.FloatVectorFieldData).NumRows,
"Dim:",
idata.Data[field.FieldID].(*storage.FloatVectorFieldData).Dim)
case schemapb.DataType_VECTOR_BINARY:
var dim int
for _, t := range field.TypeParams {
if t.Key == "dim" {
dim, err = strconv.Atoi(t.Value)
if err != nil {
log.Println("strconv wrong")
}
break
}
}
if dim <= 0 {
log.Println("invalid dim")
// TODO: add error handling
}
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.BinaryVectorFieldData{
NumRows: 0,
Data: make([]byte, 0),
Dim: dim,
}
}
fieldData := idata.Data[field.FieldID].(*storage.BinaryVectorFieldData)
for _, blob := range msg.RowData {
for d := 0; d < dim/8; d++ {
v := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:])
fieldData.Data = append(fieldData.Data, byte(v))
pos++
}
}
fieldData.NumRows += len(msg.RowData)
log.Println(
"Binary vector data:",
idata.Data[field.FieldID].(*storage.BinaryVectorFieldData).Data,
"NumRows:",
idata.Data[field.FieldID].(*storage.BinaryVectorFieldData).NumRows,
"Dim:",
idata.Data[field.FieldID].(*storage.BinaryVectorFieldData).Dim)
case schemapb.DataType_BOOL:
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.BoolFieldData{
NumRows: 0,
Data: make([]bool, 0),
}
}
fieldData := idata.Data[field.FieldID].(*storage.BoolFieldData)
for _, blob := range msg.RowData {
boolInt := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:])
if boolInt == 1 {
fieldData.Data = append(fieldData.Data, true)
} else {
fieldData.Data = append(fieldData.Data, false)
}
pos++
}
fieldData.NumRows += len(msg.RowIDs)
log.Println("Bool data:",
idata.Data[field.FieldID].(*storage.BoolFieldData).Data)
case schemapb.DataType_INT8:
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.Int8FieldData{
NumRows: 0,
Data: make([]int8, 0),
}
}
fieldData := idata.Data[field.FieldID].(*storage.Int8FieldData)
for _, blob := range msg.RowData {
v := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:])
fieldData.Data = append(fieldData.Data, int8(v))
pos++
}
fieldData.NumRows += len(msg.RowIDs)
log.Println("Int8 data:",
idata.Data[field.FieldID].(*storage.Int8FieldData).Data)
case schemapb.DataType_INT16:
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.Int16FieldData{
NumRows: 0,
Data: make([]int16, 0),
}
}
fieldData := idata.Data[field.FieldID].(*storage.Int16FieldData)
for _, blob := range msg.RowData {
v := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:])
fieldData.Data = append(fieldData.Data, int16(v))
pos++
}
fieldData.NumRows += len(msg.RowIDs)
log.Println("Int16 data:",
idata.Data[field.FieldID].(*storage.Int16FieldData).Data)
case schemapb.DataType_INT32:
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.Int32FieldData{
NumRows: 0,
Data: make([]int32, 0),
}
}
fieldData := idata.Data[field.FieldID].(*storage.Int32FieldData)
for _, blob := range msg.RowData {
v := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:])
fieldData.Data = append(fieldData.Data, int32(v))
pos++
}
fieldData.NumRows += len(msg.RowIDs)
log.Println("Int32 data:",
idata.Data[field.FieldID].(*storage.Int32FieldData).Data)
case schemapb.DataType_INT64:
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.Int64FieldData{
NumRows: 0,
Data: make([]int64, 0),
}
}
fieldData := idata.Data[field.FieldID].(*storage.Int64FieldData)
for _, blob := range msg.RowData {
v := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:])
fieldData.Data = append(fieldData.Data, int64(v))
pos++
}
fieldData.NumRows += len(msg.RowIDs)
log.Println("Int64 data:",
idata.Data[field.FieldID].(*storage.Int64FieldData).Data)
case schemapb.DataType_FLOAT:
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.FloatFieldData{
NumRows: 0,
Data: make([]float32, 0),
}
}
fieldData := idata.Data[field.FieldID].(*storage.FloatFieldData)
for _, blob := range msg.RowData {
v := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:])
fieldData.Data = append(fieldData.Data, math.Float32frombits(v))
pos++
}
fieldData.NumRows += len(msg.RowIDs)
log.Println("Float32 data:",
idata.Data[field.FieldID].(*storage.FloatFieldData).Data)
case schemapb.DataType_DOUBLE:
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.DoubleFieldData{
NumRows: 0,
Data: make([]float64, 0),
}
}
fieldData := idata.Data[field.FieldID].(*storage.DoubleFieldData)
for _, blob := range msg.RowData {
v := binary.LittleEndian.Uint64(blob.GetValue()[pos*4:])
fieldData.Data = append(fieldData.Data, math.Float64frombits(v))
pos++
}
fieldData.NumRows += len(msg.RowIDs)
log.Println("Float64 data:",
idata.Data[field.FieldID].(*storage.DoubleFieldData).Data)
}
// partitionTag -> partitionID
partitionTag := segMeta.GetPartitionTag()
partitionID, err := typeutil.Hash32String(partitionTag)
if err != nil {
// GOOSE TODO add error handler
log.Println("partitionTag to partitionID Wrong")
}
// 1.3 store in buffer
ibNode.insertBuffer.insertData[currentSegID] = idata
// 1.4 Send hardTimeTick msg, GOOSE TODO
// buffer data to binlogs
binLogs, err := inCodec.Serialize(partitionID,
currentSegID, ibNode.insertBuffer.insertData[currentSegID])
if err != nil {
log.Println("generate binlog wrong")
}
// 1.5 if full
// 1.5.1 generate binlogs
if ibNode.insertBuffer.full(currentSegID) {
log.Println("Insert Buffer full, auto flushing ...")
// partitionTag -> partitionID
partitionTag := msg.GetPartitionTag()
partitionID, err := typeutil.Hash32String(partitionTag)
// clear buffer
delete(ibNode.insertBuffer.insertData, currentSegID)
// binLogs -> minIO/S3
collIDStr := strconv.FormatInt(segMeta.GetCollectionID(), 10)
partitionIDStr := strconv.FormatInt(partitionID, 10)
segIDStr := strconv.FormatInt(currentSegID, 10)
keyPrefix := path.Join(ibNode.minioPrifex, collIDStr, partitionIDStr, segIDStr)
for _, blob := range binLogs {
uid, err := ibNode.idAllocator.AllocOne()
if err != nil {
log.Println("partitionTag to partitionID Wrong")
log.Println("Allocate Id failed")
// GOOSE TODO error handler
}
inCodec := storage.NewInsertCodec(collMeta)
// buffer data to binlogs
binLogs, err := inCodec.Serialize(partitionID,
currentSegID, ibNode.insertBuffer.insertData[currentSegID])
key := path.Join(keyPrefix, blob.Key, strconv.FormatInt(uid, 10))
err = ibNode.minIOKV.Save(key, string(blob.Value[:]))
if err != nil {
log.Println("generate binlog wrong")
log.Println("Save to MinIO failed")
// GOOSE TODO error handler
}
// clear buffer
delete(ibNode.insertBuffer.insertData, currentSegID)
// 1.5.2 binLogs -> minIO/S3
collIDStr := strconv.FormatInt(segMeta.GetCollectionID(), 10)
partitionIDStr := strconv.FormatInt(partitionID, 10)
segIDStr := strconv.FormatInt(currentSegID, 10)
keyPrefix := path.Join(ibNode.minioPrifex, collIDStr, partitionIDStr, segIDStr)
for _, blob := range binLogs {
uid, err := ibNode.idAllocator.AllocOne()
if err != nil {
log.Println("Allocate Id failed")
// GOOSE TODO error handle
}
key := path.Join(keyPrefix, blob.Key, strconv.FormatInt(uid, 10))
err = ibNode.minIOKV.Save(key, string(blob.Value[:]))
if err != nil {
log.Println("Save to MinIO failed")
// GOOSE TODO error handle
}
log.Println(".. Saving binlogs to MinIO ...")
fieldID, err := strconv.ParseInt(blob.Key, 10, 32)
if err != nil {
log.Println("string to fieldID wrong")
// GOOSE TODO error handle
}
inBinlogMsg := &insertFlushSyncMsg{
flushCompleted: false,
insertBinlogPathMsg: insertBinlogPathMsg{
ts: iMsg.timeRange.timestampMax,
segID: currentSegID,
fieldID: fieldID,
paths: []string{key},
},
}
log.Println(".. Appending binlog paths ...")
ibNode.outCh <- inBinlogMsg
fieldID, err := strconv.ParseInt(blob.Key, 10, 32)
if err != nil {
log.Println("string to fieldID wrong")
// GOOSE TODO error handler
}
// Append binlogs
inBinlogMsg := &insertFlushSyncMsg{
flushCompleted: false,
insertBinlogPathMsg: insertBinlogPathMsg{
ts: flushTs,
segID: currentSegID,
fieldID: fieldID,
paths: []string{key},
},
}
ibNode.outCh <- inBinlogMsg
}
}
// iMsg is Flush() msg from master
// 1. insertBuffer(not empty) -> binLogs -> minIO/S3
for _, msg := range iMsg.flushMessages {
currentSegID := msg.GetSegmentID()
flushTs := msg.GetTimestamp()
log.Printf(". Receiving flush message segID(%v)...", currentSegID)
if ibNode.insertBuffer.size(currentSegID) > 0 {
log.Println(".. Buffer not empty, flushing ...")
segMeta, collMeta, err := ibNode.getMeta(currentSegID)
if err != nil {
// GOOSE TODO add error handler
log.Println("Get meta wrong")
}
inCodec := storage.NewInsertCodec(collMeta)
// partitionTag -> partitionID
partitionTag := segMeta.GetPartitionTag()
partitionID, err := typeutil.Hash32String(partitionTag)
if err != nil {
// GOOSE TODO add error handler
log.Println("partitionTag to partitionID Wrong")
}
// buffer data to binlogs
binLogs, err := inCodec.Serialize(partitionID,
currentSegID, ibNode.insertBuffer.insertData[currentSegID])
if err != nil {
log.Println("generate binlog wrong")
}
// clear buffer
delete(ibNode.insertBuffer.insertData, currentSegID)
// binLogs -> minIO/S3
collIDStr := strconv.FormatInt(segMeta.GetCollectionID(), 10)
partitionIDStr := strconv.FormatInt(partitionID, 10)
segIDStr := strconv.FormatInt(currentSegID, 10)
keyPrefix := path.Join(ibNode.minioPrifex, collIDStr, partitionIDStr, segIDStr)
for _, blob := range binLogs {
uid, err := ibNode.idAllocator.AllocOne()
if err != nil {
log.Println("Allocate Id failed")
// GOOSE TODO error handler
}
key := path.Join(keyPrefix, blob.Key, strconv.FormatInt(uid, 10))
err = ibNode.minIOKV.Save(key, string(blob.Value[:]))
if err != nil {
log.Println("Save to MinIO failed")
// GOOSE TODO error handler
}
fieldID, err := strconv.ParseInt(blob.Key, 10, 32)
if err != nil {
log.Println("string to fieldID wrong")
// GOOSE TODO error handler
}
// Append binlogs
inBinlogMsg := &insertFlushSyncMsg{
flushCompleted: false,
insertBinlogPathMsg: insertBinlogPathMsg{
ts: flushTs,
segID: currentSegID,
fieldID: fieldID,
paths: []string{key},
},
}
ibNode.outCh <- inBinlogMsg
}
}
// Flushed
log.Println(".. Flush finished ...")
inBinlogMsg := &insertFlushSyncMsg{
flushCompleted: true,
insertBinlogPathMsg: insertBinlogPathMsg{
ts: flushTs,
segID: currentSegID,
},
}
ibNode.outCh <- inBinlogMsg
// Flushed
log.Println(".. Flush finished ...")
inBinlogMsg := &insertFlushSyncMsg{
flushCompleted: true,
insertBinlogPathMsg: insertBinlogPathMsg{
ts: flushTs,
segID: currentSegID,
},
}
ibNode.outCh <- inBinlogMsg
}
if err := ibNode.writeHardTimeTick(iMsg.timeRange.timestampMax); err != nil {

View File

@ -89,7 +89,7 @@ func TestParamTable_WriteNode(t *testing.T) {
t.Run("Test FlushInsertBufSize", func(t *testing.T) {
name := Params.FlushInsertBufSize
assert.Equal(t, name, 20)
assert.Equal(t, name, 500)
})
t.Run("Test FlushDdBufSize", func(t *testing.T) {