Add cgroups to get container memory and check index memory in segment loader (#10363)

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/10483/head
bigsheeper 2021-10-22 20:49:10 +08:00 committed by GitHub
parent 99ca6e4ba2
commit 5c58924420
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 426 additions and 242 deletions

2
go.mod
View File

@ -9,10 +9,10 @@ require (
github.com/apache/thrift/lib/go/thrift v0.0.0-20210120171102-e27e82c46ba4
github.com/bits-and-blooms/bitset v1.2.0 // indirect
github.com/bits-and-blooms/bloom/v3 v3.0.1
github.com/containerd/cgroups v1.0.2
github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c // indirect
github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 // indirect
github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 // indirect
github.com/frankban/quicktest v1.10.2 // indirect
github.com/go-basic/ipv4 v1.0.0
github.com/golang/protobuf v1.5.2
github.com/google/btree v1.0.1

13
go.sum
View File

@ -105,6 +105,7 @@ github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/cilium/ebpf v0.4.0/go.mod h1:4tRaxcgiL706VnOzHOdBlY8IEAIdxINsQBcU4xJJXRs=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/cockroachdb/datadriven v0.0.0-20200714090401-bf6692d28da5 h1:xD/lrqdvwsc+O2bjSSi3YqY73Ke3LAiSCx49aCesA0E=
github.com/cockroachdb/datadriven v0.0.0-20200714090401-bf6692d28da5/go.mod h1:h6jFvWxBdQXxjopDMZyH2UVceIRfR84bdzbkoKrsWNo=
@ -112,6 +113,8 @@ github.com/cockroachdb/errors v1.2.4 h1:Lap807SXTH5tri2TivECb/4abUkMZC9zRoLarvcK
github.com/cockroachdb/errors v1.2.4/go.mod h1:rQD95gz6FARkaKkQXUksEje/d9a6wBJoCr5oaCLELYA=
github.com/cockroachdb/logtags v0.0.0-20190617123548-eb05cc24525f h1:o/kfcElHqOiXqcou5a3rIlMc7oJbMQkeLk0VQJ7zgqY=
github.com/cockroachdb/logtags v0.0.0-20190617123548-eb05cc24525f/go.mod h1:i/u985jwjWRlyHXQbwatDASoW0RMlZ/3i9yJHE2xLkI=
github.com/containerd/cgroups v1.0.2 h1:mZBclaSgNDfPWtfhj2xJY28LZ9nYIgzB0pwSURPl6JM=
github.com/containerd/cgroups v1.0.2/go.mod h1:qpbpJ1jmlqsR9f2IyaLPsdkCdnt0rbDVqIDlhuu5tRY=
github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM=
@ -121,6 +124,7 @@ github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7
github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI=
github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
@ -135,6 +139,8 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZm
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
github.com/dimfeld/httptreemux v5.0.1+incompatible h1:Qj3gVcDNoOthBAqftuD596rm4wg/adLLz5xh5CmpiCA=
github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0=
github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw=
github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dvsekhvalnov/jose2go v0.0.0-20180829124132-7f401d37b68a h1:mq+R6XEM6lJX5VlLyZIrUSP8tSuJp82xTK89hvBwJbU=
@ -152,6 +158,7 @@ github.com/form3tech-oss/jwt-go v3.2.3+incompatible h1:7ZaBxOI7TMoYBfyA3cQHErNNy
github.com/form3tech-oss/jwt-go v3.2.3+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
github.com/frankban/quicktest v1.10.2 h1:19ARM85nVi4xH7xPXuc5eM/udya5ieh7b/Sv+d844Tk=
github.com/frankban/quicktest v1.10.2/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s=
github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
@ -175,6 +182,7 @@ github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LB
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 h1:ZpnhV/YsD2/4cESfV5+Hoeu/iUR3ruzNvZ+yQfO03a0=
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2/go.mod h1:bBOAhwG1umN6/6ZUMtDFBMQR8jRg9O75tm9K00oMsK4=
github.com/godbus/dbus/v5 v5.0.4 h1:9349emZab16e7zQvpmsbtjc18ykshndd8y2PG3sgJbA=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
@ -392,6 +400,8 @@ github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9k
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE=
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/opencontainers/runtime-spec v1.0.2 h1:UfAcuLBJB9Coz72x1hgl8O5RVzTdNiaglX6v2DM6FI0=
github.com/opencontainers/runtime-spec v1.0.2/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs=
github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc=
@ -455,6 +465,8 @@ github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6Mwd
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
github.com/sirupsen/logrus v1.7.0 h1:ShrD1U9pZB12TX0cVy0DtePoCH97K8EtX+mg7ZARUtM=
github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
@ -509,6 +521,7 @@ github.com/uber/jaeger-client-go v2.25.0+incompatible h1:IxcNZ7WRY1Y3G4poYlx24sz
github.com/uber/jaeger-client-go v2.25.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-lib v2.4.0+incompatible h1:fY7QsGQWiCt8pajv4r7JEvmATdCVaWxXbjwyYwsNaLQ=
github.com/uber/jaeger-lib v2.4.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/yahoo/athenz v1.8.55/go.mod h1:G7LLFUH7Z/r4QAB7FfudfuA7Am/eCzO1GlzBhDL6Kv0=

View File

@ -44,7 +44,7 @@ type indexLoader struct {
rootCoord types.RootCoord
indexCoord types.IndexCoord
kv kv.BaseKV // minio kv
kv kv.DataKV // minio kv
}
func (loader *indexLoader) loadIndex(segment *Segment, fieldID FieldID) error {
@ -146,6 +146,25 @@ func (loader *indexLoader) getIndexBinlog(indexPath []string) ([][]byte, indexPa
return index, indexParams, indexName, nil
}
func (loader *indexLoader) estimateIndexBinlogSize(segment *Segment, fieldID FieldID) (int64, error) {
indexSize := int64(0)
indexPaths := segment.getIndexPaths(fieldID)
for _, p := range indexPaths {
logSize, err := storage.EstimateMemorySize(loader.kv, p)
if err != nil {
return 0, err
}
indexSize += logSize
}
log.Debug("estimate segment index size",
zap.Any("collectionID", segment.collectionID),
zap.Any("segmentID", segment.ID()),
zap.Any("fieldID", fieldID),
zap.Any("indexPaths", indexPaths),
)
return indexSize, nil
}
func (loader *indexLoader) setIndexInfo(collectionID UniqueID, segment *Segment, fieldID UniqueID) error {
if loader.indexCoord == nil || loader.rootCoord == nil {
return errors.New("null index coordinator client or root coordinator client, collectionID = " +

View File

@ -13,21 +13,23 @@ package querynode
import (
"context"
"errors"
"fmt"
"os"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
func getSystemInfoMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest, node *QueryNode) (*milvuspb.GetMetricsResponse, error) {
usedMem, err := getUsedMemory()
if err != nil {
return nil, err
}
totalMem, err := getTotalMemory()
if err != nil {
return nil, err
}
nodeInfos := metricsinfo.QueryNodeInfos{
BaseComponentInfos: metricsinfo.BaseComponentInfos{
Name: metricsinfo.ConstructComponentName(typeutil.QueryNodeRole, Params.QueryNodeID),
@ -35,8 +37,8 @@ func getSystemInfoMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest,
IP: node.session.Address,
CPUCoreCount: metricsinfo.GetCPUCoreCount(false),
CPUCoreUsage: metricsinfo.GetCPUUsage(),
Memory: uint64(getTotalMemory()),
MemoryUsage: uint64(getUsedMemory(node.historical.replica, node.streaming.replica)),
Memory: totalMem,
MemoryUsage: usedMem,
Disk: metricsinfo.GetDiskCount(),
DiskUsage: metricsinfo.GetDiskUsage(),
},
@ -82,57 +84,16 @@ func getSystemInfoMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest,
}, nil
}
func getUsedMemory(historicalReplica, streamingReplica ReplicaInterface) int64 {
historicalSegmentsMemSize := historicalReplica.getSegmentsMemSize()
streamingSegmentsMemSize := streamingReplica.getSegmentsMemSize()
return historicalSegmentsMemSize + streamingSegmentsMemSize
}
func getTotalMemory() int64 {
return Params.CacheSize * 1024 * 1024 * 1024
}
func checkSegmentMemory(segmentLoadInfos []*querypb.SegmentLoadInfo, historicalReplica, streamingReplica ReplicaInterface) error {
usedRAMInMB := getUsedMemory(historicalReplica, streamingReplica) / 1024.0 / 1024.0
totalRAMInMB := getTotalMemory() / 1024.0 / 1024.0
segmentTotalSize := int64(0)
for _, segInfo := range segmentLoadInfos {
collectionID := segInfo.CollectionID
segmentID := segInfo.SegmentID
col, err := historicalReplica.getCollectionByID(collectionID)
if err != nil {
return err
}
sizePerRecord, err := typeutil.EstimateSizePerRecord(col.schema)
if err != nil {
return err
}
segmentSize := int64(sizePerRecord) * segInfo.NumOfRows
segmentTotalSize += segmentSize / 1024.0 / 1024.0
// TODO: get threshold factor from param table
thresholdMemSize := float64(totalRAMInMB) * 0.7
log.Debug("memory stats when load segment",
zap.Any("collectionIDs", collectionID),
zap.Any("segmentID", segmentID),
zap.Any("numOfRows", segInfo.NumOfRows),
zap.Any("totalRAM(MB)", totalRAMInMB),
zap.Any("usedRAM(MB)", usedRAMInMB),
zap.Any("segmentTotalSize(MB)", segmentTotalSize),
zap.Any("thresholdMemSize(MB)", thresholdMemSize),
)
if usedRAMInMB+segmentTotalSize > int64(thresholdMemSize) {
return errors.New(fmt.Sprintln("load segment failed, OOM if load, "+
"collectionID = ", collectionID, ", ",
"usedRAM(MB) = ", usedRAMInMB, ", ",
"segmentTotalSize(MB) = ", segmentTotalSize, ", ",
"thresholdMemSize(MB) = ", thresholdMemSize))
}
func getUsedMemory() (uint64, error) {
if Params.InContainer {
return metricsinfo.GetContainerMemUsed()
}
return nil
return metricsinfo.GetUsedMemoryCount(), nil
}
func getTotalMemory() (uint64, error) {
if Params.InContainer {
return metricsinfo.GetContainerMemLimit()
}
return metricsinfo.GetMemoryCount(), nil
}

View File

@ -19,9 +19,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
func TestGetSystemInfoMetrics(t *testing.T) {
@ -40,67 +38,3 @@ func TestGetSystemInfoMetrics(t *testing.T) {
assert.NoError(t, err)
resp.Status.ErrorCode = commonpb.ErrorCode_Success
}
func TestCheckSegmentMemory(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
genSegmentLoadInfo := func() *querypb.SegmentLoadInfo {
return &querypb.SegmentLoadInfo{
SegmentID: defaultSegmentID,
PartitionID: defaultPartitionID,
CollectionID: defaultCollectionID,
NumOfRows: 1,
}
}
t.Run("valid test", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
err = checkSegmentMemory([]*querypb.SegmentLoadInfo{genSegmentLoadInfo()}, node.historical.replica, node.streaming.replica)
assert.NoError(t, err)
})
t.Run("test no collection", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
node.historical.replica.freeAll()
err = checkSegmentMemory([]*querypb.SegmentLoadInfo{genSegmentLoadInfo()}, node.historical.replica, node.streaming.replica)
assert.Error(t, err)
})
t.Run("test OOM", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
totalRAM := Params.CacheSize * 1024 * 1024 * 1024
col, err := node.historical.replica.getCollectionByID(defaultCollectionID)
assert.NoError(t, err)
sizePerRecord, err := typeutil.EstimateSizePerRecord(col.schema)
assert.NoError(t, err)
info := genSegmentLoadInfo()
info.NumOfRows = totalRAM / int64(sizePerRecord)
err = checkSegmentMemory([]*querypb.SegmentLoadInfo{info}, node.historical.replica, node.streaming.replica)
assert.Error(t, err)
})
t.Run("test EstimateSizePerRecord failed", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
col, err := node.historical.replica.getCollectionByID(defaultCollectionID)
assert.NoError(t, err)
for _, param := range col.schema.Fields[0].TypeParams {
if param.Key == "dim" {
param.Value = "%&^%&"
}
}
info := genSegmentLoadInfo()
err = checkSegmentMemory([]*querypb.SegmentLoadInfo{info}, node.historical.replica, node.streaming.replica)
assert.Error(t, err)
})
}

View File

@ -21,6 +21,7 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/paramtable"
)
@ -37,7 +38,9 @@ type ParamTable struct {
QueryNodeIP string
QueryNodePort int64
QueryNodeID UniqueID
CacheSize int64
// TODO: remove cacheSize
CacheSize int64 // deprecated
InContainer bool
// channel prefix
ClusterChannelPrefix string
@ -110,6 +113,7 @@ func (p *ParamTable) Init() {
}
p.initCacheSize()
p.initInContainer()
p.initMinioEndPoint()
p.initMinioAccessKeyID()
@ -166,6 +170,15 @@ func (p *ParamTable) initCacheSize() {
p.CacheSize = value
}
func (p *ParamTable) initInContainer() {
var err error
p.InContainer, err = metricsinfo.InContainer()
if err != nil {
panic(err)
}
log.Debug("init InContainer", zap.Any("is query node running inside a container? :", p.InContainer))
}
// ---------------------------------------------------------- minio
func (p *ParamTable) initMinioEndPoint() {
url, err := p.Load("_MinioAddress")

View File

@ -42,7 +42,7 @@ type segmentLoader struct {
dataCoord types.DataCoord
minioKV kv.BaseKV // minio minioKV
minioKV kv.DataKV // minio minioKV
etcdKV *etcdkv.EtcdKV
indexLoader *indexLoader
@ -54,24 +54,18 @@ func (loader *segmentLoader) loadSegment(req *querypb.LoadSegmentsRequest) error
return nil
}
newSegments := make([]*Segment, 0)
newSegments := make(map[UniqueID]*Segment)
segmentGC := func() {
for _, s := range newSegments {
deleteSegment(s)
}
}
setSegments := func() error {
for _, s := range newSegments {
err := loader.historicalReplica.setSegment(s)
if err != nil {
segmentGC()
return err
}
}
return nil
}
// start to load
segmentFieldBinLogs := make(map[UniqueID][]*datapb.FieldBinlog)
segmentIndexedFieldIDs := make(map[UniqueID][]FieldID)
segmentSizes := make(map[UniqueID]int64)
// prepare and estimate segments size
for _, info := range req.Infos {
segmentID := info.SegmentID
partitionID := info.PartitionID
@ -79,62 +73,72 @@ func (loader *segmentLoader) loadSegment(req *querypb.LoadSegmentsRequest) error
collection, err := loader.historicalReplica.getCollectionByID(collectionID)
if err != nil {
log.Warn(err.Error())
segmentGC()
return err
}
segment := newSegment(collection, segmentID, partitionID, collectionID, "", segmentTypeSealed, true)
err = loader.loadSegmentInternal(collectionID, segment, info)
newSegments[segmentID] = segment
fieldBinlog, indexedFieldID, err := loader.getFieldAndIndexInfo(segment, info)
if err != nil {
deleteSegment(segment)
log.Warn(err.Error())
segmentGC()
return err
}
newSegments = append(newSegments, segment)
segmentSize, err := loader.estimateSegmentSize(segment, fieldBinlog, indexedFieldID)
if err != nil {
segmentGC()
return err
}
segmentFieldBinLogs[segmentID] = fieldBinlog
segmentIndexedFieldIDs[segmentID] = indexedFieldID
segmentSizes[segmentID] = segmentSize
}
return setSegments()
// check memory limit
err := loader.checkSegmentSize(req.Infos[0].CollectionID, segmentSizes)
if err != nil {
segmentGC()
return err
}
// start to load
for _, info := range req.Infos {
segmentID := info.SegmentID
if newSegments[segmentID] == nil || segmentFieldBinLogs[segmentID] == nil || segmentIndexedFieldIDs[segmentID] == nil {
segmentGC()
return errors.New(fmt.Sprintln("unexpected error, cannot find load infos, this error should not happen, collectionID = ", req.Infos[0].CollectionID))
}
err = loader.loadSegmentInternal(newSegments[segmentID],
segmentFieldBinLogs[segmentID],
segmentIndexedFieldIDs[segmentID],
info)
if err != nil {
segmentGC()
return err
}
}
// set segments
for _, s := range newSegments {
err := loader.historicalReplica.setSegment(s)
if err != nil {
segmentGC()
return err
}
}
return nil
}
func (loader *segmentLoader) loadSegmentInternal(collectionID UniqueID, segment *Segment, segmentLoadInfo *querypb.SegmentLoadInfo) error {
vectorFieldIDs, err := loader.historicalReplica.getVecFieldIDsByCollectionID(collectionID)
if err != nil {
return err
}
if len(vectorFieldIDs) <= 0 {
return fmt.Errorf("no vector field in collection %d", collectionID)
}
// add VectorFieldInfo for vector fields
for _, fieldBinlog := range segmentLoadInfo.BinlogPaths {
if funcutil.SliceContain(vectorFieldIDs, fieldBinlog.FieldID) {
vectorFieldInfo := newVectorFieldInfo(fieldBinlog)
segment.setVectorFieldInfo(fieldBinlog.FieldID, vectorFieldInfo)
}
}
indexedFieldIDs := make([]FieldID, 0)
for _, vecFieldID := range vectorFieldIDs {
err = loader.indexLoader.setIndexInfo(collectionID, segment, vecFieldID)
if err != nil {
log.Warn(err.Error())
continue
}
indexedFieldIDs = append(indexedFieldIDs, vecFieldID)
}
// we don't need to load raw data for indexed vector field
fieldBinlogs := loader.filterFieldBinlogs(segmentLoadInfo.BinlogPaths, indexedFieldIDs)
func (loader *segmentLoader) loadSegmentInternal(segment *Segment,
fieldBinLogs []*datapb.FieldBinlog,
indexFieldIDs []FieldID,
segmentLoadInfo *querypb.SegmentLoadInfo) error {
log.Debug("loading insert...")
err = loader.loadSegmentFieldsData(segment, fieldBinlogs)
err := loader.loadSegmentFieldsData(segment, fieldBinLogs)
if err != nil {
return err
}
pkIDField, err := loader.historicalReplica.getPKFieldIDByCollectionID(collectionID)
pkIDField, err := loader.historicalReplica.getPKFieldIDByCollectionID(segment.collectionID)
if err != nil {
return err
}
@ -155,7 +159,7 @@ func (loader *segmentLoader) loadSegmentInternal(collectionID UniqueID, segment
return err
}
for _, id := range indexedFieldIDs {
for _, id := range indexFieldIDs {
log.Debug("loading index...")
err = loader.indexLoader.loadIndex(segment, id)
if err != nil {
@ -166,26 +170,6 @@ func (loader *segmentLoader) loadSegmentInternal(collectionID UniqueID, segment
return nil
}
//func (loader *segmentLoader) GetSegmentStates(segmentID UniqueID) (*datapb.GetSegmentStatesResponse, error) {
// ctx := context.TODO()
// if loader.dataCoord == nil {
// return nil, errors.New("null data service client")
// }
//
// segmentStatesRequest := &datapb.GetSegmentStatesRequest{
// SegmentIDs: []int64{segmentID},
// }
// statesResponse, err := loader.dataCoord.GetSegmentStates(ctx, segmentStatesRequest)
// if err != nil || statesResponse.Status.ErrorCode != commonpb.ErrorCode_Success {
// return nil, err
// }
// if len(statesResponse.States) != 1 {
// return nil, errors.New("segment states' len should be 1")
// }
//
// return statesResponse, nil
//}
func (loader *segmentLoader) filterPKStatsBinlogs(fieldBinlogs []*datapb.FieldBinlog, pkFieldID int64) []string {
result := make([]string, 0)
for _, fieldBinlog := range fieldBinlogs {
@ -371,6 +355,112 @@ func JoinIDPath(ids ...UniqueID) string {
return path.Join(idStr...)
}
func (loader *segmentLoader) getFieldAndIndexInfo(segment *Segment,
segmentLoadInfo *querypb.SegmentLoadInfo) ([]*datapb.FieldBinlog, []FieldID, error) {
collectionID := segment.collectionID
vectorFieldIDs, err := loader.historicalReplica.getVecFieldIDsByCollectionID(collectionID)
if err != nil {
return nil, nil, err
}
if len(vectorFieldIDs) <= 0 {
return nil, nil, fmt.Errorf("no vector field in collection %d", collectionID)
}
// add VectorFieldInfo for vector fields
for _, fieldBinlog := range segmentLoadInfo.BinlogPaths {
if funcutil.SliceContain(vectorFieldIDs, fieldBinlog.FieldID) {
vectorFieldInfo := newVectorFieldInfo(fieldBinlog)
segment.setVectorFieldInfo(fieldBinlog.FieldID, vectorFieldInfo)
}
}
indexedFieldIDs := make([]FieldID, 0)
for _, vecFieldID := range vectorFieldIDs {
err = loader.indexLoader.setIndexInfo(collectionID, segment, vecFieldID)
if err != nil {
log.Warn(err.Error())
continue
}
indexedFieldIDs = append(indexedFieldIDs, vecFieldID)
}
// we don't need to load raw data for indexed vector field
fieldBinlogs := loader.filterFieldBinlogs(segmentLoadInfo.BinlogPaths, indexedFieldIDs)
return fieldBinlogs, indexedFieldIDs, nil
}
func (loader *segmentLoader) estimateSegmentSize(segment *Segment,
fieldBinLogs []*datapb.FieldBinlog,
indexFieldIDs []FieldID) (int64, error) {
segmentSize := int64(0)
// get fields data size, if len(indexFieldIDs) == 0, vector field would be involved in fieldBinLogs
for _, fb := range fieldBinLogs {
log.Debug("estimate segment fields size",
zap.Any("collectionID", segment.collectionID),
zap.Any("segmentID", segment.ID()),
zap.Any("fieldID", fb.FieldID),
zap.Any("paths", fb.Binlogs),
)
for _, binlogPath := range fb.Binlogs {
logSize, err := storage.EstimateMemorySize(loader.minioKV, binlogPath)
if err != nil {
return 0, err
}
segmentSize += logSize
}
}
// get index size
for _, fieldID := range indexFieldIDs {
indexSize, err := loader.indexLoader.estimateIndexBinlogSize(segment, fieldID)
if err != nil {
return 0, err
}
segmentSize += indexSize
}
return segmentSize, nil
}
func (loader *segmentLoader) checkSegmentSize(collectionID UniqueID, segmentSizes map[UniqueID]int64) error {
const thresholdFactor = 0.9
usedMem, err := getUsedMemory()
if err != nil {
return err
}
totalMem, err := getTotalMemory()
if err != nil {
return err
}
segmentTotalSize := int64(0)
for _, size := range segmentSizes {
segmentTotalSize += size
}
for segmentID, size := range segmentSizes {
log.Debug("memory stats when load segment",
zap.Any("collectionIDs", collectionID),
zap.Any("segmentID", segmentID),
zap.Any("totalMem", totalMem),
zap.Any("usedMem", usedMem),
zap.Any("segmentTotalSize", segmentTotalSize),
zap.Any("currentSegmentSize", size),
zap.Any("thresholdFactor", thresholdFactor),
)
if int64(usedMem)+segmentTotalSize+size > int64(float64(totalMem)*thresholdFactor) {
return errors.New(fmt.Sprintln("load segment failed, OOM if load, "+
"collectionID = ", collectionID, ", ",
"usedMem = ", usedMem, ", ",
"segmentTotalSize = ", segmentTotalSize, ", ",
"currentSegmentSize = ", size, ", ",
"totalMem = ", totalMem, ", ",
"thresholdFactor = ", thresholdFactor,
))
}
}
return nil
}
func newSegmentLoader(ctx context.Context, rootCoord types.RootCoord, indexCoord types.IndexCoord, replica ReplicaInterface, etcdKV *etcdkv.EtcdKV) *segmentLoader {
option := &minioKV.Option{
Address: Params.MinioEndPoint,

View File

@ -19,6 +19,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
)
@ -210,40 +211,40 @@ func TestSegmentLoader_invalid(t *testing.T) {
assert.Error(t, err)
})
t.Run("test no collection 2", func(t *testing.T) {
historical, err := genSimpleHistorical(ctx)
assert.NoError(t, err)
err = historical.replica.removeCollection(defaultCollectionID)
assert.NoError(t, err)
err = historical.loader.loadSegmentInternal(defaultCollectionID, nil, nil)
assert.Error(t, err)
})
t.Run("test no vec field", func(t *testing.T) {
historical, err := genSimpleHistorical(ctx)
assert.NoError(t, err)
err = historical.replica.removeCollection(defaultCollectionID)
assert.NoError(t, err)
schema := &schemapb.CollectionSchema{
Name: defaultCollectionName,
AutoID: true,
Fields: []*schemapb.FieldSchema{
genConstantField(constFieldParam{
id: FieldID(100),
dataType: schemapb.DataType_Int8,
}),
},
}
err = historical.loader.historicalReplica.addCollection(defaultCollectionID, schema)
assert.NoError(t, err)
err = historical.loader.loadSegmentInternal(defaultCollectionID, nil, nil)
assert.Error(t, err)
})
//t.Run("test no collection 2", func(t *testing.T) {
// historical, err := genSimpleHistorical(ctx)
// assert.NoError(t, err)
//
// err = historical.replica.removeCollection(defaultCollectionID)
// assert.NoError(t, err)
//
// err = historical.loader.loadSegmentInternal(defaultCollectionID, nil, nil)
// assert.Error(t, err)
//})
//
//t.Run("test no vec field", func(t *testing.T) {
// historical, err := genSimpleHistorical(ctx)
// assert.NoError(t, err)
//
// err = historical.replica.removeCollection(defaultCollectionID)
// assert.NoError(t, err)
//
// schema := &schemapb.CollectionSchema{
// Name: defaultCollectionName,
// AutoID: true,
// Fields: []*schemapb.FieldSchema{
// genConstantField(constFieldParam{
// id: FieldID(100),
// dataType: schemapb.DataType_Int8,
// }),
// },
// }
// err = historical.loader.historicalReplica.addCollection(defaultCollectionID, schema)
// assert.NoError(t, err)
//
// err = historical.loader.loadSegmentInternal(defaultCollectionID, nil, nil)
// assert.Error(t, err)
//})
t.Run("test no vec field 2", func(t *testing.T) {
historical, err := genSimpleHistorical(ctx)
@ -285,3 +286,64 @@ func TestSegmentLoader_invalid(t *testing.T) {
assert.Error(t, err)
})
}
func TestSegmentLoader_checkSegmentSize(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
historical, err := genSimpleHistorical(ctx)
assert.NoError(t, err)
err = historical.loader.checkSegmentSize(defaultSegmentID, map[UniqueID]int64{defaultSegmentID: 1024})
assert.NoError(t, err)
//totalMem, err := getTotalMemory()
//assert.NoError(t, err)
//err = historical.loader.checkSegmentSize(defaultSegmentID, map[UniqueID]int64{defaultSegmentID: int64(totalMem * 2)})
//assert.Error(t, err)
}
func TestSegmentLoader_estimateSegmentSize(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
historical, err := genSimpleHistorical(ctx)
assert.NoError(t, err)
seg, err := historical.replica.getSegmentByID(defaultSegmentID)
assert.NoError(t, err)
binlog := []*datapb.FieldBinlog{
{
FieldID: simpleConstField.id,
Binlogs: []string{"^&^%*&%&&(*^*&"},
},
}
_, err = historical.loader.estimateSegmentSize(seg, binlog, nil)
assert.Error(t, err)
binlog, err = saveSimpleBinLog(ctx)
assert.NoError(t, err)
_, err = historical.loader.estimateSegmentSize(seg, binlog, nil)
assert.NoError(t, err)
indexPath, err := generateIndex(defaultSegmentID)
assert.NoError(t, err)
err = seg.setIndexInfo(simpleVecField.id, &indexInfo{})
assert.NoError(t, err)
err = seg.setIndexPaths(simpleVecField.id, indexPath)
assert.NoError(t, err)
_, err = historical.loader.estimateSegmentSize(seg, binlog, []FieldID{simpleVecField.id})
assert.NoError(t, err)
err = seg.setIndexPaths(simpleVecField.id, []string{"&*^*(^*(&*%^&*^(&"})
assert.NoError(t, err)
_, err = historical.loader.estimateSegmentSize(seg, binlog, []FieldID{simpleVecField.id})
assert.Error(t, err)
}

View File

@ -376,12 +376,6 @@ func (l *loadSegmentsTask) Execute(ctx context.Context) error {
}
}
err = checkSegmentMemory(l.req.Infos, l.node.historical.replica, l.node.streaming.replica)
if err != nil {
log.Warn(err.Error())
return err
}
err = l.node.historical.loader.loadSegment(l.req)
if err != nil {
log.Warn(err.Error())

View File

@ -0,0 +1,61 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package metricsinfo
import (
"errors"
"strings"
"github.com/containerd/cgroups"
)
// IfServiceInContainer checks if the service is running inside a container
func InContainer() (bool, error) {
paths, err := cgroups.ParseCgroupFile("/proc/1/cgroup")
if err != nil {
return false, err
}
devicePath := strings.TrimPrefix(paths[string(cgroups.Devices)], "/")
return devicePath != "", nil
}
// GetContainerMemLimit returns memory limit and error
func GetContainerMemLimit() (uint64, error) {
control, err := cgroups.Load(cgroups.V1, cgroups.RootPath)
if err != nil {
return 0, err
}
stats, err := control.Stat(cgroups.IgnoreNotExist)
if err != nil {
return 0, err
}
if stats.Memory == nil || stats.Memory.Usage == nil {
return 0, errors.New("cannot find memory usage info from cGroups")
}
return stats.Memory.Usage.Limit, nil
}
// GetContainerMemUsed returns memory usage and error
func GetContainerMemUsed() (uint64, error) {
control, err := cgroups.Load(cgroups.V1, cgroups.RootPath)
if err != nil {
return 0, err
}
stats, err := control.Stat(cgroups.IgnoreNotExist)
if err != nil {
return 0, err
}
if stats.Memory == nil || stats.Memory.Usage == nil {
return 0, errors.New("cannot find memory usage info from cGroups")
}
return stats.Memory.Usage.Usage, nil
}

View File

@ -0,0 +1,37 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package metricsinfo
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestInContainer(t *testing.T) {
_, err := InContainer()
assert.NoError(t, err)
}
func TestGetContainerMemLimit(t *testing.T) {
limit, err := GetContainerMemLimit()
assert.NoError(t, err)
assert.True(t, limit > 0)
t.Log("limit memory:", limit)
}
func TestGetContainerMemUsed(t *testing.T) {
used, err := GetContainerMemUsed()
assert.NoError(t, err)
assert.True(t, used > 0)
t.Log("used memory:", used)
}