mirror of https://github.com/milvus-io/milvus.git
[skip ci] update doc of RootCoordinator (#5908)
* [skip ci]rename master serice to RootCoordinator Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * rename chap06_master.md to chap06_root_coordinator.md Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * root coord Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * create index Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * [skip ci]udpate doc Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * [skip ci] update doc Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * update doc Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * [skip ci] update doc Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * [skip ci] update doc Signed-off-by: yefu.chen <yefu.chen@zilliz.com>pull/5915/head
parent
cecaa40f92
commit
45dade553d
|
@ -1,87 +0,0 @@
|
|||
## Master service recovery on power failure
|
||||
|
||||
## 1. Basic idea
|
||||
1. `master service` reads meta from etcd when it starts
|
||||
2. `master service` needs to store the `position` of the msgstream into etcd every time it consumes the msgstream.
|
||||
3. `master service` reads the `position` of msgstream from etcd when it starts up, then seek to the specified `position` and re-consume the msgstream
|
||||
4. Ensure that all messages from the msgstream are processed in idempotent fashion, so that repeated consumption of the same message does not cause system inconsistencies
|
||||
5. `master service` registers itself in etcd and finds out if the dependent `data service` and `index service` are online via etcd
|
||||
|
||||
## 2. Specific tasks
|
||||
|
||||
### 2.1 Read meta from etcd
|
||||
1. `master service` needs to load meta from etcd when it starts, this part is already done
|
||||
|
||||
### 2.2 `dd requests` from grpc
|
||||
1. The `dd requests`, such as create_collection, create_partition, etc., from grpc are marked as done only if the related meata have been writen into etcd.
|
||||
2. The `dd requests` should be send to `dd msgstream` when the operation is done.
|
||||
3. There may be a fault here, that is, the `dd request` has been written to etcd, but it has not been sent to `dd msgstream` yet, then the `master service` has crashed.
|
||||
4. For the scenarios mentioned in item 3, `master service` needs to check if all `dd requests` are sent to `dd msgstream` when it starts up.
|
||||
5. `master service`'s built-in scheduler ensures that all grpc requests are executed serially, so it only needs to check whether the most recent `dd requests` are sent to the `dd msgstream`, and resend them if not.
|
||||
6. Take `create_collection` as an example to illustrate the process
|
||||
- When `create collection` is written to etcd, 2 additional keys are updated, `dd_msg` and `dd_type`
|
||||
- `dd_msg` is the serialization of the `dd_msg`
|
||||
- `dd_type` is the message type of `dd_msg`, such as `create_collection`, `create_partition`, `drop_collection,` etc. It's used to deserializes `dd_msg`.
|
||||
- Update the meta of `create_collection`, `dd_msg` and `dd_type` at the same time in a transactional manner.
|
||||
- When `dd_msg` has been sent to `dd msgstream`, delete `dd_msg` and `dd_type` from etcd.
|
||||
- When the `master service` starts, first check whether there are `dd_msg` and `dd_type` in etcd, if so, then deserialize `dd_msg` according to `dd_type`, and then send it to the `dd msgstream`, otherwise no processing will be done
|
||||
- There may be a failure here, that is, `dd_msg` has been sent to the `dd msgstream` , but has not been deleted from etcd yet, then the `master service` crashed, at this case, the `dd_msg` would be sent to `dd msgstream` repeatedly, so the receiver needs to count this case.
|
||||
|
||||
### 2.3 `create index` requests from grpc
|
||||
1. In the processing of `create index`, `master service` calls `metaTable`'s `GetNotIndexedSegments` to get all segment ids that are not indexed
|
||||
2. After getting the segment ids, `master service` call `index service` create the index on these segment ids.
|
||||
3. In the current implementation, the `create index` requests will return after the segment ids are put into a go channel.
|
||||
4. The `master service` starts a background task that keeps reading the segment ids from the go channel, and then calls the `index service` to create the index.
|
||||
5. There is a fault here, the segment ids have been put into the go channel in the processing function of the grpc request, and then the grpc returns, but the `master service`'s background task has not yet read them from the go channel, theb `master service` crashes. At this time, the client thinks that the index is created, but the `master service` does not call `index service` to create the index.
|
||||
6. The solution for the fault mentioned in item 5
|
||||
- Remove the go channel and `master service`'s background task
|
||||
- In the request processing function of `create index`, the call will return only when all segment ids have been send `index service`
|
||||
- Some segment ids may be send to `index service` repeatedly, and `index service` needs to handle such requests
|
||||
|
||||
### 2.4 New segment from `data service`
|
||||
1. Each time a new segment is created, the `data service` sends the segment id to the `master service` via msgstream
|
||||
2. `master service` needs to update the segment id to the collection meta and record the position of the msgstream in etcd
|
||||
3. Step 2 is transactional and the operation will be successful only if the collection meta in etcd is updated
|
||||
4. So the `master service` only needs to restore the msgstream to the position when recovering from a power failure
|
||||
|
||||
### 2.5 Flushed segment from `data node`
|
||||
1. Each time the `data node` finishes flushing a segment, it sends the segment id to the `master service` via msgstream.
|
||||
2. `master service` needs to fetch binlog from `data service` by id and send request to `index service` to create index on this segment
|
||||
3. When the `index service` is called successfully, it will return a build id, and then `master service` will update the build id to the `collection meta` and record the position of the msgstream in etcd.
|
||||
4. Step 3 is transactional and the operation will be successful only if the `collection meta` in etcd is updated
|
||||
5. So the `master service` only needs to restore the msgstream to the position when recovering from a power failure
|
||||
|
||||
### 2.6 Failed to call external grpc service
|
||||
1. `master service` needs grpc service from `data service` and `index service`, if the grpc call failed, it needs to reconnect.
|
||||
2. `master service` does not listen to the status of the `data service` and `index service` in real time
|
||||
|
||||
### 2.7 Add virtual channel assignment when creating collection
|
||||
1. Add a new field, "number of shards" in the `create collection` request, the "num of shards" tell the `master service` to create the number of virtual channel for this collection.
|
||||
2. In the current implementation, virtual channels and physical channels have a one-to-one relationship, and the total number of physical channels increases as the number of virtual channels increases; later, the total number of physical channels needs to be fixed, and multiple virtual channels share one physical channel
|
||||
3. The name of the virtual channel is globally unique, and the `collection meta` records the correspondence between the virtual channel and the physical channel
|
||||
|
||||
|
||||
### Add processing of time synchronization signals from proxy node
|
||||
1. A virtual channel can be inserted by multiple proxies, so the timestamp in the virtual channel is not increase monotonically
|
||||
2. All proxies report the timestamp of all the virtual channels to the `master service` periodically
|
||||
3. The `master service` collects the timestamps from the proxies on each virtual channel and gets the minimum one as the timestamp of that virtual channel, and then inserts the timestamp into the virtual channel
|
||||
4. The proxy reports the timestamp to the `master service` via grpc
|
||||
5. The proxy needs to register itself in etcd when it starts, `master service` will listen to the corresponding key to determine how many active proxies there are, and thus determine if all of them have sent timestamps to master
|
||||
6. If a proxy is not registered in etcd but sends a timestamp or any other grpc request to master, master will ignore the grpc request
|
||||
|
||||
### 2.9 Register service in etcd
|
||||
1. `master service` needs to register itself with etcd when it starts
|
||||
2. The registration should include ip address, port, its own id, global incremental timestamp
|
||||
|
||||
### 2.10 Remove the code related to proxy service
|
||||
1. The `proxy service` related code will be removed
|
||||
2. The the job of time synchronization which done by the `proxy service` is partially simplified and handed over to the master (subsection 2.8)
|
||||
|
||||
### 2.11 Query collection meta based on timeline
|
||||
1. Add a new field of `timestamp` to the grpc request of `describe collection`
|
||||
2. `master service` should provide snapshot on the `collection mate`
|
||||
3. Return the `collection meta` at the point of timestamp mentioned in the request
|
||||
|
||||
### 2.12 Timestamp of `dd operations`
|
||||
1. `master service` response to set the timestamp of `dd operations`, create collection, create partition, drop collection, drop partition
|
||||
2. `master service` response to send timestamp to `dd msgstream`, if there is a dd message, then use the current latest timestamp from that message, if not, get a timestamp from tso
|
|
@ -0,0 +1,86 @@
|
|||
## Root Coordinator recovery on power failure
|
||||
|
||||
## 1. Basic idea
|
||||
1. `RC(Root Coordinator`) reads meta from etcd when it starts
|
||||
2. `RC` needs to store the `position` of the msgstream into etcd every time it consumes the msgstream.
|
||||
3. `RC` reads the `position` of msgstream from etcd when it starts up, then seek to the specified `position` and re-consume the msgstream
|
||||
4. Ensure that all messages from the msgstream are processed in idempotent fashion, so that repeated consumption of the same message does not cause system inconsistencies
|
||||
5. `RC` registers itself in etcd and finds out if the dependent `DC(Data Coordinator)` and `IC(Index Coordinator)` are online via etcd
|
||||
|
||||
## 2. Specific tasks
|
||||
|
||||
### 2.1 Read meta from etcd
|
||||
1. `RC` needs to load meta from etcd when it starts, this part is already done
|
||||
|
||||
### 2.2 `dd requests` from grpc
|
||||
1. The `dd requests`, such as create_collection, create_partition, etc., from grpc are marked as done only if the related meata have been writen into etcd.
|
||||
2. The `dd requests` should be send to `dd msgstream` when the operation is done.
|
||||
3. There may be a fault here, that is, the `dd request` has been written to etcd, but it has not been sent to `dd msgstream` yet, then the `RC` has crashed.
|
||||
4. For the scenarios mentioned in item 3, `RC` needs to check if all `dd requests` are sent to `dd msgstream` when it starts up.
|
||||
5. `RC`'s built-in scheduler ensures that all grpc requests are executed serially, so it only needs to check whether the most recent `dd requests` are sent to the `dd msgstream`, and resend them if not.
|
||||
6. Take `create_collection` as an example to illustrate the process
|
||||
- When `create collection` is written to etcd, 2 additional keys are updated, `dd_msg` and `dd_type`
|
||||
- `dd_msg` is the serialization of the `dd_msg`
|
||||
- `dd_type` is the message type of `dd_msg`, such as `create_collection`, `create_partition`, `drop_collection,` etc. It's used to deserializes `dd_msg`.
|
||||
- Update the meta of `create_collection`, `dd_msg` and `dd_type` at the same time in a transactional manner.
|
||||
- When `dd_msg` has been sent to `dd msgstream`, delete `dd_msg` and `dd_type` from etcd.
|
||||
- When the `RC` starts, first check whether there are `dd_msg` and `dd_type` in etcd, if so, then deserialize `dd_msg` according to `dd_type`, and then send it to the `dd msgstream`, otherwise no processing will be done
|
||||
- There may be a failure here, that is, `dd_msg` has been sent to the `dd msgstream` , but has not been deleted from etcd yet, then the `RC` crashed, at this case, the `dd_msg` would be sent to `dd msgstream` repeatedly, so the receiver needs to count this case.
|
||||
|
||||
### 2.3 `create index` requests from grpc
|
||||
1. In the processing of `create index`, `RC` calls `metaTable`'s `GetNotIndexedSegments` to get all segment ids that are not indexed
|
||||
2. After getting the segment ids, `RC` call `IC` create the index on these segment ids.
|
||||
3. In the current implementation, the `create index` requests will return after the segment ids are put into a go channel.
|
||||
4. The `RC` starts a background task that keeps reading the segment ids from the go channel, and then calls the `IC` to create the index.
|
||||
5. There is a fault here, the segment ids have been put into the go channel in the processing function of the grpc request, and then the grpc returns, but the `RC`'s background task has not yet read them from the go channel, theb `RC` crashes. At this time, the client thinks that the index is created, but the `RC` does not call `IC` to create the index.
|
||||
6. The solution for the fault mentioned in item 5
|
||||
- Remove the go channel and `RC`'s background task
|
||||
- In the request processing function of `create index`, the call will return only when all segment ids have been send `IC`
|
||||
- Some segment ids may be send to `IC` repeatedly, and `IC` needs to handle such requests
|
||||
|
||||
### 2.4 New segment from `DC`
|
||||
1. Each time a new segment is created, the `DC` sends the segment id to the `RC` via msgstream
|
||||
2. `RC` needs to update the segment id to the collection meta and record the position of the msgstream in etcd
|
||||
3. Step 2 is transactional and the operation will be successful only if the collection meta in etcd is updated
|
||||
4. So the `RC` only needs to restore the msgstream to the position when recovering from a power failure
|
||||
|
||||
### 2.5 Flushed segment from `data node`
|
||||
1. Each time the `data node` finishes flushing a segment, it sends the segment id to the `RC` via msgstream.
|
||||
2. `RC` needs to fetch binlog from `DC` by id and send request to `IC` to create index on this segment
|
||||
3. When the `IC` is called successfully, it will return a build id, and then `RC` will update the build id to the `collection meta` and record the position of the msgstream in etcd.
|
||||
4. Step 3 is transactional and the operation will be successful only if the `collection meta` in etcd is updated
|
||||
5. So the `RC` only needs to restore the msgstream to the position when recovering from a power failure
|
||||
|
||||
### 2.6 Failed to call external grpc service
|
||||
1. `RC` depends on `DC` and `IC`, if the grpc call failed, it needs to reconnect.
|
||||
2. `RC` does not listen to the status of the `DC` and `IC` in real time
|
||||
|
||||
### 2.7 Add virtual channel assignment when creating collection
|
||||
1. Add a new field, "number of shards" in the `create collection` request, the "num of shards" tell the `RC` to create the number of virtual channel for this collection.
|
||||
2. In the current implementation, virtual channels and physical channels have a one-to-one relationship, and the total number of physical channels increases as the number of virtual channels increases; later, the total number of physical channels needs to be fixed, and multiple virtual channels share one physical channel
|
||||
3. The name of the virtual channel is globally unique, and the `collection meta` records the correspondence between the virtual channel and the physical channel
|
||||
|
||||
|
||||
### Add processing of time synchronization signals from proxy node
|
||||
1. A virtual channel can be inserted by multiple proxies, so the timestamp in the virtual channel is not increase monotonically
|
||||
2. All proxies report the timestamp of all the virtual channels to the `RC` periodically
|
||||
3. The `RC` collects the timestamps from the proxies on each virtual channel and gets the minimum one as the timestamp of that virtual channel, and then inserts the timestamp into the virtual channel
|
||||
4. The proxy reports the timestamp to the `RC` via grpc
|
||||
5. The proxy needs to register itself in etcd when it starts, `RC` will listen to the corresponding key to determine how many active proxies there are, and thus determine if all of them have sent timestamps to `RC`
|
||||
6. If a proxy is not registered in etcd but sends a timestamp or any other grpc request to `RC`, `RC` will ignore the grpc request
|
||||
|
||||
### 2.9 Register service in etcd
|
||||
1. `RC` needs to register itself with etcd when it starts
|
||||
2. The registration should include ip address, port, its own id, global incremental timestamp
|
||||
|
||||
### 2.10 Remove the code related to proxy service
|
||||
1. The `proxy service` related code will be removed
|
||||
2. The the job of time synchronization which done by the `proxy service` is partially simplified and handed over to the `RC` (subsection 2.8)
|
||||
|
||||
### 2.11 Query collection meta based on timeline
|
||||
1. Add a new field of `timestamp` to the grpc request of `describe collection`
|
||||
2. `RC` should provide snapshot on the `collection mate`
|
||||
3. Return the `collection meta` at the point of timestamp mentioned in the request
|
||||
|
||||
### 2.12 Timestamp of `dd operations`
|
||||
1. `RC` response to set the timestamp of `dd operations`, create collection, create partition, drop collection, drop partition, and send this timestamp into `dml msgstream`
|
|
@ -1,818 +0,0 @@
|
|||
|
||||
|
||||
## 10. Master
|
||||
|
||||
<img src="./figs/master.jpeg" width=700>
|
||||
|
||||
#### 10.1 Master Interface
|
||||
|
||||
```go
|
||||
type MasterService interface {
|
||||
Component
|
||||
|
||||
//DDL request
|
||||
CreateCollection(ctx context.Context, req *milvuspb.CreateCollectionRequest) (*commonpb.Status, error)
|
||||
DropCollection(ctx context.Context, req *milvuspb.DropCollectionRequest) (*commonpb.Status, error)
|
||||
HasCollection(ctx context.Context, req *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error)
|
||||
DescribeCollection(ctx context.Context, req *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error)
|
||||
ShowCollections(ctx context.Context, req *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error)
|
||||
CreatePartition(ctx context.Context, req *milvuspb.CreatePartitionRequest) (*commonpb.Status, error)
|
||||
DropPartition(ctx context.Context, req *milvuspb.DropPartitionRequest) (*commonpb.Status, error)
|
||||
HasPartition(ctx context.Context, req *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error)
|
||||
ShowPartitions(ctx context.Context, req *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error)
|
||||
|
||||
//index builder service
|
||||
CreateIndex(ctx context.Context, req *milvuspb.CreateIndexRequest) (*commonpb.Status, error)
|
||||
DescribeIndex(ctx context.Context, req *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error)
|
||||
DropIndex(ctx context.Context, req *milvuspb.DropIndexRequest) (*commonpb.Status, error)
|
||||
|
||||
//global timestamp allocator
|
||||
AllocTimestamp(ctx context.Context, req *masterpb.AllocTimestampRequest) (*masterpb.AllocTimestampResponse, error)
|
||||
AllocID(ctx context.Context, req *masterpb.AllocIDRequest) (*masterpb.AllocIDResponse, error)
|
||||
|
||||
//segment
|
||||
DescribeSegment(ctx context.Context, req *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error)
|
||||
ShowSegments(ctx context.Context, req *milvuspb.ShowSegmentsRequest) (*milvuspb.ShowSegmentsResponse, error)
|
||||
|
||||
GetDdChannel(ctx context.Context) (*milvuspb.StringResponse, error)
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
|
||||
* *MsgBase*
|
||||
|
||||
```go
|
||||
type MsgBase struct {
|
||||
MsgType MsgType
|
||||
MsgID UniqueID
|
||||
Timestamp Timestamp
|
||||
SourceID UniqueID
|
||||
}
|
||||
```
|
||||
|
||||
* *CreateCollection*
|
||||
|
||||
```go
|
||||
type CreateCollectionRequest struct {
|
||||
Base *commonpb.MsgBase
|
||||
DbName string
|
||||
CollectionName string
|
||||
Schema []byte
|
||||
}
|
||||
```
|
||||
|
||||
* *DropCollection*
|
||||
|
||||
```go
|
||||
type DropCollectionRequest struct {
|
||||
Base *commonpb.MsgBase
|
||||
DbName string
|
||||
CollectionName string
|
||||
}
|
||||
```
|
||||
|
||||
* *HasCollection*
|
||||
|
||||
```go
|
||||
type HasCollectionRequest struct {
|
||||
Base *commonpb.MsgBase
|
||||
DbName string
|
||||
CollectionName string
|
||||
}
|
||||
```
|
||||
|
||||
* *DescribeCollection*
|
||||
|
||||
```go
|
||||
type DescribeCollectionRequest struct {
|
||||
Base *commonpb.MsgBase
|
||||
DbName string
|
||||
CollectionName string
|
||||
CollectionID UniqueID
|
||||
}
|
||||
|
||||
type CollectionSchema struct {
|
||||
Name string
|
||||
Description string
|
||||
AutoID bool
|
||||
Fields []*FieldSchema
|
||||
}
|
||||
|
||||
type DescribeCollectionResponse struct {
|
||||
Status *commonpb.Status
|
||||
Schema *schemapb.CollectionSchema
|
||||
CollectionID int64
|
||||
}
|
||||
```
|
||||
|
||||
* *ShowCollections*
|
||||
|
||||
```go
|
||||
type ShowCollectionRequest struct {
|
||||
Base *commonpb.MsgBase
|
||||
DbName string
|
||||
}
|
||||
|
||||
type ShowCollectionResponse struct {
|
||||
Status *commonpb.Status
|
||||
CollectionNames []string
|
||||
}
|
||||
```
|
||||
|
||||
* *CreatePartition*
|
||||
|
||||
```go
|
||||
type CreatePartitionRequest struct {
|
||||
Base *commonpb.MsgBase
|
||||
DbName string
|
||||
CollectionName string
|
||||
PartitionName string
|
||||
}
|
||||
```
|
||||
|
||||
* *DropPartition*
|
||||
|
||||
```go
|
||||
type DropPartitionRequest struct {
|
||||
Base *commonpb.MsgBase
|
||||
DbName string
|
||||
CollectionName string
|
||||
PartitionName string
|
||||
}
|
||||
```
|
||||
|
||||
* *HasPartition*
|
||||
|
||||
```go
|
||||
type HasPartitionRequest struct {
|
||||
Base *commonpb.MsgBase
|
||||
DbName string
|
||||
CollectionName string
|
||||
PartitionName string
|
||||
}
|
||||
```
|
||||
|
||||
* *ShowPartitions*
|
||||
|
||||
```go
|
||||
type ShowPartitionRequest struct {
|
||||
Base *commonpb.MsgBase
|
||||
DbName string
|
||||
CollectionName string
|
||||
CollectionID UniqueID
|
||||
}
|
||||
|
||||
type ShowPartitionResponse struct {
|
||||
Status *commonpb.Status
|
||||
PartitionNames []string
|
||||
PartitionIDs []UniqueID
|
||||
}
|
||||
```
|
||||
|
||||
* *DescribeSegment*
|
||||
|
||||
```go
|
||||
type DescribeSegmentRequest struct {
|
||||
Base *commonpb.MsgBase
|
||||
CollectionID UniqueID
|
||||
SegmentID UniqueID
|
||||
}
|
||||
|
||||
type DescribeSegmentResponse struct {
|
||||
Status *commonpb.Status
|
||||
IndexID UniqueID
|
||||
BuildID UniqueID
|
||||
EnableIndex bool
|
||||
}
|
||||
```
|
||||
|
||||
* *ShowSegments*
|
||||
|
||||
```go
|
||||
type ShowSegmentsRequest struct {
|
||||
Base *commonpb.MsgBase
|
||||
CollectionID UniqueID
|
||||
PartitionID UniqueID
|
||||
}
|
||||
|
||||
type ShowSegmentsResponse struct {
|
||||
Status *commonpb.Status
|
||||
SegmentIDs []UniqueID
|
||||
}
|
||||
```
|
||||
|
||||
* *CreateIndex*
|
||||
|
||||
```go
|
||||
type CreateIndexRequest struct {
|
||||
Base *commonpb.MsgBase
|
||||
DbName string
|
||||
CollectionName string
|
||||
FieldName string
|
||||
ExtraParams []*commonpb.KeyValuePair
|
||||
}
|
||||
```
|
||||
|
||||
* *DescribeIndex*
|
||||
|
||||
```go
|
||||
type DescribeIndexRequest struct {
|
||||
Base *commonpb.MsgBase
|
||||
DbName string
|
||||
CollectionName string
|
||||
FieldName string
|
||||
IndexName string
|
||||
}
|
||||
|
||||
type IndexDescription struct {
|
||||
IndexName string
|
||||
IndexID UniqueID
|
||||
Params []*commonpb.KeyValuePair
|
||||
}
|
||||
|
||||
type DescribeIndexResponse struct {
|
||||
Status *commonpb.Status
|
||||
IndexDescriptions []*IndexDescription
|
||||
}
|
||||
```
|
||||
|
||||
* *DropIndex*
|
||||
|
||||
```go
|
||||
type DropIndexRequest struct {
|
||||
Base *commonpb.MsgBase
|
||||
DbName string
|
||||
CollectionName string
|
||||
FieldName string
|
||||
IndexName string
|
||||
}
|
||||
```
|
||||
|
||||
* *AllocTimestamp*
|
||||
|
||||
```go
|
||||
|
||||
type BaseRequest struct {
|
||||
Done chan error
|
||||
Valid bool
|
||||
}
|
||||
|
||||
type TSORequest struct {
|
||||
BaseRequest
|
||||
timestamp Timestamp
|
||||
count uint32
|
||||
}
|
||||
```
|
||||
|
||||
* *AllocID*
|
||||
|
||||
```go
|
||||
type BaseRequest struct {
|
||||
Done chan error
|
||||
Valid bool
|
||||
}
|
||||
|
||||
type IDRequest struct {
|
||||
BaseRequest
|
||||
id UniqueID
|
||||
count uint32
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
|
||||
#### 10.2 Dd (Data definitions) Channel
|
||||
|
||||
* *CreateCollectionMsg*
|
||||
|
||||
```go
|
||||
|
||||
type CreateCollectionRequest struct {
|
||||
Base *commonpb.MsgBase
|
||||
DbName string
|
||||
CollectionName string
|
||||
Schema []byte
|
||||
}
|
||||
|
||||
type CreateCollectionMsg struct {
|
||||
BaseMsg
|
||||
CreateCollectionRequest
|
||||
}
|
||||
```
|
||||
|
||||
* *DropCollectionMsg*
|
||||
|
||||
```go
|
||||
type DropCollectionRequest struct {
|
||||
Base *commonpb.MsgBase
|
||||
DbName string
|
||||
CollectionName string
|
||||
}
|
||||
|
||||
type DropCollectionMsg struct {
|
||||
BaseMsg
|
||||
DropCollectionRequest
|
||||
}
|
||||
```
|
||||
|
||||
* *CreatePartitionMsg*
|
||||
|
||||
```go
|
||||
type CreatePartitionRequest struct {
|
||||
Base *commonpb.MsgBase
|
||||
DbName string
|
||||
CollectionName string
|
||||
PartitionName string
|
||||
}
|
||||
|
||||
type CreatePartitionMsg struct {
|
||||
BaseMsg
|
||||
CreatePartitionRequest
|
||||
}
|
||||
```
|
||||
|
||||
* *DropPartitionMsg*
|
||||
|
||||
```go
|
||||
type DropPartitionRequest struct {
|
||||
Base *commonpb.MsgBase
|
||||
DbName string
|
||||
CollectionName string
|
||||
PartitionName string
|
||||
DbID int64
|
||||
CollectionID int64
|
||||
PartitionID int64
|
||||
}
|
||||
|
||||
type DropPartitionMsg struct {
|
||||
BaseMsg
|
||||
DropPartitionRequest
|
||||
}
|
||||
```
|
||||
|
||||
#### 10.2 Master Instance
|
||||
|
||||
```go
|
||||
type Master interface {
|
||||
MetaTable *metaTable
|
||||
//id allocator
|
||||
idAllocator *allocator.GlobalIDAllocator
|
||||
//tso allocator
|
||||
tsoAllocator *tso.GlobalTSOAllocator
|
||||
|
||||
//inner members
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
etcdCli *clientv3.Client
|
||||
kvBase *etcdkv.EtcdKV
|
||||
metaKV *etcdkv.EtcdKV
|
||||
|
||||
//setMsgStreams, receive time tick from proxy service time tick channel
|
||||
ProxyTimeTickChan chan typeutil.Timestamp
|
||||
|
||||
//setMsgStreams, send time tick into dd channel and time tick channel
|
||||
SendTimeTick func(t typeutil.Timestamp) error
|
||||
|
||||
//setMsgStreams, send create collection into dd channel
|
||||
DdCreateCollectionReq func(req *internalpb.CreateCollectionRequest) error
|
||||
|
||||
//setMsgStreams, send drop collection into dd channel, and notify the proxy to delete this collection
|
||||
DdDropCollectionReq func(req *internalpb.DropCollectionRequest) error
|
||||
|
||||
//setMsgStreams, send create partition into dd channel
|
||||
DdCreatePartitionReq func(req *internalpb.CreatePartitionRequest) error
|
||||
|
||||
//setMsgStreams, send drop partition into dd channel
|
||||
DdDropPartitionReq func(req *internalpb.DropPartitionRequest) error
|
||||
|
||||
//setMsgStreams segment channel, receive segment info from data service, if master create segment
|
||||
DataServiceSegmentChan chan *datapb.SegmentInfo
|
||||
|
||||
//setMsgStreams ,if segment flush completed, data node would put segment id into msg stream
|
||||
DataNodeSegmentFlushCompletedChan chan typeutil.UniqueID
|
||||
|
||||
//get binlog file path from data service,
|
||||
GetBinlogFilePathsFromDataServiceReq func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error)
|
||||
|
||||
//call index builder's client to build index, return build id
|
||||
BuildIndexReq func(binlog []string, typeParams []*commonpb.KeyValuePair, indexParams []*commonpb.KeyValuePair, indexID typeutil.UniqueID, indexName string) (typeutil.UniqueID, error)
|
||||
DropIndexReq func(indexID typeutil.UniqueID) error
|
||||
|
||||
//proxy service interface, notify proxy service to drop collection
|
||||
InvalidateCollectionMetaCache func(ts typeutil.Timestamp, dbName string, collectionName string) error
|
||||
|
||||
//query service interface, notify query service to release collection
|
||||
ReleaseCollection func(ts typeutil.Timestamp, dbID typeutil.UniqueID, collectionID typeutil.UniqueID) error
|
||||
|
||||
// put create index task into this chan
|
||||
indexTaskQueue chan *CreateIndexTask
|
||||
|
||||
//dd request scheduler
|
||||
ddReqQueue chan reqTask //dd request will be push into this chan
|
||||
lastDdTimeStamp typeutil.Timestamp
|
||||
|
||||
//time tick loop
|
||||
lastTimeTick typeutil.Timestamp
|
||||
|
||||
//states code
|
||||
stateCode atomic.Value
|
||||
|
||||
//call once
|
||||
initOnce sync.Once
|
||||
startOnce sync.Once
|
||||
//isInit atomic.Value
|
||||
|
||||
msFactory ms.Factory
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
#### 10.3 Data definition Request Scheduler
|
||||
|
||||
###### 10.2.1 Task
|
||||
|
||||
Master receives data definition requests via grpc. Each request (described by a proto) will be wrapped as a task for further scheduling. The task interface is
|
||||
|
||||
```go
|
||||
type reqTask interface {
|
||||
Ctx() context.Context
|
||||
Type() commonpb.MsgType
|
||||
Ts() (typeutil.Timestamp, error)
|
||||
IgnoreTimeStamp() bool
|
||||
Execute(ctx context.Context) error
|
||||
WaitToFinish() error
|
||||
Notify(err error)
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
|
||||
A task example is as follows. In this example, we wrap a CreateCollectionRequest (a proto) as a createCollectionTask. The wrapper need to implement task interfaces.
|
||||
|
||||
``` go
|
||||
type CreateCollectionReqTask struct {
|
||||
baseReqTask
|
||||
Req *milvuspb.CreateCollectionRequest
|
||||
}
|
||||
|
||||
// Task interfaces
|
||||
func (task *createCollectionTask) Ctx() context.Context
|
||||
func (task *createCollectionTask) Type() ReqType
|
||||
func (task *createCollectionTask) Ts() Timestamp
|
||||
func (task *createCollectionTask) IgnoreTimeStamp() bool
|
||||
func (task *createCollectionTask) Execute() error
|
||||
func (task *createCollectionTask) WaitToFinish() error
|
||||
func (task *createCollectionTask) Notify() error
|
||||
```
|
||||
|
||||
|
||||
|
||||
// TODO remove?
|
||||
###### 10.2.3 Scheduler
|
||||
|
||||
```go
|
||||
type ddRequestScheduler struct {
|
||||
reqQueue *task chan
|
||||
ddStream *MsgStream
|
||||
}
|
||||
|
||||
func (rs *ddRequestScheduler) Enqueue(task *task) error
|
||||
func (rs *ddRequestScheduler) schedule() *task // implement scheduling policy
|
||||
```
|
||||
|
||||
In most cases, a data definition task need to
|
||||
|
||||
* update system's meta data (via $metaTable$),
|
||||
* and synchronize the data definition request to other related system components so that the quest can take effect system wide.
|
||||
|
||||
Master
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
//TODO remove?
|
||||
#### 10.4 Meta Table
|
||||
|
||||
###### 10.4.1 Meta
|
||||
|
||||
* Tenant Meta
|
||||
|
||||
```protobuf
|
||||
message TenantMeta {
|
||||
uint64 id = 1;
|
||||
uint64 num_query_nodes = 2;
|
||||
repeated string insert_channel_names = 3;
|
||||
string query_channel_name = 4;
|
||||
}
|
||||
```
|
||||
|
||||
* Proxy Meta
|
||||
|
||||
``` protobuf
|
||||
message ProxyMeta {
|
||||
uint64 id = 1;
|
||||
common.Address address = 2;
|
||||
repeated string result_channel_names = 3;
|
||||
}
|
||||
```
|
||||
|
||||
* Collection Meta
|
||||
|
||||
```protobuf
|
||||
message CollectionMeta {
|
||||
uint64 id=1;
|
||||
schema.CollectionSchema schema=2;
|
||||
uint64 create_time=3;
|
||||
repeated uint64 segment_ids=4;
|
||||
repeated string partition_tags=5;
|
||||
}
|
||||
```
|
||||
|
||||
* Segment Meta
|
||||
|
||||
```protobuf
|
||||
message SegmentMeta {
|
||||
uint64 segment_id=1;
|
||||
uint64 collection_id =2;
|
||||
string partition_tag=3;
|
||||
int32 channel_start=4;
|
||||
int32 channel_end=5;
|
||||
uint64 open_time=6;
|
||||
uint64 close_time=7;
|
||||
int64 num_rows=8;
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
|
||||
###### 10.4.2 KV pairs in EtcdKV
|
||||
|
||||
```go
|
||||
"tenant/$tenantId" string -> tenantMetaBlob string
|
||||
"proxy/$proxyId" string -> proxyMetaBlob string
|
||||
"collection/$collectionId" string -> collectionMetaBlob string
|
||||
"segment/$segmentId" string -> segmentMetaBlob string
|
||||
```
|
||||
|
||||
Note that *tenantId*, *proxyId*, *collectionId*, *segmentId* are unique strings converted from int64.
|
||||
|
||||
*tenantMeta*, *proxyMeta*, *collectionMeta*, *segmentMeta* are serialized protos.
|
||||
|
||||
|
||||
|
||||
###### 10.4.3 Meta Table
|
||||
|
||||
```go
|
||||
type metaTable struct {
|
||||
client kv.TxnBase // client of a reliable kv service, i.e. etcd client
|
||||
tenantID2Meta map[typeutil.UniqueID]pb.TenantMeta // tenant id to tenant meta
|
||||
proxyID2Meta map[typeutil.UniqueID]pb.ProxyMeta // proxy id to proxy meta
|
||||
collID2Meta map[typeutil.UniqueID]pb.CollectionInfo // collection id to collection meta,
|
||||
collName2ID map[string]typeutil.UniqueID // collection name to collection id
|
||||
partitionID2Meta map[typeutil.UniqueID]pb.PartitionInfo // partition id -> partition meta
|
||||
segID2IndexMeta map[typeutil.UniqueID]*map[typeutil.UniqueID]pb.SegmentIndexInfo // segment id -> index id -> segment index meta
|
||||
indexID2Meta map[typeutil.UniqueID]pb.IndexInfo // index id ->index meta
|
||||
segID2CollID map[typeutil.UniqueID]typeutil.UniqueID // segment id -> collection id
|
||||
partitionID2CollID map[typeutil.UniqueID]typeutil.UniqueID // partition id -> collection id
|
||||
|
||||
tenantLock sync.RWMutex
|
||||
proxyLock sync.RWMutex
|
||||
ddLock sync.RWMutex
|
||||
}
|
||||
|
||||
func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, part *pb.PartitionInfo, idx []*pb.IndexInfo) error
|
||||
func (mt *metaTable) DeleteCollection(collID typeutil.UniqueID) error
|
||||
func (mt *metaTable) HasCollection(collID typeutil.UniqueID) bool
|
||||
func (mt *metaTable) GetCollectionByID(collectionID typeutil.UniqueID) (*pb.CollectionInfo, error)
|
||||
func (mt *metaTable) GetCollectionByName(collectionName string) (*pb.CollectionInfo, error)
|
||||
func (mt *metaTable) GetCollectionBySegmentID(segID typeutil.UniqueID) (*pb.CollectionInfo, error)
|
||||
func (mt *metaTable) ListCollections() ([]string, error)
|
||||
func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string, partitionID typeutil.UniqueID) error
|
||||
func (mt *metaTable) HasPartition(collID typeutil.UniqueID, partitionName string) bool
|
||||
func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName string) (typeutil.UniqueID, error)
|
||||
func (mt *metaTable) GetPartitionByID(partitionID typeutil.UniqueID) (pb.PartitionInfo, error)
|
||||
func (mt *metaTable) AddSegment(seg *datapb.SegmentInfo) error
|
||||
func (mt *metaTable) AddIndex(seg *pb.SegmentIndexInfo) error
|
||||
func (mt *metaTable) DropIndex(collName, fieldName, indexName string) (typeutil.UniqueID, bool, error)
|
||||
func (mt *metaTable) GetSegmentIndexInfoByID(segID typeutil.UniqueID, filedID int64, idxName string) (pb.SegmentIndexInfo, error)
|
||||
func (mt *metaTable) GetFieldSchema(collName string, fieldName string) (schemapb.FieldSchema, error)
|
||||
func (mt *metaTable) unlockGetFieldSchema(collName string, fieldName string) (schemapb.FieldSchema, error)
|
||||
func (mt *metaTable) IsSegmentIndexed(segID typeutil.UniqueID, fieldSchema *schemapb.FieldSchema, indexParams []*commonpb.KeyValuePair) bool
|
||||
func (mt *metaTable) unlockIsSegmentIndexed(segID typeutil.UniqueID, fieldSchema *schemapb.FieldSchema, indexParams []*commonpb.KeyValuePair) bool
|
||||
func (mt *metaTable) GetNotIndexedSegments(collName string, fieldName string, idxInfo *pb.IndexInfo) ([]typeutil.UniqueID, schemapb.FieldSchema, error)
|
||||
func (mt *metaTable) GetIndexByName(collName string, fieldName string, indexName string) ([]pb.IndexInfo, error)
|
||||
func (mt *metaTable) GetIndexByID(indexID typeutil.UniqueID) (*pb.IndexInfo, error)
|
||||
|
||||
func NewMetaTable(kv kv.TxnBase) (*metaTable, error)
|
||||
```
|
||||
|
||||
*metaTable* maintains meta both in memory and *etcdKV*. It keeps meta's consistency in both sides. All its member functions may be called concurrently.
|
||||
|
||||
* *AddSegment(seg \*SegmentMeta)* first update *CollectionMeta* by adding the segment id, then it adds a new SegmentMeta to *kv*. All the modifications are done transactionally.
|
||||
|
||||
|
||||
|
||||
#### 10.5 System Time Synchronization
|
||||
|
||||
|
||||
|
||||
###### 10.5.1 Time Tick Barrier
|
||||
|
||||
//TODO
|
||||
* Soft Time Tick Barrier
|
||||
|
||||
|
||||
<img src="./figs/soft_time_tick_barrier.png" width=500>
|
||||
|
||||
```go
|
||||
type softTimeTickBarrier struct {
|
||||
peer2LastTt map[UniqueID]Timestamp
|
||||
minTtInterval Timestamp
|
||||
lastTt int64
|
||||
outTt chan Timestamp
|
||||
ttStream ms.MsgStream
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
func (ttBarrier *softTimeTickBarrier) GetTimeTick() (Timestamp,error)
|
||||
func (ttBarrier *softTimeTickBarrier) Start()
|
||||
func (ttBarrier *softTimeTickBarrier) Close()
|
||||
|
||||
func NewSoftTimeTickBarrier(ctx context.Context, ttStream ms.MsgStream, peerIds []UniqueID, minTtInterval Timestamp) *softTimeTickBarrier
|
||||
```
|
||||
|
||||
|
||||
|
||||
* Hard Time Tick Barrier
|
||||
|
||||
<img src="./figs/hard_time_tick_barrier.png" width=420>
|
||||
|
||||
```go
|
||||
type hardTimeTickBarrier struct {
|
||||
peer2Tt map[UniqueID]Timestamp
|
||||
outTt chan Timestamp
|
||||
ttStream ms.MsgStream
|
||||
ctx context.Context
|
||||
wg sync.WaitGroup
|
||||
loopCtx context.Context
|
||||
loopCancel context.CancelFunc
|
||||
}
|
||||
|
||||
func (ttBarrier *hardTimeTickBarrier) GetTimeTick() (Timestamp,error)
|
||||
func (ttBarrier *hardTimeTickBarrier) Start()
|
||||
func (ttBarrier *hardTimeTickBarrier) Close()
|
||||
|
||||
func NewHardTimeTickBarrier(ctx context.Context, ttStream ms.MsgStream, peerIds []UniqueID) *hardTimeTickBarrier
|
||||
```
|
||||
|
||||
|
||||
|
||||
// TODO
|
||||
###### 10.5.1 Time Synchronization Message Producer
|
||||
|
||||
<img src="./figs/time_sync_msg_producer.png" width=700>
|
||||
|
||||
|
||||
```go
|
||||
type TimeTickBarrier interface {
|
||||
GetTimeTick() (Timestamp,error)
|
||||
Start()
|
||||
Close()
|
||||
}
|
||||
|
||||
type timeSyncMsgProducer struct {
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
ttBarrier TimeTickBarrier
|
||||
watchers []TimeTickWatcher
|
||||
}
|
||||
|
||||
func (syncMsgProducer *timeSyncMsgProducer) SetProxyTtStreams(proxyTt *MsgStream, proxyIds []UniqueId)
|
||||
func (syncMsgProducer *timeSyncMsgProducer) SetWriteNodeTtStreams(WriteNodeTt *MsgStream, writeNodeIds []UniqueId)
|
||||
|
||||
func (syncMsgProducer *timeSyncMsgProducer) SetDmSyncStream(dmSyncStream *MsgStream)
|
||||
func (syncMsgProducer *timeSyncMsgProducer) SetK2sSyncStream(k2sSyncStream *MsgStream)
|
||||
|
||||
func (syncMsgProducer *timeSyncMsgProducer) Start() error
|
||||
func (syncMsgProducer *timeSyncMsgProducer) Close() error
|
||||
|
||||
func newTimeSyncMsgProducer(ctx context.Context) *timeSyncMsgProducer error
|
||||
```
|
||||
|
||||
|
||||
|
||||
#### 10.6 System Statistics
|
||||
|
||||
###### 10.6.1 Query Node Statistics
|
||||
|
||||
```protobuf
|
||||
message SegmentStats {
|
||||
int64 segment_id = 1;
|
||||
int64 memory_size = 2;
|
||||
int64 num_rows = 3;
|
||||
bool recently_modified = 4;
|
||||
}
|
||||
|
||||
message QueryNodeStats {
|
||||
int64 id = 1;
|
||||
uint64 timestamp = 2;
|
||||
repeated SegmentStats seg_stats = 3;
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
|
||||
#### 10.7 Segment Management
|
||||
|
||||
|
||||
|
||||
//TODO
|
||||
```go
|
||||
type assignment struct {
|
||||
MemSize int64
|
||||
AssignTime time.Time
|
||||
}
|
||||
|
||||
type segmentStatus struct {
|
||||
assignments []*assignment
|
||||
}
|
||||
|
||||
type collectionStatus struct {
|
||||
openedSegment []UniqueID
|
||||
}
|
||||
|
||||
type SegmentManagement struct {
|
||||
segStatus map[UniqueID]*SegmentStatus
|
||||
collStatus map[UniqueID]*collectionStatus
|
||||
}
|
||||
|
||||
func NewSegmentManagement(ctx context.Context) *SegmentManagement
|
||||
```
|
||||
|
||||
|
||||
|
||||
//TODO
|
||||
###### 10.7.1 Assign Segment ID to Inserted Rows
|
||||
|
||||
Master receives *AssignSegIDRequest* which contains a list of *SegIDRequest(count, channelName, collectionName, partitionName)* from Proxy. Segment Manager will assign the opened segments or open a new segment if there is no enough space, and Segment Manager will record the allocated space which can be reallocated after a expire duration.
|
||||
|
||||
```go
|
||||
func (segMgr *SegmentManager) AssignSegmentID(segIDReq []*internalpb.SegIDRequest) ([]*internalpb.SegIDAssignment, error)
|
||||
|
||||
```
|
||||
|
||||
|
||||
|
||||
#### 10.8 System Config
|
||||
|
||||
```protobuf
|
||||
// examples of keys:
|
||||
// "/pulsar/ip"
|
||||
// "/pulsar/port"
|
||||
// examples of key_prefixes:
|
||||
// "/proxy"
|
||||
// "/msg_stream/insert"
|
||||
|
||||
message SysConfigRequest {
|
||||
MsgType msg_type = 1;
|
||||
int64 reqID = 2;
|
||||
int64 proxyID = 3;
|
||||
uint64 timestamp = 4;
|
||||
repeated string keys = 5;
|
||||
repeated string key_prefixes = 6;
|
||||
}
|
||||
|
||||
message SysConfigResponse {
|
||||
common.Status status = 1;
|
||||
repeated string keys = 2;
|
||||
repeated string values = 3;
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
|
||||
```go
|
||||
type SysConfig struct {
|
||||
kv *kv.EtcdKV
|
||||
}
|
||||
|
||||
func (conf *SysConfig) InitFromFile(filePath string) (error)
|
||||
func (conf *SysConfig) GetByPrefix(keyPrefix string) (keys []string, values []string, err error)
|
||||
func (conf *SysConfig) Get(keys []string) ([]string, error)
|
||||
```
|
||||
|
||||
|
||||
|
||||
configuration examples in etcd:
|
||||
|
||||
```
|
||||
key: root_path/config/master/address
|
||||
value: "localhost"
|
||||
|
||||
key: root_path/config/proxy/timezone
|
||||
value: "UTC+8"
|
||||
```
|
||||
|
|
@ -0,0 +1,699 @@
|
|||
|
||||
|
||||
## 10. Root Coordinator
|
||||
|
||||
<img src="./figs/root_coord.png">
|
||||
|
||||
#### 10.1 Root Coordinator Interface
|
||||
|
||||
```go
|
||||
type RootCoord interface {
|
||||
Component
|
||||
TimeTickProvider
|
||||
|
||||
//DDL request
|
||||
CreateCollection(ctx context.Context, req *milvuspb.CreateCollectionRequest) (*commonpb.Status, error)
|
||||
DropCollection(ctx context.Context, req *milvuspb.DropCollectionRequest) (*commonpb.Status, error)
|
||||
HasCollection(ctx context.Context, req *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error)
|
||||
DescribeCollection(ctx context.Context, req *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error)
|
||||
ShowCollections(ctx context.Context, req *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error)
|
||||
CreatePartition(ctx context.Context, req *milvuspb.CreatePartitionRequest) (*commonpb.Status, error)
|
||||
DropPartition(ctx context.Context, req *milvuspb.DropPartitionRequest) (*commonpb.Status, error)
|
||||
HasPartition(ctx context.Context, req *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error)
|
||||
ShowPartitions(ctx context.Context, req *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error)
|
||||
|
||||
//index builder service
|
||||
CreateIndex(ctx context.Context, req *milvuspb.CreateIndexRequest) (*commonpb.Status, error)
|
||||
DescribeIndex(ctx context.Context, req *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error)
|
||||
DropIndex(ctx context.Context, req *milvuspb.DropIndexRequest) (*commonpb.Status, error)
|
||||
|
||||
//global timestamp allocator
|
||||
AllocTimestamp(ctx context.Context, req *masterpb.AllocTimestampRequest) (*masterpb.AllocTimestampResponse, error)
|
||||
AllocID(ctx context.Context, req *masterpb.AllocIDRequest) (*masterpb.AllocIDResponse, error)
|
||||
UpdateChannelTimeTick(ctx context.Context, req *internalpb.ChannelTimeTickMsg) (*commonpb.Status, error)
|
||||
|
||||
//segment
|
||||
DescribeSegment(ctx context.Context, req *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error)
|
||||
ShowSegments(ctx context.Context, req *milvuspb.ShowSegmentsRequest) (*milvuspb.ShowSegmentsResponse, error)
|
||||
ReleaseDQLMessageStream(ctx context.Context, in *proxypb.ReleaseDQLMessageStreamRequest) (*commonpb.Status, error)
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
|
||||
* *MsgBase*
|
||||
|
||||
```go
|
||||
type MsgBase struct {
|
||||
MsgType MsgType
|
||||
MsgID UniqueID
|
||||
Timestamp Timestamp
|
||||
SourceID UniqueID
|
||||
}
|
||||
```
|
||||
|
||||
* *CreateCollection*
|
||||
|
||||
<img src="./figs/root_coord_create_collection.png">
|
||||
|
||||
```go
|
||||
type CreateCollectionRequest struct {
|
||||
Base *commonpb.MsgBase
|
||||
DbName string
|
||||
CollectionName string
|
||||
Schema []byte
|
||||
ShardsNum int32
|
||||
}
|
||||
```
|
||||
|
||||
* *DropCollection*
|
||||
|
||||
```go
|
||||
type DropCollectionRequest struct {
|
||||
Base *commonpb.MsgBase
|
||||
DbName string
|
||||
CollectionName string
|
||||
}
|
||||
```
|
||||
|
||||
* *HasCollection*
|
||||
|
||||
```go
|
||||
type HasCollectionRequest struct {
|
||||
Base *commonpb.MsgBase
|
||||
DbName string
|
||||
CollectionName string
|
||||
TimeStamp Timestamp
|
||||
}
|
||||
```
|
||||
|
||||
* *DescribeCollection*
|
||||
|
||||
```go
|
||||
type DescribeCollectionRequest struct {
|
||||
Base *commonpb.MsgBase
|
||||
DbName string
|
||||
CollectionName string
|
||||
CollectionID UniqueID
|
||||
TimeStamp Timestamp
|
||||
}
|
||||
|
||||
type CollectionSchema struct {
|
||||
Name string
|
||||
Description string
|
||||
AutoID bool
|
||||
Fields []*FieldSchema
|
||||
}
|
||||
|
||||
type DescribeCollectionResponse struct {
|
||||
Status *commonpb.Status
|
||||
Schema *schemapb.CollectionSchema
|
||||
CollectionID UniqueID
|
||||
}
|
||||
```
|
||||
|
||||
* *ShowCollections*
|
||||
|
||||
```go
|
||||
type ShowCollectionsRequest struct {
|
||||
Base *commonpb.MsgBase
|
||||
DbName string
|
||||
Timestamp Timestamp
|
||||
Type ShowCollectionsType
|
||||
}
|
||||
|
||||
type ShowCollectionResponse struct {
|
||||
Status *commonpb.Status
|
||||
CollectionNames []string
|
||||
CollectionIds []UniqueID
|
||||
}
|
||||
```
|
||||
|
||||
* *CreatePartition*
|
||||
|
||||
```go
|
||||
type CreatePartitionRequest struct {
|
||||
Base *commonpb.MsgBase
|
||||
DbName string
|
||||
CollectionName string
|
||||
PartitionName string
|
||||
}
|
||||
```
|
||||
|
||||
* *DropPartition*
|
||||
|
||||
```go
|
||||
type DropPartitionRequest struct {
|
||||
Base *commonpb.MsgBase
|
||||
DbName string
|
||||
CollectionName string
|
||||
PartitionName string
|
||||
}
|
||||
```
|
||||
|
||||
* *HasPartition*
|
||||
|
||||
```go
|
||||
type HasPartitionRequest struct {
|
||||
Base *commonpb.MsgBase
|
||||
DbName string
|
||||
CollectionName string
|
||||
PartitionName string
|
||||
}
|
||||
```
|
||||
|
||||
* *ShowPartitions*
|
||||
|
||||
```go
|
||||
type ShowPartitionRequest struct {
|
||||
Base *commonpb.MsgBase
|
||||
DbName string
|
||||
CollectionName string
|
||||
CollectionID UniqueID
|
||||
}
|
||||
|
||||
type ShowPartitionResponse struct {
|
||||
Status *commonpb.Status
|
||||
PartitionNames []string
|
||||
PartitionIDs []UniqueID
|
||||
}
|
||||
```
|
||||
|
||||
* *DescribeSegment*
|
||||
|
||||
```go
|
||||
type DescribeSegmentRequest struct {
|
||||
Base *commonpb.MsgBase
|
||||
CollectionID UniqueID
|
||||
SegmentID UniqueID
|
||||
}
|
||||
|
||||
type DescribeSegmentResponse struct {
|
||||
Status *commonpb.Status
|
||||
IndexID UniqueID
|
||||
BuildID UniqueID
|
||||
EnableIndex bool
|
||||
}
|
||||
```
|
||||
|
||||
* *ShowSegments*
|
||||
|
||||
```go
|
||||
type ShowSegmentsRequest struct {
|
||||
Base *commonpb.MsgBase
|
||||
CollectionID UniqueID
|
||||
PartitionID UniqueID
|
||||
}
|
||||
|
||||
type ShowSegmentsResponse struct {
|
||||
Status *commonpb.Status
|
||||
SegmentIDs []UniqueID
|
||||
}
|
||||
```
|
||||
|
||||
* *ReleaseDQLMessageStream*
|
||||
```go
|
||||
type ReleaseDQLMessageStreamRequest struct {
|
||||
Base *commonpb.MsgBase
|
||||
DbID UniqueID
|
||||
CollectionID UniqueID
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
* *CreateIndex*
|
||||
<img src="./figs/root_coord_create_index.png">
|
||||
|
||||
```go
|
||||
type CreateIndexRequest struct {
|
||||
Base *commonpb.MsgBase
|
||||
DbName string
|
||||
CollectionName string
|
||||
FieldName string
|
||||
ExtraParams []*commonpb.KeyValuePair
|
||||
}
|
||||
```
|
||||
|
||||
* *DescribeIndex*
|
||||
|
||||
```go
|
||||
type DescribeIndexRequest struct {
|
||||
Base *commonpb.MsgBase
|
||||
DbName string
|
||||
CollectionName string
|
||||
FieldName string
|
||||
IndexName string
|
||||
}
|
||||
|
||||
type IndexDescription struct {
|
||||
IndexName string
|
||||
IndexID UniqueID
|
||||
Params []*commonpb.KeyValuePair
|
||||
FieldName string
|
||||
}
|
||||
|
||||
type DescribeIndexResponse struct {
|
||||
Status *commonpb.Status
|
||||
IndexDescriptions []*IndexDescription
|
||||
}
|
||||
```
|
||||
|
||||
* *DropIndex*
|
||||
|
||||
```go
|
||||
type DropIndexRequest struct {
|
||||
Base *commonpb.MsgBase
|
||||
DbName string
|
||||
CollectionName string
|
||||
FieldName string
|
||||
IndexName string
|
||||
}
|
||||
```
|
||||
|
||||
* *AllocTimestamp*
|
||||
|
||||
```go
|
||||
type AllocTimestampRequest struct {
|
||||
Base *commonpb.MsgBase
|
||||
Count uint32
|
||||
}
|
||||
|
||||
type AllocTimestampResponse struct {
|
||||
Status *commonpb.Status
|
||||
Timestamp UniqueID
|
||||
Count uint32
|
||||
}
|
||||
```
|
||||
|
||||
* *AllocID*
|
||||
|
||||
```go
|
||||
type AllocIDRequest struct {
|
||||
Base *commonpb.MsgBase
|
||||
Count uint32
|
||||
}
|
||||
|
||||
type AllocIDResponse struct {
|
||||
Status *commonpb.Status
|
||||
ID UniqueID
|
||||
Count uint32
|
||||
}
|
||||
```
|
||||
|
||||
* *UpdateChannelTimeTick*
|
||||
|
||||
```go
|
||||
type ChannelTimeTickMsg struct {
|
||||
Base *commonpb.MsgBase
|
||||
ChannelNames []string
|
||||
Timestamps []Timestamp
|
||||
DefaultTimestamp Timestamp
|
||||
}
|
||||
```
|
||||
|
||||
#### 10.2 Dd (Data definitions) Message
|
||||
|
||||
`RC` would put `Dd Message` into the `DML MsgSteams`
|
||||
|
||||
* *BaseMsg*
|
||||
|
||||
```go
|
||||
type BaseMsg struct {
|
||||
Ctx context.Context
|
||||
BeginTimestamp Timestamp
|
||||
EndTimestamp Timestamp
|
||||
HashValues []uint32
|
||||
MsgPosition *MsgPosition
|
||||
}
|
||||
```
|
||||
|
||||
* *CreateCollectionMsg*
|
||||
|
||||
```go
|
||||
type CreateCollectionRequest struct {
|
||||
Base *commonpb.MsgBase
|
||||
DbName string
|
||||
CollectionName string
|
||||
DbID UniqueID
|
||||
CollectionID UniqueID
|
||||
Schema []byte
|
||||
VirtualChannelNames []string
|
||||
PhysicalChannelNames []string
|
||||
}
|
||||
|
||||
type CreateCollectionMsg struct {
|
||||
BaseMsg
|
||||
internalpb.CreateCollectionRequest
|
||||
}
|
||||
```
|
||||
|
||||
* *DropCollectionMsg*
|
||||
|
||||
```go
|
||||
type DropCollectionRequest struct {
|
||||
Base *commonpb.MsgBase
|
||||
DbName string
|
||||
CollectionName string
|
||||
DbID UniqueID
|
||||
CollectionID UniqueID
|
||||
}
|
||||
|
||||
type DropCollectionMsg struct {
|
||||
BaseMsg
|
||||
DropCollectionRequest
|
||||
}
|
||||
```
|
||||
|
||||
* *CreatePartitionMsg*
|
||||
|
||||
```go
|
||||
type CreatePartitionRequest struct {
|
||||
Base *commonpb.MsgBase
|
||||
DbName string
|
||||
CollectionName string
|
||||
PartitionName string
|
||||
DbID UniqueID
|
||||
CollectionID UniqueID
|
||||
PartitionID UniqueID
|
||||
}
|
||||
|
||||
type CreatePartitionMsg struct {
|
||||
BaseMsg
|
||||
CreatePartitionRequest
|
||||
}
|
||||
```
|
||||
|
||||
* *DropPartitionMsg*
|
||||
|
||||
```go
|
||||
type DropPartitionRequest struct {
|
||||
Base *commonpb.MsgBase
|
||||
DbName string
|
||||
CollectionName string
|
||||
PartitionName string
|
||||
DbID UniqueID
|
||||
CollectionID UniqueID
|
||||
PartitionID UniqueID
|
||||
}
|
||||
|
||||
type DropPartitionMsg struct {
|
||||
BaseMsg
|
||||
DropPartitionRequest
|
||||
}
|
||||
```
|
||||
|
||||
#### 10.3 Create Index automatically
|
||||
`RC` would notify `IC(Index Coord)` to build index automatically when the segment has been flushed.
|
||||
<img src="./figs/root_coord_create_index_automatically.png">
|
||||
|
||||
#### 10.4 RootCoord Instance
|
||||
|
||||
```go
|
||||
type Core struct {
|
||||
MetaTable *metaTable
|
||||
//id allocator
|
||||
IDAllocator func(count uint32) (typeutil.UniqueID, typeutil.UniqueID, error)
|
||||
IDAllocatorUpdate func() error
|
||||
|
||||
//tso allocator
|
||||
TSOAllocator func(count uint32) (typeutil.Timestamp, error)
|
||||
TSOAllocatorUpdate func() error
|
||||
|
||||
//inner members
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
etcdCli *clientv3.Client
|
||||
kvBase *etcdkv.EtcdKV
|
||||
|
||||
//setMsgStreams, send time tick into dd channel and time tick channel
|
||||
SendTimeTick func(t typeutil.Timestamp) error
|
||||
|
||||
//setMsgStreams, send create collection into dd channel
|
||||
SendDdCreateCollectionReq func(ctx context.Context, req *internalpb.CreateCollectionRequest, channelNames []string) error
|
||||
|
||||
//setMsgStreams, send drop collection into dd channel, and notify the proxy to delete this collection
|
||||
SendDdDropCollectionReq func(ctx context.Context, req *internalpb.DropCollectionRequest, channelNames []string) error
|
||||
|
||||
//setMsgStreams, send create partition into dd channel
|
||||
SendDdCreatePartitionReq func(ctx context.Context, req *internalpb.CreatePartitionRequest, channelNames []string) error
|
||||
|
||||
//setMsgStreams, send drop partition into dd channel
|
||||
SendDdDropPartitionReq func(ctx context.Context, req *internalpb.DropPartitionRequest, channelNames []string) error
|
||||
|
||||
// if rootcoord create segment, datacoord will put segment msg into this channel
|
||||
DataCoordSegmentChan <-chan *ms.MsgPack
|
||||
|
||||
// if segment flush completed, data node would put segment msg into this channel
|
||||
DataNodeFlushedSegmentChan <-chan *ms.MsgPack
|
||||
|
||||
//get binlog file path from data service,
|
||||
CallGetBinlogFilePathsService func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error)
|
||||
CallGetNumRowsService func(segID typeutil.UniqueID, isFromFlushedChan bool) (int64, error)
|
||||
|
||||
//call index builder's client to build index, return build id
|
||||
CallBuildIndexService func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo) (typeutil.UniqueID, error)
|
||||
CallDropIndexService func(ctx context.Context, indexID typeutil.UniqueID) error
|
||||
|
||||
NewProxyClient func(sess *sessionutil.Session) (types.ProxyNode, error)
|
||||
|
||||
//query service interface, notify query service to release collection
|
||||
CallReleaseCollectionService func(ctx context.Context, ts typeutil.Timestamp, dbID typeutil.UniqueID, collectionID typeutil.UniqueID) error
|
||||
|
||||
//dd request scheduler
|
||||
ddReqQueue chan reqTask //dd request will be push into this chan
|
||||
|
||||
//dml channels
|
||||
dmlChannels *dmlChannels
|
||||
|
||||
//ProxyNode manager
|
||||
proxyNodeManager *proxyNodeManager
|
||||
|
||||
// proxy clients
|
||||
proxyClientManager *proxyClientManager
|
||||
|
||||
// channel timetick
|
||||
chanTimeTick *timetickSync
|
||||
|
||||
//time tick loop
|
||||
lastTimeTick typeutil.Timestamp
|
||||
|
||||
//states code
|
||||
stateCode atomic.Value
|
||||
|
||||
//call once
|
||||
initOnce sync.Once
|
||||
startOnce sync.Once
|
||||
//isInit atomic.Value
|
||||
|
||||
session *sessionutil.Session
|
||||
sessCloseCh <-chan bool
|
||||
|
||||
msFactory ms.Factory
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
#### 10.5 Data definition Request Scheduler
|
||||
|
||||
###### 10.5.1 Task
|
||||
|
||||
RootCoord receives data definition requests via grpc. Each request (described by a proto) will be wrapped as a task for further scheduling. The task interface is
|
||||
|
||||
```go
|
||||
type reqTask interface {
|
||||
Ctx() context.Context
|
||||
Type() commonpb.MsgType
|
||||
Execute(ctx context.Context) error
|
||||
WaitToFinish() error
|
||||
Notify(err error)
|
||||
}
|
||||
```
|
||||
|
||||
A task example is as follows. In this example, we wrap a CreateCollectionRequest (a proto) as a createCollectionTask. The wrapper need to implement task interfaces.
|
||||
|
||||
``` go
|
||||
type CreateCollectionReqTask struct {
|
||||
baseReqTask
|
||||
Req *milvuspb.CreateCollectionRequest
|
||||
}
|
||||
|
||||
// Task interfaces
|
||||
func (t *CreateCollectionReqTask) Ctx() context.Context
|
||||
func (t *CreateCollectionReqTask) Type() commonpb.MsgType
|
||||
func (t *CreateCollectionReqTask) Execute(ctx context.Context) error
|
||||
func (t *CreateCollectionReqTask) WaitToFinish() error
|
||||
func (t *CreateCollectionReqTask) Notify(err error)
|
||||
```
|
||||
|
||||
In most cases, a data definition task need to
|
||||
|
||||
* update system's meta data (via $metaTable$),
|
||||
* send `DD Message` into related `DML MsgStream`, so that the `Data Node` and `Query Node` would take it
|
||||
|
||||
|
||||
#### 10.6 Meta Table
|
||||
|
||||
###### 10.6.1 Meta
|
||||
|
||||
* Tenant Meta
|
||||
|
||||
```protobuf
|
||||
message TenantMeta {
|
||||
uint64 id = 1;
|
||||
uint64 num_query_nodes = 2;
|
||||
repeated string insert_channel_names = 3;
|
||||
string query_channel_name = 4;
|
||||
}
|
||||
```
|
||||
|
||||
* Proxy Meta
|
||||
|
||||
``` protobuf
|
||||
message ProxyMeta {
|
||||
uint64 id = 1;
|
||||
common.Address address = 2;
|
||||
repeated string result_channel_names = 3;
|
||||
}
|
||||
```
|
||||
|
||||
* Collection Meta
|
||||
|
||||
```protobuf
|
||||
message PartitionInfo {
|
||||
string partition_name = 1;
|
||||
int64 partitionID = 2;
|
||||
repeated int64 segmentIDs = 3;
|
||||
}
|
||||
|
||||
message IndexInfo {
|
||||
string index_name = 1;
|
||||
int64 indexID = 2;
|
||||
repeated common.KeyValuePair index_params = 3;
|
||||
}
|
||||
|
||||
message FieldIndexInfo{
|
||||
int64 filedID = 1;
|
||||
int64 indexID = 2;
|
||||
}
|
||||
|
||||
message CollectionInfo {
|
||||
int64 ID = 1;
|
||||
schema.CollectionSchema schema = 2;
|
||||
uint64 create_time = 3;
|
||||
repeated int64 partitionIDs = 4;
|
||||
repeated FieldIndexInfo field_indexes = 5;
|
||||
repeated string virtual_channel_names = 6;
|
||||
repeated string physical_channel_names = 7;
|
||||
}
|
||||
```
|
||||
|
||||
* Segment Meta
|
||||
|
||||
```protobuf
|
||||
message SegmentIndexInfo {
|
||||
int64 segmentID = 1;
|
||||
int64 fieldID = 2;
|
||||
int64 indexID = 3;
|
||||
int64 buildID = 4;
|
||||
bool enable_index = 5;
|
||||
}
|
||||
```
|
||||
|
||||
###### 10.6.2 KV pairs in EtcdKV
|
||||
|
||||
```go
|
||||
"tenant/$tenantId" string -> tenantMetaBlob string
|
||||
"proxy/$proxyId" string -> proxyMetaBlob string
|
||||
"collection/$collectionId" string -> collectionInfoBlob string
|
||||
"partition/$collectionId/$partitionId" string -> partitionInfoBlob string
|
||||
"index/$collectionId/$indexId" string -> IndexInfoBlob string
|
||||
"segment-index/$collectionId/$indexId/$partitionId/$segmentId" -> segmentIndexInfoBlog string
|
||||
```
|
||||
|
||||
Note that *tenantId*, *proxyId*, *collectionId*, *partitionId*, *indexId*, *segmentId* are unique strings converted from int64.
|
||||
|
||||
*tenantMetaBlob*, *proxyMetaBlob*, *collectionInfoBlob*, *partitionInfoBlob*, *IndexInfoBlob*, *segmentIndexInfoBlog* are serialized protos.
|
||||
|
||||
|
||||
###### 10.6.3 Meta Table
|
||||
|
||||
```go
|
||||
type metaTable struct {
|
||||
client kv.SnapShotKV
|
||||
tenantID2Meta map[typeutil.UniqueID]pb.TenantMeta
|
||||
proxyID2Meta map[typeutil.UniqueID]pb.ProxyMeta
|
||||
collID2Meta map[typeutil.UniqueID]pb.CollectionInfo
|
||||
collName2ID map[string]typeutil.UniqueID
|
||||
partitionID2Meta map[typeutil.UniqueID]pb.PartitionInfo
|
||||
segID2IndexMeta map[typeutil.UniqueID]*map[typeutil.UniqueID]pb.SegmentIndexInfo
|
||||
indexID2Meta map[typeutil.UniqueID]pb.IndexInfo
|
||||
segID2CollID map[typeutil.UniqueID]typeutil.UniqueID
|
||||
segID2PartitionID map[typeutil.UniqueID]typeutil.UniqueID
|
||||
flushedSegID map[typeutil.UniqueID]bool
|
||||
partitionID2CollID map[typeutil.UniqueID]typeutil.UniqueID
|
||||
vChan2Chan map[string]string
|
||||
|
||||
tenantLock sync.RWMutex
|
||||
proxyLock sync.RWMutex
|
||||
ddLock sync.RWMutex
|
||||
}
|
||||
|
||||
func NewMetaTable(kv kv.SnapShotKV) (*metaTable, error)
|
||||
|
||||
func (mt *metaTable) AddTenant(te *pb.TenantMeta) (typeutil.Timestamp, error)
|
||||
func (mt *metaTable) AddProxy(po *pb.ProxyMeta) (typeutil.Timestamp, error)
|
||||
func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, part *pb.PartitionInfo, idx []*pb.IndexInfo, ddOpStr func(ts typeutil.Timestamp) (string, error)) (typeutil.Timestamp, error)
|
||||
func (mt *metaTable) DeleteCollection(collID typeutil.UniqueID, ddOpStr func(ts typeutil.Timestamp) (string, error)) (typeutil.Timestamp, error)
|
||||
func (mt *metaTable) HasCollection(collID typeutil.UniqueID, ts typeutil.Timestamp) bool
|
||||
func (mt *metaTable) GetCollectionByID(collectionID typeutil.UniqueID, ts typeutil.Timestamp) (*pb.CollectionInfo, error)
|
||||
func (mt *metaTable) GetCollectionByName(collectionName string, ts typeutil.Timestamp) (*pb.CollectionInfo, error)
|
||||
func (mt *metaTable) GetCollectionBySegmentID(segID typeutil.UniqueID) (*pb.CollectionInfo, error)
|
||||
func (mt *metaTable) ListCollections(ts typeutil.Timestamp) (map[string]typeutil.UniqueID, error)
|
||||
func (mt *metaTable) ListCollectionVirtualChannels() []string
|
||||
func (mt *metaTable) ListCollectionPhysicalChannels() []string
|
||||
func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string, partitionID typeutil.UniqueID, ddOpStr func(ts typeutil.Timestamp) (string, error)) (typeutil.Timestamp, error)
|
||||
func (mt *metaTable) GetPartitionByName(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp) (pb.PartitionInfo, error)
|
||||
func (mt *metaTable) HasPartition(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp) bool
|
||||
func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName string, ddOpStr func(ts typeutil.Timestamp) (string, error)) (typeutil.Timestamp, typeutil.UniqueID, error)
|
||||
func (mt *metaTable) GetPartitionByID(collID typeutil.UniqueID, partitionID typeutil.UniqueID, ts typeutil.Timestamp) (pb.PartitionInfo, error)
|
||||
func (mt *metaTable) AddSegment(segInfos []*datapb.SegmentInfo, msgStartPos string, msgEndPos string) (typeutil.Timestamp, error)
|
||||
func (mt *metaTable) AddIndex(segIdxInfos []*pb.SegmentIndexInfo, msgStartPos string, msgEndPos string) (typeutil.Timestamp, error)
|
||||
func (mt *metaTable) DropIndex(collName, fieldName, indexName string) (typeutil.Timestamp, typeutil.UniqueID, bool, error)
|
||||
func (mt *metaTable) GetSegmentIndexInfoByID(segID typeutil.UniqueID, filedID int64, idxName string) (pb.SegmentIndexInfo, error)
|
||||
func (mt *metaTable) GetFieldSchema(collName string, fieldName string) (schemapb.FieldSchema, error)
|
||||
func (mt *metaTable) IsSegmentIndexed(segID typeutil.UniqueID, fieldSchema *schemapb.FieldSchema, indexParams []*commonpb.KeyValuePair) bool
|
||||
func (mt *metaTable) GetNotIndexedSegments(collName string, fieldName string, idxInfo *pb.IndexInfo) ([]typeutil.UniqueID, schemapb.FieldSchema, error)
|
||||
func (mt *metaTable) GetIndexByName(collName, indexName string) (pb.CollectionInfo, []pb.IndexInfo, error)
|
||||
func (mt *metaTable) GetIndexByID(indexID typeutil.UniqueID) (*pb.IndexInfo, error)
|
||||
func (mt *metaTable) AddFlushedSegment(segID typeutil.UniqueID) error
|
||||
```
|
||||
|
||||
* *metaTable* maintains meta both in memory and *etcdKV*. It keeps meta's consistency in both sides. All its member functions may be called concurrently.
|
||||
|
||||
* for *HasCollection*, *GetCollectionByID*, *GetCollectionByName*, *ListCollections*, if the argument of `ts` is none-zero, then *metaTable* would return the meta on the timestamp of `ts`; if `ts` is zero, *metaTable* would return the lastest meta
|
||||
|
||||
|
||||
|
||||
#### 10.7 System Time Synchronization
|
||||
|
||||
<img src="./figs/root_coord_time_sync.png">
|
||||
|
||||
```go
|
||||
type timetickSync struct {
|
||||
core *Core
|
||||
lock sync.Mutex
|
||||
proxyTimeTick map[typeutil.UniqueID]*channelTimeTickMsg
|
||||
sendChan chan map[typeutil.UniqueID]*channelTimeTickMsg
|
||||
}
|
||||
|
||||
func newTimeTickSync(core *Core) *timetickSync
|
||||
|
||||
func (t *timetickSync) UpdateTimeTick(in *internalpb.ChannelTimeTickMsg) error
|
||||
func (t *timetickSync) AddProxyNode(sess *sessionutil.Session)
|
||||
func (t *timetickSync) DelProxyNode(sess *sessionutil.Session)
|
||||
func (t *timetickSync) GetProxyNodes(sess []*sessionutil.Session)
|
||||
func (t *timetickSync) StartWatch()
|
||||
func (t *timetickSync) SendChannelTimeTick(chanName string, ts typeutil.Timestamp) error
|
||||
func (t *timetickSync) GetProxyNodeNum()
|
||||
func (t *timetickSync) GetChanNum() int
|
||||
```
|
Binary file not shown.
After Width: | Height: | Size: 142 KiB |
Binary file not shown.
After Width: | Height: | Size: 148 KiB |
Binary file not shown.
After Width: | Height: | Size: 149 KiB |
Binary file not shown.
After Width: | Height: | Size: 111 KiB |
Binary file not shown.
After Width: | Height: | Size: 197 KiB |
Loading…
Reference in New Issue