milvus/docs/developer_guides/chap06_root_coordinator.md

21 KiB

6. Root Coordinator

6.1 Root Coordinator Interface

type RootCoord interface {
	Component
	TimeTickProvider

	// DDL request
  // CreateCollection notifies RootCoord to create a collection
	CreateCollection(ctx context.Context, req *milvuspb.CreateCollectionRequest) (*commonpb.Status, error)
	// DropCollection notifies RootCoord to drop a collection
	DropCollection(ctx context.Context, req *milvuspb.DropCollectionRequest) (*commonpb.Status, error)
  // HasCollection notifies RootCoord to check a collection's existence at specified timestamp
	HasCollection(ctx context.Context, req *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error)
  // DescribeCollection notifies RootCoord to get all information about this collection at specified timestamp
	DescribeCollection(ctx context.Context, req *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error)
  // ShowCollections notifies RootCoord to list all collection names and other info in database at specified timestamp
	ShowCollections(ctx context.Context, req *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error)
	// CreatePartition notifies RootCoord to create a partition
  CreatePartition(ctx context.Context, req *milvuspb.CreatePartitionRequest) (*commonpb.Status, error)
  // DropPartition notifies RootCoord to drop a partition
	DropPartition(ctx context.Context, req *milvuspb.DropPartitionRequest) (*commonpb.Status, error)
  // HasPartition notifies RootCoord to check if a partition with specified name exists in the collection
	HasPartition(ctx context.Context, req *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error)
  // ShowPartitions notifies RootCoord to list all partition names and other info in the collection
	ShowPartitions(ctx context.Context, req *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error)

	//index builder service
  // CreateIndex notifies RootCoord to create an index for the specified field in the collection
	CreateIndex(ctx context.Context, req *milvuspb.CreateIndexRequest) (*commonpb.Status, error)
  // DescribeIndex notifies RootCoord to get specified index information for specified field
	DescribeIndex(ctx context.Context, req *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error)
  // DropIndex notifies RootCoord to drop the specified index for the specified field
	DropIndex(ctx context.Context, req *milvuspb.DropIndexRequest) (*commonpb.Status, error)

	//global timestamp allocator
  // AllocTimestamp notifies RootCoord to alloc timestamps
	AllocTimestamp(ctx context.Context, req *rootcoordpb.AllocTimestampRequest) (*rootcoordpb.AllocTimestampResponse, error)
  // AllocID notifies RootCoord to alloc IDs
	AllocID(ctx context.Context, req *rootcoordpb.AllocIDRequest) (*rootcoordpb.AllocIDResponse, error)
  // UpdateChannelTimeTick notifies RootCoord to update each Proxy's safe timestamp
	UpdateChannelTimeTick(ctx context.Context, req *internalpb.ChannelTimeTickMsg) (*commonpb.Status, error)

	//segment
	// DescribeSegment notifies RootCoord to get specified segment information in the collection
	DescribeSegment(ctx context.Context, req *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error)
	// ShowSegments notifies RootCoord to list all segment ids in the collection or partition
	ShowSegments(ctx context.Context, req *milvuspb.ShowSegmentsRequest) (*milvuspb.ShowSegmentsResponse, error)
	// ReleaseDQLMessageStream notifies RootCoord to release and close the search message stream of specific collection.
	ReleaseDQLMessageStream(ctx context.Context, in *proxypb.ReleaseDQLMessageStreamRequest) (*commonpb.Status, error)

  // SegmentFlushCompleted notifies RootCoord that specified segment has been flushed
  SegmentFlushCompleted(ctx context.Context, in *datapb.SegmentFlushCompletedMsg) (*commonpb.Status, error)
  // GetMetrics notifies RootCoord to collect metrics for specified component
  GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error)
}
  • MsgBase
type MsgBase struct {
	MsgType   MsgType
	MsgID	    UniqueID
	Timestamp Timestamp
	SourceID  UniqueID
}
  • CreateCollection
type CreateCollectionRequest struct {
	Base           *commonpb.MsgBase
	DbName         string
	CollectionName string
	Schema         []byte
    ShardsNum      int32
}
  • DropCollection
type DropCollectionRequest struct {
	Base           *commonpb.MsgBase
	DbName         string
	CollectionName string
}
  • HasCollection
type HasCollectionRequest struct {
	Base           *commonpb.MsgBase
	DbName         string
	CollectionName string
    TimeStamp      Timestamp
}
  • DescribeCollection
