2022-04-20 08:15:41 +00:00
|
|
|
package proxy
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"errors"
|
2022-05-17 14:35:57 +00:00
|
|
|
"fmt"
|
2022-04-20 08:15:41 +00:00
|
|
|
|
|
|
|
qnClient "github.com/milvus-io/milvus/internal/distributed/querynode/client"
|
|
|
|
|
|
|
|
"github.com/milvus-io/milvus/internal/log"
|
|
|
|
"github.com/milvus-io/milvus/internal/types"
|
|
|
|
|
|
|
|
"go.uber.org/zap"
|
|
|
|
)
|
|
|
|
|
|
|
|
type getQueryNodePolicy func(context.Context, string) (types.QueryNode, error)
|
|
|
|
|
2022-05-17 14:35:57 +00:00
|
|
|
type pickShardPolicy func(ctx context.Context, policy getQueryNodePolicy, query func(UniqueID, types.QueryNode) error, leaders []queryNode) error
|
2022-04-20 08:15:41 +00:00
|
|
|
|
|
|
|
// TODO add another policy to enbale the use of cache
|
|
|
|
// defaultGetQueryNodePolicy creates QueryNode client for every address everytime
|
|
|
|
func defaultGetQueryNodePolicy(ctx context.Context, address string) (types.QueryNode, error) {
|
|
|
|
qn, err := qnClient.NewClient(ctx, address)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := qn.Init(); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := qn.Start(); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return qn, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
var (
|
|
|
|
errBegin = errors.New("begin error")
|
|
|
|
errInvalidShardLeaders = errors.New("Invalid shard leader")
|
|
|
|
)
|
|
|
|
|
2022-05-17 14:35:57 +00:00
|
|
|
type queryNode struct {
|
|
|
|
nodeID UniqueID
|
|
|
|
address string
|
|
|
|
}
|
|
|
|
|
|
|
|
func (q queryNode) String() string {
|
|
|
|
return fmt.Sprintf("<NodeID: %d>", q.nodeID)
|
|
|
|
}
|
|
|
|
|
|
|
|
func updateShardsWithRoundRobin(shardsLeaders map[string][]queryNode) map[string][]queryNode {
|
|
|
|
|
|
|
|
for channelID, leaders := range shardsLeaders {
|
|
|
|
if len(leaders) <= 1 {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
shardsLeaders[channelID] = append(leaders[1:], leaders[0])
|
|
|
|
}
|
|
|
|
|
|
|
|
return shardsLeaders
|
|
|
|
}
|
|
|
|
|
|
|
|
func roundRobinPolicy(ctx context.Context, getQueryNodePolicy getQueryNodePolicy, query func(UniqueID, types.QueryNode) error, leaders []queryNode) error {
|
2022-04-20 08:15:41 +00:00
|
|
|
var (
|
|
|
|
err = errBegin
|
|
|
|
current = 0
|
|
|
|
qn types.QueryNode
|
|
|
|
)
|
2022-05-17 14:35:57 +00:00
|
|
|
replicaNum := len(leaders)
|
2022-04-20 08:15:41 +00:00
|
|
|
|
|
|
|
for err != nil && current < replicaNum {
|
2022-05-17 14:35:57 +00:00
|
|
|
currentID := leaders[current].nodeID
|
2022-04-20 08:15:41 +00:00
|
|
|
if err != errBegin {
|
2022-04-27 08:55:46 +00:00
|
|
|
log.Warn("retry with another QueryNode",
|
|
|
|
zap.Int("retries numbers", current),
|
2022-05-17 14:35:57 +00:00
|
|
|
zap.Int64("nodeID", currentID))
|
2022-04-20 08:15:41 +00:00
|
|
|
}
|
|
|
|
|
2022-05-17 14:35:57 +00:00
|
|
|
qn, err = getQueryNodePolicy(ctx, leaders[current].address)
|
2022-04-20 08:15:41 +00:00
|
|
|
if err != nil {
|
|
|
|
log.Warn("fail to get valid QueryNode", zap.Int64("nodeID", currentID),
|
|
|
|
zap.Error(err))
|
|
|
|
current++
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
defer qn.Stop()
|
|
|
|
err = query(currentID, qn)
|
|
|
|
if err != nil {
|
|
|
|
log.Warn("fail to Query with shard leader",
|
|
|
|
zap.Int64("nodeID", currentID),
|
|
|
|
zap.Error(err))
|
|
|
|
}
|
|
|
|
current++
|
|
|
|
}
|
|
|
|
|
|
|
|
if current == replicaNum && err != nil {
|
2022-05-17 14:35:57 +00:00
|
|
|
log.Warn("no shard leaders available",
|
|
|
|
zap.String("leaders", fmt.Sprintf("%v", leaders)), zap.Error(err))
|
2022-04-26 03:27:53 +00:00
|
|
|
// needs to return the error from query
|
|
|
|
return err
|
2022-04-20 08:15:41 +00:00
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|