mirror of https://github.com/milvus-io/milvus.git
enhance: Optimize save colelction target latency (#38345)
issue: #38237 this PR only use better compression level for proto msg which is larger than 1MB, and use a lighter compression level for smaller proto msg, which could get a better latency in most case. this PR could reduce the latency from 22.7s to 4.7s with 10000 collctions and each collections has 1000 segments. before this PR: BenchmarkTargetManager-8 1 22781536357 ns/op 566407275088 B/op 11188282 allocs/op after this PR: BenchmarkTargetManager-8 1 4729566944 ns/op 36713248864 B/op 10963615 allocs/op Signed-off-by: Wei Liu <wei.liu@zilliz.com>pull/38362/head
parent
7ea9c983d2
commit
950203aba0
|
@ -279,8 +279,14 @@ func (s Catalog) SaveCollectionTargets(ctx context.Context, targets ...*querypb.
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// only compress data when size is larger than 1MB
|
||||
compressLevel := zstd.SpeedFastest
|
||||
if len(v) > 1024*1024 {
|
||||
compressLevel = zstd.SpeedBetterCompression
|
||||
}
|
||||
var compressed bytes.Buffer
|
||||
compressor.ZstdCompress(bytes.NewReader(v), io.Writer(&compressed), zstd.WithEncoderLevel(zstd.SpeedBetterCompression))
|
||||
compressor.ZstdCompress(bytes.NewReader(v), io.Writer(&compressed), zstd.WithEncoderLevel(compressLevel))
|
||||
kvs[k] = compressed.String()
|
||||
}
|
||||
|
||||
|
|
|
@ -675,6 +675,53 @@ func (suite *TargetManagerSuite) TestGetTargetJSON() {
|
|||
assert.Len(suite.T(), currentTarget[0].Segments, 2)
|
||||
}
|
||||
|
||||
func BenchmarkTargetManager(b *testing.B) {
|
||||
paramtable.Init()
|
||||
config := GenerateEtcdConfig()
|
||||
cli, _ := etcd.GetEtcdClient(
|
||||
config.UseEmbedEtcd.GetAsBool(),
|
||||
config.EtcdUseSSL.GetAsBool(),
|
||||
config.Endpoints.GetAsStrings(),
|
||||
config.EtcdTLSCert.GetValue(),
|
||||
config.EtcdTLSKey.GetValue(),
|
||||
config.EtcdTLSCACert.GetValue(),
|
||||
config.EtcdTLSMinVersion.GetValue())
|
||||
|
||||
kv := etcdkv.NewEtcdKV(cli, config.MetaRootPath.GetValue())
|
||||
|
||||
catalog := querycoord.NewCatalog(kv)
|
||||
idAllocator := RandomIncrementIDAllocator()
|
||||
meta := NewMeta(idAllocator, catalog, session.NewNodeManager())
|
||||
mgr := NewTargetManager(nil, meta)
|
||||
|
||||
segmentNum := 1000
|
||||
segments := make(map[int64]*datapb.SegmentInfo)
|
||||
for i := 0; i < segmentNum; i++ {
|
||||
segments[int64(i)] = &datapb.SegmentInfo{
|
||||
ID: int64(i),
|
||||
InsertChannel: "channel-1",
|
||||
}
|
||||
}
|
||||
|
||||
channels := map[string]*DmChannel{
|
||||
"channel-1": {
|
||||
VchannelInfo: &datapb.VchannelInfo{
|
||||
CollectionID: int64(1),
|
||||
ChannelName: "channel-1",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
collectionNum := 10000
|
||||
for i := 0; i < collectionNum; i++ {
|
||||
mgr.current.collectionTargetMap[int64(i)] = NewCollectionTarget(segments, channels, nil)
|
||||
}
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
mgr.SaveCurrentTarget(context.TODO(), catalog)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTargetManager(t *testing.T) {
|
||||
suite.Run(t, new(TargetManagerSuite))
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue