mirror of https://github.com/milvus-io/milvus.git
Support access s3 through AWS IAM role (#17292)
Signed-off-by: shaoyue.chen <shaoyue.chen@zilliz.com>pull/17357/head
parent
0986c29d7f
commit
76eaa3fc50
|
@ -60,6 +60,12 @@ minio:
|
|||
useSSL: false # Access to MinIO/S3 with SSL
|
||||
bucketName: "a-bucket" # Bucket name in MinIO/S3
|
||||
rootPath: files # The root path where the message is stored in MinIO/S3
|
||||
# Whether to use AWS IAM role to access S3 instead of access/secret keys
|
||||
# For more infomation, refer to https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_use.html
|
||||
useIAM: false
|
||||
# Custom endpoint for fetch IAM role credentials.
|
||||
# Leave it empty if you want to use AWS default endpoint
|
||||
iamEndpoint: ""
|
||||
|
||||
# Milvus supports three MQ: rocksmq(based on RockDB), Pulsar and Kafka, which should be reserved in config what you use.
|
||||
# There is a note about enabling priority if we config multiple mq in this file
|
||||
|
|
|
@ -353,32 +353,26 @@ func (s *Server) initGarbageCollection() error {
|
|||
var cli *minio.Client
|
||||
var err error
|
||||
if Params.DataCoordCfg.EnableGarbageCollection {
|
||||
var creds *credentials.Credentials
|
||||
if Params.MinioCfg.UseIAM {
|
||||
creds = credentials.NewIAM(Params.MinioCfg.IAMEndpoint)
|
||||
} else {
|
||||
creds = credentials.NewStaticV4(Params.MinioCfg.AccessKeyID, Params.MinioCfg.SecretAccessKey, "")
|
||||
}
|
||||
// TODO: We call minio.New in different places with same procedures to call several functions.
|
||||
// We should abstract this to a focade function to avoid applying changes to only one place.
|
||||
cli, err = minio.New(Params.MinioCfg.Address, &minio.Options{
|
||||
Creds: credentials.NewStaticV4(Params.MinioCfg.AccessKeyID, Params.MinioCfg.SecretAccessKey, ""),
|
||||
Creds: creds,
|
||||
Secure: Params.MinioCfg.UseSSL,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
checkBucketFn := func() error {
|
||||
has, err := cli.BucketExists(context.TODO(), Params.MinioCfg.BucketName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !has {
|
||||
err = cli.MakeBucket(context.TODO(), Params.MinioCfg.BucketName, minio.MakeBucketOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
return fmt.Errorf("failed to create minio client: %v", err)
|
||||
}
|
||||
// retry times shall be two, just to prevent
|
||||
// 1. bucket not exists
|
||||
// 2. bucket is created by other componnent
|
||||
// 3. datacoord try to create but failed with bucket already exists error
|
||||
err = retry.Do(s.ctx, checkBucketFn, retry.Attempts(2))
|
||||
err = retry.Do(s.ctx, getCheckBucketFn(cli), retry.Attempts(2))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -397,6 +391,23 @@ func (s *Server) initGarbageCollection() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// here we use variable for test convenience
|
||||
var getCheckBucketFn = func(cli *minio.Client) func() error {
|
||||
return func() error {
|
||||
has, err := cli.BucketExists(context.TODO(), Params.MinioCfg.BucketName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !has {
|
||||
err = cli.MakeBucket(context.TODO(), Params.MinioCfg.BucketName, minio.MakeBucketOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) initServiceDiscovery() error {
|
||||
sessions, rev, err := s.session.GetSessions(typeutil.DataNodeRole)
|
||||
if err != nil {
|
||||
|
|
|
@ -31,6 +31,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
"github.com/minio/minio-go/v7"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
|
@ -2868,3 +2869,34 @@ func Test_initServiceDiscovery(t *testing.T) {
|
|||
|
||||
closeTestServer(t, server)
|
||||
}
|
||||
|
||||
func Test_initGarbageCollection(t *testing.T) {
|
||||
server := newTestServer2(t, nil)
|
||||
Params.DataCoordCfg.EnableGarbageCollection = true
|
||||
|
||||
t.Run("err_minio_bad_address", func(t *testing.T) {
|
||||
Params.MinioCfg.Address = "host:9000:bad"
|
||||
err := server.initGarbageCollection()
|
||||
assert.Error(t, err)
|
||||
assert.Contains(t, err.Error(), "failed to create minio client")
|
||||
})
|
||||
|
||||
// mock CheckBucketFn
|
||||
getCheckBucketFnBak := getCheckBucketFn
|
||||
getCheckBucketFn = func(cli *minio.Client) func() error {
|
||||
return func() error { return nil }
|
||||
}
|
||||
defer func() {
|
||||
getCheckBucketFn = getCheckBucketFnBak
|
||||
}()
|
||||
Params.MinioCfg.Address = "minio:9000"
|
||||
t.Run("ok", func(t *testing.T) {
|
||||
err := server.initGarbageCollection()
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
t.Run("iam_ok", func(t *testing.T) {
|
||||
Params.MinioCfg.UseIAM = true
|
||||
err := server.initGarbageCollection()
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -56,8 +56,14 @@ func NewMinioChunkManager(ctx context.Context, opts ...Option) (*MinioChunkManag
|
|||
}
|
||||
|
||||
func newMinioChunkManagerWithConfig(ctx context.Context, c *config) (*MinioChunkManager, error) {
|
||||
var creds *credentials.Credentials
|
||||
if c.useIAM {
|
||||
creds = credentials.NewIAM(c.iamEndpoint)
|
||||
} else {
|
||||
creds = credentials.NewStaticV4(c.accessKeyID, c.secretAccessKeyID, "")
|
||||
}
|
||||
minIOClient, err := minio.New(c.address, &minio.Options{
|
||||
Creds: credentials.NewStaticV4(c.accessKeyID, c.secretAccessKeyID, ""),
|
||||
Creds: creds,
|
||||
Secure: c.useSSL,
|
||||
})
|
||||
// options nil or invalid formatted endpoint, don't need to retry
|
||||
|
|
|
@ -40,6 +40,8 @@ func newMinIOChunkManager(ctx context.Context, bucketName string) (*MinioChunkMa
|
|||
SecretAccessKeyID(secretAccessKey),
|
||||
UseSSL(useSSL),
|
||||
BucketName(bucketName),
|
||||
UseIAM(false),
|
||||
IAMEndpoint(""),
|
||||
CreateBucket(true),
|
||||
)
|
||||
return client, err
|
||||
|
|
|
@ -9,6 +9,8 @@ type config struct {
|
|||
useSSL bool
|
||||
createBucket bool
|
||||
rootPath string
|
||||
useIAM bool
|
||||
iamEndpoint string
|
||||
}
|
||||
|
||||
func newDefaultConfig() *config {
|
||||
|
@ -58,3 +60,15 @@ func RootPath(rootPath string) Option {
|
|||
c.rootPath = rootPath
|
||||
}
|
||||
}
|
||||
|
||||
func UseIAM(useIAM bool) Option {
|
||||
return func(c *config) {
|
||||
c.useIAM = useIAM
|
||||
}
|
||||
}
|
||||
|
||||
func IAMEndpoint(iamEndpoint string) Option {
|
||||
return func(c *config) {
|
||||
c.iamEndpoint = iamEndpoint
|
||||
}
|
||||
}
|
||||
|
|
|
@ -49,6 +49,8 @@ func (f *DefaultFactory) Init(params *paramtable.ComponentParam) {
|
|||
storage.SecretAccessKeyID(params.MinioCfg.SecretAccessKey),
|
||||
storage.UseSSL(params.MinioCfg.UseSSL),
|
||||
storage.BucketName(params.MinioCfg.BucketName),
|
||||
storage.UseIAM(params.MinioCfg.UseIAM),
|
||||
storage.IAMEndpoint(params.MinioCfg.IAMEndpoint),
|
||||
storage.CreateBucket(true))
|
||||
}
|
||||
|
||||
|
|
|
@ -43,6 +43,8 @@ const (
|
|||
DefaultMinioSecretAccessKey = "minioadmin"
|
||||
DefaultMinioUseSSL = "false"
|
||||
DefaultMinioBucketName = "a-bucket"
|
||||
DefaultMinioUseIAM = "false"
|
||||
DefaultMinioIAMEndpoint = ""
|
||||
DefaultEtcdEndpoints = "localhost:2379"
|
||||
DefaultInsertBufferSize = "16777216"
|
||||
DefaultEnvPrefix = "milvus"
|
||||
|
@ -499,6 +501,18 @@ func (gp *BaseTable) loadMinioConfig() {
|
|||
minioBucketName = gp.LoadWithDefault("minio.bucketName", DefaultMinioBucketName)
|
||||
}
|
||||
gp.Save("_MinioBucketName", minioBucketName)
|
||||
|
||||
minioUseIAM := os.Getenv("MINIO_USE_IAM")
|
||||
if minioUseIAM == "" {
|
||||
minioUseIAM = gp.LoadWithDefault("minio.useIAM", DefaultMinioUseIAM)
|
||||
}
|
||||
gp.Save("_MinioUseIAM", minioUseIAM)
|
||||
|
||||
minioIAMEndpoint := os.Getenv("MINIO_IAM_ENDPOINT")
|
||||
if minioIAMEndpoint == "" {
|
||||
minioIAMEndpoint = gp.LoadWithDefault("minio.iamEndpoint", DefaultMinioIAMEndpoint)
|
||||
}
|
||||
gp.Save("_MinioIAMEndpoint", minioIAMEndpoint)
|
||||
}
|
||||
|
||||
func (gp *BaseTable) loadDataNodeConfig() {
|
||||
|
|
|
@ -296,6 +296,8 @@ type MinioConfig struct {
|
|||
UseSSL bool
|
||||
BucketName string
|
||||
RootPath string
|
||||
UseIAM bool
|
||||
IAMEndpoint string
|
||||
}
|
||||
|
||||
func (p *MinioConfig) init(base *BaseTable) {
|
||||
|
@ -307,6 +309,8 @@ func (p *MinioConfig) init(base *BaseTable) {
|
|||
p.initUseSSL()
|
||||
p.initBucketName()
|
||||
p.initRootPath()
|
||||
p.initUseIAM()
|
||||
p.initIAMEndpoint()
|
||||
}
|
||||
|
||||
func (p *MinioConfig) initAddress() {
|
||||
|
@ -356,3 +360,13 @@ func (p *MinioConfig) initRootPath() {
|
|||
}
|
||||
p.RootPath = rootPath
|
||||
}
|
||||
|
||||
func (p *MinioConfig) initUseIAM() {
|
||||
useIAM := p.Base.LoadWithDefault("minio.useIAM", DefaultMinioUseIAM)
|
||||
p.UseIAM, _ = strconv.ParseBool(useIAM)
|
||||
}
|
||||
|
||||
func (p *MinioConfig) initIAMEndpoint() {
|
||||
iamEndpoint := p.Base.LoadWithDefault("minio.iamEndpoint", DefaultMinioIAMEndpoint)
|
||||
p.IAMEndpoint = iamEndpoint
|
||||
}
|
||||
|
|
|
@ -89,6 +89,10 @@ func TestServiceParam(t *testing.T) {
|
|||
|
||||
assert.Equal(t, Params.UseSSL, false)
|
||||
|
||||
assert.Equal(t, Params.UseIAM, false)
|
||||
|
||||
assert.Equal(t, Params.IAMEndpoint, "")
|
||||
|
||||
t.Logf("Minio BucketName = %s", Params.BucketName)
|
||||
|
||||
t.Logf("Minio rootpath = %s", Params.RootPath)
|
||||
|
|
Loading…
Reference in New Issue