Add data service

Signed-off-by: sunby <bingyi.sun@zilliz.com>
pull/4973/head^2
sunby 2021-01-15 17:09:41 +08:00 committed by yefu.chen
parent e6225a7030
commit 67e6ea8051
19 changed files with 1677 additions and 1473 deletions

View File

@ -0,0 +1,47 @@
package dataservice
import "fmt"
type (
channelRange []string
insertChannelMapper struct {
channelRanges []channelRange
}
)
func (cr channelRange) Contains(channelName string) bool {
for _, name := range cr {
if name == channelName {
return true
}
}
return false
}
func newInsertChannelMapper() *insertChannelMapper {
mapper := &insertChannelMapper{channelRanges: make([]channelRange, Params.QueryNodeNum)}
channelNames, numOfChannels, numOfQueryNodes := Params.InsertChannelNames, len(Params.InsertChannelNames), Params.QueryNodeNum
div, rem := numOfChannels/numOfQueryNodes, numOfChannels%numOfQueryNodes
for i, j := 0, 0; i < numOfChannels; j++ {
numOfRange := div
if j < rem {
numOfRange++
}
cRange := channelRange{}
k := i + numOfRange
for ; i < k; i++ {
cRange = append(cRange, channelNames[i])
}
mapper.channelRanges = append(mapper.channelRanges, cRange)
}
return mapper
}
func (mapper *insertChannelMapper) GetChannelRange(channelName string) (channelRange, error) {
for _, cr := range mapper.channelRanges {
if cr.Contains(channelName) {
return cr, nil
}
}
return nil, fmt.Errorf("channel name %s not found", channelName)
}

View File

@ -0,0 +1,5 @@
package dataservice
type DataService struct {
segAllocator segmentAllocator
}

View File

@ -0,0 +1,40 @@
package dataservice
import (
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"golang.org/x/net/context"
)
func (ds *DataService) RegisterNode(context.Context, *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error) {
return nil, nil
}
func (ds *DataService) Flush(context.Context, *datapb.FlushRequest) (*commonpb.Status, error) {
return nil, nil
}
func (ds *DataService) AssignSegmentID(ctx context.Context, request *datapb.AssignSegIDRequest) (*datapb.AssignSegIDResponse, error) {
//for _, req := range request.SegIDRequests {
// segmentID, retCount, expireTs, err := ds.segAllocator.AllocSegment(req.CollectionID, req.PartitionID, req.ChannelID, int(req.Count))
// if err != nil {
// log.Printf()
// }
//}
return nil, nil
}
func (ds *DataService) ShowSegments(context.Context, *datapb.ShowSegmentRequest) (*datapb.ShowSegmentResponse, error) {
return nil, nil
}
func (ds *DataService) GetSegmentStates(context.Context, *datapb.SegmentStatesRequest) (*datapb.SegmentStatesResponse, error) {
return nil, nil
}
func (ds *DataService) GetInsertBinlogPaths(context.Context, *datapb.InsertBinlogPathRequest) (*datapb.InsertBinlogPathsResponse, error) {
return nil, nil
}
func (ds *DataService) GetInsertChannels(context.Context, *datapb.InsertChannelRequest) (*internalpb2.StringList, error) {
return nil, nil
}

View File

@ -0,0 +1,180 @@
package dataservice
import (
"fmt"
"strconv"
"sync"
log "github.com/sirupsen/logrus"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"github.com/golang/protobuf/proto"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/kv"
)
type (
UniqueID = typeutil.UniqueID
Timestamp = typeutil.Timestamp
collectionInfo struct {
ID UniqueID
Schema *schemapb.CollectionSchema
}
meta struct {
client kv.TxnBase // client of a reliable kv service, i.e. etcd client
collID2Info map[UniqueID]*collectionInfo // collection id to collection info
segID2Info map[UniqueID]*datapb.SegmentInfo // segment id to segment info
ddLock sync.RWMutex
}
)
func NewMetaTable(kv kv.TxnBase) (*meta, error) {
mt := &meta{
client: kv,
collID2Info: make(map[UniqueID]*collectionInfo),
segID2Info: make(map[UniqueID]*datapb.SegmentInfo),
}
err := mt.reloadFromKV()
if err != nil {
return nil, err
}
return mt, nil
}
func (mt *meta) reloadFromKV() error {
_, values, err := mt.client.LoadWithPrefix("segment")
if err != nil {
return err
}
for _, value := range values {
segmentInfo := &datapb.SegmentInfo{}
err = proto.UnmarshalText(value, segmentInfo)
if err != nil {
return err
}
mt.segID2Info[segmentInfo.SegmentID] = segmentInfo
}
return nil
}
func (mt *meta) AddCollection(collectionInfo *collectionInfo) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
if _, ok := mt.collID2Info[collectionInfo.ID]; ok {
return fmt.Errorf("collection %s with id %d already exist", collectionInfo.Schema.Name, collectionInfo.ID)
}
mt.collID2Info[collectionInfo.ID] = collectionInfo
return nil
}
func (mt *meta) DropCollection(collID UniqueID) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
if _, ok := mt.collID2Info[collID]; !ok {
return errors.Errorf("can't find collection. id = " + strconv.FormatInt(collID, 10))
}
delete(mt.collID2Info, collID)
for id, segment := range mt.segID2Info {
if segment.CollectionID != collID {
continue
}
delete(mt.segID2Info, id)
if err := mt.removeSegmentInfo(id); err != nil {
log.Printf("remove segment info failed, %s", err.Error())
_ = mt.reloadFromKV()
}
}
return nil
}
func (mt *meta) HasCollection(collID UniqueID) bool {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
_, ok := mt.collID2Info[collID]
return ok
}
func (mt *meta) AddSegment(segmentInfo *datapb.SegmentInfo) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
if _, ok := mt.segID2Info[segmentInfo.SegmentID]; !ok {
return fmt.Errorf("segment %d already exist", segmentInfo.SegmentID)
}
mt.segID2Info[segmentInfo.SegmentID] = segmentInfo
if err := mt.saveSegmentInfo(segmentInfo); err != nil {
_ = mt.reloadFromKV()
return err
}
return nil
}
func (mt *meta) UpdateSegment(segmentInfo *datapb.SegmentInfo) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
mt.segID2Info[segmentInfo.SegmentID] = segmentInfo
if err := mt.saveSegmentInfo(segmentInfo); err != nil {
_ = mt.reloadFromKV()
return err
}
return nil
}
func (mt *meta) GetSegmentByID(segID UniqueID) (*datapb.SegmentInfo, error) {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
segmentInfo, ok := mt.segID2Info[segID]
if !ok {
return nil, errors.Errorf("GetSegmentByID:can't find segment id = %d", segID)
}
return segmentInfo, nil
}
func (mt *meta) CloseSegment(segID UniqueID, closeTs Timestamp) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
segInfo, ok := mt.segID2Info[segID]
if !ok {
return errors.Errorf("DropSegment:can't find segment id = " + strconv.FormatInt(segID, 10))
}
segInfo.CloseTime = closeTs
err := mt.saveSegmentInfo(segInfo)
if err != nil {
_ = mt.reloadFromKV()
return err
}
return nil
}
func (mt *meta) GetCollection(collectionID UniqueID) (*collectionInfo, error) {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
collectionInfo, ok := mt.collID2Info[collectionID]
if !ok {
return nil, fmt.Errorf("collection %d not found", collectionID)
}
return collectionInfo, nil
}
func (mt *meta) saveSegmentInfo(segmentInfo *datapb.SegmentInfo) error {
segBytes := proto.MarshalTextString(segmentInfo)
return mt.client.Save("/segment/"+strconv.FormatInt(segmentInfo.SegmentID, 10), segBytes)
}
func (mt *meta) removeSegmentInfo(segID UniqueID) error {
return mt.client.Remove("/segment/" + strconv.FormatInt(segID, 10))
}

View File