type DescribeCollectionRequest struct {
	Base           *commonpb.MsgBase
	DbName         string
	CollectionName string
	CollectionID   UniqueID
    TimeStamp      Timestamp
}

type CollectionSchema struct {
	Name        string
	Description string
	AutoID      bool
	Fields      []*FieldSchema
}

type DescribeCollectionResponse struct {
	Status       *commonpb.Status
	Schema       *schemapb.CollectionSchema
	CollectionID UniqueID
}
  • ShowCollections
type ShowCollectionsRequest struct {
	Base   *commonpb.MsgBase
	DbName string
    Timestamp Timestamp
    Type ShowCollectionsType
}

type ShowCollectionResponse struct {
	Status          *commonpb.Status
	CollectionNames []string
    CollectionIds   []UniqueID
}
  • CreatePartition
type CreatePartitionRequest struct {
	Base           *commonpb.MsgBase
	DbName         string
	CollectionName string
	PartitionName  string
}
  • DropPartition
type DropPartitionRequest struct {
	Base           *commonpb.MsgBase
	DbName         string
	CollectionName string
	PartitionName  string
}
  • HasPartition
type HasPartitionRequest struct {
	Base           *commonpb.MsgBase
	DbName         string
	CollectionName string
	PartitionName  string
}
  • ShowPartitions
type ShowPartitionRequest struct {
	Base           *commonpb.MsgBase
	DbName         string
	CollectionName string
	CollectionID   UniqueID
}

type ShowPartitionResponse struct {
	Status         *commonpb.Status
	PartitionNames []string
	PartitionIDs   []UniqueID
}
  • DescribeSegment
type DescribeSegmentRequest struct {
	Base         *commonpb.MsgBase
	CollectionID UniqueID
	SegmentID    UniqueID
}

type DescribeSegmentResponse struct {
	Status      *commonpb.Status
	IndexID     UniqueID
	BuildID     UniqueID
	EnableIndex bool
}
  • ShowSegments
type ShowSegmentsRequest struct {
	Base         *commonpb.MsgBase
	CollectionID UniqueID
	PartitionID  UniqueID
}

type ShowSegmentsResponse struct {
	Status     *commonpb.Status
	SegmentIDs []UniqueID
}
  • ReleaseDQLMessageStream
type ReleaseDQLMessageStreamRequest struct {
	Base         *commonpb.MsgBase
	DbID         UniqueID
	CollectionID UniqueID
}

  • CreateIndex
type CreateIndexRequest struct {
	Base           *commonpb.MsgBase
	DbName         string
	CollectionName string
	FieldName      string
	ExtraParams    []*commonpb.KeyValuePair
}
  • DescribeIndex
type DescribeIndexRequest struct {
	Base           *commonpb.MsgBase
	DbName         string
	CollectionName string
	FieldName      string
	IndexName      string
}

type IndexDescription struct {
	IndexName string
	IndexID   UniqueID
	Params    []*commonpb.KeyValuePair
	FieldName string
}

type DescribeIndexResponse struct {
	Status            *commonpb.Status
	IndexDescriptions []*IndexDescription
}
  • DropIndex
type DropIndexRequest struct {
	Base           *commonpb.MsgBase
	DbName         string
	CollectionName string
	FieldName      string
	IndexName      string
}
  • AllocTimestamp
type AllocTimestampRequest struct {
	Base  *commonpb.MsgBase
	Count uint32
}

type AllocTimestampResponse struct {
	Status    *commonpb.Status
	Timestamp UniqueID
	Count     uint32
}
  • AllocID
type AllocIDRequest struct {
	Base  *commonpb.MsgBase
	Count uint32
}

type AllocIDResponse struct {
	Status *commonpb.Status
	ID     UniqueID
	Count  uint32
}
  • UpdateChannelTimeTick
type ChannelTimeTickMsg struct {
	Base             *commonpb.MsgBase
	ChannelNames     []string
	Timestamps       []Timestamp
	DefaultTimestamp Timestamp
}

6.2 Dd (Data definitions) Message

RootCoord would put Dd Message into the DML MsgSteams

  • BaseMsg
type BaseMsg struct {
	Ctx            context.Context
	BeginTimestamp Timestamp
	EndTimestamp   Timestamp
	HashValues     []uint32
	MsgPosition    *MsgPosition
}
  • CreateCollectionMsg
