Refactor param table, and add ddNode

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/4973/head^2
bigsheeper 2020-12-11 17:20:14 +08:00 committed by yefu.chen
parent 8df05714a8
commit 585d3f9831
23 changed files with 578 additions and 1148 deletions

View File

@ -20,6 +20,10 @@ writeNode:
maxParallelism: 1024
msgStream:
dataDefinition:
recvBufSize: 64 # msgPack chan buffer size
pulsarBufSize: 64 # pulsar chan buffer size
insert:
#streamBufSize: 1024 # msgPack chan buffer size
recvBufSize: 1024 # msgPack chan buffer size
@ -29,13 +33,3 @@ writeNode:
#streamBufSize: 1024 # msgPack chan buffer size
recvBufSize: 1024 # msgPack chan buffer size
pulsarBufSize: 1024 # pulsar chan buffer size
search:
recvBufSize: 512
pulsarBufSize: 512
searchResult:
recvBufSize: 64
stats:
recvBufSize: 64

View File

@ -17,11 +17,6 @@ type ddNode struct {
replica collectionReplica
}
type metaOperateRecord struct {
createOrDrop bool // create: true, drop: false
timestamp Timestamp
}
func (ddNode *ddNode) Name() string {
return "ddNode"
}

View File

@ -9,8 +9,8 @@ import (
)
type filterDmNode struct {
ddMsg *ddMsg
BaseNode
ddMsg *ddMsg
}
func (fdmNode *filterDmNode) Name() string {

View File

@ -19,6 +19,11 @@ type ddMsg struct {
timeRange TimeRange
}
type metaOperateRecord struct {
createOrDrop bool // create: true, drop: false
timestamp Timestamp
}
type insertMsg struct {
insertMessages []*msgstream.InsertMsg
timeRange TimeRange

View File

@ -118,7 +118,7 @@ func makeNewChannelNames(names []string, suffix string) []string {
}
func refreshChannelNames() {
suffix := "_test_query_node" + strconv.FormatInt(rand.Int63n(100), 10)
suffix := "-test-query-node" + strconv.FormatInt(rand.Int63n(100), 10)
Params.DDChannelNames = makeNewChannelNames(Params.DDChannelNames, suffix)
Params.InsertChannelNames = makeNewChannelNames(Params.InsertChannelNames, suffix)
Params.SearchChannelNames = makeNewChannelNames(Params.SearchChannelNames, suffix)

View File

@ -291,9 +291,9 @@ func (ss *searchService) search(msg msgstream.TsMsg) error {
inReduced := make([]bool, len(searchResults))
numSegment := int64(len(searchResults))
err = reduceSearchResults(searchResults, numSegment, inReduced)
if err != nil {
return err
err2 := reduceSearchResults(searchResults, numSegment, inReduced)
if err2 != nil {
return err2
}
err = fillTargetEntry(plan, searchResults, matchedSegments, inReduced)
if err != nil {

View File

@ -11,15 +11,24 @@ import (
type BinlogReader struct {
magicNumber int32
descriptorEvent
buffer *bytes.Buffer
eventList []*EventReader
isClose bool
currentEventReader *EventReader
buffer *bytes.Buffer
bufferLength int
currentOffset int32
isClose bool
}
func (reader *BinlogReader) NextEventReader() (*EventReader, error) {
if reader.isClose {
return nil, errors.New("bin log reader is closed")
}
if reader.currentEventReader != nil {
reader.currentOffset = reader.currentEventReader.NextPosition
if err := reader.currentEventReader.Close(); err != nil {
return nil, err
}
reader.currentEventReader = nil
}
if reader.buffer.Len() <= 0 {
return nil, nil
}
@ -27,14 +36,15 @@ func (reader *BinlogReader) NextEventReader() (*EventReader, error) {
if err != nil {
return nil, err
}
reader.eventList = append(reader.eventList, eventReader)
return eventReader, nil
reader.currentEventReader = eventReader
return reader.currentEventReader, nil
}
func (reader *BinlogReader) readMagicNumber() (int32, error) {
if err := binary.Read(reader.buffer, binary.LittleEndian, &reader.magicNumber); err != nil {
return -1, err
}
reader.currentOffset = 4
if reader.magicNumber != MagicNumber {
return -1, errors.New("parse magic number failed, expected: " + strconv.Itoa(int(MagicNumber)) +
", actual: " + strconv.Itoa(int(reader.magicNumber)))
@ -45,6 +55,7 @@ func (reader *BinlogReader) readMagicNumber() (int32, error) {
func (reader *BinlogReader) readDescriptorEvent() (*descriptorEvent, error) {
event, err := ReadDescriptorEvent(reader.buffer)
reader.currentOffset = event.NextPosition
if err != nil {
return nil, err
}
@ -56,20 +67,20 @@ func (reader *BinlogReader) Close() error {
if reader.isClose {
return nil
}
for _, e := range reader.eventList {
if err := e.Close(); err != nil {
reader.isClose = true
if reader.currentEventReader != nil {
if err := reader.currentEventReader.Close(); err != nil {
return err
}
}
reader.isClose = true
return nil
}
func NewBinlogReader(data []byte) (*BinlogReader, error) {
reader := &BinlogReader{
buffer: bytes.NewBuffer(data),
eventList: []*EventReader{},
isClose: false,
buffer: bytes.NewBuffer(data),
bufferLength: len(data),
isClose: false,
}
if _, err := reader.readMagicNumber(); err != nil {

View File

@ -241,812 +241,4 @@ func TestInsertBinlog(t *testing.T) {
assert.Equal(t, int(e2NxtPos), len(buf))
//read binlog
r, err := NewBinlogReader(buf)
assert.Nil(t, err)
event1, err := r.NextEventReader()
assert.Nil(t, err)
assert.NotNil(t, event1)
p1, err := event1.GetInt64FromPayload()
assert.Equal(t, p1, []int64{1, 2, 3, 4, 5, 6})
assert.Nil(t, err)
assert.Equal(t, event1.TypeCode, InsertEventType)
ed1, ok := (event1.eventData).(*insertEventData)
assert.True(t, ok)
assert.Equal(t, ed1.StartTimestamp, Timestamp(100))
assert.Equal(t, ed1.EndTimestamp, Timestamp(200))
event2, err := r.NextEventReader()
assert.Nil(t, err)
assert.NotNil(t, event2)
p2, err := event2.GetInt64FromPayload()
assert.Nil(t, err)
assert.Equal(t, p2, []int64{7, 8, 9, 10, 11, 12})
assert.Equal(t, event2.TypeCode, InsertEventType)
ed2, ok := (event2.eventData).(*insertEventData)
assert.True(t, ok)
_, ok = (event2.eventData).(*deleteEventData)
assert.False(t, ok)
assert.Equal(t, ed2.StartTimestamp, Timestamp(300))
assert.Equal(t, ed2.EndTimestamp, Timestamp(400))
}
func TestDeleteBinlog(t *testing.T) {
w, err := NewDeleteBinlogWriter(schemapb.DataType_INT64, 50)
assert.Nil(t, err)
e1, err := w.NextDeleteEventWriter()
assert.Nil(t, err)
err = e1.AddDataToPayload([]int64{1, 2, 3})
assert.Nil(t, err)
err = e1.AddDataToPayload([]int32{4, 5, 6})
assert.NotNil(t, err)
err = e1.AddDataToPayload([]int64{4, 5, 6})
assert.Nil(t, err)
e1.SetStartTimestamp(100)
e1.SetEndTimestamp(200)
e2, err := w.NextDeleteEventWriter()
assert.Nil(t, err)
err = e2.AddDataToPayload([]int64{7, 8, 9})
assert.Nil(t, err)
err = e2.AddDataToPayload([]bool{true, false, true})
assert.NotNil(t, err)
err = e2.AddDataToPayload([]int64{10, 11, 12})
assert.Nil(t, err)
e2.SetStartTimestamp(300)
e2.SetEndTimestamp(400)
w.SetStartTimeStamp(1000)
w.SetEndTimeStamp(2000)
_, err = w.GetBuffer()
assert.NotNil(t, err)
err = w.Close()
assert.Nil(t, err)
buf, err := w.GetBuffer()
assert.Nil(t, err)
//magic number
magicNum := UnsafeReadInt32(buf, 0)
assert.Equal(t, magicNum, MagicNumber)
pos := int(unsafe.Sizeof(MagicNumber))
//descriptor header, timestamp
ts := UnsafeReadInt64(buf, pos)
assert.Greater(t, ts, int64(0))
curts := time.Now().UnixNano() / int64(time.Millisecond)
curts = int64(tsoutil.ComposeTS(curts, 0))
diffts := curts - ts
maxdiff := int64(tsoutil.ComposeTS(1000, 0))
assert.LessOrEqual(t, diffts, maxdiff)
pos += int(unsafe.Sizeof(ts))
//descriptor header, type code
tc := UnsafeReadInt8(buf, pos)
assert.Equal(t, EventTypeCode(tc), DescriptorEventType)
pos += int(unsafe.Sizeof(tc))
//descriptor header, server id
svrID := UnsafeReadInt32(buf, pos)
assert.Equal(t, svrID, int32(ServerID))
pos += int(unsafe.Sizeof(svrID))
//descriptor header, event length
descEventLen := UnsafeReadInt32(buf, pos)
pos += int(unsafe.Sizeof(descEventLen))
//descriptor header, next position
descNxtPos := UnsafeReadInt32(buf, pos)
assert.Equal(t, descEventLen+int32(unsafe.Sizeof(MagicNumber)), descNxtPos)
pos += int(unsafe.Sizeof(descNxtPos))
//descriptor data fix, binlog version
binLogVer := UnsafeReadInt16(buf, pos)
assert.Equal(t, binLogVer, int16(BinlogVersion))
pos += int(unsafe.Sizeof(binLogVer))
//descriptor data fix, server version
svrVer := UnsafeReadInt64(buf, pos)
assert.Equal(t, svrVer, int64(ServerVersion))
pos += int(unsafe.Sizeof(svrVer))
//descriptor data fix, commit id
cmitID := UnsafeReadInt64(buf, pos)
assert.Equal(t, cmitID, int64(CommitID))
pos += int(unsafe.Sizeof(cmitID))
//descriptor data fix, header length
headLen := UnsafeReadInt8(buf, pos)
assert.Equal(t, headLen, int8(binary.Size(eventHeader{})))
pos += int(unsafe.Sizeof(headLen))
//descriptor data fix, collection id
collID := UnsafeReadInt64(buf, pos)
assert.Equal(t, collID, int64(50))
pos += int(unsafe.Sizeof(collID))
//descriptor data fix, partition id
partID := UnsafeReadInt64(buf, pos)
assert.Equal(t, partID, int64(-1))
pos += int(unsafe.Sizeof(partID))
//descriptor data fix, segment id
segID := UnsafeReadInt64(buf, pos)
assert.Equal(t, segID, int64(-1))
pos += int(unsafe.Sizeof(segID))
//descriptor data fix, field id
fieldID := UnsafeReadInt64(buf, pos)
assert.Equal(t, fieldID, int64(-1))
pos += int(unsafe.Sizeof(fieldID))
//descriptor data fix, start time stamp
startts := UnsafeReadInt64(buf, pos)
assert.Equal(t, startts, int64(1000))
pos += int(unsafe.Sizeof(startts))
//descriptor data fix, end time stamp
endts := UnsafeReadInt64(buf, pos)
assert.Equal(t, endts, int64(2000))
pos += int(unsafe.Sizeof(endts))
//descriptor data fix, payload type
colType := UnsafeReadInt32(buf, pos)
assert.Equal(t, schemapb.DataType(colType), schemapb.DataType_INT64)
pos += int(unsafe.Sizeof(colType))
//descriptor data, post header lengths
for i := DescriptorEventType; i < EventTypeEnd; i++ {
size := getEventFixPartSize(i)
assert.Equal(t, uint8(size), buf[pos])
pos++
}
//start of e1
assert.Equal(t, pos, int(descNxtPos))
//insert e1 header, Timestamp
e1ts := UnsafeReadInt64(buf, pos)
diffts = curts - e1ts
assert.LessOrEqual(t, diffts, maxdiff)
pos += int(unsafe.Sizeof(e1ts))
//insert e1 header, type code
e1tc := UnsafeReadInt8(buf, pos)
assert.Equal(t, EventTypeCode(e1tc), DeleteEventType)
pos += int(unsafe.Sizeof(e1tc))
//insert e1 header, Server id
e1svrID := UnsafeReadInt32(buf, pos)
assert.Equal(t, e1svrID, int32(ServerID))
pos += int(unsafe.Sizeof(e1svrID))
//insert e1 header, event length
e1EventLen := UnsafeReadInt32(buf, pos)
pos += int(unsafe.Sizeof(e1EventLen))
//insert e1 header, next position
e1NxtPos := UnsafeReadInt32(buf, pos)
assert.Equal(t, descNxtPos+e1EventLen, e1NxtPos)
pos += int(unsafe.Sizeof(descNxtPos))
//insert e1 data, start time stamp
e1st := UnsafeReadInt64(buf, pos)
assert.Equal(t, e1st, int64(100))
pos += int(unsafe.Sizeof(e1st))
//insert e1 data, end time stamp
e1et := UnsafeReadInt64(buf, pos)
assert.Equal(t, e1et, int64(200))
pos += int(unsafe.Sizeof(e1et))
//insert e1, payload
e1Payload := buf[pos:e1NxtPos]
e1r, err := NewPayloadReader(schemapb.DataType_INT64, e1Payload)
assert.Nil(t, err)
e1a, err := e1r.GetInt64FromPayload()
assert.Nil(t, err)
assert.Equal(t, e1a, []int64{1, 2, 3, 4, 5, 6})
err = e1r.Close()
assert.Nil(t, err)
//start of e2
pos = int(e1NxtPos)
//insert e2 header, Timestamp
e2ts := UnsafeReadInt64(buf, pos)
diffts = curts - e2ts
assert.LessOrEqual(t, diffts, maxdiff)
pos += int(unsafe.Sizeof(e2ts))
//insert e2 header, type code
e2tc := UnsafeReadInt8(buf, pos)
assert.Equal(t, EventTypeCode(e2tc), DeleteEventType)
pos += int(unsafe.Sizeof(e2tc))
//insert e2 header, Server id
e2svrID := UnsafeReadInt32(buf, pos)
assert.Equal(t, e2svrID, int32(ServerID))
pos += int(unsafe.Sizeof(e2svrID))
//insert e2 header, event length
e2EventLen := UnsafeReadInt32(buf, pos)
pos += int(unsafe.Sizeof(e2EventLen))
//insert e2 header, next position
e2NxtPos := UnsafeReadInt32(buf, pos)
assert.Equal(t, e1NxtPos+e2EventLen, e2NxtPos)
pos += int(unsafe.Sizeof(descNxtPos))
//insert e2 data, start time stamp
e2st := UnsafeReadInt64(buf, pos)
assert.Equal(t, e2st, int64(300))
pos += int(unsafe.Sizeof(e2st))
//insert e2 data, end time stamp
e2et := UnsafeReadInt64(buf, pos)
assert.Equal(t, e2et, int64(400))
pos += int(unsafe.Sizeof(e2et))
//insert e2, payload
e2Payload := buf[pos:]
e2r, err := NewPayloadReader(schemapb.DataType_INT64, e2Payload)
assert.Nil(t, err)
e2a, err := e2r.GetInt64FromPayload()
assert.Nil(t, err)
assert.Equal(t, e2a, []int64{7, 8, 9, 10, 11, 12})
err = e2r.Close()
assert.Nil(t, err)
assert.Equal(t, int(e2NxtPos), len(buf))
//read binlog
r, err := NewBinlogReader(buf)
assert.Nil(t, err)
event1, err := r.NextEventReader()
assert.Nil(t, err)
assert.NotNil(t, event1)
p1, err := event1.GetInt64FromPayload()
assert.Equal(t, p1, []int64{1, 2, 3, 4, 5, 6})
assert.Nil(t, err)
assert.Equal(t, event1.TypeCode, DeleteEventType)
ed1, ok := (event1.eventData).(*deleteEventData)
assert.True(t, ok)
assert.Equal(t, ed1.StartTimestamp, Timestamp(100))
assert.Equal(t, ed1.EndTimestamp, Timestamp(200))
event2, err := r.NextEventReader()
assert.Nil(t, err)
assert.NotNil(t, event2)
p2, err := event2.GetInt64FromPayload()
assert.Nil(t, err)
assert.Equal(t, p2, []int64{7, 8, 9, 10, 11, 12})
assert.Equal(t, event2.TypeCode, DeleteEventType)
ed2, ok := (event2.eventData).(*deleteEventData)
assert.True(t, ok)
_, ok = (event2.eventData).(*insertEventData)
assert.False(t, ok)
assert.Equal(t, ed2.StartTimestamp, Timestamp(300))
assert.Equal(t, ed2.EndTimestamp, Timestamp(400))
}
func TestDDLBinlog1(t *testing.T) {
w, err := NewDDLBinlogWriter(schemapb.DataType_INT64, 50)
assert.Nil(t, err)
e1, err := w.NextCreateCollectionEventWriter()
assert.Nil(t, err)
err = e1.AddDataToPayload([]int64{1, 2, 3})
assert.Nil(t, err)
err = e1.AddDataToPayload([]int32{4, 5, 6})
assert.NotNil(t, err)
err = e1.AddDataToPayload([]int64{4, 5, 6})
assert.Nil(t, err)
e1.SetStartTimestamp(100)
e1.SetEndTimestamp(200)
e2, err := w.NextDropCollectionEventWriter()
assert.Nil(t, err)
err = e2.AddDataToPayload([]int64{7, 8, 9})
assert.Nil(t, err)
err = e2.AddDataToPayload([]bool{true, false, true})
assert.NotNil(t, err)
err = e2.AddDataToPayload([]int64{10, 11, 12})
assert.Nil(t, err)
e2.SetStartTimestamp(300)
e2.SetEndTimestamp(400)
w.SetStartTimeStamp(1000)
w.SetEndTimeStamp(2000)
_, err = w.GetBuffer()
assert.NotNil(t, err)
err = w.Close()
assert.Nil(t, err)
buf, err := w.GetBuffer()
assert.Nil(t, err)
//magic number
magicNum := UnsafeReadInt32(buf, 0)
assert.Equal(t, magicNum, MagicNumber)
pos := int(unsafe.Sizeof(MagicNumber))
//descriptor header, timestamp
ts := UnsafeReadInt64(buf, pos)
assert.Greater(t, ts, int64(0))
curts := time.Now().UnixNano() / int64(time.Millisecond)
curts = int64(tsoutil.ComposeTS(curts, 0))
diffts := curts - ts
maxdiff := int64(tsoutil.ComposeTS(1000, 0))
assert.LessOrEqual(t, diffts, maxdiff)
pos += int(unsafe.Sizeof(ts))
//descriptor header, type code
tc := UnsafeReadInt8(buf, pos)
assert.Equal(t, EventTypeCode(tc), DescriptorEventType)
pos += int(unsafe.Sizeof(tc))
//descriptor header, server id
svrID := UnsafeReadInt32(buf, pos)
assert.Equal(t, svrID, int32(ServerID))
pos += int(unsafe.Sizeof(svrID))
//descriptor header, event length
descEventLen := UnsafeReadInt32(buf, pos)
pos += int(unsafe.Sizeof(descEventLen))
//descriptor header, next position
descNxtPos := UnsafeReadInt32(buf, pos)
assert.Equal(t, descEventLen+int32(unsafe.Sizeof(MagicNumber)), descNxtPos)
pos += int(unsafe.Sizeof(descNxtPos))
//descriptor data fix, binlog version
binLogVer := UnsafeReadInt16(buf, pos)
assert.Equal(t, binLogVer, int16(BinlogVersion))
pos += int(unsafe.Sizeof(binLogVer))
//descriptor data fix, server version
svrVer := UnsafeReadInt64(buf, pos)
assert.Equal(t, svrVer, int64(ServerVersion))
pos += int(unsafe.Sizeof(svrVer))
//descriptor data fix, commit id
cmitID := UnsafeReadInt64(buf, pos)
assert.Equal(t, cmitID, int64(CommitID))
pos += int(unsafe.Sizeof(cmitID))
//descriptor data fix, header length
headLen := UnsafeReadInt8(buf, pos)
assert.Equal(t, headLen, int8(binary.Size(eventHeader{})))
pos += int(unsafe.Sizeof(headLen))
//descriptor data fix, collection id
collID := UnsafeReadInt64(buf, pos)
assert.Equal(t, collID, int64(50))
pos += int(unsafe.Sizeof(collID))
//descriptor data fix, partition id
partID := UnsafeReadInt64(buf, pos)
assert.Equal(t, partID, int64(-1))
pos += int(unsafe.Sizeof(partID))
//descriptor data fix, segment id
segID := UnsafeReadInt64(buf, pos)
assert.Equal(t, segID, int64(-1))
pos += int(unsafe.Sizeof(segID))
//descriptor data fix, field id
fieldID := UnsafeReadInt64(buf, pos)
assert.Equal(t, fieldID, int64(-1))
pos += int(unsafe.Sizeof(fieldID))
//descriptor data fix, start time stamp
startts := UnsafeReadInt64(buf, pos)
assert.Equal(t, startts, int64(1000))
pos += int(unsafe.Sizeof(startts))
//descriptor data fix, end time stamp
endts := UnsafeReadInt64(buf, pos)
assert.Equal(t, endts, int64(2000))
pos += int(unsafe.Sizeof(endts))
//descriptor data fix, payload type
colType := UnsafeReadInt32(buf, pos)
assert.Equal(t, schemapb.DataType(colType), schemapb.DataType_INT64)
pos += int(unsafe.Sizeof(colType))
//descriptor data, post header lengths
for i := DescriptorEventType; i < EventTypeEnd; i++ {
size := getEventFixPartSize(i)
assert.Equal(t, uint8(size), buf[pos])
pos++
}
//start of e1
assert.Equal(t, pos, int(descNxtPos))
//insert e1 header, Timestamp
e1ts := UnsafeReadInt64(buf, pos)
diffts = curts - e1ts
assert.LessOrEqual(t, diffts, maxdiff)
pos += int(unsafe.Sizeof(e1ts))
//insert e1 header, type code
e1tc := UnsafeReadInt8(buf, pos)
assert.Equal(t, EventTypeCode(e1tc), CreateCollectionEventType)
pos += int(unsafe.Sizeof(e1tc))
//insert e1 header, Server id
e1svrID := UnsafeReadInt32(buf, pos)
assert.Equal(t, e1svrID, int32(ServerID))
pos += int(unsafe.Sizeof(e1svrID))
//insert e1 header, event length
e1EventLen := UnsafeReadInt32(buf, pos)
pos += int(unsafe.Sizeof(e1EventLen))
//insert e1 header, next position
e1NxtPos := UnsafeReadInt32(buf, pos)
assert.Equal(t, descNxtPos+e1EventLen, e1NxtPos)
pos += int(unsafe.Sizeof(descNxtPos))
//insert e1 data, start time stamp
e1st := UnsafeReadInt64(buf, pos)
assert.Equal(t, e1st, int64(100))
pos += int(unsafe.Sizeof(e1st))
//insert e1 data, end time stamp
e1et := UnsafeReadInt64(buf, pos)
assert.Equal(t, e1et, int64(200))
pos += int(unsafe.Sizeof(e1et))
//insert e1, payload
e1Payload := buf[pos:e1NxtPos]
e1r, err := NewPayloadReader(schemapb.DataType_INT64, e1Payload)
assert.Nil(t, err)
e1a, err := e1r.GetInt64FromPayload()
assert.Nil(t, err)
assert.Equal(t, e1a, []int64{1, 2, 3, 4, 5, 6})
err = e1r.Close()
assert.Nil(t, err)
//start of e2
pos = int(e1NxtPos)
//insert e2 header, Timestamp
e2ts := UnsafeReadInt64(buf, pos)
diffts = curts - e2ts
assert.LessOrEqual(t, diffts, maxdiff)
pos += int(unsafe.Sizeof(e2ts))
//insert e2 header, type code
e2tc := UnsafeReadInt8(buf, pos)
assert.Equal(t, EventTypeCode(e2tc), DropCollectionEventType)
pos += int(unsafe.Sizeof(e2tc))
//insert e2 header, Server id
e2svrID := UnsafeReadInt32(buf, pos)
assert.Equal(t, e2svrID, int32(ServerID))
pos += int(unsafe.Sizeof(e2svrID))
//insert e2 header, event length
e2EventLen := UnsafeReadInt32(buf, pos)
pos += int(unsafe.Sizeof(e2EventLen))
//insert e2 header, next position
e2NxtPos := UnsafeReadInt32(buf, pos)
assert.Equal(t, e1NxtPos+e2EventLen, e2NxtPos)
pos += int(unsafe.Sizeof(descNxtPos))
//insert e2 data, start time stamp
e2st := UnsafeReadInt64(buf, pos)
assert.Equal(t, e2st, int64(300))
pos += int(unsafe.Sizeof(e2st))
//insert e2 data, end time stamp
e2et := UnsafeReadInt64(buf, pos)
assert.Equal(t, e2et, int64(400))
pos += int(unsafe.Sizeof(e2et))
//insert e2, payload
e2Payload := buf[pos:]
e2r, err := NewPayloadReader(schemapb.DataType_INT64, e2Payload)
assert.Nil(t, err)
e2a, err := e2r.GetInt64FromPayload()
assert.Nil(t, err)
assert.Equal(t, e2a, []int64{7, 8, 9, 10, 11, 12})
err = e2r.Close()
assert.Nil(t, err)
assert.Equal(t, int(e2NxtPos), len(buf))
//read binlog
r, err := NewBinlogReader(buf)
assert.Nil(t, err)
event1, err := r.NextEventReader()
assert.Nil(t, err)
assert.NotNil(t, event1)
p1, err := event1.GetInt64FromPayload()
assert.Equal(t, p1, []int64{1, 2, 3, 4, 5, 6})
assert.Nil(t, err)
assert.Equal(t, event1.TypeCode, CreateCollectionEventType)
ed1, ok := (event1.eventData).(*createCollectionEventData)
assert.True(t, ok)
assert.Equal(t, ed1.StartTimestamp, Timestamp(100))
assert.Equal(t, ed1.EndTimestamp, Timestamp(200))
event2, err := r.NextEventReader()
assert.Nil(t, err)
assert.NotNil(t, event2)
p2, err := event2.GetInt64FromPayload()
assert.Nil(t, err)
assert.Equal(t, p2, []int64{7, 8, 9, 10, 11, 12})
assert.Equal(t, event2.TypeCode, DropCollectionEventType)
ed2, ok := (event2.eventData).(*dropCollectionEventData)
assert.True(t, ok)
_, ok = (event2.eventData).(*insertEventData)
assert.False(t, ok)
assert.Equal(t, ed2.StartTimestamp, Timestamp(300))
assert.Equal(t, ed2.EndTimestamp, Timestamp(400))
}
func TestDDLBinlog2(t *testing.T) {
w, err := NewDDLBinlogWriter(schemapb.DataType_INT64, 50)
assert.Nil(t, err)
e1, err := w.NextCreatePartitionEventWriter()
assert.Nil(t, err)
err = e1.AddDataToPayload([]int64{1, 2, 3})
assert.Nil(t, err)
err = e1.AddDataToPayload([]int32{4, 5, 6})
assert.NotNil(t, err)
err = e1.AddDataToPayload([]int64{4, 5, 6})
assert.Nil(t, err)
e1.SetStartTimestamp(100)
e1.SetEndTimestamp(200)
e2, err := w.NextDropPartitionEventWriter()
assert.Nil(t, err)
err = e2.AddDataToPayload([]int64{7, 8, 9})
assert.Nil(t, err)
err = e2.AddDataToPayload([]bool{true, false, true})
assert.NotNil(t, err)
err = e2.AddDataToPayload([]int64{10, 11, 12})
assert.Nil(t, err)
e2.SetStartTimestamp(300)
e2.SetEndTimestamp(400)
w.SetStartTimeStamp(1000)
w.SetEndTimeStamp(2000)
_, err = w.GetBuffer()
assert.NotNil(t, err)
err = w.Close()
assert.Nil(t, err)
buf, err := w.GetBuffer()
assert.Nil(t, err)
//magic number
magicNum := UnsafeReadInt32(buf, 0)
assert.Equal(t, magicNum, MagicNumber)
pos := int(unsafe.Sizeof(MagicNumber))
//descriptor header, timestamp
ts := UnsafeReadInt64(buf, pos)
assert.Greater(t, ts, int64(0))
curts := time.Now().UnixNano() / int64(time.Millisecond)
curts = int64(tsoutil.ComposeTS(curts, 0))
diffts := curts - ts
maxdiff := int64(tsoutil.ComposeTS(1000, 0))
assert.LessOrEqual(t, diffts, maxdiff)
pos += int(unsafe.Sizeof(ts))
//descriptor header, type code
tc := UnsafeReadInt8(buf, pos)
assert.Equal(t, EventTypeCode(tc), DescriptorEventType)
pos += int(unsafe.Sizeof(tc))
//descriptor header, server id
svrID := UnsafeReadInt32(buf, pos)
assert.Equal(t, svrID, int32(ServerID))
pos += int(unsafe.Sizeof(svrID))
//descriptor header, event length
descEventLen := UnsafeReadInt32(buf, pos)
pos += int(unsafe.Sizeof(descEventLen))
//descriptor header, next position
descNxtPos := UnsafeReadInt32(buf, pos)
assert.Equal(t, descEventLen+int32(unsafe.Sizeof(MagicNumber)), descNxtPos)
pos += int(unsafe.Sizeof(descNxtPos))
//descriptor data fix, binlog version
binLogVer := UnsafeReadInt16(buf, pos)
assert.Equal(t, binLogVer, int16(BinlogVersion))
pos += int(unsafe.Sizeof(binLogVer))
//descriptor data fix, server version
svrVer := UnsafeReadInt64(buf, pos)
assert.Equal(t, svrVer, int64(ServerVersion))
pos += int(unsafe.Sizeof(svrVer))
//descriptor data fix, commit id
cmitID := UnsafeReadInt64(buf, pos)
assert.Equal(t, cmitID, int64(CommitID))
pos += int(unsafe.Sizeof(cmitID))
//descriptor data fix, header length
headLen := UnsafeReadInt8(buf, pos)
assert.Equal(t, headLen, int8(binary.Size(eventHeader{})))
pos += int(unsafe.Sizeof(headLen))
//descriptor data fix, collection id
collID := UnsafeReadInt64(buf, pos)
assert.Equal(t, collID, int64(50))
pos += int(unsafe.Sizeof(collID))
//descriptor data fix, partition id
partID := UnsafeReadInt64(buf, pos)
assert.Equal(t, partID, int64(-1))
pos += int(unsafe.Sizeof(partID))
//descriptor data fix, segment id
segID := UnsafeReadInt64(buf, pos)
assert.Equal(t, segID, int64(-1))
pos += int(unsafe.Sizeof(segID))
//descriptor data fix, field id
fieldID := UnsafeReadInt64(buf, pos)
assert.Equal(t, fieldID, int64(-1))
pos += int(unsafe.Sizeof(fieldID))
//descriptor data fix, start time stamp
startts := UnsafeReadInt64(buf, pos)
assert.Equal(t, startts, int64(1000))
pos += int(unsafe.Sizeof(startts))
//descriptor data fix, end time stamp
endts := UnsafeReadInt64(buf, pos)
assert.Equal(t, endts, int64(2000))
pos += int(unsafe.Sizeof(endts))
//descriptor data fix, payload type
colType := UnsafeReadInt32(buf, pos)
assert.Equal(t, schemapb.DataType(colType), schemapb.DataType_INT64)
pos += int(unsafe.Sizeof(colType))
//descriptor data, post header lengths
for i := DescriptorEventType; i < EventTypeEnd; i++ {
size := getEventFixPartSize(i)
assert.Equal(t, uint8(size), buf[pos])
pos++
}
//start of e1
assert.Equal(t, pos, int(descNxtPos))
//insert e1 header, Timestamp
e1ts := UnsafeReadInt64(buf, pos)
diffts = curts - e1ts
assert.LessOrEqual(t, diffts, maxdiff)
pos += int(unsafe.Sizeof(e1ts))
//insert e1 header, type code
e1tc := UnsafeReadInt8(buf, pos)
assert.Equal(t, EventTypeCode(e1tc), CreatePartitionEventType)
pos += int(unsafe.Sizeof(e1tc))
//insert e1 header, Server id
e1svrID := UnsafeReadInt32(buf, pos)
assert.Equal(t, e1svrID, int32(ServerID))
pos += int(unsafe.Sizeof(e1svrID))
//insert e1 header, event length
e1EventLen := UnsafeReadInt32(buf, pos)
pos += int(unsafe.Sizeof(e1EventLen))
//insert e1 header, next position
e1NxtPos := UnsafeReadInt32(buf, pos)
assert.Equal(t, descNxtPos+e1EventLen, e1NxtPos)
pos += int(unsafe.Sizeof(descNxtPos))
//insert e1 data, start time stamp
e1st := UnsafeReadInt64(buf, pos)
assert.Equal(t, e1st, int64(100))
pos += int(unsafe.Sizeof(e1st))
//insert e1 data, end time stamp
e1et := UnsafeReadInt64(buf, pos)
assert.Equal(t, e1et, int64(200))
pos += int(unsafe.Sizeof(e1et))
//insert e1, payload
e1Payload := buf[pos:e1NxtPos]
e1r, err := NewPayloadReader(schemapb.DataType_INT64, e1Payload)
assert.Nil(t, err)
e1a, err := e1r.GetInt64FromPayload()
assert.Nil(t, err)
assert.Equal(t, e1a, []int64{1, 2, 3, 4, 5, 6})
err = e1r.Close()
assert.Nil(t, err)
//start of e2
pos = int(e1NxtPos)
//insert e2 header, Timestamp
e2ts := UnsafeReadInt64(buf, pos)
diffts = curts - e2ts
assert.LessOrEqual(t, diffts, maxdiff)
pos += int(unsafe.Sizeof(e2ts))
//insert e2 header, type code
e2tc := UnsafeReadInt8(buf, pos)
assert.Equal(t, EventTypeCode(e2tc), DropPartitionEventType)
pos += int(unsafe.Sizeof(e2tc))
//insert e2 header, Server id
e2svrID := UnsafeReadInt32(buf, pos)
assert.Equal(t, e2svrID, int32(ServerID))
pos += int(unsafe.Sizeof(e2svrID))
//insert e2 header, event length
e2EventLen := UnsafeReadInt32(buf, pos)
pos += int(unsafe.Sizeof(e2EventLen))
//insert e2 header, next position
e2NxtPos := UnsafeReadInt32(buf, pos)
assert.Equal(t, e1NxtPos+e2EventLen, e2NxtPos)
pos += int(unsafe.Sizeof(descNxtPos))
//insert e2 data, start time stamp
e2st := UnsafeReadInt64(buf, pos)
assert.Equal(t, e2st, int64(300))
pos += int(unsafe.Sizeof(e2st))
//insert e2 data, end time stamp
e2et := UnsafeReadInt64(buf, pos)
assert.Equal(t, e2et, int64(400))
pos += int(unsafe.Sizeof(e2et))
//insert e2, payload
e2Payload := buf[pos:]
e2r, err := NewPayloadReader(schemapb.DataType_INT64, e2Payload)
assert.Nil(t, err)
e2a, err := e2r.GetInt64FromPayload()
assert.Nil(t, err)
assert.Equal(t, e2a, []int64{7, 8, 9, 10, 11, 12})
err = e2r.Close()
assert.Nil(t, err)
assert.Equal(t, int(e2NxtPos), len(buf))
//read binlog
r, err := NewBinlogReader(buf)
assert.Nil(t, err)
event1, err := r.NextEventReader()
assert.Nil(t, err)
assert.NotNil(t, event1)
p1, err := event1.GetInt64FromPayload()
assert.Equal(t, p1, []int64{1, 2, 3, 4, 5, 6})
assert.Nil(t, err)
assert.Equal(t, event1.TypeCode, CreatePartitionEventType)
ed1, ok := (event1.eventData).(*createPartitionEventData)
assert.True(t, ok)
assert.Equal(t, ed1.StartTimestamp, Timestamp(100))
assert.Equal(t, ed1.EndTimestamp, Timestamp(200))
event2, err := r.NextEventReader()
assert.Nil(t, err)
assert.NotNil(t, event2)
p2, err := event2.GetInt64FromPayload()
assert.Nil(t, err)
assert.Equal(t, p2, []int64{7, 8, 9, 10, 11, 12})
assert.Equal(t, event2.TypeCode, DropPartitionEventType)
ed2, ok := (event2.eventData).(*dropPartitionEventData)
assert.True(t, ok)
_, ok = (event2.eventData).(*insertEventData)
assert.False(t, ok)
assert.Equal(t, ed2.StartTimestamp, Timestamp(300))
assert.Equal(t, ed2.EndTimestamp, Timestamp(400))
}

View File

@ -223,13 +223,12 @@ func NewInsertBinlogWriter(dataType schemapb.DataType, collectionID, partitionID
},
}, nil
}
func NewDeleteBinlogWriter(dataType schemapb.DataType, collectionID int64) (*DeleteBinlogWriter, error) {
func NewDeleteBinlogWriter(dataType schemapb.DataType) (*DeleteBinlogWriter, error) {
descriptorEvent, err := newDescriptorEvent()
if err != nil {
return nil, err
}
descriptorEvent.PayloadDataType = dataType
descriptorEvent.CollectionID = collectionID
return &DeleteBinlogWriter{
baseBinlogWriter: baseBinlogWriter{
descriptorEvent: *descriptorEvent,
@ -240,13 +239,12 @@ func NewDeleteBinlogWriter(dataType schemapb.DataType, collectionID int64) (*Del
},
}, nil
}
func NewDDLBinlogWriter(dataType schemapb.DataType, collectionID int64) (*DDLBinlogWriter, error) {
func NewDDLBinlogWriter(dataType schemapb.DataType) (*DDLBinlogWriter, error) {
descriptorEvent, err := newDescriptorEvent()
if err != nil {
return nil, err
}
descriptorEvent.PayloadDataType = dataType
descriptorEvent.CollectionID = collectionID
return &DDLBinlogWriter{
baseBinlogWriter: baseBinlogWriter{
descriptorEvent: *descriptorEvent,

View File

@ -54,5 +54,6 @@ func TestBinlogWriterReader(t *testing.T) {
reader, err := binlogReader.NextEventReader()
assert.Nil(t, err)
fmt.Println("reader offset : " + strconv.Itoa(int(binlogReader.currentOffset)))
assert.Nil(t, reader)
}

View File

@ -354,7 +354,7 @@ type DataDefinitionCodec struct {
}
func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequests []string, eventTypes []EventTypeCode) ([]*Blob, error) {
writer, err := NewDDLBinlogWriter(schemapb.DataType_STRING, dataDefinitionCodec.Schema.ID)
writer, err := NewDDLBinlogWriter(schemapb.DataType_STRING)
if err != nil {
return nil, err
}
@ -426,7 +426,7 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ
value: buffer,
})
writer, err = NewDDLBinlogWriter(schemapb.DataType_INT64, dataDefinitionCodec.Schema.ID)
writer, err = NewDDLBinlogWriter(schemapb.DataType_INT64)
if err != nil {
return nil, err
}

View File

@ -37,15 +37,21 @@ func (dsService *dataSyncService) initNodes() {
dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx)
var dmStreamNode Node = newDmInputNode(dsService.ctx)
var ddStreamNode Node = newDDInputNode(dsService.ctx)
var ddNode Node = newDDNode()
var filterDmNode Node = newFilteredDmNode()
var insertBufferNode Node = newInsertBufferNode()
var serviceTimeNode Node = newServiceTimeNode()
dsService.fg.AddNode(&dmStreamNode)
dsService.fg.AddNode(&filterDmNode)
dsService.fg.AddNode(&insertBufferNode)
dsService.fg.AddNode(&serviceTimeNode)
dsService.fg.AddNode(&ddStreamNode)
dsService.fg.AddNode(&filterDmNode)
dsService.fg.AddNode(&ddNode)
dsService.fg.AddNode(&insertBufferNode)
// dmStreamNode
var err = dsService.fg.SetEdges(dmStreamNode.Name(),
[]string{},
[]string{filterDmNode.Name()},
@ -54,27 +60,39 @@ func (dsService *dataSyncService) initNodes() {
log.Fatal("set edges failed in node:", dmStreamNode.Name())
}
// ddStreamNode
err = dsService.fg.SetEdges(ddStreamNode.Name(),
[]string{},
[]string{ddNode.Name()},
)
if err != nil {
log.Fatal("set edges failed in node:", ddStreamNode.Name())
}
// filterDmNode
err = dsService.fg.SetEdges(filterDmNode.Name(),
[]string{dmStreamNode.Name()},
[]string{dmStreamNode.Name(), ddNode.Name()},
[]string{insertBufferNode.Name()},
)
if err != nil {
log.Fatal("set edges failed in node:", filterDmNode.Name())
}
// ddNode
err = dsService.fg.SetEdges(ddNode.Name(),
[]string{ddStreamNode.Name()},
[]string{filterDmNode.Name()},
)
if err != nil {
log.Fatal("set edges failed in node:", ddNode.Name())
}
// insertBufferNode
err = dsService.fg.SetEdges(insertBufferNode.Name(),
[]string{filterDmNode.Name()},
[]string{serviceTimeNode.Name()},
[]string{},
)
if err != nil {
log.Fatal("set edges failed in node:", insertBufferNode.Name())
}
err = dsService.fg.SetEdges(serviceTimeNode.Name(),
[]string{insertBufferNode.Name()},
[]string{},
)
if err != nil {
log.Fatal("set edges failed in node:", serviceTimeNode.Name())
}
}

View File

@ -16,7 +16,6 @@ import (
// NOTE: start pulsar before test
func TestDataSyncService_Start(t *testing.T) {
Params.Init()
const ctxTimeInMillisecond = 200
const closeWithDeadline = true
var ctx context.Context
@ -31,7 +30,7 @@ func TestDataSyncService_Start(t *testing.T) {
}
// init write node
pulsarURL, _ := Params.pulsarAddress()
pulsarURL := Params.PulsarAddress
node := NewWriteNode(ctx, 0)
// test data generate
@ -116,20 +115,30 @@ func TestDataSyncService_Start(t *testing.T) {
// pulsar produce
const receiveBufSize = 1024
producerChannels := Params.insertChannelNames()
insertChannels := Params.InsertChannelNames
ddChannels := Params.DDChannelNames
insertStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize)
insertStream.SetPulsarClient(pulsarURL)
insertStream.CreatePulsarProducers(producerChannels)
insertStream.CreatePulsarProducers(insertChannels)
ddStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize)
ddStream.SetPulsarClient(pulsarURL)
ddStream.CreatePulsarProducers(ddChannels)
var insertMsgStream msgstream.MsgStream = insertStream
insertMsgStream.Start()
var ddMsgStream msgstream.MsgStream = ddStream
ddMsgStream.Start()
err := insertMsgStream.Produce(&msgPack)
assert.NoError(t, err)
err = insertMsgStream.Broadcast(&timeTickMsgPack)
assert.NoError(t, err)
err = ddMsgStream.Broadcast(&timeTickMsgPack)
assert.NoError(t, err)
// dataSync
node.dataSyncService = newDataSyncService(node.ctx)

View File

@ -0,0 +1,47 @@
package writenode
import (
"errors"
"strconv"
)
type ddBuffer struct {
collectionBuffer map[UniqueID]interface{}
partitionBuffer map[UniqueID]interface{}
}
func (d *ddBuffer) addCollection(collectionID UniqueID) error {
if _, ok := d.collectionBuffer[collectionID]; !ok {
return errors.New("collection " + strconv.FormatInt(collectionID, 10) + " is already exists")
}
d.collectionBuffer[collectionID] = nil
return nil
}
func (d *ddBuffer) removeCollection(collectionID UniqueID) error {
if _, ok := d.collectionBuffer[collectionID]; !ok {
return errors.New("cannot found collection " + strconv.FormatInt(collectionID, 10))
}
delete(d.collectionBuffer, collectionID)
return nil
}
func (d *ddBuffer) addPartition(partitionID UniqueID) error {
if _, ok := d.partitionBuffer[partitionID]; !ok {
return errors.New("partition " + strconv.FormatInt(partitionID, 10) + " is already exists")
}
d.partitionBuffer[partitionID] = nil
return nil
}
func (d *ddBuffer) removePartition(partitionID UniqueID) error {
if _, ok := d.partitionBuffer[partitionID]; !ok {
return errors.New("cannot found partition " + strconv.FormatInt(partitionID, 10))
}
delete(d.partitionBuffer, partitionID)
return nil
}

View File

@ -0,0 +1,176 @@
package writenode
import (
"log"
"sort"
"github.com/golang/protobuf/proto"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
)
type ddNode struct {
BaseNode
ddMsg *ddMsg
ddBuffer *ddBuffer
}
func (ddNode *ddNode) Name() string {
return "ddNode"
}
func (ddNode *ddNode) Operate(in []*Msg) []*Msg {
//fmt.Println("Do filterDmNode operation")
if len(in) != 1 {
log.Println("Invalid operate message input in ddNode, input length = ", len(in))
// TODO: add error handling
}
msMsg, ok := (*in[0]).(*MsgStreamMsg)
if !ok {
log.Println("type assertion failed for MsgStreamMsg")
// TODO: add error handling
}
var ddMsg = ddMsg{
collectionRecords: make(map[string][]metaOperateRecord),
partitionRecords: make(map[string][]metaOperateRecord),
timeRange: TimeRange{
timestampMin: msMsg.TimestampMin(),
timestampMax: msMsg.TimestampMax(),
},
}
ddNode.ddMsg = &ddMsg
// sort tsMessages
tsMessages := msMsg.TsMessages()
sort.Slice(tsMessages,
func(i, j int) bool {
return tsMessages[i].BeginTs() < tsMessages[j].BeginTs()
})
// do dd tasks
for _, msg := range tsMessages {
switch msg.Type() {
case internalPb.MsgType_kCreateCollection:
ddNode.createCollection(msg.(*msgstream.CreateCollectionMsg))
case internalPb.MsgType_kDropCollection:
ddNode.dropCollection(msg.(*msgstream.DropCollectionMsg))
case internalPb.MsgType_kCreatePartition:
ddNode.createPartition(msg.(*msgstream.CreatePartitionMsg))
case internalPb.MsgType_kDropPartition:
ddNode.dropPartition(msg.(*msgstream.DropPartitionMsg))
default:
log.Println("Non supporting message type:", msg.Type())
}
}
var res Msg = ddNode.ddMsg
return []*Msg{&res}
}
func (ddNode *ddNode) createCollection(msg *msgstream.CreateCollectionMsg) {
collectionID := msg.CollectionID
err := ddNode.ddBuffer.addCollection(collectionID)
if err != nil {
log.Println(err)
return
}
// TODO: add default partition?
var schema schemapb.CollectionSchema
err = proto.Unmarshal((*msg.Schema).Value, &schema)
if err != nil {
log.Println(err)
return
}
collectionName := schema.Name
ddNode.ddMsg.collectionRecords[collectionName] = append(ddNode.ddMsg.collectionRecords[collectionName],
metaOperateRecord{
createOrDrop: true,
timestamp: msg.Timestamp,
})
// TODO: write dd binlog
}
func (ddNode *ddNode) dropCollection(msg *msgstream.DropCollectionMsg) {
collectionID := msg.CollectionID
err := ddNode.ddBuffer.removeCollection(collectionID)
if err != nil {
log.Println(err)
return
}
collectionName := msg.CollectionName.CollectionName
ddNode.ddMsg.collectionRecords[collectionName] = append(ddNode.ddMsg.collectionRecords[collectionName],
metaOperateRecord{
createOrDrop: false,
timestamp: msg.Timestamp,
})
// TODO: write dd binlog
}
func (ddNode *ddNode) createPartition(msg *msgstream.CreatePartitionMsg) {
partitionID := msg.PartitionID
err := ddNode.ddBuffer.addPartition(partitionID)
if err != nil {
log.Println(err)
return
}
partitionTag := msg.PartitionName.Tag
ddNode.ddMsg.partitionRecords[partitionTag] = append(ddNode.ddMsg.partitionRecords[partitionTag],
metaOperateRecord{
createOrDrop: true,
timestamp: msg.Timestamp,
})
// TODO: write dd binlog
}
func (ddNode *ddNode) dropPartition(msg *msgstream.DropPartitionMsg) {
partitionID := msg.PartitionID
err := ddNode.ddBuffer.removePartition(partitionID)
if err != nil {
log.Println(err)
return
}
partitionTag := msg.PartitionName.Tag
ddNode.ddMsg.partitionRecords[partitionTag] = append(ddNode.ddMsg.partitionRecords[partitionTag],
metaOperateRecord{
createOrDrop: false,
timestamp: msg.Timestamp,
})
// TODO: write dd binlog
}
func newDDNode() *ddNode {
maxQueueLength := Params.FlowGraphMaxQueueLength
maxParallelism := Params.FlowGraphMaxParallelism
baseNode := BaseNode{}
baseNode.SetMaxQueueLength(maxQueueLength)
baseNode.SetMaxParallelism(maxParallelism)
ddBuffer := &ddBuffer{
collectionBuffer: make(map[UniqueID]interface{}),
partitionBuffer: make(map[UniqueID]interface{}),
}
return &ddNode{
BaseNode: baseNode,
ddBuffer: ddBuffer,
}
}

View File

@ -4,11 +4,13 @@ import (
"log"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
)
type filterDmNode struct {
BaseNode
ddMsg *ddMsg
}
func (fdmNode *filterDmNode) Name() string {
@ -16,29 +18,40 @@ func (fdmNode *filterDmNode) Name() string {
}
func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg {
//fmt.Println("Do filterDmNode operation")
if len(in) != 1 {
if len(in) != 2 {
log.Println("Invalid operate message input in filterDmNode, input length = ", len(in))
// TODO: add error handling
}
msMsg, ok := (*in[0]).(*MsgStreamMsg)
msgStreamMsg, ok := (*in[0]).(*MsgStreamMsg)
if !ok {
log.Println("type assertion failed for MsgStreamMsg")
// TODO: add error handling
}
ddMsg, ok := (*in[1]).(*ddMsg)
if !ok {
log.Println("type assertion failed for ddMsg")
// TODO: add error handling
}
fdmNode.ddMsg = ddMsg
var iMsg = insertMsg{
insertMessages: make([]*msgstream.InsertMsg, 0),
timeRange: TimeRange{
timestampMin: msMsg.TimestampMin(),
timestampMax: msMsg.TimestampMax(),
timestampMin: msgStreamMsg.TimestampMin(),
timestampMax: msgStreamMsg.TimestampMax(),
},
}
for _, msg := range msMsg.TsMessages() {
for _, msg := range msgStreamMsg.TsMessages() {
switch msg.Type() {
case internalPb.MsgType_kInsert:
iMsg.insertMessages = append(iMsg.insertMessages, msg.(*msgstream.InsertMsg))
resMsg := fdmNode.filterInvalidInsertMessage(msg.(*msgstream.InsertMsg))
if resMsg != nil {
iMsg.insertMessages = append(iMsg.insertMessages, resMsg)
}
case internalPb.MsgType_kFlush:
iMsg.insertMessages = append(iMsg.insertMessages, msg.(*msgstream.InsertMsg))
// case internalPb.MsgType_kDelete:
@ -52,9 +65,44 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg {
return []*Msg{&res}
}
func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg) *msgstream.InsertMsg {
// No dd record, do all insert requests.
records, ok := fdmNode.ddMsg.collectionRecords[msg.CollectionName]
if !ok {
return msg
}
// If the last record is drop type, all insert requests are invalid.
if !records[len(records)-1].createOrDrop {
return nil
}
// Filter insert requests before last record.
if len(msg.RowIDs) != len(msg.Timestamps) || len(msg.RowIDs) != len(msg.RowData) {
// TODO: what if the messages are misaligned? Here, we ignore those messages and print error
log.Println("Error, misaligned messages detected")
return nil
}
tmpTimestamps := make([]Timestamp, 0)
tmpRowIDs := make([]int64, 0)
tmpRowData := make([]*commonpb.Blob, 0)
targetTimestamp := records[len(records)-1].timestamp
for i, t := range msg.Timestamps {
if t >= targetTimestamp {
tmpTimestamps = append(tmpTimestamps, t)
tmpRowIDs = append(tmpRowIDs, msg.RowIDs[i])
tmpRowData = append(tmpRowData, msg.RowData[i])
}
}
msg.Timestamps = tmpTimestamps
msg.RowIDs = tmpRowIDs
msg.RowData = tmpRowData
return msg
}
func newFilteredDmNode() *filterDmNode {
maxQueueLength := Params.flowGraphMaxQueueLength()
maxParallelism := Params.flowGraphMaxParallelism()
maxQueueLength := Params.FlowGraphMaxQueueLength
maxParallelism := Params.FlowGraphMaxParallelism
baseNode := BaseNode{}
baseNode.SetMaxQueueLength(maxQueueLength)

View File

@ -38,7 +38,7 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
// TODO: add error handling
}
iMsg, ok := (*in[0]).(*insertMsg)
_, ok := (*in[0]).(*insertMsg)
if !ok {
log.Println("type assertion failed for insertMsg")
// TODO: add error handling
@ -64,18 +64,13 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
// log.Printf("t(%d) : %v ", task.Timestamps[0], task.RowData[0])
// }
var res Msg = &serviceTimeMsg{
timeRange: iMsg.timeRange,
}
// TODO
return []*Msg{&res}
return nil
}
func newInsertBufferNode() *insertBufferNode {
maxQueueLength := Params.flowGraphMaxQueueLength()
maxParallelism := Params.flowGraphMaxParallelism()
maxQueueLength := Params.FlowGraphMaxQueueLength
maxParallelism := Params.FlowGraphMaxParallelism
baseNode := BaseNode{}
baseNode.SetMaxQueueLength(maxQueueLength)

View File

@ -2,7 +2,6 @@ package writenode
import (
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
)
@ -18,8 +17,17 @@ type (
timeRange TimeRange
}
schemaUpdateMsg struct {
timeRange TimeRange
ddMsg struct {
// TODO: use collection id
collectionRecords map[string][]metaOperateRecord
// TODO: use partition id
partitionRecords map[string][]metaOperateRecord
timeRange TimeRange
}
metaOperateRecord struct {
createOrDrop bool // create: true, drop: false
timestamp Timestamp
}
insertMsg struct {
@ -32,34 +40,6 @@ type (
deleteMessages []*msgstream.DeleteMsg
timeRange TimeRange
}
serviceTimeMsg struct {
timeRange TimeRange
}
InsertData struct {
insertIDs map[SegmentID][]UniqueID
insertTimestamps map[SegmentID][]Timestamp
insertRecords map[SegmentID][]*commonpb.Blob
insertOffset map[SegmentID]int64
}
DeleteData struct {
deleteIDs map[SegmentID][]UniqueID
deleteTimestamps map[SegmentID][]Timestamp
deleteOffset map[SegmentID]int64
}
DeleteRecord struct {
entityID UniqueID
timestamp Timestamp
segmentID UniqueID
}
DeletePreprocessData struct {
deleteRecords []*DeleteRecord
count int32
}
)
func (ksMsg *key2SegMsg) TimeTick() Timestamp {
@ -70,11 +50,11 @@ func (ksMsg *key2SegMsg) DownStreamNodeIdx() int {
return 0
}
func (suMsg *schemaUpdateMsg) TimeTick() Timestamp {
func (suMsg *ddMsg) TimeTick() Timestamp {
return suMsg.timeRange.timestampMax
}
func (suMsg *schemaUpdateMsg) DownStreamNodeIdx() int {
func (suMsg *ddMsg) DownStreamNodeIdx() int {
return 0
}
@ -93,11 +73,3 @@ func (dMsg *deleteMsg) TimeTick() Timestamp {
func (dMsg *deleteMsg) DownStreamNodeIdx() int {
return 0
}
func (stMsg *serviceTimeMsg) TimeTick() Timestamp {
return stMsg.timeRange.timestampMax
}
func (stMsg *serviceTimeMsg) DownStreamNodeIdx() int {
return 0
}

View File

@ -2,23 +2,19 @@ package writenode
import (
"context"
"log"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
)
func newDmInputNode(ctx context.Context) *flowgraph.InputNode {
receiveBufSize := Params.insertReceiveBufSize()
pulsarBufSize := Params.insertPulsarBufSize()
receiveBufSize := Params.InsertReceiveBufSize
pulsarBufSize := Params.InsertPulsarBufSize
msgStreamURL, err := Params.pulsarAddress()
if err != nil {
log.Fatal(err)
}
msgStreamURL := Params.PulsarAddress
consumeChannels := Params.insertChannelNames()
consumeSubName := Params.msgChannelSubName()
consumeChannels := Params.InsertChannelNames
consumeSubName := Params.MsgChannelSubName
insertStream := msgstream.NewPulsarTtMsgStream(ctx, receiveBufSize)
@ -31,9 +27,32 @@ func newDmInputNode(ctx context.Context) *flowgraph.InputNode {
var stream msgstream.MsgStream = insertStream
maxQueueLength := Params.flowGraphMaxQueueLength()
maxParallelism := Params.flowGraphMaxParallelism()
maxQueueLength := Params.FlowGraphMaxQueueLength
maxParallelism := Params.FlowGraphMaxParallelism
node := flowgraph.NewInputNode(&stream, "dmInputNode", maxQueueLength, maxParallelism)
return node
}
func newDDInputNode(ctx context.Context) *flowgraph.InputNode {
receiveBufSize := Params.DDReceiveBufSize
pulsarBufSize := Params.DDPulsarBufSize
msgStreamURL := Params.PulsarAddress
consumeChannels := Params.DDChannelNames
consumeSubName := Params.MsgChannelSubName
ddStream := msgstream.NewPulsarTtMsgStream(ctx, receiveBufSize)
ddStream.SetPulsarClient(msgStreamURL)
unmarshalDispatcher := msgstream.NewUnmarshalDispatcher()
ddStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)
var stream msgstream.MsgStream = ddStream
maxQueueLength := Params.FlowGraphMaxQueueLength
maxParallelism := Params.FlowGraphMaxParallelism
node := flowgraph.NewInputNode(&stream, "ddInputNode", maxQueueLength, maxParallelism)
return node
}

View File

@ -1,46 +0,0 @@
package writenode
import (
"log"
)
type serviceTimeNode struct {
BaseNode
}
func (stNode *serviceTimeNode) Name() string {
return "stNode"
}
func (stNode *serviceTimeNode) Operate(in []*Msg) []*Msg {
if len(in) != 1 {
log.Println("Invalid operate message input in serviceTimeNode, input length = ", len(in))
// TODO: add error handling
}
// serviceTimeMsg, ok := (*in[0]).(*serviceTimeMsg)
_, ok := (*in[0]).(*serviceTimeMsg)
if !ok {
log.Println("type assertion failed for serviceTimeMsg")
// TODO: add error handling
}
// update service time
// (*(*stNode.replica).getTSafe()).set(serviceTimeMsg.timeRange.timestampMax)
// fmt.Println("update tSafe to:", getPhysicalTime(serviceTimeMsg.timeRange.timestampMax))
return nil
}
func newServiceTimeNode() *serviceTimeNode {
maxQueueLength := Params.flowGraphMaxQueueLength()
maxParallelism := Params.flowGraphMaxParallelism()
baseNode := BaseNode{}
baseNode.SetMaxQueueLength(maxQueueLength)
baseNode.SetMaxParallelism(maxParallelism)
return &serviceTimeNode{
BaseNode: baseNode,
}
}

View File

@ -10,6 +10,30 @@ import (
type ParamTable struct {
paramtable.BaseTable
PulsarAddress string
WriteNodeID UniqueID
WriteNodeNum int
WriteNodeTimeTickChannelName string
FlowGraphMaxQueueLength int32
FlowGraphMaxParallelism int32
// dm
InsertChannelNames []string
InsertChannelRange []int
InsertReceiveBufSize int64
InsertPulsarBufSize int64
// dd
DDChannelNames []string
DDReceiveBufSize int64
DDPulsarBufSize int64
MsgChannelSubName string
DefaultPartitionTag string
SliceIndex int
}
var Params ParamTable
@ -30,18 +54,35 @@ func (p *ParamTable) Init() {
writeNodeIDStr = strconv.Itoa(int(writeNodeIDList[0]))
}
}
p.Save("_writeNodeID", writeNodeIDStr)
}
func (p *ParamTable) pulsarAddress() (string, error) {
url, err := p.Load("_PulsarAddress")
err = p.Save("_writeNodeID", writeNodeIDStr)
if err != nil {
panic(err)
}
return url, nil
p.initPulsarAddress()
p.initWriteNodeID()
p.initWriteNodeNum()
p.initWriteNodeTimeTickChannelName()
p.initMsgChannelSubName()
p.initDefaultPartitionTag()
p.initSliceIndex()
p.initFlowGraphMaxQueueLength()
p.initFlowGraphMaxParallelism()
p.initInsertChannelNames()
p.initInsertChannelRange()
p.initInsertReceiveBufSize()
p.initInsertPulsarBufSize()
p.initDDChannelNames()
p.initDDReceiveBufSize()
p.initDDPulsarBufSize()
}
func (p *ParamTable) WriteNodeID() UniqueID {
func (p *ParamTable) initWriteNodeID() {
writeNodeID, err := p.Load("_writeNodeID")
if err != nil {
panic(err)
@ -50,187 +91,153 @@ func (p *ParamTable) WriteNodeID() UniqueID {
if err != nil {
panic(err)
}
return UniqueID(id)
p.WriteNodeID = UniqueID(id)
}
func (p *ParamTable) insertChannelRange() []int {
func (p *ParamTable) initPulsarAddress() {
url, err := p.Load("_PulsarAddress")
if err != nil {
panic(err)
}
p.PulsarAddress = url
}
func (p *ParamTable) initInsertChannelRange() {
insertChannelRange, err := p.Load("msgChannel.channelRange.insert")
if err != nil {
panic(err)
}
return paramtable.ConvertRangeToIntRange(insertChannelRange, ",")
p.InsertChannelRange = paramtable.ConvertRangeToIntRange(insertChannelRange, ",")
}
// advanced params
// stats
func (p *ParamTable) statsPublishInterval() int {
return p.ParseInt("writeNode.stats.publishInterval")
}
// dataSync:
func (p *ParamTable) flowGraphMaxQueueLength() int32 {
return p.ParseInt32("writeNode.dataSync.flowGraph.maxQueueLength")
func (p *ParamTable) initFlowGraphMaxQueueLength() {
p.FlowGraphMaxQueueLength = p.ParseInt32("writeNode.dataSync.flowGraph.maxQueueLength")
}
func (p *ParamTable) flowGraphMaxParallelism() int32 {
return p.ParseInt32("writeNode.dataSync.flowGraph.maxParallelism")
func (p *ParamTable) initFlowGraphMaxParallelism() {
p.FlowGraphMaxParallelism = p.ParseInt32("writeNode.dataSync.flowGraph.maxParallelism")
}
// msgStream
func (p *ParamTable) insertReceiveBufSize() int64 {
return p.ParseInt64("writeNode.msgStream.insert.recvBufSize")
func (p *ParamTable) initInsertReceiveBufSize() {
p.InsertReceiveBufSize = p.ParseInt64("writeNode.msgStream.insert.recvBufSize")
}
func (p *ParamTable) insertPulsarBufSize() int64 {
return p.ParseInt64("writeNode.msgStream.insert.pulsarBufSize")
func (p *ParamTable) initInsertPulsarBufSize() {
p.InsertPulsarBufSize = p.ParseInt64("writeNode.msgStream.insert.pulsarBufSize")
}
func (p *ParamTable) searchReceiveBufSize() int64 {
return p.ParseInt64("writeNode.msgStream.search.recvBufSize")
}
func (p *ParamTable) searchPulsarBufSize() int64 {
return p.ParseInt64("writeNode.msgStream.search.pulsarBufSize")
}
func (p *ParamTable) searchResultReceiveBufSize() int64 {
return p.ParseInt64("writeNode.msgStream.searchResult.recvBufSize")
}
func (p *ParamTable) statsReceiveBufSize() int64 {
return p.ParseInt64("writeNode.msgStream.stats.recvBufSize")
}
func (p *ParamTable) etcdAddress() string {
etcdAddress, err := p.Load("_EtcdAddress")
func (p *ParamTable) initDDReceiveBufSize() {
revBufSize, err := p.Load("writeNode.msgStream.dataDefinition.recvBufSize")
if err != nil {
panic(err)
}
return etcdAddress
bufSize, err := strconv.Atoi(revBufSize)
if err != nil {
panic(err)
}
p.DDReceiveBufSize = int64(bufSize)
}
func (p *ParamTable) metaRootPath() string {
rootPath, err := p.Load("etcd.rootPath")
func (p *ParamTable) initDDPulsarBufSize() {
pulsarBufSize, err := p.Load("writeNode.msgStream.dataDefinition.pulsarBufSize")
if err != nil {
panic(err)
}
subPath, err := p.Load("etcd.metaSubPath")
bufSize, err := strconv.Atoi(pulsarBufSize)
if err != nil {
panic(err)
}
return rootPath + "/" + subPath
p.DDPulsarBufSize = int64(bufSize)
}
func (p *ParamTable) gracefulTime() int64 {
gracefulTime, err := p.Load("writeNode.gracefulTime")
if err != nil {
panic(err)
}
time, err := strconv.Atoi(gracefulTime)
if err != nil {
panic(err)
}
return int64(time)
}
func (p *ParamTable) initInsertChannelNames() {
func (p *ParamTable) insertChannelNames() []string {
prefix, err := p.Load("msgChannel.chanNamePrefix.insert")
if err != nil {
log.Fatal(err)
}
prefix += "-"
channelRange, err := p.Load("msgChannel.channelRange.insert")
if err != nil {
panic(err)
}
channelIDs := paramtable.ConvertRangeToIntSlice(channelRange, ",")
var ret []string
for _, ID := range channelIDs {
ret = append(ret, prefix+strconv.Itoa(ID))
}
sep := len(channelIDs) / p.writeNodeNum()
index := p.sliceIndex()
sep := len(channelIDs) / p.WriteNodeNum
index := p.SliceIndex
if index == -1 {
panic("writeNodeID not Match with Config")
}
start := index * sep
return ret[start : start+sep]
p.InsertChannelNames = ret[start : start+sep]
}
func (p *ParamTable) searchChannelNames() []string {
prefix, err := p.Load("msgChannel.chanNamePrefix.search")
if err != nil {
log.Fatal(err)
}
channelRange, err := p.Load("msgChannel.channelRange.search")
if err != nil {
panic(err)
}
channelIDs := paramtable.ConvertRangeToIntSlice(channelRange, ",")
var ret []string
for _, ID := range channelIDs {
ret = append(ret, prefix+strconv.Itoa(ID))
}
return ret
}
func (p *ParamTable) searchResultChannelNames() []string {
prefix, err := p.Load("msgChannel.chanNamePrefix.searchResult")
if err != nil {
log.Fatal(err)
}
prefix += "-"
channelRange, err := p.Load("msgChannel.channelRange.searchResult")
if err != nil {
panic(err)
}
channelIDs := paramtable.ConvertRangeToIntSlice(channelRange, ",")
var ret []string
for _, ID := range channelIDs {
ret = append(ret, prefix+strconv.Itoa(ID))
}
return ret
}
func (p *ParamTable) msgChannelSubName() string {
// TODO: subName = namePrefix + "-" + writeNodeID, writeNodeID is assigned by master
func (p *ParamTable) initMsgChannelSubName() {
name, err := p.Load("msgChannel.subNamePrefix.writeNodeSubNamePrefix")
if err != nil {
log.Panic(err)
}
writeNodeIDStr, err := p.Load("_WriteNodeID")
writeNodeIDStr, err := p.Load("_writeNodeID")
if err != nil {
panic(err)
}
return name + "-" + writeNodeIDStr
p.MsgChannelSubName = name + "-" + writeNodeIDStr
}
func (p *ParamTable) writeNodeTimeTickChannelName() string {
func (p *ParamTable) initDDChannelNames() {
prefix, err := p.Load("msgChannel.chanNamePrefix.dataDefinition")
if err != nil {
panic(err)
}
prefix += "-"
iRangeStr, err := p.Load("msgChannel.channelRange.dataDefinition")
if err != nil {
panic(err)
}
channelIDs := paramtable.ConvertRangeToIntSlice(iRangeStr, ",")
var ret []string
for _, ID := range channelIDs {
ret = append(ret, prefix+strconv.Itoa(ID))
}
p.DDChannelNames = ret
}
func (p *ParamTable) initDefaultPartitionTag() {
defaultTag, err := p.Load("common.defaultPartitionTag")
if err != nil {
panic(err)
}
p.DefaultPartitionTag = defaultTag
}
func (p *ParamTable) initWriteNodeTimeTickChannelName() {
channels, err := p.Load("msgChannel.chanNamePrefix.writeNodeTimeTick")
if err != nil {
panic(err)
}
return channels
p.WriteNodeTimeTickChannelName = channels
}
func (p *ParamTable) sliceIndex() int {
writeNodeID := p.WriteNodeID()
func (p *ParamTable) initSliceIndex() {
writeNodeID := p.WriteNodeID
writeNodeIDList := p.WriteNodeIDList()
for i := 0; i < len(writeNodeIDList); i++ {
if writeNodeID == writeNodeIDList[i] {
return i
p.SliceIndex = i
return
}
}
return -1
p.SliceIndex = -1
}
func (p *ParamTable) writeNodeNum() int {
return len(p.WriteNodeIDList())
func (p *ParamTable) initWriteNodeNum() {
p.WriteNodeNum = len(p.WriteNodeIDList())
}

View File

@ -12,101 +12,58 @@ func TestParamTable_WriteNode(t *testing.T) {
Params.Init()
t.Run("Test PulsarAddress", func(t *testing.T) {
address, err := Params.pulsarAddress()
assert.NoError(t, err)
address := Params.PulsarAddress
split := strings.Split(address, ":")
assert.Equal(t, split[0], "pulsar")
assert.Equal(t, split[len(split)-1], "6650")
})
t.Run("Test WriteNodeID", func(t *testing.T) {
id := Params.WriteNodeID()
id := Params.WriteNodeID
assert.Equal(t, id, UniqueID(3))
})
t.Run("Test insertChannelRange", func(t *testing.T) {
channelRange := Params.insertChannelRange()
channelRange := Params.InsertChannelRange
assert.Equal(t, len(channelRange), 2)
assert.Equal(t, channelRange[0], 0)
assert.Equal(t, channelRange[1], 2)
})
t.Run("Test statsServiceTimeInterval", func(t *testing.T) {
interval := Params.statsPublishInterval()
assert.Equal(t, interval, 1000)
})
t.Run("Test statsMsgStreamReceiveBufSize", func(t *testing.T) {
bufSize := Params.statsReceiveBufSize()
assert.Equal(t, bufSize, int64(64))
})
t.Run("Test insertMsgStreamReceiveBufSize", func(t *testing.T) {
bufSize := Params.insertReceiveBufSize()
bufSize := Params.InsertReceiveBufSize
assert.Equal(t, bufSize, int64(1024))
})
t.Run("Test searchMsgStreamReceiveBufSize", func(t *testing.T) {
bufSize := Params.searchReceiveBufSize()
assert.Equal(t, bufSize, int64(512))
})
t.Run("Test searchResultMsgStreamReceiveBufSize", func(t *testing.T) {
bufSize := Params.searchResultReceiveBufSize()
assert.Equal(t, bufSize, int64(64))
})
t.Run("Test searchPulsarBufSize", func(t *testing.T) {
bufSize := Params.searchPulsarBufSize()
assert.Equal(t, bufSize, int64(512))
})
t.Run("Test insertPulsarBufSize", func(t *testing.T) {
bufSize := Params.insertPulsarBufSize()
bufSize := Params.InsertPulsarBufSize
assert.Equal(t, bufSize, int64(1024))
})
t.Run("Test flowGraphMaxQueueLength", func(t *testing.T) {
length := Params.flowGraphMaxQueueLength()
length := Params.FlowGraphMaxQueueLength
assert.Equal(t, length, int32(1024))
})
t.Run("Test flowGraphMaxParallelism", func(t *testing.T) {
maxParallelism := Params.flowGraphMaxParallelism()
maxParallelism := Params.FlowGraphMaxParallelism
assert.Equal(t, maxParallelism, int32(1024))
})
t.Run("Test insertChannelNames", func(t *testing.T) {
names := Params.insertChannelNames()
names := Params.InsertChannelNames
assert.Equal(t, len(names), 2)
assert.Equal(t, names[0], "insert0")
assert.Equal(t, names[1], "insert1")
})
t.Run("Test searchChannelNames", func(t *testing.T) {
names := Params.searchChannelNames()
assert.Equal(t, len(names), 1)
assert.Equal(t, names[0], "search0")
})
t.Run("Test searchResultChannelName", func(t *testing.T) {
names := Params.searchResultChannelNames()
assert.Equal(t, len(names), 1)
assert.Equal(t, names[0], "searchResult-0")
assert.Equal(t, names[0], "insert-0")
assert.Equal(t, names[1], "insert-1")
})
t.Run("Test msgChannelSubName", func(t *testing.T) {
name := Params.msgChannelSubName()
name := Params.MsgChannelSubName
assert.Equal(t, name, "writeNode-3")
})
t.Run("Test timeTickChannelName", func(t *testing.T) {
name := Params.writeNodeTimeTickChannelName()
name := Params.WriteNodeTimeTickChannelName
assert.Equal(t, name, "writeNodeTimeTick")
})
t.Run("Test metaRootPath", func(t *testing.T) {
path := Params.metaRootPath()
assert.Equal(t, path, "by-dev/meta")
})
}

View File

@ -0,0 +1,32 @@
package writenode
import (
"fmt"
"math/rand"
"os"
"strconv"
"testing"
)
func makeNewChannelNames(names []string, suffix string) []string {
var ret []string
for _, name := range names {
ret = append(ret, name+suffix)
}
return ret
}
func refreshChannelNames() {
suffix := "-test-write-node" + strconv.FormatInt(rand.Int63n(100), 10)
Params.DDChannelNames = makeNewChannelNames(Params.DDChannelNames, suffix)
Params.InsertChannelNames = makeNewChannelNames(Params.InsertChannelNames, suffix)
}
func TestMain(m *testing.M) {
Params.Init()
refreshChannelNames()
p := Params
fmt.Println(p)
exitCode := m.Run()
os.Exit(exitCode)
}