@ -0,0 +1,370 @@
package dataservice
import (
"log"
"strconv"
"strings"
"github.com/zilliztech/milvus-distributed/internal/util/paramtable"
"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
type ParamTable struct {
paramtable.BaseTable
Address string
Port int
EtcdAddress string
MetaRootPath string
KvRootPath string
WriteNodeSegKvSubPath string
PulsarAddress string
IndexBuilderAddress string
// nodeID
ProxyIDList []typeutil.UniqueID
WriteNodeIDList []typeutil.UniqueID
TopicNum int
QueryNodeNum int
SoftTimeTickBarrierInterval typeutil.Timestamp
// segment
SegmentSize float64
SegmentSizeFactor float64
DefaultRecordSize int64
MinSegIDAssignCnt int64
MaxSegIDAssignCnt int64
SegIDAssignExpiration int64
// msgChannel
ProxyTimeTickChannelNames []string
WriteNodeTimeTickChannelNames []string
DDChannelNames []string
InsertChannelNames []string
K2SChannelNames []string
QueryNodeStatsChannelName string
MsgChannelSubName string
MaxPartitionNum int64
DefaultPartitionTag string
LoadIndexChannelNames []string
}
var Params ParamTable
func (p *ParamTable) Init() {
// load yaml
p.BaseTable.Init()
err := p.LoadYaml("advanced/master.yaml")
if err != nil {
panic(err)
}
// set members
p.initAddress()
p.initPort()
p.initEtcdAddress()
p.initMetaRootPath()
p.initKvRootPath()
p.initWriteNodeSegKvSubPath()
p.initPulsarAddress()
p.initIndexBuilderAddress()
p.initProxyIDList()
p.initWriteNodeIDList()
p.initTopicNum()
p.initQueryNodeNum()
p.initSoftTimeTickBarrierInterval()
p.initSegmentSize()
p.initSegmentSizeFactor()
p.initDefaultRecordSize()
p.initMinSegIDAssignCnt()
p.initMaxSegIDAssignCnt()
p.initSegIDAssignExpiration()
p.initProxyTimeTickChannelNames()
p.initWriteNodeTimeTickChannelNames()
p.initInsertChannelNames()
p.initDDChannelNames()
p.initK2SChannelNames()
p.initQueryNodeStatsChannelName()
p.initMsgChannelSubName()
p.initMaxPartitionNum()
p.initDefaultPartitionTag()
p.initLoadIndexChannelNames()
}
func (p *ParamTable) initAddress() {
masterAddress, err := p.Load("master.address")
if err != nil {
panic(err)
}
p.Address = masterAddress
}
func (p *ParamTable) initPort() {
p.Port = p.ParseInt("master.port")
}
func (p *ParamTable) initEtcdAddress() {
addr, err := p.Load("_EtcdAddress")
if err != nil {
panic(err)
}
p.EtcdAddress = addr
}
func (p *ParamTable) initPulsarAddress() {
addr, err := p.Load("_PulsarAddress")
if err != nil {
panic(err)
}
p.PulsarAddress = addr
}
func (p *ParamTable) initIndexBuilderAddress() {
ret, err := p.Load("_IndexBuilderAddress")
if err != nil {
panic(err)
}
p.IndexBuilderAddress = ret
}
func (p *ParamTable) initMetaRootPath() {
rootPath, err := p.Load("etcd.rootPath")
if err != nil {
panic(err)
}
subPath, err := p.Load("etcd.metaSubPath")
if err != nil {
panic(err)
}
p.MetaRootPath = rootPath + "/" + subPath
}
func (p *ParamTable) initKvRootPath() {
rootPath, err := p.Load("etcd.rootPath")
if err != nil {
panic(err)
}
subPath, err := p.Load("etcd.kvSubPath")
if err != nil {
panic(err)
}
p.KvRootPath = rootPath + "/" + subPath
}
func (p *ParamTable) initWriteNodeSegKvSubPath() {
subPath, err := p.Load("etcd.writeNodeSegKvSubPath")
if err != nil {
panic(err)
}
p.WriteNodeSegKvSubPath = subPath + "/"
}
func (p *ParamTable) initTopicNum() {
iRangeStr, err := p.Load("msgChannel.channelRange.insert")
if err != nil {
panic(err)
}
rangeSlice := paramtable.ConvertRangeToIntRange(iRangeStr, ",")
p.TopicNum = rangeSlice[1] - rangeSlice[0]
}
func (p *ParamTable) initSegmentSize() {
p.SegmentSize = p.ParseFloat("master.segment.size")
}
func (p *ParamTable) initSegmentSizeFactor() {
p.SegmentSizeFactor = p.ParseFloat("master.segment.sizeFactor")
}
func (p *ParamTable) initDefaultRecordSize() {
p.DefaultRecordSize = p.ParseInt64("master.segment.defaultSizePerRecord")
}
func (p *ParamTable) initMinSegIDAssignCnt() {
p.MinSegIDAssignCnt = p.ParseInt64("master.segment.minIDAssignCnt")
}
func (p *ParamTable) initMaxSegIDAssignCnt() {
p.MaxSegIDAssignCnt = p.ParseInt64("master.segment.maxIDAssignCnt")
}
func (p *ParamTable) initSegIDAssignExpiration() {
p.SegIDAssignExpiration = p.ParseInt64("master.segment.IDAssignExpiration")
}
func (p *ParamTable) initQueryNodeNum() {
p.QueryNodeNum = len(p.QueryNodeIDList())
}
func (p *ParamTable) initQueryNodeStatsChannelName() {
channels, err := p.Load("msgChannel.chanNamePrefix.queryNodeStats")
if err != nil {
panic(err)
}
p.QueryNodeStatsChannelName = channels
}
func (p *ParamTable) initProxyIDList() {
p.ProxyIDList = p.BaseTable.ProxyIDList()
}
func (p *ParamTable) initProxyTimeTickChannelNames() {
ch, err := p.Load("msgChannel.chanNamePrefix.proxyTimeTick")
if err != nil {
log.Panic(err)
}
id, err := p.Load("nodeID.proxyIDList")
if err != nil {
log.Panicf("load proxy id list error, %s", err.Error())
}
ids := strings.Split(id, ",")
channels := make([]string, 0, len(ids))
for _, i := range ids {
_, err := strconv.ParseInt(i, 10, 64)
if err != nil {
log.Panicf("load proxy id list error, %s", err.Error())
}
channels = append(channels, ch+"-"+i)
}
p.ProxyTimeTickChannelNames = channels
}
func (p *ParamTable) initMsgChannelSubName() {
name, err := p.Load("msgChannel.subNamePrefix.masterSubNamePrefix")
if err != nil {
log.Panic(err)
}
p.MsgChannelSubName = name
}
func (p *ParamTable) initSoftTimeTickBarrierInterval() {
t, err := p.Load("master.timeSync.softTimeTickBarrierInterval")
if err != nil {
log.Panic(err)
}
v, err := strconv.ParseInt(t, 10, 64)
if err != nil {
log.Panic(err)
}
p.SoftTimeTickBarrierInterval = tsoutil.ComposeTS(v, 0)
}
func (p *ParamTable) initWriteNodeIDList() {
p.WriteNodeIDList = p.BaseTable.WriteNodeIDList()
}
func (p *ParamTable) initWriteNodeTimeTickChannelNames() {
ch, err := p.Load("msgChannel.chanNamePrefix.writeNodeTimeTick")
if err != nil {
log.Fatal(err)
}
id, err := p.Load("nodeID.writeNodeIDList")
if err != nil {
log.Panicf("load write node id list error, %s", err.Error())
}
ids := strings.Split(id, ",")
channels := make([]string, 0, len(ids))
for _, i := range ids {
_, err := strconv.ParseInt(i, 10, 64)
if err != nil {
log.Panicf("load write node id list error, %s", err.Error())
}
channels = append(channels, ch+"-"+i)
}
p.WriteNodeTimeTickChannelNames = channels
}
func (p *ParamTable) initDDChannelNames() {
prefix, err := p.Load("msgChannel.chanNamePrefix.dataDefinition")
if err != nil {
panic(err)
}
prefix += "-"
iRangeStr, err := p.Load("msgChannel.channelRange.dataDefinition")
if err != nil {
panic(err)
}
channelIDs := paramtable.ConvertRangeToIntSlice(iRangeStr, ",")
var ret []string
for _, ID := range channelIDs {
ret = append(ret, prefix+strconv.Itoa(ID))
}
p.DDChannelNames = ret
}
func (p *ParamTable) initInsertChannelNames() {
prefix, err := p.Load("msgChannel.chanNamePrefix.insert")
if err != nil {
panic(err)
}
prefix += "-"
iRangeStr, err := p.Load("msgChannel.channelRange.insert")
if err != nil {
panic(err)
}
channelIDs := paramtable.ConvertRangeToIntSlice(iRangeStr, ",")
var ret []string
for _, ID := range channelIDs {
ret = append(ret, prefix+strconv.Itoa(ID))
}
p.InsertChannelNames = ret
}
func (p *ParamTable) initK2SChannelNames() {
prefix, err := p.Load("msgChannel.chanNamePrefix.k2s")
if err != nil {
panic(err)
}
prefix += "-"
iRangeStr, err := p.Load("msgChannel.channelRange.k2s")
if err != nil {
panic(err)
}
channelIDs := paramtable.ConvertRangeToIntSlice(iRangeStr, ",")
var ret []string
for _, ID := range channelIDs {
ret = append(ret, prefix+strconv.Itoa(ID))
}
p.K2SChannelNames = ret
}
func (p *ParamTable) initMaxPartitionNum() {
str, err := p.Load("master.maxPartitionNum")
if err != nil {
panic(err)
}
maxPartitionNum, err := strconv.ParseInt(str, 10, 64)
if err != nil {
panic(err)
}
p.MaxPartitionNum = maxPartitionNum
}
func (p *ParamTable) initDefaultPartitionTag() {
defaultTag, err := p.Load("common.defaultPartitionTag")
if err != nil {
panic(err)
}
p.DefaultPartitionTag = defaultTag
}
func (p *ParamTable) initLoadIndexChannelNames() {
loadIndexChannelName, err := p.Load("msgChannel.chanNamePrefix.cmd")
if err != nil {
panic(err)
}
p.LoadIndexChannelNames = []string{loadIndexChannelName}
}

View File

@ -0,0 +1,250 @@
package dataservice
import (
"fmt"
"strconv"
"sync"
"time"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
)
type errRemainInSufficient struct {
requestRows int
}
func newErrRemainInSufficient(requestRows int) *errRemainInSufficient {
return &errRemainInSufficient{requestRows: requestRows}
}
func (err *errRemainInSufficient) Error() string {
return "segment remaining is insufficient for" + strconv.Itoa(err.requestRows)
}
// segmentAllocator is used to allocate rows for segments and record the allocations.
type segmentAllocator interface {
// OpenSegment add the segment to allocator and set it allocatable
OpenSegment(segmentInfo *datapb.SegmentInfo) error
// AllocSegment allocate rows and record the allocation.
AllocSegment(collectionID UniqueID, partitionID UniqueID, channelName string, requestRows int) (UniqueID, int, Timestamp, error)
// GetSealedSegments get all sealed segment.
GetSealedSegments() ([]UniqueID, error)
// SealSegment set segment sealed, the segment will not be allocated anymore.
SealSegment(segmentID UniqueID)
// DropSegment drop the segment from allocator.
DropSegment(segmentID UniqueID)
// ExpireAllocations check all allocations' expire time and remove the expired allocation.
ExpireAllocations(timeTick Timestamp) error
// IsAllocationsExpired check all allocations of segment expired.
IsAllocationsExpired(segmentID UniqueID, ts Timestamp) (bool, error)
}
type (
segmentStatus struct {
id UniqueID
collectionID UniqueID
partitionID UniqueID
total int
sealed bool
lastExpireTime Timestamp
allocations []*allocation
cRange channelRange
}
allocation struct {
rowNums int
expireTime Timestamp
}
segmentAllocatorImpl struct {
mt *meta
segments map[UniqueID]*segmentStatus //segment id -> status
cMapper *insertChannelMapper
segmentExpireDuration int64
defaultSizePerRecord int64
segmentThreshold float64
segmentThresholdFactor float64
numOfChannels int
numOfQueryNodes int
mu sync.RWMutex
globalIDAllocator func() (UniqueID, error)
globalTSOAllocator func() (Timestamp, error)
}
)
func newSegmentAssigner(metaTable *meta, globalIDAllocator func() (UniqueID, error),
globalTSOAllocator func() (Timestamp, error)) (*segmentAllocatorImpl, error) {
segmentAllocator := &segmentAllocatorImpl{
mt: metaTable,
segments: make(map[UniqueID]*segmentStatus),
segmentExpireDuration: Params.SegIDAssignExpiration,
defaultSizePerRecord: Params.DefaultRecordSize,
segmentThreshold: Params.SegmentSize * 1024 * 1024,
segmentThresholdFactor: Params.SegmentSizeFactor,
numOfChannels: Params.TopicNum,
numOfQueryNodes: Params.QueryNodeNum,
globalIDAllocator: globalIDAllocator,
globalTSOAllocator: globalTSOAllocator,
}
return segmentAllocator, nil
}
func (allocator *segmentAllocatorImpl) OpenSegment(segmentInfo *datapb.SegmentInfo) error {
if _, ok := allocator.segments[segmentInfo.SegmentID]; ok {
return fmt.Errorf("segment %d already exist", segmentInfo.SegmentID)
}
totalRows, err := allocator.estimateTotalRows(segmentInfo.CollectionID)
if err != nil {
return err
}
allocator.segments[segmentInfo.SegmentID] = &segmentStatus{
id: segmentInfo.SegmentID,
collectionID: segmentInfo.CollectionID,
partitionID: segmentInfo.PartitionID,
total: totalRows,
sealed: false,
lastExpireTime: 0,
cRange: segmentInfo.InsertChannels,
}
return nil
}
func (allocator *segmentAllocatorImpl) AllocSegment(collectionID UniqueID, partitionID UniqueID, channelName string, requestRows int) (segID UniqueID, retCount int, expireTime Timestamp, err error) {
allocator.mu.Lock()
defer allocator.mu.Unlock()
for _, segStatus := range allocator.segments {
if segStatus.sealed || segStatus.collectionID != collectionID || segStatus.partitionID != partitionID ||
!segStatus.cRange.Contains(channelName) {
continue
}
var success bool
success, err = allocator.alloc(segStatus, requestRows)
if err != nil {
return
}
if !success {
continue
}
segID = segStatus.id
retCount = requestRows
expireTime = segStatus.lastExpireTime
return
}
err = newErrRemainInSufficient(requestRows)
return
}
func (allocator *segmentAllocatorImpl) alloc(segStatus *segmentStatus, numRows int) (bool, error) {
totalOfAllocations := 0
for _, allocation := range segStatus.allocations {
totalOfAllocations += allocation.rowNums
}
segMeta, err := allocator.mt.GetSegmentByID(segStatus.id)
if err != nil {
return false, err
}
free := segStatus.total - int(segMeta.NumRows) - totalOfAllocations
if numRows > free {
return false, nil
}
ts, err := allocator.globalTSOAllocator()
if err != nil {
return false, err
}
physicalTs, logicalTs := tsoutil.ParseTS(ts)
expirePhysicalTs := physicalTs.Add(time.Duration(allocator.segmentExpireDuration) * time.Millisecond)
expireTs := tsoutil.ComposeTS(expirePhysicalTs.UnixNano()/int64(time.Millisecond), int64(logicalTs))
segStatus.lastExpireTime = expireTs
segStatus.allocations = append(segStatus.allocations, &allocation{
numRows,
ts,
})
return true, nil
}
func (allocator *segmentAllocatorImpl) estimateTotalRows(collectionID UniqueID) (int, error) {
collMeta, err := allocator.mt.GetCollection(collectionID)
if err != nil {
return -1, err
}
sizePerRecord, err := typeutil.EstimateSizePerRecord(collMeta.Schema)
if err != nil {
return -1, err
}
return int(allocator.segmentThreshold / float64(sizePerRecord)), nil
}
func (allocator *segmentAllocatorImpl) GetSealedSegments() ([]UniqueID, error) {
allocator.mu.Lock()
defer allocator.mu.Unlock()
keys := make([]UniqueID, 0)
for _, segStatus := range allocator.segments {
if !segStatus.sealed {
sealed, err := allocator.checkSegmentSealed(segStatus)
if err != nil {
return nil, err
}
segStatus.sealed = sealed
}
if segStatus.sealed {
keys = append(keys, segStatus.id)
}
}
return keys, nil
}
func (allocator *segmentAllocatorImpl) checkSegmentSealed(segStatus *segmentStatus) (bool, error) {
segMeta, err := allocator.mt.GetSegmentByID(segStatus.id)
if err != nil {
return false, err
}
return float64(segMeta.NumRows) >= allocator.segmentThresholdFactor*float64(segStatus.total), nil
}
func (allocator *segmentAllocatorImpl) SealSegment(segmentID UniqueID) {
allocator.mu.Lock()
defer allocator.mu.Unlock()
status, ok := allocator.segments[segmentID]
if !ok {
return
}
status.sealed = true
}
func (allocator *segmentAllocatorImpl) DropSegment(segmentID UniqueID) {
allocator.mu.Lock()
defer allocator.mu.Unlock()
delete(allocator.segments, segmentID)
}
func (allocator *segmentAllocatorImpl) ExpireAllocations(timeTick Timestamp) error {
allocator.mu.Lock()
defer allocator.mu.Unlock()
for _, segStatus := range allocator.segments {
for i := 0; i < len(segStatus.allocations); i++ {
if timeTick < segStatus.allocations[i].expireTime {
continue
}
segStatus.allocations = append(segStatus.allocations[:i], segStatus.allocations[i+1:]...)
i--
}
}
return nil
}
func (allocator *segmentAllocatorImpl) IsAllocationsExpired(segmentID UniqueID, ts Timestamp) (bool, error) {
allocator.mu.RLock()
defer allocator.mu.RUnlock()
status, ok := allocator.segments[segmentID]
if !ok {
return false, fmt.Errorf("segment %d not found", segmentID)
}
return status.lastExpireTime <= ts, nil
}

View File

@ -0,0 +1,107 @@
package dataservice
import (
"log"
"golang.org/x/net/context"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
)
type (
proxyTimeTickWatcher struct {
allocator segmentAllocator
msgQueue chan *msgstream.TimeTickMsg
ctx context.Context
cancelFunc context.CancelFunc
}
dataNodeTimeTickWatcher struct {
allocator segmentAllocator
msgQueue chan *msgstream.TimeTickMsg
ctx context.Context
cancelFunc context.CancelFunc
}
)
func newProxyTimeTickWatcher(ctx context.Context, allocator segmentAllocator) *proxyTimeTickWatcher {
cancel, cancelFunc := context.WithCancel(ctx)
return &proxyTimeTickWatcher{
allocator: allocator,
msgQueue: make(chan *msgstream.TimeTickMsg, 1),
ctx: cancel,
cancelFunc: cancelFunc,
}
}
func (watcher *proxyTimeTickWatcher) Start() {
go watcher.handleProxyTimeTickMsg()
}
func (watcher *proxyTimeTickWatcher) Close() {
watcher.cancelFunc()
}
func (watcher *proxyTimeTickWatcher) Watch(msg *msgstream.TimeTickMsg) {
watcher.msgQueue <- msg
}
func (watcher *proxyTimeTickWatcher) handleProxyTimeTickMsg() {
for {
select {
case <-watcher.ctx.Done():
return
case msg := <-watcher.msgQueue:
if err := watcher.allocator.ExpireAllocations(msg.Timestamp); err != nil {
log.Printf("expire allocations error : %s", err.Error())
}
}
}
}
func newDataNodeTimeTickWatcher(ctx context.Context, allocator segmentAllocator) *dataNodeTimeTickWatcher {
cancel, cancelFunc := context.WithCancel(ctx)
return &dataNodeTimeTickWatcher{
allocator: allocator,
msgQueue: make(chan *msgstream.TimeTickMsg, 1),
ctx: cancel,
cancelFunc: cancelFunc,
}
}
func (watcher *dataNodeTimeTickWatcher) Watch(msg *msgstream.TimeTickMsg) {
watcher.msgQueue <- msg
}
func (watcher *dataNodeTimeTickWatcher) Start() {
go watcher.handleDataNodeTimeTickMsg()
}
func (watcher *dataNodeTimeTickWatcher) Close() {
watcher.cancelFunc()
}
func (watcher *dataNodeTimeTickWatcher) handleDataNodeTimeTickMsg() {
for {
select {
case <-watcher.ctx.Done():
return
case msg := <-watcher.msgQueue:
segments, err := watcher.allocator.GetSealedSegments()
if err != nil {
log.Printf("get sealed segments error %s", err.Error())
continue
}
for _, id := range segments {
expired, err := watcher.allocator.IsAllocationsExpired(id, msg.Timestamp)
if err != nil {
log.Printf("check allocations expired error %s", err.Error())
continue
}
if expired {
// TODO: flush segment
watcher.allocator.DropSegment(id)
}
}
}
}
}

View File

@ -113,6 +113,16 @@ message FlushSegRequest {
repeated int64 segmentIDs = 4;
}
message SegmentInfo {
int64 segmentID=1;
int64 collectionID =2;
int64 partitionID=3;
repeated string insert_channels = 4;
uint64 open_time=5;
uint64 close_time=6;
int64 num_rows=7;
int64 mem_size=8;
}
service DataService {

View File

@ -897,6 +897,101 @@ func (m *FlushSegRequest) GetSegmentIDs() []int64 {
return nil
}
type SegmentInfo struct {
SegmentID int64 `protobuf:"varint,1,opt,name=segmentID,proto3" json:"segmentID,omitempty"`
CollectionID int64 `protobuf:"varint,2,opt,name=collectionID,proto3" json:"collectionID,omitempty"`
PartitionID int64 `protobuf:"varint,3,opt,name=partitionID,proto3" json:"partitionID,omitempty"`
InsertChannels []string `protobuf:"bytes,4,rep,name=insert_channels,json=insertChannels,proto3" json:"insert_channels,omitempty"`
OpenTime uint64 `protobuf:"varint,5,opt,name=open_time,json=openTime,proto3" json:"open_time,omitempty"`
CloseTime uint64 `protobuf:"varint,6,opt,name=close_time,json=closeTime,proto3" json:"close_time,omitempty"`
NumRows int64 `protobuf:"varint,7,opt,name=num_rows,json=numRows,proto3" json:"num_rows,omitempty"`
MemSize int64 `protobuf:"varint,8,opt,name=mem_size,json=memSize,proto3" json:"mem_size,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *SegmentInfo) Reset() { *m = SegmentInfo{} }
func (m *SegmentInfo) String() string { return proto.CompactTextString(m) }
func (*SegmentInfo) ProtoMessage() {}
func (*SegmentInfo) Descriptor() ([]byte, []int) {
return fileDescriptor_3385cd32ad6cfe64, []int{16}
}
func (m *SegmentInfo) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_SegmentInfo.Unmarshal(m, b)
}
func (m *SegmentInfo) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_SegmentInfo.Marshal(b, m, deterministic)
}
func (m *SegmentInfo) XXX_Merge(src proto.Message) {
xxx_messageInfo_SegmentInfo.Merge(m, src)
}
func (m *SegmentInfo) XXX_Size() int {
return xxx_messageInfo_SegmentInfo.Size(m)
}
func (m *SegmentInfo) XXX_DiscardUnknown() {
xxx_messageInfo_SegmentInfo.DiscardUnknown(m)
}
var xxx_messageInfo_SegmentInfo proto.InternalMessageInfo
func (m *SegmentInfo) GetSegmentID() int64 {
if m != nil {
return m.SegmentID
}
return 0
}
func (m *SegmentInfo) GetCollectionID() int64 {
if m != nil {
return m.CollectionID
}
return 0
}
func (m *SegmentInfo) GetPartitionID() int64 {
if m != nil {
return m.PartitionID
}
return 0
}
func (m *SegmentInfo) GetInsertChannels() []string {
if m != nil {
return m.InsertChannels
}
return nil
}
func (m *SegmentInfo) GetOpenTime() uint64 {
if m != nil {
return m.OpenTime
}
return 0
}
func (m *SegmentInfo) GetCloseTime() uint64 {
if m != nil {
return m.CloseTime
}
return 0
}
func (m *SegmentInfo) GetNumRows() int64 {
if m != nil {
return m.NumRows
}
return 0
}
func (m *SegmentInfo) GetMemSize() int64 {
if m != nil {
return m.MemSize
}
return 0
}
func init() {
proto.RegisterEnum("milvus.proto.data.SegmentState", SegmentState_name, SegmentState_value)
proto.RegisterType((*RegisterNodeRequest)(nil), "milvus.proto.data.RegisterNodeRequest")
@ -915,72 +1010,79 @@ func init() {
proto.RegisterType((*InsertChannelRequest)(nil), "milvus.proto.data.InsertChannelRequest")
proto.RegisterType((*WatchDmChannelRequest)(nil), "milvus.proto.data.WatchDmChannelRequest")
proto.RegisterType((*FlushSegRequest)(nil), "milvus.proto.data.FlushSegRequest")
proto.RegisterType((*SegmentInfo)(nil), "milvus.proto.data.SegmentInfo")
}
func init() { proto.RegisterFile("data_service.proto", fileDescriptor_3385cd32ad6cfe64) }
var fileDescriptor_3385cd32ad6cfe64 = []byte{
// 952 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xc4, 0x56, 0x41, 0x8f, 0xdb, 0x44,
0x14, 0x8e, 0xe3, 0x64, 0xdb, 0xbc, 0xa4, 0xbb, 0xd9, 0xd9, 0x14, 0x42, 0xba, 0xda, 0x0d, 0x23,
0xd1, 0x46, 0x15, 0x24, 0xd2, 0x56, 0x85, 0x1b, 0xa2, 0x4b, 0xca, 0x2a, 0x12, 0xac, 0xaa, 0x09,
0x12, 0x52, 0x39, 0x44, 0x93, 0xf8, 0xe1, 0x8c, 0xea, 0xd8, 0xc6, 0x33, 0x69, 0xab, 0xbd, 0xc1,
0x89, 0x13, 0xe2, 0x0f, 0xf0, 0x57, 0xf8, 0x5b, 0x5c, 0x91, 0x67, 0x1c, 0xc7, 0x4e, 0x1c, 0x52,
0x69, 0xa9, 0xb8, 0x65, 0x5e, 0xbe, 0x79, 0xef, 0x9b, 0x6f, 0xde, 0xfb, 0xc6, 0x40, 0x1c, 0xae,
0xf8, 0x44, 0x62, 0xf4, 0x5a, 0xcc, 0xb0, 0x1f, 0x46, 0x81, 0x0a, 0xc8, 0xf1, 0x42, 0x78, 0xaf,
0x97, 0xd2, 0xac, 0xfa, 0x31, 0xa0, 0xd3, 0x98, 0x05, 0x8b, 0x45, 0xe0, 0x9b, 0x50, 0xe7, 0x50,
0xf8, 0x0a, 0x23, 0x9f, 0x7b, 0x66, 0x4d, 0x7f, 0xb1, 0xe0, 0x84, 0xa1, 0x2b, 0xa4, 0xc2, 0xe8,
0x3a, 0x70, 0x90, 0xe1, 0xcf, 0x4b, 0x94, 0x8a, 0x5c, 0x40, 0x65, 0xca, 0x25, 0xb6, 0xad, 0xae,
0xd5, 0xab, 0x5f, 0x9c, 0xf5, 0x73, 0x79, 0xd3, 0x1c, 0xdf, 0x49, 0xf7, 0x92, 0x4b, 0x64, 0x1a,
0x4b, 0x3e, 0x87, 0x3b, 0xdc, 0x71, 0x22, 0x94, 0xb2, 0x5d, 0xd6, 0xdb, 0x4e, 0xf3, 0xdb, 0x12,
0x22, 0xcf, 0x0c, 0x86, 0xad, 0xc0, 0xf4, 0x25, 0xb4, 0xf2, 0x14, 0x64, 0x18, 0xf8, 0x12, 0xc9,
0x25, 0xd4, 0x85, 0x2f, 0xd4, 0x24, 0xe4, 0x11, 0x5f, 0xc8, 0x84, 0xca, 0xc7, 0x3b, 0xa8, 0x8c,
0x7c, 0xa1, 0x5e, 0x68, 0x20, 0x03, 0x91, 0xfe, 0xa6, 0xbf, 0x59, 0xd0, 0x18, 0xa3, 0x3b, 0x1a,
0xae, 0x0e, 0xd6, 0x82, 0xea, 0x2c, 0x58, 0xfa, 0x4a, 0xa7, 0xab, 0x32, 0xb3, 0x20, 0xa7, 0x50,
0x9b, 0xcd, 0xb9, 0xef, 0xa3, 0x37, 0x1a, 0x6a, 0xf2, 0x35, 0xb6, 0x0e, 0x10, 0x0a, 0x8d, 0x59,
0xe0, 0x79, 0x38, 0x53, 0x22, 0xf0, 0x47, 0xc3, 0xb6, 0xdd, 0xb5, 0x7a, 0x36, 0xcb, 0xc5, 0x48,
0x17, 0xea, 0x21, 0x8f, 0x94, 0x48, 0x20, 0x15, 0x0d, 0xc9, 0x86, 0xe8, 0x1f, 0x16, 0x90, 0x67,
0x52, 0x0a, 0xd7, 0xcf, 0x11, 0xfa, 0x00, 0x0e, 0xfc, 0xc0, 0xc1, 0xd1, 0x50, 0x33, 0xb2, 0x59,
0xb2, 0x22, 0x0f, 0xa0, 0x16, 0x22, 0x46, 0x93, 0x28, 0xf0, 0x30, 0xa1, 0x74, 0x37, 0x0e, 0xb0,
0xc0, 0x43, 0xf2, 0x1c, 0xee, 0xc9, 0x4c, 0x12, 0xd9, 0xb6, 0xbb, 0x76, 0xaf, 0x7e, 0x71, 0xde,
0xdf, 0xba, 0xff, 0x7e, 0xb6, 0x18, 0xcb, 0xef, 0xa2, 0x7f, 0x5b, 0x70, 0xa4, 0xff, 0x37, 0xbc,
0x16, 0xe8, 0x6b, 0x81, 0x34, 0x28, 0xa1, 0x63, 0x16, 0xdb, 0x02, 0x55, 0xb3, 0x02, 0xa5, 0xa2,
0xc6, 0xca, 0xdc, 0x5b, 0x89, 0xba, 0x29, 0x5b, 0x65, 0xbf, 0x6c, 0xd5, 0x2d, 0xd9, 0xc8, 0x39,
0xd4, 0xf1, 0x6d, 0x28, 0x22, 0x9c, 0x28, 0xb1, 0xc0, 0xf6, 0x41, 0xd7, 0xea, 0x55, 0x18, 0x98,
0xd0, 0xf7, 0x62, 0x81, 0xe4, 0x09, 0x1c, 0x48, 0xc5, 0xd5, 0x52, 0xb6, 0xef, 0xe8, 0x0e, 0x79,
0x50, 0xd8, 0x75, 0x63, 0x0d, 0x61, 0x09, 0x94, 0x22, 0x9c, 0xe4, 0xee, 0x22, 0x69, 0xb9, 0x6b,
0x68, 0xca, 0xbc, 0x1e, 0x71, 0xdf, 0xc5, 0xd2, 0xd2, 0x5d, 0xd2, 0xae, 0xa1, 0x6c, 0x6b, 0x2f,
0xbd, 0x81, 0xc6, 0x37, 0xde, 0x52, 0xce, 0x6f, 0x33, 0x56, 0x04, 0x2a, 0xce, 0x34, 0x51, 0xdd,
0x66, 0xfa, 0xf7, 0xbb, 0x48, 0x4b, 0x7f, 0xb7, 0x80, 0x8c, 0xe7, 0xc1, 0x9b, 0x31, 0xba, 0x9a,
0xdd, 0x2d, 0x28, 0x6c, 0x96, 0x2b, 0xef, 0xbf, 0x49, 0x7b, 0x7b, 0x00, 0x9e, 0xc2, 0x49, 0x8e,
0x4f, 0xa2, 0xf9, 0x19, 0x80, 0x34, 0xa1, 0xd1, 0xd0, 0xa8, 0x6d, 0xb3, 0x4c, 0x84, 0xce, 0xa1,
0x95, 0x6c, 0x89, 0xef, 0x10, 0xe5, 0x6d, 0x0e, 0x72, 0x0a, 0xb5, 0x34, 0x73, 0x72, 0x8a, 0x75,
0x20, 0x9e, 0xd0, 0xfb, 0x1b, 0xa5, 0x12, 0x8e, 0x4f, 0xa1, 0x1a, 0x37, 0x8e, 0x29, 0x76, 0xb8,
0x6b, 0xce, 0xd2, 0x8d, 0xcc, 0xa0, 0xe3, 0xde, 0x9d, 0x45, 0xc8, 0x55, 0xd2, 0xbb, 0x65, 0xd3,
0xbb, 0x26, 0xa4, 0x7b, 0xf7, 0x1c, 0xea, 0x12, 0xb9, 0x87, 0x8e, 0x01, 0xd8, 0x06, 0x60, 0x42,
0x31, 0x80, 0xbe, 0x82, 0x0f, 0x47, 0xbe, 0xc4, 0x48, 0x5d, 0x0a, 0xdf, 0x0b, 0xdc, 0x17, 0x5c,
0xcd, 0xdf, 0xdf, 0xf9, 0x43, 0xf8, 0x68, 0xb3, 0xd8, 0x5a, 0x82, 0x0e, 0xdc, 0xfd, 0x49, 0xa0,
0xe7, 0xac, 0x2f, 0x29, 0x5d, 0x93, 0x2f, 0xa0, 0x1a, 0xc6, 0xe0, 0x76, 0x59, 0xcf, 0xca, 0x2e,
0x8f, 0x1e, 0xab, 0x48, 0xf8, 0xee, 0xb7, 0x42, 0x2a, 0x66, 0xf0, 0xf4, 0x57, 0x0b, 0x5a, 0xa6,
0xe4, 0xd7, 0xc6, 0x4c, 0xde, 0xf7, 0xa0, 0x14, 0x58, 0x37, 0x7d, 0x05, 0xf7, 0x7f, 0xe0, 0x6a,
0x36, 0x1f, 0x2e, 0xfe, 0x03, 0x12, 0x67, 0x00, 0xa9, 0x2f, 0x1a, 0x3d, 0x6a, 0x2c, 0x13, 0xa1,
0x7f, 0x5a, 0x70, 0xa4, 0x2d, 0x61, 0x8c, 0xee, 0xff, 0x70, 0xd8, 0x8d, 0x69, 0xab, 0x6c, 0x4e,
0xdb, 0xe3, 0x1f, 0xf5, 0x7b, 0x99, 0x76, 0x32, 0x39, 0x82, 0x7a, 0xb2, 0xbe, 0x0e, 0x7c, 0x6c,
0x96, 0xc8, 0x89, 0x7e, 0x32, 0x4c, 0x40, 0x3d, 0x7f, 0x2b, 0xa4, 0x6a, 0x5a, 0x84, 0xc0, 0x61,
0x12, 0xbc, 0x8a, 0x82, 0x37, 0xc2, 0x77, 0x9b, 0x65, 0x72, 0x0c, 0xf7, 0x56, 0x99, 0x74, 0x3f,
0x37, 0xed, 0x8b, 0xbf, 0xaa, 0x50, 0x1f, 0x72, 0xc5, 0xc7, 0xe6, 0xa3, 0x85, 0x70, 0x68, 0x64,
0x5f, 0x7e, 0xf2, 0xb0, 0x60, 0xae, 0x0a, 0xbe, 0x4e, 0x3a, 0x8f, 0xf6, 0xe2, 0x4c, 0xd3, 0xd2,
0x12, 0xb9, 0x82, 0xaa, 0x96, 0x9b, 0x14, 0xcd, 0x6c, 0xd6, 0x9b, 0x3b, 0xff, 0xf6, 0x6e, 0xd0,
0x12, 0x99, 0xc2, 0x51, 0xfa, 0x62, 0x18, 0xb1, 0xc8, 0x27, 0x05, 0x29, 0xb7, 0x5f, 0xf8, 0xce,
0xc3, 0x7d, 0xb0, 0x94, 0xec, 0x04, 0x1a, 0x19, 0x87, 0x94, 0x85, 0x05, 0xb6, 0x2d, 0xbd, 0xb0,
0x40, 0x81, 0xd3, 0xd2, 0x12, 0x71, 0xa1, 0x79, 0x85, 0x2a, 0xe7, 0x71, 0xe4, 0xd1, 0x1e, 0x33,
0x5b, 0x19, 0x6e, 0xa7, 0xb7, 0x1f, 0x98, 0x16, 0x8a, 0xa0, 0x75, 0x85, 0x6a, 0xcb, 0x4d, 0xc8,
0xe3, 0x82, 0x1c, 0x3b, 0x0c, 0xae, 0xf3, 0xe9, 0x3b, 0x60, 0xb3, 0x35, 0x39, 0x1c, 0xa7, 0x35,
0x93, 0x49, 0x2e, 0x3e, 0x5d, 0x91, 0xe3, 0x74, 0xf6, 0x9b, 0x16, 0x2d, 0x5d, 0x7e, 0xf5, 0xf2,
0x4b, 0x57, 0xa8, 0xf9, 0x72, 0x1a, 0xb7, 0xc7, 0xe0, 0x46, 0x78, 0x9e, 0xb8, 0x51, 0x38, 0x9b,
0x0f, 0xcc, 0xde, 0xcf, 0x1c, 0x21, 0x55, 0x24, 0xa6, 0x4b, 0x85, 0xce, 0x60, 0x95, 0x61, 0xa0,
0x13, 0x0e, 0xe2, 0xca, 0xe1, 0x74, 0x7a, 0xa0, 0x57, 0x4f, 0xfe, 0x09, 0x00, 0x00, 0xff, 0xff,
0x63, 0x83, 0xa9, 0x0a, 0xbe, 0x0b, 0x00, 0x00,
// 1051 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xc4, 0x56, 0x51, 0x6f, 0xdb, 0x36,
0x10, 0x8e, 0x2c, 0x3b, 0x89, 0xcf, 0x4e, 0xe2, 0x30, 0xe9, 0xe6, 0xba, 0x59, 0x92, 0x09, 0x58,
0x13, 0x14, 0x5b, 0x02, 0xa4, 0xe8, 0xf6, 0x36, 0xac, 0x99, 0xbb, 0xc0, 0xc0, 0x16, 0x14, 0xf4,
0x80, 0x01, 0xdd, 0x83, 0x21, 0x5b, 0x57, 0x99, 0xa8, 0x44, 0x6a, 0x22, 0xdd, 0x14, 0x79, 0xdb,
0x9e, 0xf6, 0x30, 0x0c, 0xfb, 0x03, 0xfb, 0x2b, 0xfb, 0x5b, 0x7b, 0x1d, 0x44, 0xca, 0xb2, 0x64,
0xcb, 0x73, 0x81, 0xb4, 0xe8, 0x9b, 0x78, 0xfa, 0x78, 0x77, 0xfc, 0xf8, 0xdd, 0x1d, 0x81, 0x78,
0xae, 0x72, 0x07, 0x12, 0xe3, 0xd7, 0x6c, 0x84, 0x67, 0x51, 0x2c, 0x94, 0x20, 0xbb, 0x21, 0x0b,
0x5e, 0x4f, 0xa4, 0x59, 0x9d, 0x25, 0x80, 0x4e, 0x73, 0x24, 0xc2, 0x50, 0x70, 0x63, 0xea, 0x6c,
0x33, 0xae, 0x30, 0xe6, 0x6e, 0x60, 0xd6, 0xce, 0xaf, 0x16, 0xec, 0x51, 0xf4, 0x99, 0x54, 0x18,
0x5f, 0x0b, 0x0f, 0x29, 0xfe, 0x32, 0x41, 0xa9, 0xc8, 0x05, 0x54, 0x87, 0xae, 0xc4, 0xb6, 0x75,
0x6c, 0x9d, 0x36, 0x2e, 0x0e, 0xcf, 0x0a, 0x7e, 0x33, 0x1f, 0x3f, 0x48, 0xff, 0xd2, 0x95, 0x48,
0x35, 0x96, 0x7c, 0x09, 0x1b, 0xae, 0xe7, 0xc5, 0x28, 0x65, 0xbb, 0xa2, 0xb7, 0x1d, 0x14, 0xb7,
0xa5, 0x89, 0x3c, 0x35, 0x18, 0x3a, 0x05, 0x3b, 0x2f, 0x60, 0xbf, 0x98, 0x82, 0x8c, 0x04, 0x97,
0x48, 0x2e, 0xa1, 0xc1, 0x38, 0x53, 0x83, 0xc8, 0x8d, 0xdd, 0x50, 0xa6, 0xa9, 0x7c, 0xba, 0x24,
0x95, 0x1e, 0x67, 0xea, 0xb9, 0x06, 0x52, 0x60, 0xd9, 0xb7, 0xf3, 0xbb, 0x05, 0xcd, 0x3e, 0xfa,
0xbd, 0xee, 0xf4, 0x60, 0xfb, 0x50, 0x1b, 0x89, 0x09, 0x57, 0xda, 0x5d, 0x8d, 0x9a, 0x05, 0x39,
0x80, 0xfa, 0x68, 0xec, 0x72, 0x8e, 0x41, 0xaf, 0xab, 0x93, 0xaf, 0xd3, 0x99, 0x81, 0x38, 0xd0,
0x1c, 0x89, 0x20, 0xc0, 0x91, 0x62, 0x82, 0xf7, 0xba, 0x6d, 0xfb, 0xd8, 0x3a, 0xb5, 0x69, 0xc1,
0x46, 0x8e, 0xa1, 0x11, 0xb9, 0xb1, 0x62, 0x29, 0xa4, 0xaa, 0x21, 0x79, 0x93, 0xf3, 0x97, 0x05,
0xe4, 0xa9, 0x94, 0xcc, 0xe7, 0x85, 0x84, 0x3e, 0x82, 0x75, 0x2e, 0x3c, 0xec, 0x75, 0x75, 0x46,
0x36, 0x4d, 0x57, 0xe4, 0x01, 0xd4, 0x23, 0xc4, 0x78, 0x10, 0x8b, 0x00, 0xd3, 0x94, 0x36, 0x13,
0x03, 0x15, 0x01, 0x92, 0x67, 0xb0, 0x25, 0x73, 0x4e, 0x64, 0xdb, 0x3e, 0xb6, 0x4f, 0x1b, 0x17,
0x47, 0x67, 0x0b, 0xf7, 0x7f, 0x96, 0x0f, 0x46, 0x8b, 0xbb, 0x9c, 0x7f, 0x2d, 0xd8, 0xd1, 0xff,
0x4d, 0x5e, 0x21, 0x72, 0x4d, 0x90, 0x06, 0xa5, 0xe9, 0x98, 0xc5, 0x22, 0x41, 0xb5, 0x3c, 0x41,
0x19, 0xa9, 0x09, 0x33, 0x5b, 0x53, 0x52, 0xe7, 0x69, 0xab, 0xae, 0xa6, 0xad, 0xb6, 0x40, 0x1b,
0x39, 0x82, 0x06, 0xbe, 0x89, 0x58, 0x8c, 0x03, 0xc5, 0x42, 0x6c, 0xaf, 0x1f, 0x5b, 0xa7, 0x55,
0x0a, 0xc6, 0xf4, 0x23, 0x0b, 0x91, 0x3c, 0x86, 0x75, 0xa9, 0x5c, 0x35, 0x91, 0xed, 0x0d, 0xad,
0x90, 0x07, 0xa5, 0xaa, 0xeb, 0x6b, 0x08, 0x4d, 0xa1, 0x0e, 0xc2, 0x5e, 0xe1, 0x2e, 0x52, 0xc9,
0x5d, 0x43, 0x4b, 0x16, 0xf9, 0x48, 0x74, 0x97, 0x50, 0xeb, 0x2c, 0xa3, 0x76, 0x06, 0xa5, 0x0b,
0x7b, 0x9d, 0x5b, 0x68, 0x7e, 0x17, 0x4c, 0xe4, 0xf8, 0x2e, 0x65, 0x45, 0xa0, 0xea, 0x0d, 0x53,
0xd6, 0x6d, 0xaa, 0xbf, 0xdf, 0x86, 0x5a, 0xe7, 0x4f, 0x0b, 0x48, 0x7f, 0x2c, 0x6e, 0xfa, 0xe8,
0xeb, 0xec, 0xee, 0x90, 0xc2, 0x7c, 0xb8, 0xca, 0xea, 0x9b, 0xb4, 0x17, 0x0b, 0xe0, 0x09, 0xec,
0x15, 0xf2, 0x49, 0x39, 0x3f, 0x04, 0x90, 0xc6, 0xd4, 0xeb, 0x1a, 0xb6, 0x6d, 0x9a, 0xb3, 0x38,
0x63, 0xd8, 0x4f, 0xb7, 0x24, 0x77, 0x88, 0xf2, 0x2e, 0x07, 0x39, 0x80, 0x7a, 0xe6, 0x39, 0x3d,
0xc5, 0xcc, 0x90, 0x54, 0xe8, 0xbd, 0xb9, 0x50, 0x69, 0x8e, 0x4f, 0xa0, 0x96, 0x08, 0xc7, 0x04,
0xdb, 0x5e, 0x56, 0x67, 0xd9, 0x46, 0x6a, 0xd0, 0x89, 0x76, 0x47, 0x31, 0xba, 0x2a, 0xd5, 0x6e,
0xc5, 0x68, 0xd7, 0x98, 0xb4, 0x76, 0x8f, 0xa0, 0x21, 0xd1, 0x0d, 0xd0, 0x33, 0x00, 0xdb, 0x00,
0x8c, 0x29, 0x01, 0x38, 0xaf, 0xe0, 0xe3, 0x1e, 0x97, 0x18, 0xab, 0x4b, 0xc6, 0x03, 0xe1, 0x3f,
0x77, 0xd5, 0xf8, 0xfd, 0x9d, 0x3f, 0x82, 0xfb, 0xf3, 0xc1, 0x66, 0x14, 0x74, 0x60, 0xf3, 0x25,
0xc3, 0xc0, 0x9b, 0x5d, 0x52, 0xb6, 0x26, 0x5f, 0x41, 0x2d, 0x4a, 0xc0, 0xed, 0x8a, 0xae, 0x95,
0x65, 0x3d, 0xba, 0xaf, 0x62, 0xc6, 0xfd, 0xef, 0x99, 0x54, 0xd4, 0xe0, 0x9d, 0xdf, 0x2c, 0xd8,
0x37, 0x21, 0xbf, 0x35, 0xcd, 0xe4, 0x7d, 0x17, 0x4a, 0x49, 0xeb, 0x76, 0x5e, 0xc1, 0xbd, 0x9f,
0x5c, 0x35, 0x1a, 0x77, 0xc3, 0x77, 0x90, 0xc4, 0x21, 0x40, 0xd6, 0x17, 0x0d, 0x1f, 0x75, 0x9a,
0xb3, 0x38, 0x7f, 0x5b, 0xb0, 0xa3, 0x5b, 0x42, 0x1f, 0xfd, 0x0f, 0x70, 0xd8, 0xb9, 0x6a, 0xab,
0x2e, 0x54, 0xdb, 0x1f, 0x15, 0x68, 0xa4, 0x52, 0xee, 0xf1, 0x97, 0xa2, 0xa8, 0x18, 0x6b, 0x4e,
0x31, 0xef, 0xa6, 0x31, 0x90, 0x13, 0xd8, 0x61, 0x5a, 0x04, 0x83, 0x94, 0x28, 0x93, 0x58, 0x9d,
0x6e, 0xb3, 0xbc, 0x36, 0x64, 0x32, 0x13, 0x45, 0x84, 0xdc, 0x14, 0x4b, 0x4d, 0x17, 0xcb, 0x66,
0x62, 0xd0, 0xb5, 0xf4, 0x09, 0xc0, 0x28, 0x10, 0xb2, 0x30, 0x27, 0xea, 0xda, 0xa2, 0x7f, 0xdf,
0x87, 0x4d, 0x3e, 0x09, 0x07, 0xb1, 0xb8, 0x31, 0x83, 0xc2, 0xa6, 0x1b, 0x7c, 0x12, 0x52, 0x71,
0x23, 0x93, 0x5f, 0x21, 0x86, 0x03, 0xc9, 0x6e, 0xb1, 0xbd, 0x69, 0x7e, 0x85, 0x18, 0xf6, 0xd9,
0x2d, 0x3e, 0xfa, 0x59, 0x3f, 0x1f, 0xb2, 0xc2, 0x26, 0x3b, 0x19, 0x3b, 0xd7, 0x82, 0x63, 0x6b,
0x8d, 0xec, 0xe9, 0x09, 0x6a, 0x0c, 0xea, 0xd9, 0x1b, 0x26, 0x55, 0xcb, 0x22, 0x04, 0xb6, 0x53,
0xe3, 0x55, 0x2c, 0x6e, 0x18, 0xf7, 0x5b, 0x15, 0xb2, 0x0b, 0x5b, 0x53, 0x4f, 0xba, 0xbc, 0x5b,
0xf6, 0xc5, 0x3f, 0x35, 0x68, 0x74, 0x5d, 0xe5, 0xf6, 0xcd, 0x1b, 0x8e, 0xb8, 0xd0, 0xcc, 0x3f,
0x84, 0xc8, 0xc3, 0x92, 0x36, 0x53, 0xf2, 0x58, 0xeb, 0x9c, 0xac, 0xc4, 0x99, 0x1a, 0x76, 0xd6,
0xc8, 0x15, 0xd4, 0xb4, 0xfa, 0x48, 0x59, 0x0b, 0xcb, 0x8f, 0xaa, 0xce, 0xff, 0x8d, 0x51, 0x67,
0x8d, 0x0c, 0x61, 0x27, 0x1b, 0xa0, 0xa9, 0x18, 0x3e, 0x2b, 0x71, 0xb9, 0xf8, 0xe0, 0xe9, 0x3c,
0x5c, 0x05, 0xcb, 0x92, 0x1d, 0x40, 0x33, 0x37, 0x30, 0x64, 0x69, 0x80, 0xc5, 0x09, 0x57, 0x1a,
0xa0, 0x64, 0xf0, 0x38, 0x6b, 0xc4, 0x87, 0xd6, 0x15, 0xaa, 0x42, 0xcb, 0x27, 0x27, 0x2b, 0x7a,
0xfb, 0x74, 0xfe, 0x74, 0x4e, 0x57, 0x03, 0xb3, 0x40, 0x31, 0xec, 0x5f, 0xa1, 0x5a, 0x68, 0xae,
0xe4, 0x51, 0x89, 0x8f, 0x25, 0xfd, 0xbe, 0xf3, 0xf9, 0x5b, 0x60, 0xf3, 0x31, 0x5d, 0xd8, 0xcd,
0x62, 0x66, 0x15, 0x74, 0xb2, 0xd4, 0x49, 0xb1, 0xf7, 0x75, 0x56, 0xf7, 0x70, 0x67, 0xed, 0xf2,
0x9b, 0x17, 0x5f, 0xfb, 0x4c, 0x8d, 0x27, 0xc3, 0x44, 0x1e, 0xe7, 0xb7, 0x2c, 0x08, 0xd8, 0xad,
0xc2, 0xd1, 0xf8, 0xdc, 0xec, 0xfd, 0xc2, 0x63, 0x52, 0xc5, 0x6c, 0x38, 0x51, 0xe8, 0x9d, 0x4f,
0x3d, 0x9c, 0x6b, 0x87, 0xe7, 0x49, 0xe4, 0x68, 0x38, 0x5c, 0xd7, 0xab, 0xc7, 0xff, 0x05, 0x00,
0x00, 0xff, 0xff, 0x45, 0x70, 0x7b, 0x12, 0xcd, 0x0c, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.

View File

@ -5,13 +5,11 @@ option go_package = "github.com/zilliztech/milvus-distributed/internal/proto/mil
import "common.proto";
import "internal.proto";
import "schema.proto";
message CreateCollectionRequest {
internal.MsgBase base = 1;
string db_name = 2;
string collectionName = 3;
// `schema` is the serialized `schema.CollectionSchema`
bytes schema = 4;
}
@ -28,11 +26,6 @@ message HasCollectionRequest {
string collection_name = 3;
}
message BoolResponse {
common.Status status = 1;
bool value = 2;
}
message DescribeCollectionRequest {
internal.MsgBase base = 1;
string db_name = 2;
@ -40,8 +33,7 @@ message DescribeCollectionRequest {
}
message DescribeCollectionResponse {
common.Status status = 1;
schema.CollectionSchema schema = 2;
repeated bytes schema = 1;
}
message LoadCollectionRequest {
@ -64,7 +56,6 @@ message CollectionStatsRequest {
message CollectionStatsResponse {
repeated common.KeyValuePair stats = 1;
common.Status status = 2;
}
@ -75,7 +66,6 @@ message ShowCollectionRequest {
message ShowCollectionResponse {
repeated string collection_names = 1;
common.Status status = 2;
}
@ -124,7 +114,6 @@ message PartitionStatsRequest {
message PartitionStatsResponse {
repeated common.KeyValuePair stats = 1;
common.Status status = 2;
}
message ShowPartitionRequest {
@ -135,7 +124,6 @@ message ShowPartitionRequest {
message ShowPartitionResponse {
repeated string partition_names = 1;
common.Status status = 2;
}
@ -161,7 +149,6 @@ message IndexDescription {
message DescribeIndexResponse {
repeated IndexDescription index_descriptions = 1;
common.Status status = 2;
}
message InsertRequest {
@ -176,24 +163,6 @@ message InsertRequest {
message InsertResponse {
int64 rowID_begin = 1;
int64 rowID_end = 2;
common.Status status = 3;
}
enum PlaceholderType {
NONE = 0;
VECTOR_BINARY = 100;
VECTOR_FLOAT = 101;
}
message PlaceholderValue {
string tag = 1;
PlaceholderType type = 2;
// values is a 2d-array, every array contains a vector
repeated bytes values = 3;
}
message PlaceholderGroup {
repeated PlaceholderValue placeholders = 1;
}
message SearchRequest {
@ -202,49 +171,11 @@ message SearchRequest {
string collection_name = 3;
repeated string partition_names = 4;
string dsl = 5;
// serialized `PlaceholderGroup`
bytes placeholder_group = 6;
}
message Hits {
repeated int64 IDs = 1;
repeated bytes row_data = 2;
repeated float scores = 3;
}
message QueryResult {
common.Status status = 1;
repeated bytes hits = 2;
repeated bytes placeholder_group = 6;
}
message FlushRequest {
internal.MsgBase base = 1;
string db_name = 2;
string collection_name = 3;
}
service MilvusService {
rpc CreateCollection(CreateCollectionRequest) returns (common.Status) {}
rpc DropCollection(DropCollectionRequest) returns (common.Status) {}
rpc HasCollection(HasCollectionRequest) returns (BoolResponse) {}
rpc LoadCollection(LoadCollectionRequest) returns (common.Status) {}
rpc ReleaseCollection(ReleaseCollectionRequest) returns (common.Status) {}
rpc DescribeCollection(DescribeCollectionRequest) returns (DescribeCollectionResponse) {}
rpc GetCollectionStatistics(CollectionStatsRequest) returns (CollectionStatsResponse) {}
rpc ShowCollections(ShowCollectionRequest) returns (ShowCollectionResponse) {}
rpc CreatePartition(CreatePartitionRequest) returns (common.Status) {}
rpc DropPartition(DropPartitionRequest) returns (common.Status) {}
rpc HasPartition(HasPartitionRequest) returns (BoolResponse) {}
rpc LoadPartitions(LoadPartitonRequest) returns (common.Status) {}
rpc ReleasePartitions(ReleasePartitionRequest) returns (common.Status) {}
rpc GetPartitionStatistics(PartitionStatsRequest) returns (PartitionStatsResponse) {}
rpc ShowPartitions(ShowPartitionRequest) returns (ShowPartitionResponse) {}
rpc CreateIndex(CreateIndexRequest) returns (common.Status) {}
rpc DescribeIndex(DescribeIndexRequest) returns (DescribeIndexResponse) {}
rpc Insert(InsertRequest) returns (InsertResponse) {}
rpc Search(SearchRequest) returns (QueryResult) {}
rpc Flush(FlushRequest) returns (common.Status) {}
}
}

File diff suppressed because it is too large Load Diff

View File

@ -12,7 +12,6 @@ import (
"time"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go"
"github.com/uber/jaeger-client-go/config"
"google.golang.org/grpc"
@ -72,11 +71,8 @@ func CreateProxy(ctx context.Context) (*Proxy, error) {
Type: "const",
Param: 1,
},
Reporter: &config.ReporterConfig{
LogSpans: true,
},
}
p.tracer, p.closer, err = cfg.NewTracer(config.Logger(jaeger.StdLogger))
p.tracer, p.closer, err = cfg.NewTracer()
if err != nil {
panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
}

View File

@ -1,16 +0,0 @@
package proxyservice
import (
"github.com/zilliztech/milvus-distributed/internal/proto/proxypb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
type ServiceBase = typeutil.Service
type Interface interface {
ServiceBase
RegisterLink() (proxypb.RegisterLinkResponse, error)
RegisterNode(request proxypb.RegisterNodeRequest) (proxypb.RegisterNodeResponse, error)
// TODO: i'm sure it's not a best way to keep consistency, fix me
InvalidateCollectionMetaCache(request proxypb.InvalidateCollMetaCacheRequest) error
}

View File

@ -1,54 +0,0 @@
package proxyservice
import (
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/proxypb"
)
type ProxyService struct {
// implement Service
//nodeClients [] .Interface
// factory method
}
func (s ProxyService) Init() {
panic("implement me")
}
func (s ProxyService) Start() {
panic("implement me")
}
func (s ProxyService) Stop() {
panic("implement me")
}
func (s ProxyService) GetServiceStates() (internalpb2.ServiceStates, error) {
panic("implement me")
}
func (s ProxyService) GetTimeTickChannel() (string, error) {
panic("implement me")
}
func (s ProxyService) GetStatisticsChannel() (string, error) {
panic("implement me")
}
func (s ProxyService) RegisterLink() (proxypb.RegisterLinkResponse, error) {
panic("implement me")
}
func (s ProxyService) RegisterNode(request proxypb.RegisterNodeRequest) (proxypb.RegisterNodeResponse, error) {
panic("implement me")
}
func (s ProxyService) InvalidateCollectionMetaCache(request proxypb.InvalidateCollMetaCacheRequest) error {
panic("implement me")
}
func NewProxyServiceImpl() Interface {
return &ProxyService{}
}

View File

@ -18,7 +18,6 @@ import (
"io"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go"
"github.com/uber/jaeger-client-go/config"
"google.golang.org/grpc"
@ -89,11 +88,8 @@ func newQueryNode(ctx context.Context, queryNodeID uint64) *QueryNode {
Type: "const",
Param: 1,
},
Reporter: &config.ReporterConfig{
LogSpans: true,
},
}
q.tracer, q.closer, err = cfg.NewTracer(config.Logger(jaeger.StdLogger))
q.tracer, q.closer, err = cfg.NewTracer()
if err != nil {
panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
}

View File

@ -0,0 +1,141 @@
package timesync
import (
"context"
"log"
"github.com/zilliztech/milvus-distributed/internal/errors"
ms "github.com/zilliztech/milvus-distributed/internal/msgstream"
internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
)
type timeSyncMsgProducer struct {
//softTimeTickBarrier
proxyTtBarrier TimeTickBarrier
//hardTimeTickBarrier
writeNodeTtBarrier TimeTickBarrier
ddSyncStream ms.MsgStream // insert & delete
dmSyncStream ms.MsgStream
k2sSyncStream ms.MsgStream
ctx context.Context
cancel context.CancelFunc
proxyWatchers []TimeTickWatcher
writeNodeWatchers []TimeTickWatcher
}
func NewTimeSyncMsgProducer(ctx context.Context) (*timeSyncMsgProducer, error) {
ctx2, cancel := context.WithCancel(ctx)
return &timeSyncMsgProducer{ctx: ctx2, cancel: cancel}, nil
}
func (syncMsgProducer *timeSyncMsgProducer) SetProxyTtBarrier(proxyTtBarrier TimeTickBarrier) {
syncMsgProducer.proxyTtBarrier = proxyTtBarrier
}
func (syncMsgProducer *timeSyncMsgProducer) SetWriteNodeTtBarrier(writeNodeTtBarrier TimeTickBarrier) {
syncMsgProducer.writeNodeTtBarrier = writeNodeTtBarrier
}
func (syncMsgProducer *timeSyncMsgProducer) SetDDSyncStream(ddSync ms.MsgStream) {
syncMsgProducer.ddSyncStream = ddSync
}
func (syncMsgProducer *timeSyncMsgProducer) SetDMSyncStream(dmSync ms.MsgStream) {
syncMsgProducer.dmSyncStream = dmSync
}
func (syncMsgProducer *timeSyncMsgProducer) SetK2sSyncStream(k2sSync ms.MsgStream) {
syncMsgProducer.k2sSyncStream = k2sSync
}
func (syncMsgProducer *timeSyncMsgProducer) WatchProxyTtBarrier(watcher TimeTickWatcher) {
syncMsgProducer.proxyWatchers = append(syncMsgProducer.proxyWatchers, watcher)
}
func (syncMsgProducer *timeSyncMsgProducer) WatchWriteNodeTtBarrier(watcher TimeTickWatcher) {
syncMsgProducer.writeNodeWatchers = append(syncMsgProducer.writeNodeWatchers, watcher)
}
func (syncMsgProducer *timeSyncMsgProducer) broadcastMsg(barrier TimeTickBarrier, streams []ms.MsgStream, watchers []TimeTickWatcher) error {
for {
select {
case <-syncMsgProducer.ctx.Done():
{
log.Printf("broadcast context done, exit")
return errors.Errorf("broadcast done exit")
}
default:
timetick, err := barrier.GetTimeTick()
if err != nil {
log.Printf("broadcast get time tick error")
}
msgPack := ms.MsgPack{}
baseMsg := ms.BaseMsg{
BeginTimestamp: timetick,
EndTimestamp: timetick,
HashValues: []uint32{0},
}
timeTickResult := internalPb.TimeTickMsg{
MsgType: internalPb.MsgType_kTimeTick,
PeerID: 0,
Timestamp: timetick,
}
timeTickMsg := &ms.TimeTickMsg{
BaseMsg: baseMsg,
TimeTickMsg: timeTickResult,
}
msgPack.Msgs = append(msgPack.Msgs, timeTickMsg)
for _, stream := range streams {
err = stream.Broadcast(&msgPack)
}
for _, watcher := range watchers {
watcher.Watch(timeTickMsg)
}
if err != nil {
return err
}
}
}
}
func (syncMsgProducer *timeSyncMsgProducer) Start() error {
err := syncMsgProducer.proxyTtBarrier.Start()
if err != nil {
return err
}
err = syncMsgProducer.writeNodeTtBarrier.Start()
if err != nil {
return err
}
for _, watcher := range syncMsgProducer.proxyWatchers {
watcher.Start()
}
for _, watcher := range syncMsgProducer.writeNodeWatchers {
watcher.Start()
}
go syncMsgProducer.broadcastMsg(syncMsgProducer.proxyTtBarrier, []ms.MsgStream{syncMsgProducer.dmSyncStream, syncMsgProducer.ddSyncStream}, syncMsgProducer.proxyWatchers)
go syncMsgProducer.broadcastMsg(syncMsgProducer.writeNodeTtBarrier, []ms.MsgStream{syncMsgProducer.k2sSyncStream}, syncMsgProducer.writeNodeWatchers)
return nil
}
func (syncMsgProducer *timeSyncMsgProducer) Close() {
syncMsgProducer.ddSyncStream.Close()
syncMsgProducer.dmSyncStream.Close()
syncMsgProducer.k2sSyncStream.Close()
syncMsgProducer.cancel()
syncMsgProducer.proxyTtBarrier.Close()
syncMsgProducer.writeNodeTtBarrier.Close()
for _, watcher := range syncMsgProducer.proxyWatchers {
watcher.Close()
}
for _, watcher := range syncMsgProducer.writeNodeWatchers {
watcher.Close()
}
}

View File

@ -0,0 +1,235 @@
package timesync
import (
"context"
"log"
"math"
"sync/atomic"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"github.com/zilliztech/milvus-distributed/internal/errors"
ms "github.com/zilliztech/milvus-distributed/internal/msgstream"
)
type (
Timestamp = typeutil.Timestamp
UniqueID = typeutil.UniqueID
TimeTickBarrier interface {
GetTimeTick() (Timestamp, error)
Start() error
Close()
}
softTimeTickBarrier struct {
peer2LastTt map[UniqueID]Timestamp
minTtInterval Timestamp
lastTt int64
outTt chan Timestamp
ttStream ms.MsgStream
ctx context.Context
cancel context.CancelFunc
}
hardTimeTickBarrier struct {
peer2Tt map[UniqueID]Timestamp
outTt chan Timestamp
ttStream ms.MsgStream
ctx context.Context
cancel context.CancelFunc
}
)
func (ttBarrier *softTimeTickBarrier) GetTimeTick() (Timestamp, error) {
select {
case <-ttBarrier.ctx.Done():
return 0, errors.Errorf("[GetTimeTick] closed.")
case ts, ok := <-ttBarrier.outTt:
if !ok {
return 0, errors.Errorf("[GetTimeTick] closed.")
}
num := len(ttBarrier.outTt)
for i := 0; i < num; i++ {
ts, ok = <-ttBarrier.outTt
if !ok {
return 0, errors.Errorf("[GetTimeTick] closed.")
}
}
atomic.StoreInt64(&(ttBarrier.lastTt), int64(ts))
return ts, ttBarrier.ctx.Err()
}
}
func (ttBarrier *softTimeTickBarrier) Start() error {
go func() {
for {
select {
case <-ttBarrier.ctx.Done():
log.Printf("[TtBarrierStart] %s\n", ttBarrier.ctx.Err())
return
case ttmsgs := <-ttBarrier.ttStream.Chan():
if len(ttmsgs.Msgs) > 0 {
for _, timetickmsg := range ttmsgs.Msgs {
ttmsg := timetickmsg.(*ms.TimeTickMsg)
oldT, ok := ttBarrier.peer2LastTt[ttmsg.PeerID]
// log.Printf("[softTimeTickBarrier] peer(%d)=%d\n", ttmsg.PeerID, ttmsg.Timestamp)
if !ok {
log.Printf("[softTimeTickBarrier] Warning: peerID %d not exist\n", ttmsg.PeerID)
continue
}
if ttmsg.Timestamp > oldT {
ttBarrier.peer2LastTt[ttmsg.PeerID] = ttmsg.Timestamp
// get a legal Timestamp
ts := ttBarrier.minTimestamp()
lastTt := atomic.LoadInt64(&(ttBarrier.lastTt))
if lastTt != 0 && ttBarrier.minTtInterval > ts-Timestamp(lastTt) {
continue
}
ttBarrier.outTt <- ts
}
}
}
}
}
}()
return nil
}
func newSoftTimeTickBarrier(ctx context.Context,
ttStream *ms.MsgStream,
peerIds []UniqueID,
minTtInterval Timestamp) *softTimeTickBarrier {
if len(peerIds) <= 0 {
log.Printf("[newSoftTimeTickBarrier] Error: peerIds is empty!\n")
return nil
}
sttbarrier := softTimeTickBarrier{}
sttbarrier.minTtInterval = minTtInterval
sttbarrier.ttStream = *ttStream
sttbarrier.outTt = make(chan Timestamp, 1024)
sttbarrier.ctx, sttbarrier.cancel = context.WithCancel(ctx)
sttbarrier.peer2LastTt = make(map[UniqueID]Timestamp)
for _, id := range peerIds {
sttbarrier.peer2LastTt[id] = Timestamp(0)
}
if len(peerIds) != len(sttbarrier.peer2LastTt) {
log.Printf("[newSoftTimeTickBarrier] Warning: there are duplicate peerIds!\n")
}
return &sttbarrier
}
func (ttBarrier *softTimeTickBarrier) Close() {
ttBarrier.cancel()
}
func (ttBarrier *softTimeTickBarrier) minTimestamp() Timestamp {
tempMin := Timestamp(math.MaxUint64)
for _, tt := range ttBarrier.peer2LastTt {
if tt < tempMin {
tempMin = tt
}
}
return tempMin
}
func (ttBarrier *hardTimeTickBarrier) GetTimeTick() (Timestamp, error) {
select {
case <-ttBarrier.ctx.Done():
return 0, errors.Errorf("[GetTimeTick] closed.")
case ts, ok := <-ttBarrier.outTt:
if !ok {
return 0, errors.Errorf("[GetTimeTick] closed.")
}
return ts, ttBarrier.ctx.Err()
}
}
func (ttBarrier *hardTimeTickBarrier) Start() error {
go func() {
// Last timestamp synchronized
state := Timestamp(0)
for {
select {
case <-ttBarrier.ctx.Done():
log.Printf("[TtBarrierStart] %s\n", ttBarrier.ctx.Err())
return
case ttmsgs := <-ttBarrier.ttStream.Chan():
if len(ttmsgs.Msgs) > 0 {
for _, timetickmsg := range ttmsgs.Msgs {
// Suppose ttmsg.Timestamp from stream is always larger than the previous one,
// that `ttmsg.Timestamp > oldT`
ttmsg := timetickmsg.(*ms.TimeTickMsg)
oldT, ok := ttBarrier.peer2Tt[ttmsg.PeerID]
if !ok {
log.Printf("[hardTimeTickBarrier] Warning: peerID %d not exist\n", ttmsg.PeerID)
continue
}
if oldT > state {
log.Printf("[hardTimeTickBarrier] Warning: peer(%d) timestamp(%d) ahead\n",
ttmsg.PeerID, ttmsg.Timestamp)
}
ttBarrier.peer2Tt[ttmsg.PeerID] = ttmsg.Timestamp
newState := ttBarrier.minTimestamp()
if newState > state {
ttBarrier.outTt <- newState
state = newState
}
}
}
}
}
}()
return nil
}
func (ttBarrier *hardTimeTickBarrier) minTimestamp() Timestamp {
tempMin := Timestamp(math.MaxUint64)
for _, tt := range ttBarrier.peer2Tt {
if tt < tempMin {
tempMin = tt
}
}
return tempMin
}
func newHardTimeTickBarrier(ctx context.Context,
ttStream *ms.MsgStream,
peerIds []UniqueID) *hardTimeTickBarrier {
if len(peerIds) <= 0 {
log.Printf("[newSoftTimeTickBarrier] Error: peerIds is empty!")
return nil
}
sttbarrier := hardTimeTickBarrier{}
sttbarrier.ttStream = *ttStream
sttbarrier.outTt = make(chan Timestamp, 1024)
sttbarrier.ctx, sttbarrier.cancel = context.WithCancel(ctx)
sttbarrier.peer2Tt = make(map[UniqueID]Timestamp)
for _, id := range peerIds {
sttbarrier.peer2Tt[id] = Timestamp(0)
}
if len(peerIds) != len(sttbarrier.peer2Tt) {
log.Printf("[newSoftTimeTickBarrier] Warning: there are duplicate peerIds!")
}
return &sttbarrier
}
func (ttBarrier *hardTimeTickBarrier) Close() {
ttBarrier.cancel()
}

View File

@ -0,0 +1,11 @@
package timesync
import (
ms "github.com/zilliztech/milvus-distributed/internal/msgstream"
)
type TimeTickWatcher interface {
Watch(msg *ms.TimeTickMsg)
Start()
Close()
}

View File

@ -6,7 +6,6 @@ import (
"io"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go"
"github.com/uber/jaeger-client-go/config"
)
@ -52,12 +51,9 @@ func (node *WriteNode) Start() error {
Type: "const",
Param: 1,
},
Reporter: &config.ReporterConfig{
LogSpans: true,
},
}
var err error
node.tracer, node.closer, err = cfg.NewTracer(config.Logger(jaeger.StdLogger))
node.tracer, node.closer, err = cfg.NewTracer()
if err != nil {
panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
}