type CreateCollectionRequest struct {
	Base                 *commonpb.MsgBase
	DbName               string
	CollectionName       string
	DbID                 UniqueID
	CollectionID         UniqueID
	Schema               []byte
	VirtualChannelNames  []string
	PhysicalChannelNames []string
}

type CreateCollectionMsg struct {
	BaseMsg
	msgpb.CreateCollectionRequest
}
  • DropCollectionMsg
type DropCollectionRequest struct {
	Base           *commonpb.MsgBase
	DbName         string
	CollectionName string
	DbID           UniqueID
	CollectionID   UniqueID
}

type DropCollectionMsg struct {
	BaseMsg
	DropCollectionRequest
}
  • CreatePartitionMsg
type CreatePartitionRequest struct {
	Base           *commonpb.MsgBase
	DbName         string
	CollectionName string
	PartitionName  string
	DbID           UniqueID
	CollectionID   UniqueID
	PartitionID    UniqueID
}

type CreatePartitionMsg struct {
	BaseMsg
	CreatePartitionRequest
}
  • DropPartitionMsg
type DropPartitionRequest struct {
	Base           *commonpb.MsgBase
	DbName         string
	CollectionName string
	PartitionName  string
	DbID           UniqueID
	CollectionID   UniqueID
	PartitionID    UniqueID
}

type DropPartitionMsg struct {
	BaseMsg
	DropPartitionRequest
}

6.3 Create Index automatically

RootCoord would notify IndexCoord(Index Coordinator) to build index automatically when the segment has been flushed.

6.4 RootCoord Instance

type Core struct {
	MetaTable *metaTable
	//id allocator
	IDAllocator       func(count uint32) (typeutil.UniqueID, typeutil.UniqueID, error)
	IDAllocatorUpdate func() error

	//tso allocator
	TSOAllocator       func(count uint32) (typeutil.Timestamp, error)
	TSOAllocatorUpdate func() error

	//inner members
	ctx     context.Context
	cancel  context.CancelFunc
	etcdCli *clientv3.Client
	kvBase  *etcdkv.etcdKV

	//setMsgStreams, send time tick into dd channel and time tick channel
	SendTimeTick func(t typeutil.Timestamp) error

	//setMsgStreams, send create collection into dd channel
	SendDdCreateCollectionReq func(ctx context.Context, req *msgpb.CreateCollectionRequest, channelNames []string) error

	//setMsgStreams, send drop collection into dd channel, and notify the proxy to delete this collection
	SendDdDropCollectionReq func(ctx context.Context, req *msgpb.DropCollectionRequest, channelNames []string) error

	//setMsgStreams, send create partition into dd channel
	SendDdCreatePartitionReq func(ctx context.Context, req *msgpb.CreatePartitionRequest, channelNames []string) error

	//setMsgStreams, send drop partition into dd channel
	SendDdDropPartitionReq func(ctx context.Context, req *msgpb.DropPartitionRequest, channelNames []string) error

	// if RootCoord create segment, DataCoord will put segment msg into this channel
	DataCoordSegmentChan <-chan *ms.MsgPack

	// if segment flush completed, DataNode would put segment msg into this channel
	DataNodeFlushedSegmentChan <-chan *ms.MsgPack

	//get binlog file path from data service,
	CallGetBinlogFilePathsService func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error)
	CallGetNumRowsService         func(segID typeutil.UniqueID, isFromFlushedChan bool) (int64, error)

	//call index builder's client to build index, return build id
	CallBuildIndexService func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo) (typeutil.UniqueID, error)
	CallDropIndexService  func(ctx context.Context, indexID typeutil.UniqueID) error

	NewProxyClient func(sess *sessionutil.Session) (types.Proxy, error)

	//query service interface, notify query service to release collection
	CallReleaseCollectionService func(ctx context.Context, ts typeutil.Timestamp, dbID typeutil.UniqueID, collectionID typeutil.UniqueID) error

	//dd request scheduler
	ddReqQueue chan reqTask //dd request will be push into this chan

	//dml channels
	dmlChannels *dmlChannels

	//Proxy manager
	proxyManager *proxyManager

	// proxy clients
	proxyClientManager *proxyClientManager

	// channel timetick
	chanTimeTick *timetickSync

	//time tick loop
	lastTimeTick typeutil.Timestamp

	//states code
	stateCode atomic.Value

	//call once
	initOnce  sync.Once
	startOnce sync.Once
	//isInit    atomic.Value

	session     *sessionutil.Session
	sessCloseCh <-chan bool

	msFactory ms.Factory
}

