Fix the pulsar test failure

Signed-off-by: xige-16 <xi.ge@zilliz.com>
pull/4973/head^2
xige-16 2020-09-28 17:15:12 +08:00 committed by yefu.chen
parent 6797fd0b0b
commit 0547561f8b
10 changed files with 138 additions and 45 deletions

3
.gitignore vendored
View File

@ -26,6 +26,7 @@ cmake_build
proxy/milvus proxy/milvus
proxy/cmake_build proxy/cmake_build
proxy/cmake-build-debug proxy/cmake-build-debug
proxy/cmake-build-release
proxy/thirdparty/grpc-src proxy/thirdparty/grpc-src
proxy/thirdparty/grpc-build proxy/thirdparty/grpc-build
proxy/milvus/* proxy/milvus/*
@ -36,7 +37,7 @@ proxy/suvlim/*
sdk/cmake_build sdk/cmake_build
sdk/cmake-build-debug sdk/cmake-build-debug
sdk/cmake-build-release sdk/cmake-build-release
sdk/cmake_build_release
# Compiled source # Compiled source
*.a *.a

View File

@ -5,8 +5,8 @@ if [[ ! ${jobs+1} ]]; then
jobs=$(nproc) jobs=$(nproc)
fi fi
BUILD_OUTPUT_DIR="cmake-build-debug" BUILD_OUTPUT_DIR="cmake-build-release"
BUILD_TYPE="Debug" BUILD_TYPE="Release"
BUILD_UNITTEST="OFF" BUILD_UNITTEST="OFF"
INSTALL_PREFIX=$(pwd)/milvus INSTALL_PREFIX=$(pwd)/milvus
MAKE_CLEAN="OFF" MAKE_CLEAN="OFF"

View File

@ -5,7 +5,7 @@ if [[ ! ${jobs+1} ]]; then
jobs=$(nproc) jobs=$(nproc)
fi fi
BUILD_OUTPUT_DIR="cmake_build" BUILD_OUTPUT_DIR="cmake_build_release"
BUILD_TYPE="Release" BUILD_TYPE="Release"
BUILD_UNITTEST="OFF" BUILD_UNITTEST="OFF"
INSTALL_PREFIX=$(pwd)/milvus INSTALL_PREFIX=$(pwd)/milvus

View File

@ -200,6 +200,7 @@ func (mc *MessageClient) InitClient(url string) {
log.Fatal(err) log.Fatal(err)
} }
mc.timeSyncCfg = timeSync.(*timesync.ReaderTimeSyncCfg) mc.timeSyncCfg = timeSync.(*timesync.ReaderTimeSyncCfg)
mc.timeSyncCfg.RoleType = timesync.Reader
mc.timestampBatchStart = 0 mc.timestampBatchStart = 0
mc.timestampBatchEnd = 0 mc.timestampBatchEnd = 0

View File

@ -274,16 +274,16 @@ func (node *QueryNode) RunInsertDelete(wg *sync.WaitGroup) {
assert.NotEqual(nil, 0, timeRange.timestampMin) assert.NotEqual(nil, 0, timeRange.timestampMin)
assert.NotEqual(nil, 0, timeRange.timestampMax) assert.NotEqual(nil, 0, timeRange.timestampMax)
if node.msgCounter.InsertCounter/CountInsertMsgBaseline != BaselineCounter {
node.WriteQueryLog()
BaselineCounter = node.msgCounter.InsertCounter/CountInsertMsgBaseline
}
if msgLen[0] == 0 && len(node.buffer.InsertDeleteBuffer) <= 0 { if msgLen[0] == 0 && len(node.buffer.InsertDeleteBuffer) <= 0 {
node.queryNodeTimeSync.UpdateSearchTimeSync(timeRange) node.queryNodeTimeSync.UpdateSearchTimeSync(timeRange)
continue continue
} }
if node.msgCounter.InsertCounter/CountInsertMsgBaseline == BaselineCounter {
node.WriteQueryLog()
BaselineCounter++
}
node.QueryNodeDataInit() node.QueryNodeDataInit()
node.MessagesPreprocess(node.messageClient.InsertOrDeleteMsg, timeRange) node.MessagesPreprocess(node.messageClient.InsertOrDeleteMsg, timeRange)
//fmt.Println("MessagesPreprocess Done") //fmt.Println("MessagesPreprocess Done")
@ -294,8 +294,7 @@ func (node *QueryNode) RunInsertDelete(wg *sync.WaitGroup) {
//fmt.Println("DoInsertAndDelete Done") //fmt.Println("DoInsertAndDelete Done")
node.queryNodeTimeSync.UpdateSearchTimeSync(timeRange) node.queryNodeTimeSync.UpdateSearchTimeSync(timeRange)
} }
} } else {
for { for {
var msgLen = node.PrepareBatchMsg() var msgLen = node.PrepareBatchMsg()
var timeRange = TimeRange{node.messageClient.TimeSyncStart(), node.messageClient.TimeSyncEnd()} var timeRange = TimeRange{node.messageClient.TimeSyncStart(), node.messageClient.TimeSyncEnd()}
@ -317,6 +316,7 @@ func (node *QueryNode) RunInsertDelete(wg *sync.WaitGroup) {
//fmt.Println("DoInsertAndDelete Done") //fmt.Println("DoInsertAndDelete Done")
node.queryNodeTimeSync.UpdateSearchTimeSync(timeRange) node.queryNodeTimeSync.UpdateSearchTimeSync(timeRange)
} }
}
wg.Done() wg.Done()
} }
@ -525,9 +525,10 @@ func (node *QueryNode) DoInsert(segmentID int64, wg *sync.WaitGroup) msgPb.Statu
records := node.insertData.insertRecords[segmentID] records := node.insertData.insertRecords[segmentID]
offsets := node.insertData.insertOffset[segmentID] offsets := node.insertData.insertOffset[segmentID]
err = targetSegment.SegmentInsert(offsets, &ids, &timestamps, &records)
node.QueryLog(len(ids)) node.QueryLog(len(ids))
err = targetSegment.SegmentInsert(offsets, &ids, &timestamps, &records)
if err != nil { if err != nil {
fmt.Println(err.Error()) fmt.Println(err.Error())
return msgPb.Status{ErrorCode: 1} return msgPb.Status{ErrorCode: 1}

View File

@ -1,7 +1,7 @@
#!/bin/bash #!/bin/bash
BUILD_OUTPUT_DIR="cmake_build" BUILD_OUTPUT_DIR="cmake_build_release"
BUILD_TYPE="Debug" BUILD_TYPE="Release"
MAKE_CLEAN="OFF" MAKE_CLEAN="OFF"
RUN_CPPLINT="OFF" RUN_CPPLINT="OFF"

View File

@ -2,15 +2,33 @@ package readertimesync
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"github.com/apache/pulsar-client-go/pulsar" "github.com/apache/pulsar-client-go/pulsar"
"github.com/czs007/suvlim/conf" "github.com/czs007/suvlim/conf"
pb "github.com/czs007/suvlim/pkg/master/grpc/message" pb "github.com/czs007/suvlim/pkg/master/grpc/message"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"log" "log"
"os"
"sort" "sort"
"strconv" "strconv"
"sync" "sync"
"time"
)
type InsertLog struct {
MsgLength int
DurationInMilliseconds int64
InsertTime time.Time
NumSince int64
Speed float64
}
type TimeSyncRole int
const (
Reader TimeSyncRole = 0
Writer TimeSyncRole = 1
) )
const ReadStopFlagEnd int64 = 0 const ReadStopFlagEnd int64 = 0
@ -49,6 +67,8 @@ type ReaderTimeSyncCfg struct {
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
InsertLogs []InsertLog
RoleType TimeSyncRole
} }
/* /*
@ -293,9 +313,57 @@ func (r *ReaderTimeSyncCfg) isReadStopFlag(imsg *pb.InsertOrDeleteMsg) bool {
return imsg.ClientId < ReadStopFlagEnd return imsg.ClientId < ReadStopFlagEnd
} }
func (r *ReaderTimeSyncCfg) WriteInsertLog() {
fileName := "/tmp/reader_get_pulsar.txt"
if r.RoleType == Writer {
fileName = "/tmp/writer_get_pulsar.txt"
}
f, err := os.OpenFile(fileName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
log.Fatal(err)
}
// write logs
for _, insertLog := range r.InsertLogs {
insertLogJson, err := json.Marshal(&insertLog)
if err != nil {
log.Fatal(err)
}
writeString := string(insertLogJson) + "\n"
//fmt.Println(writeString)
_, err2 := f.WriteString(writeString)
if err2 != nil {
log.Fatal(err2)
}
}
// reset InsertLogs buffer
r.InsertLogs = make([]InsertLog, 0)
err = f.Close()
if err != nil {
log.Fatal(err)
}
fmt.Println("write get pulsar log done")
}
func (r *ReaderTimeSyncCfg) startReadTopics() { func (r *ReaderTimeSyncCfg) startReadTopics() {
ctx, _ := context.WithCancel(r.ctx) ctx, _ := context.WithCancel(r.ctx)
tsm := TimeSyncMsg{Timestamp: 0, NumRecorders: 0} tsm := TimeSyncMsg{Timestamp: 0, NumRecorders: 0}
const Debug = true
const WriterBaseline = 1000 * 1000
const LogBaseline = 100000
var Counter int64 = 0
var LastCounter int64 = 0
r.InsertLogs = make([]InsertLog, 0)
InsertTime := time.Now()
var BaselineCounter int64 = 0
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -332,7 +400,31 @@ func (r *ReaderTimeSyncCfg) startReadTopics() {
log.Printf("WARN : Insert or delete chan is full ...") log.Printf("WARN : Insert or delete chan is full ...")
} }
tsm.NumRecorders++ tsm.NumRecorders++
if Debug {
r.insertOrDeleteChan <- &imsg r.insertOrDeleteChan <- &imsg
Counter++
if Counter % LogBaseline == 0 {
timeNow := time.Now()
duration := timeNow.Sub(InsertTime)
speed := float64(Counter-LastCounter) / duration.Seconds()
insertLog := InsertLog{
MsgLength: int(Counter - LastCounter),
DurationInMilliseconds: duration.Milliseconds(),
InsertTime: timeNow,
NumSince: Counter,
Speed: speed,
}
r.InsertLogs = append(r.InsertLogs, insertLog)
LastCounter = Counter
InsertTime = timeNow
}
if Counter/WriterBaseline != BaselineCounter {
r.WriteInsertLog()
BaselineCounter = Counter/WriterBaseline
}
} else {
r.insertOrDeleteChan <- &imsg
}
} }
r.readerConsumer.AckID(msg.ID()) r.readerConsumer.AckID(msg.ID())
} }

View File

@ -9,7 +9,6 @@ import (
"github.com/czs007/suvlim/writer/write_node" "github.com/czs007/suvlim/writer/write_node"
"log" "log"
"strconv" "strconv"
"time"
) )
func main() { func main() {
@ -43,9 +42,6 @@ func main() {
} }
const Debug = true const Debug = true
const MB = 1024 * 1024
const timeInterval = time.Second * 2
const CountMsgNum = 10000 * 10
if Debug { if Debug {
//var shouldBenchmark = false //var shouldBenchmark = false
@ -75,16 +71,16 @@ func main() {
// start = time.Now() // start = time.Now()
//} //}
if wn.MsgCounter.InsertCounter/CountInsertMsgBaseline != BaselineCounter {
wn.WriteWriterLog()
BaselineCounter = wn.MsgCounter.InsertCounter/CountInsertMsgBaseline
}
if msgLength > 0 { if msgLength > 0 {
wn.DoWriteNode(ctx) wn.DoWriteNode(ctx)
fmt.Println("write node do a batch message, storage len: ", msgLength) fmt.Println("write node do a batch message, storage len: ", msgLength)
} }
if wn.MsgCounter.InsertCounter/CountInsertMsgBaseline == BaselineCounter {
wn.WriteWriterLog()
BaselineCounter++
}
// Test insert time // Test insert time
// ignore if less than 1000 records per time interval // ignore if less than 1000 records per time interval
//if shouldBenchmark && wn.MsgCounter.InsertCounter > 1000 { //if shouldBenchmark && wn.MsgCounter.InsertCounter > 1000 {

View File

@ -149,6 +149,7 @@ func (mc *MessageClient) InitClient(url string) {
log.Fatal(err) log.Fatal(err)
} }
mc.timeSyncCfg = timeSync.(*timesync.ReaderTimeSyncCfg) mc.timeSyncCfg = timeSync.(*timesync.ReaderTimeSyncCfg)
mc.timeSyncCfg.RoleType = timesync.Writer
mc.timestampBatchStart = 0 mc.timestampBatchStart = 0
mc.timestampBatchEnd = 0 mc.timestampBatchEnd = 0

View File

@ -92,7 +92,7 @@ func (wn *WriteNode) InsertBatchData(ctx context.Context, data []*msgpb.InsertOr
timeStamp = append(timeStamp, uint64(data[i].Timestamp)) timeStamp = append(timeStamp, uint64(data[i].Timestamp))
} }
wn.WriterLog(len(timeStamp)) //wn.WriterLog(len(data))
//wn.MsgCounter.InsertCounter += int64(len(timeStamp)) //wn.MsgCounter.InsertCounter += int64(len(timeStamp))
//if len(timeStamp) > 0 { //if len(timeStamp) > 0 {
@ -172,6 +172,7 @@ func (wn *WriteNode) DoWriteNode(ctx context.Context) {
start = end start = end
} }
wg.Wait() wg.Wait()
wn.WriterLog(numInsertData)
wn.DeleteBatchData(ctx, wn.MessageClient.DeleteMsg) wn.DeleteBatchData(ctx, wn.MessageClient.DeleteMsg)
wn.UpdateTimeSync(wn.MessageClient.TimeSync()) wn.UpdateTimeSync(wn.MessageClient.TimeSync())
} }
@ -208,7 +209,7 @@ func (wn *WriteNode) WriteWriterLog() {
} }
writeString := string(insertLogJson) + "\n" writeString := string(insertLogJson) + "\n"
fmt.Println(writeString) //fmt.Println(writeString)
_, err2 := f.WriteString(writeString) _, err2 := f.WriteString(writeString)
if err2 != nil { if err2 != nil {
@ -224,5 +225,5 @@ func (wn *WriteNode) WriteWriterLog() {
log.Fatal(err) log.Fatal(err)
} }
fmt.Println("write log done") fmt.Println("write write node log done")
} }