mirror of https://github.com/milvus-io/milvus.git
543 lines
15 KiB
Markdown
543 lines
15 KiB
Markdown
|
|
|
|
## Appendix A. Basic Components
|
|
|
|
#### A.1 System Component
|
|
|
|
Milvus has 9 different components, and can be abstracted into basic Component.
|
|
|
|
```go
|
|
type Component interface {
|
|
Init() error
|
|
Start() error
|
|
Stop() error
|
|
GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error)
|
|
GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error)
|
|
}
|
|
```
|
|
|
|
* *GetComponentStates*
|
|
|
|
```go
|
|
|
|
type StateCode = int
|
|
|
|
const (
|
|
INITIALIZING StateCode = 0
|
|
HEALTHY StateCode = 1
|
|
ABNORMAL StateCode = 2
|
|
)
|
|
|
|
type ComponentInfo struct {
|
|
NodeID UniqueID
|
|
Role string
|
|
StateCode StateCode
|
|
ExtraInfo []*commonpb.KeyValuePair
|
|
}
|
|
|
|
type ComponentStates struct {
|
|
State *ComponentInfo
|
|
SubcomponentStates []*ComponentInfo
|
|
Status *commonpb.Status
|
|
}
|
|
|
|
```
|
|
|
|
If a component needs to process timetick message to align timetick, it needs to implement TimeTickProvider interface.
|
|
|
|
|
|
```go
|
|
type TimeTickProvider interface {
|
|
GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error)
|
|
}
|
|
```
|
|
|
|
|
|
#### A.2 Session
|
|
###### ServerID
|
|
|
|
The ID is stored in a key-value pair on etcd. The key is metaRootPath + "/services/ServerID". The initial value is 0. When a service is registered, it is incremented by 1 and returned to the next registered service.
|
|
|
|
###### Registeration
|
|
|
|
- Registration is achieved through etcd's lease mechanism.
|
|
|
|
- The service creates a lease with etcd and stores a key-value pair in etcd. If the lease expires or the service goes offline, etcd will delete the key-value pair. You can judge whether this service is avaliable through the key.
|
|
|
|
- key: metaRootPath + "/services" + "/ServerName-ServerID"
|
|
|
|
- value: json format
|
|
{
|
|
"ServerID":ServerID //ServerID
|
|
"ServerName": ServerName // ServerName
|
|
"Address": ip:port // Address of service, including ip and port
|
|
"LeaseID": LeaseID // The ID of etcd lease
|
|
}
|
|
|
|
- By obtaining the address, you can establish a connection with other services
|
|
|
|
###### Discovery
|
|
|
|
- All currently available services can be obtained by obtaining all the key-value pairs deposited during registration. If you want to get all the available nodes for a certain type of service, you can pass in the prefix of the corresponding key
|
|
- Registeration time can be compared with ServerID for ServerID will increase according to time.
|
|
|
|
|
|
|
|
###### Interface
|
|
|
|
```go
|
|
default_ttl = 10
|
|
default_retry_times = 3
|
|
|
|
type Session struct {
|
|
ServerID int64
|
|
ServerName string
|
|
Address string
|
|
LeaseID clientv3.LeaseID
|
|
}
|
|
|
|
// GetServerID gets id from etcd with key: metaRootPath + "/services/ServerID"
|
|
// Each server get ServerID and add one to ServerID
|
|
GetServerID(){}
|
|
|
|
// RegisterService registers the service to etcd so that other services
|
|
// can find that the service is online and issue subsequent operations
|
|
// RegisterService will save a key-value in etcd
|
|
// key: metaRootPath + "/services" + "/ServerName-ServerID"
|
|
// value: json format
|
|
// {
|
|
// "ServerID": ServerID
|
|
// "ServerName": ServerName-ServerID // ServerName
|
|
// "Address": ip:port // Address of service, including ip and port
|
|
// "LeaseID": LeaseID // The ID of etcd lease
|
|
// }
|
|
// MetaRootPath is configurable in the config file.
|
|
RegisterService(etcdKV etcdKV, serverName string, address string)(<-chan *clientv3.LeaseKeepAliveResponse, error)
|
|
|
|
|
|
|
|
// ProcessKeepAliveResponse processes the response of etcd keepAlive interface
|
|
// If keepAlive fails for unexpected error, it will retry for default_retry_times times
|
|
ProcessKeepAliveResponse(ctx context, ch <-chan *clientv3.LeaseKeepAliveResponse)
|
|
|
|
|
|
|
|
// GetAllSessions gets all the services registered in etcd.
|
|
// This gets all the key with prefix metaRootPath + "/services/" + prefix
|
|
// For general, "datanode" to get all datanodes
|
|
GetSessions(prefix string) ([]session, error)
|
|
```
|
|
|
|
|
|
#### A.3 Global Parameter Table
|
|
|
|
``` go
|
|
type BaseTable struct {
|
|
params *memkv.MemoryKV
|
|
}
|
|
|
|
func (gp *BaseTable) Init()
|
|
func (gp *BaseTable) LoadFromKVPair(kvPairs []*commonpb.KeyValuePair) error
|
|
func (gp *BaseTable) Load(key string) (string, error)
|
|
func (gp *BaseTable) LoadRange(key, endKey string, limit int) ([]string, []string, error)
|
|
func (gp *BaseTable) LoadYaml(fileName string) error
|
|
func (gp *BaseTable) LoadYaml(fileName string) error
|
|
func (gp *BaseTable) LoadYaml(fileName string) error
|
|
func (gp *BaseTable) ParseFloat(key string) float64
|
|
func (gp *BaseTable) ParseInt64(key string) int64
|
|
func (gp *BaseTable) ParseInt32(key string) int32
|
|
func (gp *BaseTable) ParseInt(key string) int
|
|
func (gp *BaseTable) WriteNodeIDList() []UniqueID
|
|
func (gp *BaseTable) DataNodeIDList() []UniqueID
|
|
func (gp *BaseTable) ProxyIDList() []UniqueID
|
|
func (gp *BaseTable) QueryNodeIDList() []UniqueID
|
|
```
|
|
|
|
|
|
* *LoadYaml(filePath string)* turns a YAML file into multiple key-value pairs. For example, given the following YAML
|
|
|
|
```yaml
|
|
etcd:
|
|
address: localhost
|
|
port: 2379
|
|
rootpath: milvus/etcd
|
|
```
|
|
|
|
*BaseTable.LoadYaml* will insert three key-value pairs into *params*
|
|
|
|
```go
|
|
"etcd.address" -> "localhost"
|
|
"etcd.port" -> "2379"
|
|
"etcd.rootpath" -> "milvus/etcd"
|
|
```
|
|
|
|
|
|
|
|
#### A.4 Time Ticked Flow Graph
|
|
|
|
//TODO remove?
|
|
###### A.4.1 Flow Graph States
|
|
|
|
```go
|
|
type flowGraphStates struct {
|
|
startTick Timestamp
|
|
numActiveTasks map[string]int32
|
|
numCompletedTasks map[string]int64
|
|
}
|
|
```
|
|
|
|
###### A.4.2 Message
|
|
|
|
```go
|
|
type Msg interface {
|
|
TimeTick() Timestamp
|
|
}
|
|
```
|
|
|
|
###### A.4.3 Node
|
|
|
|
```go
|
|
type Node interface {
|
|
Name() string
|
|
MaxQueueLength() int32
|
|
MaxParallelism() int32
|
|
Operate(ctx context.Context, in []Msg) ([]Msg, context.Context)
|
|
IsInputNode() bool
|
|
Close()
|
|
}
|
|
```
|
|
|
|
```go
|
|
type BaseNode struct {
|
|
maxQueueLength int32
|
|
maxParallelism int32
|
|
}
|
|
```
|
|
|
|
###### A.4.4 Flow Graph
|
|
|
|
```go
|
|
type nodeCtx struct {
|
|
node Node
|
|
inputChannels []chan Msg
|
|
inputMessages []Msg
|
|
downstream []*nodeCtx
|
|
downstreamInputChanIdx map[string]int
|
|
|
|
NumActiveTasks int64
|
|
NumCompletedTasks int64
|
|
}
|
|
|
|
func (nodeCtx *nodeCtx) Start(ctx context.Context) error
|
|
```
|
|
|
|
*Start()* will enter a loop. In each iteration, it tries to collect input messges from *inputChan*, then prepare node's input. When input is ready, it will trigger *node.Operate*. When *node.Operate* returns, it sends the returned *Msg* to *outputChans*, which connects to the downstreams' *inputChans*.
|
|
|
|
```go
|
|
type TimeTickedFlowGraph struct {
|
|
ctx context.Context
|
|
nodeCtx map[NodeName]*nodeCtx
|
|
}
|
|
|
|
func (*pipeline TimeTickedFlowGraph) AddNode(node Node)
|
|
func (*pipeline TimeTickedFlowGraph) SetEdges(nodeName string, in []string, out []string)
|
|
func (*pipeline TimeTickedFlowGraph) Start() error
|
|
func (*pipeline TimeTickedFlowGraph) Close() error
|
|
|
|
func NewTimeTickedFlowGraph(ctx context.Context) *TimeTickedFlowGraph
|
|
```
|
|
|
|
#### A.5 Allocator
|
|
|
|
```go
|
|
type Allocator struct {
|
|
Ctx context.Context
|
|
CancelFunc context.CancelFunc
|
|
|
|
wg sync.WaitGroup
|
|
|
|
Reqs chan Request
|
|
ToDoReqs []Request
|
|
CanDoReqs []Request
|
|
SyncReqs []Request
|
|
|
|
TChan TickerChan
|
|
ForceSyncChan chan Request
|
|
|
|
SyncFunc func() bool
|
|
ProcessFunc func(req Request) error
|
|
|
|
CheckSyncFunc func(timeout bool) bool
|
|
PickCanDoFunc func()
|
|
}
|
|
func (ta *Allocator) Start() error
|
|
func (ta *Allocator) Init() error
|
|
func (ta *Allocator) Close() error
|
|
func (ta *Allocator) CleanCache() error
|
|
|
|
```
|
|
|
|
|
|
#### A.6 ID Allocator
|
|
|
|
```go
|
|
type IDAllocator struct {
|
|
Allocator
|
|
|
|
masterAddress string
|
|
master types.MasterService
|
|
|
|
countPerRPC uint32
|
|
|
|
idStart UniqueID
|
|
idEnd UniqueID
|
|
|
|
PeerID UniqueID
|
|
}
|
|
|
|
func (ia *IDAllocator) Start() error
|
|
func (ia *IDAllocator) connectMaster() error
|
|
func (ia *IDAllocator) syncID() bool
|
|
func (ia *IDAllocator) checkSyncFunc(timeout bool) bool
|
|
func (ia *IDAllocator) pickCanDoFunc()
|
|
func (ia *IDAllocator) processFunc(req Request) error
|
|
func (ia *IDAllocator) AllocOne() (UniqueID, error)
|
|
func (ia *IDAllocator) Alloc(count uint32) (UniqueID, UniqueID, error)
|
|
|
|
func NewIDAllocator(ctx context.Context, masterAddr string) (*IDAllocator, error)
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
#### A.6 Timestamp Allocator
|
|
|
|
###### A.6.1 Timestamp
|
|
|
|
Let's take a brief review of Hybrid Logical Clock (HLC). HLC uses 64bits timestamps which are composed of a 46-bits physical component (thought of as and always close to local wall time) and a 18-bits logical component (used to distinguish between events with the same physical component).
|
|
|
|
<img src="./figs/hlc.png" width=400>
|
|
|
|
HLC's logical part is advanced on each request. The phsical part can be increased in two cases:
|
|
|
|
A. when the local wall time is greater than HLC's physical part,
|
|
|
|
B. or the logical part overflows.
|
|
|
|
In either cases, the physical part will be updated, and the logical part will be set to 0.
|
|
|
|
Keep the physical part close to local wall time may face non-monotonic problems such as updates to POSIX time that could turn time backward. HLC avoids such problems, since if 'local wall time < HLC's physical part' holds, only case B is satisfied, thus montonicity is guaranteed.
|
|
|
|
Milvus does not support transaction, but it should gurantee the deterministic execution of the multi-way WAL. The timestamp attached to each request should
|
|
|
|
- have its physical part close to wall time (has an acceptable bounded error, a.k.a. uncertainty interval in transaction senarios),
|
|
- and be globally unique.
|
|
|
|
HLC leverages on physical clocks at nodes that are synchronized using the NTP. NTP usually maintain time to within tens of milliseconds over local networks in datacenter. Asymmetric routes and network congestion occasionally cause errors of hundreds of milliseconds. Both the normal time error and the spike are acceptable for Milvus use cases.
|
|
|
|
The interface of Timestamp is as follows.
|
|
|
|
```
|
|
type timestamp struct {
|
|
physical uint64 // 18-63 bits
|
|
logical uint64 // 0-17 bits
|
|
}
|
|
|
|
type Timestamp uint64
|
|
```
|
|
|
|
|
|
|
|
###### A.6.2 Timestamp Oracle
|
|
|
|
```go
|
|
type timestampOracle struct {
|
|
key string
|
|
txnkv kv.TxnBase
|
|
|
|
saveInterval time.Duration
|
|
maxResetTSGap func() time.Duration
|
|
|
|
TSO unsafe.Pointer
|
|
lastSavedTime atomic.Value
|
|
}
|
|
|
|
func (t *timestampOracle) InitTimestamp() error
|
|
func (t *timestampOracle) ResetUserTimestamp(tso uint64) error
|
|
func (t *timestampOracle) UpdateTimestamp() error
|
|
func (t *timestampOracle) ResetTimestamp()
|
|
```
|
|
|
|
|
|
|
|
###### A.6.3 Timestamp Allocator
|
|
|
|
```go
|
|
type TimestampAllocator struct {
|
|
Allocator
|
|
|
|
masterAddress string
|
|
masterClient types.MasterService
|
|
|
|
countPerRPC uint32
|
|
lastTsBegin Timestamp
|
|
lastTsEnd Timestamp
|
|
PeerID UniqueID
|
|
}
|
|
|
|
func (ta *TimestampAllocator) Start() error
|
|
func (ta *TimestampAllocator) AllocOne() (UniqueID, error)
|
|
func (ta *TimestampAllocator) Alloc(count uint32) (UniqueID, UniqueID, error)
|
|
func (ta *TimestampAllocator) ClearCache()
|
|
|
|
func NewTimestampAllocator(ctx context.Context, masterAddr string) (*TimestampAllocator, error)
|
|
```
|
|
|
|
|
|
|
|
* Batch Allocation of Timestamps
|
|
|
|
* Expiration of Timestamps
|
|
|
|
|
|
|
|
|
|
|
|
#### A.7 KV
|
|
|
|
###### A.7.1 KV Base
|
|
|
|
```go
|
|
type BaseKV interface {
|
|
Load(key string) (string, error)
|
|
MultiLoad(keys []string) ([]string, error)
|
|
LoadWithPrefix(key string) ([]string, []string, error)
|
|
Save(key, value string) error
|
|
MultiSave(kvs map[string]string) error
|
|
Remove(key string) error
|
|
MultiRemove(keys []string) error
|
|
|
|
Close()
|
|
}
|
|
```
|
|
|
|
###### A.7.2 Txn Base
|
|
|
|
```go
|
|
type TxnKV interface {
|
|
BaseKV
|
|
|
|
MultiSaveAndRemove(saves map[string]string, removals []string) error
|
|
MultiRemoveWithPrefix(keys []string) error
|
|
MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error
|
|
}
|
|
```
|
|
|
|
|
|
|
|
###### A.7.3 Etcd KV
|
|
|
|
```go
|
|
type EtcdKV struct {
|
|
client *clientv3.Client
|
|
rootPath string
|
|
}
|
|
|
|
func (kv *EtcdKV) Close()
|
|
func (kv *EtcdKV) GetPath(key string) string
|
|
func (kv *EtcdKV) LoadWithPrefix(key string) ([]string, []string, error)
|
|
func (kv *EtcdKV) Load(key string) (string, error)
|
|
func (kv *EtcdKV) GetCount(key string) (int64, error)
|
|
func (kv *EtcdKV) MultiLoad(keys []string) ([]string, error)
|
|
func (kv *EtcdKV) Save(key, value string) error
|
|
func (kv *EtcdKV) MultiSave(kvs map[string]string) error
|
|
func (kv *EtcdKV) RemoveWithPrefix(prefix string) error
|
|
func (kv *EtcdKV) Remove(key string) error
|
|
func (kv *EtcdKV) MultiRemove(keys []string) error
|
|
func (kv *EtcdKV) MultiSaveAndRemove(saves map[string]string, removals []string) error
|
|
func (kv *EtcdKV) Watch(key string) clientv3.WatchChan
|
|
func (kv *EtcdKV) WatchWithPrefix(key string) clientv3.WatchChan
|
|
|
|
func NewEtcdKV(etcdAddr string, rootPath string) *EtcdKV
|
|
```
|
|
|
|
EtcdKV implements all *TxnKV* interfaces.
|
|
|
|
###### A.7.4 Memory KV
|
|
|
|
```go
|
|
type MemoryKV struct {
|
|
sync.RWMutex
|
|
tree *btree.BTree
|
|
}
|
|
|
|
func (s memoryKVItem) Less(than btree.Item) bool
|
|
func (kv *MemoryKV) Load(key string) (string, error)
|
|
func (kv *MemoryKV) LoadRange(key, endKey string, limit int) ([]string, []string, error)
|
|
func (kv *MemoryKV) Save(key, value string) error
|
|
func (kv *MemoryKV) Remove(key string) error
|
|
func (kv *MemoryKV) MultiLoad(keys []string) ([]string, error)
|
|
func (kv *MemoryKV) MultiSave(kvs map[string]string) error
|
|
func (kv *MemoryKV) MultiRemove(keys []string) error
|
|
func (kv *MemoryKV) MultiSaveAndRemove(saves map[string]string, removals []string) error
|
|
func (kv *MemoryKV) LoadWithPrefix(key string) ([]string, []string, error)
|
|
func (kv *MemoryKV) Close()
|
|
func (kv *MemoryKV) MultiRemoveWithPrefix(keys []string) error
|
|
func (kv *MemoryKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error
|
|
```
|
|
|
|
MemoryKV implements all *TxnKV* interfaces.
|
|
|
|
###### A.7.5 MinIO KV
|
|
|
|
```go
|
|
type MinIOKV struct {
|
|
ctx context.Context
|
|
minioClient *minio.Client
|
|
bucketName string
|
|
}
|
|
|
|
func (kv *MinIOKV) LoadWithPrefix(key string) ([]string, []string, error)
|
|
func (kv *MinIOKV) Load(key string) (string, error)
|
|
func (kv *MinIOKV) MultiLoad(keys []string) ([]string, error)
|
|
func (kv *MinIOKV) Save(key, value string) error
|
|
func (kv *MinIOKV) MultiSave(kvs map[string]string) error
|
|
func (kv *MinIOKV) RemoveWithPrefix(key string) error
|
|
func (kv *MinIOKV) Remove(key string) error
|
|
func (kv *MinIOKV) MultiRemove(keys []string) error
|
|
func (kv *MinIOKV) Close()
|
|
```
|
|
|
|
MinIOKV implements all *KV* interfaces.
|
|
|
|
###### A.7.6 RocksdbKV KV
|
|
|
|
```go
|
|
type RocksdbKV struct {
|
|
opts *gorocksdb.Options
|
|
db *gorocksdb.DB
|
|
writeOptions *gorocksdb.WriteOptions
|
|
readOptions *gorocksdb.ReadOptions
|
|
name string
|
|
}
|
|
|
|
func (kv *RocksdbKV) Close()
|
|
func (kv *RocksdbKV) GetName() string
|
|
func (kv *RocksdbKV) Load(key string) (string, error)
|
|
func (kv *RocksdbKV) LoadWithPrefix(key string) ([]string, []string, error)
|
|
func (kv *RocksdbKV) MultiLoad(keys []string) ([]string, error)
|
|
func (kv *RocksdbKV) Save(key, value string) error
|
|
func (kv *RocksdbKV) MultiSave(kvs map[string]string) error
|
|
func (kv *RocksdbKV) RemoveWithPrefix(key string) error
|
|
func (kv *RocksdbKV) Remove(key string) error
|
|
func (kv *RocksdbKV) MultiRemove(keys []string) error
|
|
func (kv *RocksdbKV) MultiSaveAndRemove(saves map[string]string, removals []string) error
|
|
func (kv *RocksdbKV) MultiRemoveWithPrefix(keys []string) error
|
|
func (kv *RocksdbKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error
|
|
```
|
|
|
|
RocksdbKV implements all *TxnKV* interfaces.h
|
|
|
|
|