6.5 Data definition Request Scheduler

6.5.1 Task

RootCoord receives data definition requests via grpc. Each request (described by a proto) will be wrapped as a task for further scheduling. The task interface is

type reqTask interface {
	Ctx() context.Context
	Type() commonpb.MsgType
	Execute(ctx context.Context) error
	WaitToFinish() error
	Notify(err error)
}

A task example is as follows. In this example, we wrap a CreateCollectionRequest (a proto) as a createCollectionTask. The wrapper need to implement task interfaces.

type CreateCollectionReqTask struct {
	baseReqTask
	Req *milvuspb.CreateCollectionRequest
}

// Task interfaces
func (t *CreateCollectionReqTask) Ctx() context.Context
func (t *CreateCollectionReqTask) Type() commonpb.MsgType
func (t *CreateCollectionReqTask) Execute(ctx context.Context) error
func (t *CreateCollectionReqTask) WaitToFinish() error
func (t *CreateCollectionReqTask) Notify(err error)

In most cases, a data definition task need to

  • update system's meta data (via metaTable),
  • send DD Message into related DML MsgStream, so that the Data Node and Query Node would take it

6.6 Meta Table

6.6.1 Meta
  • Proxy Meta
message ProxyMeta {
  uint64 id = 1;
  common.Address address = 2;
  repeated string result_channel_names = 3;
}
  • Collection Meta
message PartitionInfo {
  string partition_name = 1;
  int64 partitionID = 2;
  repeated int64 segmentIDs = 3;
}

message IndexInfo {
  string index_name = 1;
  int64 indexID = 2;
  repeated common.KeyValuePair index_params = 3;
}

message FieldIndexInfo{
  int64 filedID = 1;
  int64 indexID = 2;
}

message CollectionInfo {
  int64 ID = 1;
  schema.CollectionSchema schema = 2;
  uint64 create_time = 3;
  repeated int64 partitionIDs = 4;
  repeated FieldIndexInfo field_indexes = 5;
  repeated string virtual_channel_names = 6;
  repeated string physical_channel_names = 7;
}
  • Segment Meta
message SegmentIndexInfo {
  int64 segmentID = 1;
  int64 fieldID = 2;
  int64 indexID = 3;
  int64 buildID = 4;
  bool enable_index = 5;
}
6.6.2 KV pairs in etcdKV
"proxy/$proxyId" string -> proxyMetaBlob string
"collection/$collectionId" string -> collectionInfoBlob string
"partition/$collectionId/$partitionId" string -> partitionInfoBlob string
"index/$collectionId/$indexId" string -> IndexInfoBlob string
"segment-index/$collectionId/$indexId/$partitionId/$segmentId" -> segmentIndexInfoBlog string

Note that tenantId, proxyId, collectionId, partitionId, indexId, segmentId are unique strings converted from int64.

proxyMetaBlob, collectionInfoBlob, partitionInfoBlob, IndexInfoBlob, segmentIndexInfoBlog are serialized protos.

6.6.3 Meta Table
type metaTable struct {
	txn             kv.TxnKV                                                        // client of a reliable txnkv service, i.e. etcd client
	snapshot        kv.SnapShotKV                                                   // client of a reliable snapshotkv service, i.e. etcd client
	collID2Meta     map[typeutil.UniqueID]pb.CollectionInfo                         // collection_id -> meta
	collName2ID     map[string]typeutil.UniqueID                                    // collection name to collection id
	collAlias2ID    map[string]typeutil.UniqueID                                    // collection alias to collection id
	partID2SegID    map[typeutil.UniqueID]map[typeutil.UniqueID]bool                // partition_id -> segment_id -> bool
	segID2IndexMeta map[typeutil.UniqueID]map[typeutil.UniqueID]pb.SegmentIndexInfo // collection_id/index_id/partition_id/segment_id -> meta
	indexID2Meta    map[typeutil.UniqueID]pb.IndexInfo                              // collection_id/index_id -> meta

	proxyLock  sync.RWMutex
	ddLock     sync.RWMutex
}

func NewMetaTable(kv kv.SnapShotKV) (*metaTable, error)

