Add MinioConfig in GlobalParams for all components (#15099)

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>
pull/15106/head
Cai Yudong 2022-01-10 17:29:34 +08:00 committed by GitHub
parent 4369e08f2a
commit b6a48817b0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 177 additions and 480 deletions

View File

@ -204,9 +204,9 @@ func Test_garbageCollector_scan(t *testing.T) {
// initialize unit test sso env
func initUtOSSEnv(bucket, root string, n int) (cli *minio.Client, inserts []string, stats []string, delta []string, other []string, err error) {
Params.Init()
cli, err = minio.New(Params.DataCoordCfg.MinioAddress, &minio.Options{
Creds: credentials.NewStaticV4(Params.DataCoordCfg.MinioAccessKeyID, Params.DataCoordCfg.MinioSecretAccessKey, ""),
Secure: Params.DataCoordCfg.MinioUseSSL,
cli, err = minio.New(Params.MinioCfg.Address, &minio.Options{
Creds: credentials.NewStaticV4(Params.MinioCfg.AccessKeyID, Params.MinioCfg.SecretAccessKey, ""),
Secure: Params.MinioCfg.UseSSL,
})
if err != nil {
return nil, nil, nil, nil, nil, err

View File

@ -343,21 +343,21 @@ func (s *Server) initGarbageCollection() error {
var cli *minio.Client
var err error
if Params.DataCoordCfg.EnableGarbageCollection {
cli, err = minio.New(Params.DataCoordCfg.MinioAddress, &minio.Options{
Creds: credentials.NewStaticV4(Params.DataCoordCfg.MinioAccessKeyID, Params.DataCoordCfg.MinioSecretAccessKey, ""),
Secure: Params.DataCoordCfg.MinioUseSSL,
cli, err = minio.New(Params.MinioCfg.Address, &minio.Options{
Creds: credentials.NewStaticV4(Params.MinioCfg.AccessKeyID, Params.MinioCfg.SecretAccessKey, ""),
Secure: Params.MinioCfg.UseSSL,
})
if err != nil {
return err
}
checkBucketFn := func() error {
has, err := cli.BucketExists(context.TODO(), Params.DataCoordCfg.MinioBucketName)
has, err := cli.BucketExists(context.TODO(), Params.MinioCfg.BucketName)
if err != nil {
return err
}
if !has {
err = cli.MakeBucket(context.TODO(), Params.DataCoordCfg.MinioBucketName, minio.MakeBucketOptions{})
err = cli.MakeBucket(context.TODO(), Params.MinioCfg.BucketName, minio.MakeBucketOptions{})
if err != nil {
return err
}
@ -377,8 +377,8 @@ func (s *Server) initGarbageCollection() error {
s.garbageCollector = newGarbageCollector(s.meta, GcOption{
cli: cli,
enabled: Params.DataCoordCfg.EnableGarbageCollection,
bucketName: Params.DataCoordCfg.MinioBucketName,
rootPath: Params.DataCoordCfg.MinioRootPath,
bucketName: Params.MinioCfg.BucketName,
rootPath: Params.MinioCfg.RootPath,
checkInterval: Params.DataCoordCfg.GCInterval,
missingTolerance: Params.DataCoordCfg.GCMissingTolerance,

View File

@ -461,12 +461,12 @@ func (node *DataNode) Start() error {
}
option := &miniokv.Option{
Address: Params.DataNodeCfg.MinioAddress,
AccessKeyID: Params.DataNodeCfg.MinioAccessKeyID,
SecretAccessKeyID: Params.DataNodeCfg.MinioSecretAccessKey,
UseSSL: Params.DataNodeCfg.MinioUseSSL,
Address: Params.MinioCfg.Address,
AccessKeyID: Params.MinioCfg.AccessKeyID,
SecretAccessKeyID: Params.MinioCfg.SecretAccessKey,
UseSSL: Params.MinioCfg.UseSSL,
BucketName: Params.MinioCfg.BucketName,
CreateBucket: true,
BucketName: Params.DataNodeCfg.MinioBucketName,
}
kv, err := miniokv.NewMinIOKV(node.ctx, option)

View File

@ -133,12 +133,12 @@ var _ Replica = &SegmentReplica{}
func newReplica(ctx context.Context, rc types.RootCoord, collID UniqueID) (*SegmentReplica, error) {
// MinIO
option := &miniokv.Option{
Address: Params.DataNodeCfg.MinioAddress,
AccessKeyID: Params.DataNodeCfg.MinioAccessKeyID,
SecretAccessKeyID: Params.DataNodeCfg.MinioSecretAccessKey,
UseSSL: Params.DataNodeCfg.MinioUseSSL,
Address: Params.MinioCfg.Address,
AccessKeyID: Params.MinioCfg.AccessKeyID,
SecretAccessKeyID: Params.MinioCfg.SecretAccessKey,
UseSSL: Params.MinioCfg.UseSSL,
BucketName: Params.MinioCfg.BucketName,
CreateBucket: true,
BucketName: Params.DataNodeCfg.MinioBucketName,
}
minIOKV, err := miniokv.NewMinIOKV(ctx, option)

View File

@ -217,11 +217,11 @@ func (i *IndexCoord) Init() error {
}
option := &miniokv.Option{
Address: Params.IndexCoordCfg.MinIOAddress,
AccessKeyID: Params.IndexCoordCfg.MinIOAccessKeyID,
SecretAccessKeyID: Params.IndexCoordCfg.MinIOSecretAccessKey,
UseSSL: Params.IndexCoordCfg.MinIOUseSSL,
BucketName: Params.IndexCoordCfg.MinioBucketName,
Address: Params.MinioCfg.Address,
AccessKeyID: Params.MinioCfg.AccessKeyID,
SecretAccessKeyID: Params.MinioCfg.SecretAccessKey,
UseSSL: Params.MinioCfg.UseSSL,
BucketName: Params.MinioCfg.BucketName,
CreateBucket: true,
}

View File

@ -56,7 +56,7 @@ func getSystemInfoMetrics(
ID: coord.session.ServerID,
},
SystemConfigurations: metricsinfo.IndexCoordConfiguration{
MinioBucketName: Params.IndexCoordCfg.MinioBucketName,
MinioBucketName: Params.MinioCfg.BucketName,
},
},
ConnectedNodes: make([]metricsinfo.IndexNodeInfos, 0),

View File

@ -176,11 +176,11 @@ func (i *IndexNode) Init() error {
i.etcdKV = etcdKV
option := &miniokv.Option{
Address: Params.IndexNodeCfg.MinIOAddress,
AccessKeyID: Params.IndexNodeCfg.MinIOAccessKeyID,
SecretAccessKeyID: Params.IndexNodeCfg.MinIOSecretAccessKey,
UseSSL: Params.IndexNodeCfg.MinIOUseSSL,
BucketName: Params.IndexNodeCfg.MinioBucketName,
Address: Params.MinioCfg.Address,
AccessKeyID: Params.MinioCfg.AccessKeyID,
SecretAccessKeyID: Params.MinioCfg.SecretAccessKey,
UseSSL: Params.MinioCfg.UseSSL,
BucketName: Params.MinioCfg.BucketName,
CreateBucket: true,
}
kv, err := miniokv.NewMinIOKV(i.loopCtx, option)

View File

@ -353,7 +353,7 @@ func getMockSystemInfoMetrics(
Type: typeutil.IndexNodeRole,
},
SystemConfigurations: metricsinfo.IndexNodeConfiguration{
MinioBucketName: Params.IndexNodeCfg.MinioBucketName,
MinioBucketName: Params.MinioCfg.BucketName,
SimdType: Params.IndexNodeCfg.SimdType,
},

View File

@ -51,7 +51,7 @@ func getSystemInfoMetrics(
ID: node.session.ServerID,
},
SystemConfigurations: metricsinfo.IndexNodeConfiguration{
MinioBucketName: Params.IndexNodeCfg.MinioBucketName,
MinioBucketName: Params.MinioCfg.BucketName,
SimdType: Params.IndexNodeCfg.SimdType,
},

View File

@ -131,12 +131,12 @@ func newQueryNodeCluster(ctx context.Context, clusterMeta Meta, kv *etcdkv.EtcdK
}
option := &minioKV.Option{
Address: Params.QueryCoordCfg.MinioEndPoint,
AccessKeyID: Params.QueryCoordCfg.MinioAccessKeyID,
SecretAccessKeyID: Params.QueryCoordCfg.MinioSecretAccessKey,
UseSSL: Params.QueryCoordCfg.MinioUseSSLStr,
Address: Params.MinioCfg.Address,
AccessKeyID: Params.MinioCfg.AccessKeyID,
SecretAccessKeyID: Params.MinioCfg.SecretAccessKey,
UseSSL: Params.MinioCfg.UseSSL,
BucketName: Params.MinioCfg.BucketName,
CreateBucket: true,
BucketName: Params.QueryCoordCfg.MinioBucketName,
}
c.dataKV, err = minioKV.NewMinIOKV(ctx, option)

View File

@ -335,11 +335,11 @@ func generateIndex(segmentID UniqueID) ([]string, error) {
}
option := &minioKV.Option{
Address: Params.QueryCoordCfg.MinioEndPoint,
AccessKeyID: Params.QueryCoordCfg.MinioAccessKeyID,
SecretAccessKeyID: Params.QueryCoordCfg.MinioSecretAccessKey,
UseSSL: Params.QueryCoordCfg.MinioUseSSLStr,
BucketName: Params.QueryCoordCfg.MinioBucketName,
Address: Params.MinioCfg.Address,
AccessKeyID: Params.MinioCfg.AccessKeyID,
SecretAccessKeyID: Params.MinioCfg.SecretAccessKey,
UseSSL: Params.MinioCfg.UseSSL,
BucketName: Params.MinioCfg.BucketName,
CreateBucket: true,
}

View File

@ -312,11 +312,11 @@ func newDataCoordMock(ctx context.Context) (*dataCoordMock, error) {
// create minio client
option := &minioKV.Option{
Address: Params.QueryCoordCfg.MinioEndPoint,
AccessKeyID: Params.QueryCoordCfg.MinioAccessKeyID,
SecretAccessKeyID: Params.QueryCoordCfg.MinioSecretAccessKey,
UseSSL: Params.QueryCoordCfg.MinioUseSSLStr,
BucketName: Params.QueryCoordCfg.MinioBucketName,
Address: Params.MinioCfg.Address,
AccessKeyID: Params.MinioCfg.AccessKeyID,
SecretAccessKeyID: Params.MinioCfg.SecretAccessKey,
UseSSL: Params.MinioCfg.UseSSL,
BucketName: Params.MinioCfg.BucketName,
CreateBucket: true,
}
kv, err := minioKV.NewMinIOKV(ctx, option)

View File

@ -53,12 +53,12 @@ func TestShuffleSegmentsToQueryNode(t *testing.T) {
}
option := &minioKV.Option{
Address: Params.QueryCoordCfg.MinioEndPoint,
AccessKeyID: Params.QueryCoordCfg.MinioAccessKeyID,
SecretAccessKeyID: Params.QueryCoordCfg.MinioSecretAccessKey,
UseSSL: Params.QueryCoordCfg.MinioUseSSLStr,
Address: Params.MinioCfg.Address,
AccessKeyID: Params.MinioCfg.AccessKeyID,
SecretAccessKeyID: Params.MinioCfg.SecretAccessKey,
UseSSL: Params.MinioCfg.UseSSL,
BucketName: Params.MinioCfg.BucketName,
CreateBucket: true,
BucketName: Params.QueryCoordCfg.MinioBucketName,
}
cluster.dataKV, err = minioKV.NewMinIOKV(baseCtx, option)

View File

@ -247,12 +247,12 @@ func (loader *indexLoader) setIndexInfo(segment *Segment, info *indexInfo) {
// newIndexLoader returns a new indexLoader
func newIndexLoader(ctx context.Context, rootCoord types.RootCoord, indexCoord types.IndexCoord, replica ReplicaInterface) *indexLoader {
option := &minioKV.Option{
Address: Params.QueryNodeCfg.MinioEndPoint,
AccessKeyID: Params.QueryNodeCfg.MinioAccessKeyID,
SecretAccessKeyID: Params.QueryNodeCfg.MinioSecretAccessKey,
UseSSL: Params.QueryNodeCfg.MinioUseSSLStr,
Address: Params.MinioCfg.Address,
AccessKeyID: Params.MinioCfg.AccessKeyID,
SecretAccessKeyID: Params.MinioCfg.SecretAccessKey,
UseSSL: Params.MinioCfg.UseSSL,
BucketName: Params.MinioCfg.BucketName,
CreateBucket: true,
BucketName: Params.QueryNodeCfg.MinioBucketName,
}
client, err := minioKV.NewMinIOKV(ctx, option)

View File

@ -815,13 +815,12 @@ func generateInsertBinLog(collectionID UniqueID, partitionID UniqueID, segmentID
}
// create minio client
bucketName := Params.QueryNodeCfg.MinioBucketName
option := &minioKV.Option{
Address: Params.QueryNodeCfg.MinioEndPoint,
AccessKeyID: Params.QueryNodeCfg.MinioAccessKeyID,
SecretAccessKeyID: Params.QueryNodeCfg.MinioSecretAccessKey,
UseSSL: Params.QueryNodeCfg.MinioUseSSLStr,
BucketName: bucketName,
Address: Params.MinioCfg.Address,
AccessKeyID: Params.MinioCfg.AccessKeyID,
SecretAccessKeyID: Params.MinioCfg.SecretAccessKey,
UseSSL: Params.MinioCfg.UseSSL,
BucketName: Params.MinioCfg.BucketName,
CreateBucket: true,
}
kv, err := minioKV.NewMinIOKV(context.Background(), option)

View File

@ -254,11 +254,11 @@ func generateIndex(segmentID UniqueID) ([]string, error) {
}
option := &minioKV.Option{
Address: Params.QueryNodeCfg.MinioEndPoint,
AccessKeyID: Params.QueryNodeCfg.MinioAccessKeyID,
SecretAccessKeyID: Params.QueryNodeCfg.MinioSecretAccessKey,
UseSSL: Params.QueryNodeCfg.MinioUseSSLStr,
BucketName: Params.QueryNodeCfg.MinioBucketName,
Address: Params.MinioCfg.Address,
AccessKeyID: Params.MinioCfg.AccessKeyID,
SecretAccessKeyID: Params.MinioCfg.SecretAccessKey,
UseSSL: Params.MinioCfg.UseSSL,
BucketName: Params.MinioCfg.BucketName,
CreateBucket: true,
}
@ -357,13 +357,12 @@ func genSimpleCollectionMeta() *etcdpb.CollectionMeta {
// ---------- unittest util functions ----------
// functions of third-party
func genMinioKV(ctx context.Context) (*minioKV.MinIOKV, error) {
bucketName := Params.QueryNodeCfg.MinioBucketName
option := &minioKV.Option{
Address: Params.QueryNodeCfg.MinioEndPoint,
AccessKeyID: Params.QueryNodeCfg.MinioAccessKeyID,
SecretAccessKeyID: Params.QueryNodeCfg.MinioSecretAccessKey,
UseSSL: Params.QueryNodeCfg.MinioUseSSLStr,
BucketName: bucketName,
Address: Params.MinioCfg.Address,
AccessKeyID: Params.MinioCfg.AccessKeyID,
SecretAccessKeyID: Params.MinioCfg.SecretAccessKey,
UseSSL: Params.MinioCfg.UseSSL,
BucketName: Params.MinioCfg.BucketName,
CreateBucket: true,
}
kv, err := minioKV.NewMinIOKV(ctx, option)

View File

@ -67,12 +67,12 @@ func newQueryService(ctx context.Context,
localChunkManager := storage.NewLocalChunkManager(path)
option := &miniokv.Option{
Address: Params.QueryNodeCfg.MinioEndPoint,
AccessKeyID: Params.QueryNodeCfg.MinioAccessKeyID,
SecretAccessKeyID: Params.QueryNodeCfg.MinioSecretAccessKey,
UseSSL: Params.QueryNodeCfg.MinioUseSSLStr,
Address: Params.MinioCfg.Address,
AccessKeyID: Params.MinioCfg.AccessKeyID,
SecretAccessKeyID: Params.MinioCfg.SecretAccessKey,
UseSSL: Params.MinioCfg.UseSSL,
BucketName: Params.MinioCfg.BucketName,
CreateBucket: true,
BucketName: Params.QueryNodeCfg.MinioBucketName,
}
client, err := miniokv.NewMinIOKV(ctx, option)

View File

@ -672,12 +672,12 @@ func newSegmentLoader(ctx context.Context,
etcdKV *etcdkv.EtcdKV,
factory msgstream.Factory) *segmentLoader {
option := &minioKV.Option{
Address: Params.QueryNodeCfg.MinioEndPoint,
AccessKeyID: Params.QueryNodeCfg.MinioAccessKeyID,
SecretAccessKeyID: Params.QueryNodeCfg.MinioSecretAccessKey,
UseSSL: Params.QueryNodeCfg.MinioUseSSLStr,
Address: Params.MinioCfg.Address,
AccessKeyID: Params.MinioCfg.AccessKeyID,
SecretAccessKeyID: Params.MinioCfg.SecretAccessKey,
UseSSL: Params.MinioCfg.UseSSL,
BucketName: Params.MinioCfg.BucketName,
CreateBucket: true,
BucketName: Params.QueryNodeCfg.MinioBucketName,
}
client, err := minioKV.NewMinIOKV(ctx, option)

View File

@ -55,6 +55,7 @@ type GlobalParamTable struct {
PulsarCfg pulsarConfig
RocksmqCfg rocksmqConfig
MinioCfg minioConfig
//CommonCfg commonConfig
//KnowhereCfg knowhereConfig
//MsgChannelCfg msgChannelConfig
@ -82,6 +83,7 @@ func (p *GlobalParamTable) Init() {
p.PulsarCfg.init(&p.BaseParams)
p.RocksmqCfg.init(&p.BaseParams)
p.MinioCfg.init(&p.BaseParams)
//p.CommonCfg.init(&p.BaseParams)
//p.KnowhereCfg.init(&p.BaseParams)
//p.MsgChannelCfg.init(&p.BaseParams)
@ -102,7 +104,6 @@ func (p *GlobalParamTable) SetLogConfig(role string) {
p.BaseParams.SetLogConfig()
}
// TODO: considering remove it: comment a large block of code is not a good practice, old code can be found with git
///////////////////////////////////////////////////////////////////////////////
// --- pulsar ---
type pulsarConfig struct {
@ -163,6 +164,78 @@ func (p *rocksmqConfig) initPath() {
p.Path = path
}
///////////////////////////////////////////////////////////////////////////////
// --- minio ---
type minioConfig struct {
BaseParams *BaseParamTable
Address string
AccessKeyID string
SecretAccessKey string
UseSSL bool
BucketName string
RootPath string
}
func (p *minioConfig) init(bp *BaseParamTable) {
p.BaseParams = bp
p.initAddress()
p.initAccessKeyID()
p.initSecretAccessKey()
p.initUseSSL()
p.initBucketName()
p.initRootPath()
}
func (p *minioConfig) initAddress() {
endpoint, err := p.BaseParams.Load("_MinioAddress")
if err != nil {
panic(err)
}
p.Address = endpoint
}
func (p *minioConfig) initAccessKeyID() {
keyID, err := p.BaseParams.Load("_MinioAccessKeyID")
if err != nil {
panic(err)
}
p.AccessKeyID = keyID
}
func (p *minioConfig) initSecretAccessKey() {
key, err := p.BaseParams.Load("_MinioSecretAccessKey")
if err != nil {
panic(err)
}
p.SecretAccessKey = key
}
func (p *minioConfig) initUseSSL() {
usessl, err := p.BaseParams.Load("_MinioUseSSL")
if err != nil {
panic(err)
}
p.UseSSL, _ = strconv.ParseBool(usessl)
}
func (p *minioConfig) initBucketName() {
bucketName, err := p.BaseParams.Load("_MinioBucketName")
if err != nil {
panic(err)
}
p.BucketName = bucketName
}
func (p *minioConfig) initRootPath() {
rootPath, err := p.BaseParams.Load("minio.rootPath")
if err != nil {
panic(err)
}
p.RootPath = rootPath
}
///////////////////////////////////////////////////////////////////////////////
// --- common ---
//type commonConfig struct {
@ -664,13 +737,6 @@ type queryCoordConfig struct {
SearchChannelPrefix string
SearchResultChannelPrefix string
//--- Minio ---
MinioEndPoint string
MinioAccessKeyID string
MinioSecretAccessKey string
MinioUseSSLStr bool
MinioBucketName string
CreatedTime time.Time
UpdatedTime time.Time
@ -697,13 +763,6 @@ func (p *queryCoordConfig) init(bp *BaseParamTable) {
p.initStatsChannelName()
p.initTimeTickChannelName()
//--- Minio ----
p.initMinioEndPoint()
p.initMinioAccessKeyID()
p.initMinioSecretAccessKey()
p.initMinioUseSSLStr()
p.initMinioBucketName()
//---- Handoff ---
p.initAutoHandoff()
@ -762,50 +821,6 @@ func (p *queryCoordConfig) initTimeTickChannelName() {
p.TimeTickChannelName = strings.Join(s, "-")
}
func (p *queryCoordConfig) initMinioEndPoint() {
url, err := p.BaseParams.Load("_MinioAddress")
if err != nil {
panic(err)
}
p.MinioEndPoint = url
}
func (p *queryCoordConfig) initMinioAccessKeyID() {
id, err := p.BaseParams.Load("minio.accessKeyID")
if err != nil {
panic(err)
}
p.MinioAccessKeyID = id
}
func (p *queryCoordConfig) initMinioSecretAccessKey() {
key, err := p.BaseParams.Load("minio.secretAccessKey")
if err != nil {
panic(err)
}
p.MinioSecretAccessKey = key
}
func (p *queryCoordConfig) initMinioUseSSLStr() {
ssl, err := p.BaseParams.Load("minio.useSSL")
if err != nil {
panic(err)
}
sslBoolean, err := strconv.ParseBool(ssl)
if err != nil {
panic(err)
}
p.MinioUseSSLStr = sslBoolean
}
func (p *queryCoordConfig) initMinioBucketName() {
bucketName, err := p.BaseParams.Load("minio.bucketName")
if err != nil {
panic(err)
}
p.MinioBucketName = bucketName
}
func (p *queryCoordConfig) initAutoHandoff() {
handoff, err := p.BaseParams.Load("queryCoord.autoHandoff")
if err != nil {
@ -892,13 +907,6 @@ type queryNodeConfig struct {
FlowGraphMaxQueueLength int32
FlowGraphMaxParallelism int32
// minio
MinioEndPoint string
MinioAccessKeyID string
MinioSecretAccessKey string
MinioUseSSLStr bool
MinioBucketName string
// search
SearchChannelNames []string
SearchResultChannelNames []string
@ -937,13 +945,6 @@ func (p *queryNodeConfig) init(bp *BaseParamTable) {
p.BaseParams = bp
p.initCacheSize()
p.initMinioEndPoint()
p.initMinioAccessKeyID()
p.initMinioSecretAccessKey()
p.initMinioUseSSLStr()
p.initMinioBucketName()
p.initGracefulTime()
p.initFlowGraphMaxQueueLength()
@ -999,51 +1000,6 @@ func (p *queryNodeConfig) initCacheSize() {
p.CacheSize = value
}
// ---------------------------------------------------------- minio
func (p *queryNodeConfig) initMinioEndPoint() {
url, err := p.BaseParams.Load("_MinioAddress")
if err != nil {
panic(err)
}
p.MinioEndPoint = url
}
func (p *queryNodeConfig) initMinioAccessKeyID() {
id, err := p.BaseParams.Load("minio.accessKeyID")
if err != nil {
panic(err)
}
p.MinioAccessKeyID = id
}
func (p *queryNodeConfig) initMinioSecretAccessKey() {
key, err := p.BaseParams.Load("minio.secretAccessKey")
if err != nil {
panic(err)
}
p.MinioSecretAccessKey = key
}
func (p *queryNodeConfig) initMinioUseSSLStr() {
ssl, err := p.BaseParams.Load("minio.useSSL")
if err != nil {
panic(err)
}
sslBoolean, err := strconv.ParseBool(ssl)
if err != nil {
panic(err)
}
p.MinioUseSSLStr = sslBoolean
}
func (p *queryNodeConfig) initMinioBucketName() {
bucketName, err := p.BaseParams.Load("minio.bucketName")
if err != nil {
panic(err)
}
p.MinioBucketName = bucketName
}
// advanced params
// stats
func (p *queryNodeConfig) initStatsPublishInterval() {
@ -1151,14 +1107,6 @@ type dataCoordConfig struct {
// --- ETCD ---
ChannelWatchSubPath string
// --- MinIO ---
MinioAddress string
MinioAccessKeyID string
MinioSecretAccessKey string
MinioUseSSL bool
MinioBucketName string
MinioRootPath string
// --- SEGMENTS ---
SegmentMaxSize float64
SegmentSealProportion float64
@ -1204,13 +1152,6 @@ func (p *dataCoordConfig) init(bp *BaseParamTable) {
p.initEnableCompaction()
p.initMinioAddress()
p.initMinioAccessKeyID()
p.initMinioSecretAccessKey()
p.initMinioUseSSL()
p.initMinioBucketName()
p.initMinioRootPath()
p.initRetentionDuration()
p.initEnableAutoCompaction()
@ -1304,55 +1245,6 @@ func (p *dataCoordConfig) initGCDropTolerance() {
p.GCDropTolerance = time.Duration(p.BaseParams.ParseInt64WithDefault("dataCoord.gc.dropTolerance", 24*60*60)) * time.Second
}
// --- MinIO ---
func (p *dataCoordConfig) initMinioAddress() {
endpoint, err := p.BaseParams.Load("_MinioAddress")
if err != nil {
panic(err)
}
p.MinioAddress = endpoint
}
func (p *dataCoordConfig) initMinioAccessKeyID() {
keyID, err := p.BaseParams.Load("_MinioAccessKeyID")
if err != nil {
panic(err)
}
p.MinioAccessKeyID = keyID
}
func (p *dataCoordConfig) initMinioSecretAccessKey() {
key, err := p.BaseParams.Load("_MinioSecretAccessKey")
if err != nil {
panic(err)
}
p.MinioSecretAccessKey = key
}
func (p *dataCoordConfig) initMinioUseSSL() {
usessl, err := p.BaseParams.Load("_MinioUseSSL")
if err != nil {
panic(err)
}
p.MinioUseSSL, _ = strconv.ParseBool(usessl)
}
func (p *dataCoordConfig) initMinioBucketName() {
bucketName, err := p.BaseParams.Load("_MinioBucketName")
if err != nil {
panic(err)
}
p.MinioBucketName = bucketName
}
func (p *dataCoordConfig) initMinioRootPath() {
rootPath, err := p.BaseParams.Load("minio.rootPath")
if err != nil {
panic(err)
}
p.MinioRootPath = rootPath
}
func (p *dataCoordConfig) initRetentionDuration() {
p.RetentionDuration = p.BaseParams.ParseInt64WithDefault("common.retentionDuration", DefaultRetentionDuration)
}
@ -1398,13 +1290,6 @@ type dataNodeConfig struct {
// etcd
ChannelWatchSubPath string
// MinIO
MinioAddress string
MinioAccessKeyID string
MinioSecretAccessKey string
MinioUseSSL bool
MinioBucketName string
CreatedTime time.Time
UpdatedTime time.Time
}
@ -1426,12 +1311,6 @@ func (p *dataNodeConfig) init(bp *BaseParamTable) {
p.initChannelWatchPath()
p.initMinioAddress()
p.initMinioAccessKeyID()
p.initMinioSecretAccessKey()
p.initMinioUseSSL()
p.initMinioBucketName()
p.initDmlChannelName()
p.initDeltaChannelName()
}
@ -1459,7 +1338,7 @@ func (p *dataNodeConfig) initFlushInsertBufferSize() {
}
func (p *dataNodeConfig) initInsertBinlogRootPath() {
// GOOSE TODO: rootPath change to TenentID
// GOOSE TODO: rootPath change to TenentID
rootPath, err := p.BaseParams.Load("minio.rootPath")
if err != nil {
panic(err)
@ -1513,47 +1392,6 @@ func (p *dataNodeConfig) initChannelWatchPath() {
p.ChannelWatchSubPath = "channelwatch"
}
// --- MinIO ---
func (p *dataNodeConfig) initMinioAddress() {
endpoint, err := p.BaseParams.Load("_MinioAddress")
if err != nil {
panic(err)
}
p.MinioAddress = endpoint
}
func (p *dataNodeConfig) initMinioAccessKeyID() {
keyID, err := p.BaseParams.Load("_MinioAccessKeyID")
if err != nil {
panic(err)
}
p.MinioAccessKeyID = keyID
}
func (p *dataNodeConfig) initMinioSecretAccessKey() {
key, err := p.BaseParams.Load("_MinioSecretAccessKey")
if err != nil {
panic(err)
}
p.MinioSecretAccessKey = key
}
func (p *dataNodeConfig) initMinioUseSSL() {
usessl, err := p.BaseParams.Load("_MinioUseSSL")
if err != nil {
panic(err)
}
p.MinioUseSSL, _ = strconv.ParseBool(usessl)
}
func (p *dataNodeConfig) initMinioBucketName() {
bucketName, err := p.BaseParams.Load("_MinioBucketName")
if err != nil {
panic(err)
}
p.MinioBucketName = bucketName
}
func (p *dataNodeConfig) initDmlChannelName() {
config, err := p.BaseParams.Load("msgChannel.chanNamePrefix.rootCoordDml")
if err != nil {
@ -1582,12 +1420,6 @@ type indexCoordConfig struct {
IndexStorageRootPath string
MinIOAddress string
MinIOAccessKeyID string
MinIOSecretAccessKey string
MinIOUseSSL bool
MinioBucketName string
CreatedTime time.Time
UpdatedTime time.Time
}
@ -1595,62 +1427,9 @@ type indexCoordConfig struct {
func (p *indexCoordConfig) init(bp *BaseParamTable) {
p.BaseParams = bp
p.initMinIOAddress()
p.initMinIOAccessKeyID()
p.initMinIOSecretAccessKey()
p.initMinIOUseSSL()
p.initMinioBucketName()
p.initIndexStorageRootPath()
}
// initMinIOAddress initializes init the minio address of configuration items.
func (p *indexCoordConfig) initMinIOAddress() {
ret, err := p.BaseParams.Load("_MinioAddress")
if err != nil {
panic(err)
}
p.MinIOAddress = ret
}
// initMinIOAccessKeyID initializes the minio access key of configuration items.
func (p *indexCoordConfig) initMinIOAccessKeyID() {
ret, err := p.BaseParams.Load("minio.accessKeyID")
if err != nil {
panic(err)
}
p.MinIOAccessKeyID = ret
}
// initMinIOSecretAccessKey initializes the minio secret access key.
func (p *indexCoordConfig) initMinIOSecretAccessKey() {
ret, err := p.BaseParams.Load("minio.secretAccessKey")
if err != nil {
panic(err)
}
p.MinIOSecretAccessKey = ret
}
// initMinIOUseSSL initializes the minio use SSL of configuration items.
func (p *indexCoordConfig) initMinIOUseSSL() {
ret, err := p.BaseParams.Load("minio.useSSL")
if err != nil {
panic(err)
}
p.MinIOUseSSL, err = strconv.ParseBool(ret)
if err != nil {
panic(err)
}
}
// initMinioBucketName initializes the minio bucket name of configuration items.
func (p *indexCoordConfig) initMinioBucketName() {
bucketName, err := p.BaseParams.Load("minio.bucketName")
if err != nil {
panic(err)
}
p.MinioBucketName = bucketName
}
// initIndexStorageRootPath initializes the root path of index files.
func (p *indexCoordConfig) initIndexStorageRootPath() {
rootPath, err := p.BaseParams.Load("minio.rootPath")
@ -1674,12 +1453,6 @@ type indexNodeConfig struct {
IndexStorageRootPath string
MinIOAddress string
MinIOAccessKeyID string
MinIOSecretAccessKey string
MinIOUseSSL bool
MinioBucketName string
SimdType string
CreatedTime time.Time
@ -1689,11 +1462,6 @@ type indexNodeConfig struct {
func (p *indexNodeConfig) init(bp *BaseParamTable) {
p.BaseParams = bp
p.initMinIOAddress()
p.initMinIOAccessKeyID()
p.initMinIOSecretAccessKey()
p.initMinIOUseSSL()
p.initMinioBucketName()
p.initIndexStorageRootPath()
p.initKnowhereSimdType()
}
@ -1703,41 +1471,6 @@ func (p *indexNodeConfig) InitAlias(alias string) {
p.Alias = alias
}
func (p *indexNodeConfig) initMinIOAddress() {
ret, err := p.BaseParams.Load("_MinioAddress")
if err != nil {
panic(err)
}
p.MinIOAddress = ret
}
func (p *indexNodeConfig) initMinIOAccessKeyID() {
ret, err := p.BaseParams.Load("_MinioAccessKeyID")
if err != nil {
panic(err)
}
p.MinIOAccessKeyID = ret
}
func (p *indexNodeConfig) initMinIOSecretAccessKey() {
ret, err := p.BaseParams.Load("_MinioSecretAccessKey")
if err != nil {
panic(err)
}
p.MinIOSecretAccessKey = ret
}
func (p *indexNodeConfig) initMinIOUseSSL() {
ret, err := p.BaseParams.Load("_MinioUseSSL")
if err != nil {
panic(err)
}
p.MinIOUseSSL, err = strconv.ParseBool(ret)
if err != nil {
panic(err)
}
}
func (p *indexNodeConfig) initIndexStorageRootPath() {
rootPath, err := p.BaseParams.Load("minio.rootPath")
if err != nil {
@ -1746,14 +1479,6 @@ func (p *indexNodeConfig) initIndexStorageRootPath() {
p.IndexStorageRootPath = path.Join(rootPath, "index_files")
}
func (p *indexNodeConfig) initMinioBucketName() {
bucketName, err := p.BaseParams.Load("_MinioBucketName")
if err != nil {
panic(err)
}
p.MinioBucketName = bucketName
}
func (p *indexNodeConfig) initKnowhereSimdType() {
simdType := p.BaseParams.LoadWithDefault("knowhere.simdType", "auto")
p.SimdType = simdType

View File

@ -48,6 +48,25 @@ func TestGlobalParamTable(t *testing.T) {
t.Logf("rocksmq path = %s", Params.Path)
})
t.Run("test minioConfig", func(t *testing.T) {
Params := GlobalParams.MinioCfg
addr := Params.Address
equal := addr == "localhost:9000" || addr == "minio:9000"
assert.Equal(t, equal, true)
t.Logf("minio address = %s", Params.Address)
assert.Equal(t, Params.AccessKeyID, "minioadmin")
assert.Equal(t, Params.SecretAccessKey, "minioadmin")
assert.Equal(t, Params.UseSSL, false)
t.Logf("Minio BucketName = %s", Params.BucketName)
t.Logf("Minio rootpath = %s", Params.RootPath)
})
t.Run("test rootCoordConfig", func(t *testing.T) {
Params := GlobalParams.RootCoordCfg
@ -183,19 +202,6 @@ func TestGlobalParamTable(t *testing.T) {
Params.initCacheSize()
assert.Equal(t, int64(32), Params.CacheSize)
endPoint := Params.MinioEndPoint
equal := endPoint == "localhost:9000" || endPoint == "minio:9000"
assert.Equal(t, equal, true)
accessKeyID := Params.MinioAccessKeyID
assert.Equal(t, accessKeyID, "minioadmin")
secretAccessKey := Params.MinioSecretAccessKey
assert.Equal(t, secretAccessKey, "minioadmin")
useSSL := Params.MinioUseSSLStr
assert.Equal(t, useSSL, false)
interval := Params.StatsPublishInterval
assert.Equal(t, 1000, interval)
@ -278,18 +284,6 @@ func TestGlobalParamTable(t *testing.T) {
assert.Equal(t, name, "by-dev-dataNode-2")
log.Println("MsgChannelSubName:", name)
id1 := Params.MinioAccessKeyID
log.Println("MinioAccessKeyID:", id1)
key := Params.MinioSecretAccessKey
log.Println("MinioSecretAccessKey:", key)
useSSL := Params.MinioUseSSL
log.Println("MinioUseSSL:", useSSL)
name = Params.MinioBucketName
log.Println("MinioBucketName:", name)
Params.CreatedTime = time.Now()
log.Println("CreatedTime: ", Params.CreatedTime)
@ -308,16 +302,6 @@ func TestGlobalParamTable(t *testing.T) {
t.Logf("Port: %v", Params.Port)
t.Logf("MinIOAddress: %v", Params.MinIOAddress)
t.Logf("MinIOAccessKeyID: %v", Params.MinIOAccessKeyID)
t.Logf("MinIOSecretAccessKey: %v", Params.MinIOSecretAccessKey)
t.Logf("MinIOUseSSL: %v", Params.MinIOUseSSL)
t.Logf("MinioBucketName: %v", Params.MinioBucketName)
Params.CreatedTime = time.Now()
t.Logf("CreatedTime: %v", Params.CreatedTime)
@ -340,16 +324,6 @@ func TestGlobalParamTable(t *testing.T) {
t.Logf("Alias: %v", Params.Alias)
t.Logf("MinIOAddress: %v", Params.MinIOAddress)
t.Logf("MinIOAccessKeyID: %v", Params.MinIOAccessKeyID)
t.Logf("MinIOSecretAccessKey: %v", Params.MinIOSecretAccessKey)
t.Logf("MinIOUseSSL: %v", Params.MinIOUseSSL)
t.Logf("MinioBucketName: %v", Params.MinioBucketName)
t.Logf("SimdType: %v", Params.SimdType)
Params.CreatedTime = time.Now()