func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, part *pb.PartitionInfo, idx []*pb.IndexInfo, ddOpStr func(ts typeutil.Timestamp) (string, error)) (typeutil.Timestamp, error)
func (mt *metaTable) DeleteCollection(collID typeutil.UniqueID, ddOpStr func(ts typeutil.Timestamp) (string, error)) (typeutil.Timestamp, error)
func (mt *metaTable) HasCollection(collID typeutil.UniqueID, ts typeutil.Timestamp) bool
func (mt *metaTable) GetCollectionByID(collectionID typeutil.UniqueID, ts typeutil.Timestamp) (*pb.CollectionInfo, error)
func (mt *metaTable) GetCollectionByName(collectionName string, ts typeutil.Timestamp) (*pb.CollectionInfo, error)
func (mt *metaTable) GetCollectionBySegmentID(segID typeutil.UniqueID) (*pb.CollectionInfo, error)
func (mt *metaTable) ListCollections(ts typeutil.Timestamp) (map[string]typeutil.UniqueID, error)
func (mt *metaTable) ListCollectionVirtualChannels() []string
func (mt *metaTable) ListCollectionPhysicalChannels() []string
func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string, partitionID typeutil.UniqueID, ddOpStr func(ts typeutil.Timestamp) (string, error)) (typeutil.Timestamp, error)
func (mt *metaTable) GetPartitionByName(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp) (pb.PartitionInfo, error)
func (mt *metaTable) HasPartition(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp) bool
func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName string, ddOpStr func(ts typeutil.Timestamp) (string, error)) (typeutil.Timestamp, typeutil.UniqueID, error)
func (mt *metaTable) GetPartitionByID(collID typeutil.UniqueID, partitionID typeutil.UniqueID, ts typeutil.Timestamp) (pb.PartitionInfo, error)
func (mt *metaTable) AddSegment(segInfos []*datapb.SegmentInfo, msgStartPos string, msgEndPos string) (typeutil.Timestamp, error)
func (mt *metaTable) AddIndex(segIdxInfos []*pb.SegmentIndexInfo, msgStartPos string, msgEndPos string) (typeutil.Timestamp, error)
func (mt *metaTable) DropIndex(collName, fieldName, indexName string) (typeutil.Timestamp, typeutil.UniqueID, bool, error)
func (mt *metaTable) GetSegmentIndexInfoByID(segID typeutil.UniqueID, filedID int64, idxName string) (pb.SegmentIndexInfo, error)
func (mt *metaTable) GetFieldSchema(collName string, fieldName string) (schemapb.FieldSchema, error)
func (mt *metaTable) IsSegmentIndexed(segID typeutil.UniqueID, fieldSchema *schemapb.FieldSchema, indexParams []*commonpb.KeyValuePair) bool
func (mt *metaTable) GetNotIndexedSegments(collName string, fieldName string, idxInfo *pb.IndexInfo) ([]typeutil.UniqueID, schemapb.FieldSchema, error)
func (mt *metaTable) GetIndexByName(collName, indexName string) (pb.CollectionInfo, []pb.IndexInfo, error)
func (mt *metaTable) GetIndexByID(indexID typeutil.UniqueID) (*pb.IndexInfo, error)
func (mt *metaTable) AddFlushedSegment(segID typeutil.UniqueID) error
  • metaTable maintains meta both in memory and etcdKV. It keeps meta's consistency in both sides. All its member functions may be called concurrently.

  • for HasCollection, GetCollectionByID, GetCollectionByName, ListCollections, if the argument of ts is none-zero, then metaTable would return the meta on the timestamp of ts; if ts is zero, metaTable would return the latest meta

6.7 System Time Synchronization

type timetickSync struct {
	core          *Core
	lock          sync.Mutex
	proxyTimeTick map[typeutil.UniqueID]*channelTimeTickMsg
	sendChan      chan map[typeutil.UniqueID]*channelTimeTickMsg

	// record ddl timetick info
	ddlLock  sync.RWMutex
	ddlMinTs typeutil.Timestamp
	ddlTsSet map[typeutil.Timestamp]struct{}
}

func newTimeTickSync(core *Core) *timetickSync

func (t *timetickSync) UpdateTimeTick(in *internalpb.ChannelTimeTickMsg) error
func (t *timetickSync) DelProxy(sess *sessionutil.Session)
func (t *timetickSync) GetProxy(sess []*sessionutil.Session)
func (t *timetickSync) StartWatch()
func (t *timetickSync) SendChannelTimeTick(chanName string, ts typeutil.Timestamp) error
func (t *timetickSync) GetProxyNum()
func (t *timetickSync) GetChanNum() int