Remove stats watcher (#7501)

issue: #7500
Signed-off-by: sunby <bingyi.sun@zilliz.com>
pull/7542/head
sunby 2021-09-07 16:11:07 +08:00 committed by GitHub
parent 668d10d1da
commit 62f4cb5e1b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 0 additions and 371 deletions

View File

@ -1,103 +0,0 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package etcdkv
import (
"context"
"sync"
"time"
"github.com/milvus-io/milvus/internal/log"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)
type EtcdStatsWatcher struct {
mu sync.RWMutex
client *clientv3.Client
helper etcdStatsHelper
size int
startTime time.Time
}
type etcdStatsHelper struct {
eventAfterReceive func()
eventAfterStartWatch func()
}
func defaultHelper() etcdStatsHelper {
return etcdStatsHelper{
eventAfterReceive: func() {},
eventAfterStartWatch: func() {},
}
}
func NewEtcdStatsWatcher(client *clientv3.Client) *EtcdStatsWatcher {
return &EtcdStatsWatcher{
client: client,
helper: defaultHelper(),
}
}
func (w *EtcdStatsWatcher) StartBackgroundLoop(ctx context.Context) {
ch := w.client.Watch(ctx, "", clientv3.WithPrefix(), clientv3.WithCreatedNotify())
w.mu.Lock()
w.startTime = time.Now()
w.mu.Unlock()
w.helper.eventAfterStartWatch()
for {
select {
case <-ctx.Done():
log.Debug("etcd stats watcher shutdown")
return
case e := <-ch:
if e.Err() != nil {
log.Error("etcd stats watcher receive error response", zap.Error(e.Err()))
continue
}
if len(e.Events) == 0 {
continue
}
t := 0
for _, event := range e.Events {
if event.Kv.Version == 1 {
t += len(event.Kv.Key) + len(event.Kv.Value)
}
}
w.mu.Lock()
w.size += t
w.mu.Unlock()
w.helper.eventAfterReceive()
}
}
}
func (w *EtcdStatsWatcher) GetSize() int {
w.mu.RLock()
defer w.mu.RUnlock()
return w.size
}
/*
func (w *EtcdStatsWatcher) GetStartTime() time.Time {
w.mu.RLock()
defer w.mu.RUnlock()
return w.startTime
}
func (w *EtcdStatsWatcher) Reset() {
w.mu.Lock()
defer w.mu.Unlock()
w.size = 0
w.startTime = time.Now()
}
*/

View File

@ -1,88 +0,0 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package etcdkv
import (
"context"
"math/rand"
"strings"
"testing"
"time"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/stretchr/testify/assert"
clientv3 "go.etcd.io/etcd/client/v3"
)
func TestEtcdStatsWatcher(t *testing.T) {
rand.Seed(time.Now().UnixNano())
var p paramtable.BaseTable
p.Init()
endpoints, err := p.Load("_EtcdEndpoints")
assert.Nil(t, err)
etcdEndpoints := strings.Split(endpoints, ",")
cli, err := clientv3.New(clientv3.Config{Endpoints: etcdEndpoints})
assert.Nil(t, err)
defer cli.Close()
w := NewEtcdStatsWatcher(cli)
startCh := make(chan struct{})
receiveCh := make(chan struct{})
w.helper.eventAfterStartWatch = func() {
var e struct{}
startCh <- e
}
w.helper.eventAfterReceive = func() {
var e struct{}
receiveCh <- e
}
go w.StartBackgroundLoop(context.TODO())
<-startCh
key := make([]byte, 1)
rand.Read(key)
_, err = cli.Put(context.TODO(), string(key), string([]byte{65, 65, 65}))
assert.Nil(t, err)
<-receiveCh
size := w.GetSize()
assert.EqualValues(t, 4, size)
}
func TestEtcdStatsWatcherDone(t *testing.T) {
var p paramtable.BaseTable
p.Init()
endpoints, err := p.Load("_EtcdEndpoints")
assert.Nil(t, err)
etcdEndpoints := strings.Split(endpoints, ",")
cli, err := clientv3.New(clientv3.Config{Endpoints: etcdEndpoints})
assert.Nil(t, err)
defer cli.Close()
w := NewEtcdStatsWatcher(cli)
startCh := make(chan struct{})
receiveCh := make(chan struct{})
w.helper.eventAfterStartWatch = func() {
var e struct{}
startCh <- e
}
w.helper.eventAfterReceive = func() {
var e struct{}
receiveCh <- e
}
ctx, cancel := context.WithCancel(context.Background())
go w.StartBackgroundLoop(ctx)
<-startCh
cancel()
}

View File

@ -1,113 +0,0 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package miniokv
import (
"context"
"fmt"
"sync"
"time"
"github.com/milvus-io/milvus/internal/log"
"github.com/minio/minio-go/v7"
"go.uber.org/zap"
)
type MinioStatsWatcher struct {
mu sync.RWMutex
client *minio.Client
objCreateSize int64
startTime time.Time
bucketName string
helper MinioStatsWatcherHelper
}
type MinioStatsWatcherHelper struct {
eventAfterStartWatch func()
eventAfterNotify func()
}
func defaultStatsHelper() MinioStatsWatcherHelper {
return MinioStatsWatcherHelper{
eventAfterStartWatch: func() {},
eventAfterNotify: func() {},
}
}
func NewMinioStatsWatcher(client *minio.Client, bucketName string) *MinioStatsWatcher {
return &MinioStatsWatcher{
client: client,
bucketName: bucketName,
helper: defaultStatsHelper(),
}
}
func NewMinioStatsWatcherWithHelper(client *minio.Client, bucketName string, helper MinioStatsWatcherHelper) *MinioStatsWatcher {
stats := NewMinioStatsWatcher(client, bucketName)
stats.helper = helper
return stats
}
func (s *MinioStatsWatcher) StartBackground(ctx context.Context) {
s.mu.Lock()
s.startTime = time.Now()
s.mu.Unlock()
ch := s.client.ListenBucketNotification(ctx, s.bucketName, "", "", []string{"s3:ObjectCreated:*"})
s.helper.eventAfterStartWatch()
for {
select {
case <-ctx.Done():
log.Debug("minio stats shutdown")
return
case info := <-ch:
if info.Err != nil {
log.Error("minio receive wrong notification", zap.Error(info.Err))
continue
}
var size int64
for _, record := range info.Records {
size += record.S3.Object.Size
}
s.mu.Lock()
s.objCreateSize += size
s.mu.Unlock()
s.helper.eventAfterNotify()
}
}
}
func (s *MinioStatsWatcher) GetObjectCreateSize() int64 {
s.mu.RLock()
defer s.mu.RUnlock()
return s.objCreateSize
}
func (s *MinioStatsWatcher) GetStartTime() time.Time {
s.mu.RLock()
defer s.mu.RUnlock()
return s.startTime
}
func (s *MinioStatsWatcher) String() string {
s.mu.RLock()
defer s.mu.RUnlock()
duration := time.Since(s.startTime).Seconds()
return fmt.Sprintf("object create %d bytes in %f seconds, avg: %f", s.objCreateSize, duration, float64(s.objCreateSize)/duration)
}
func (s *MinioStatsWatcher) Reset() {
s.mu.Lock()
defer s.mu.Unlock()
s.objCreateSize = 0
s.startTime = time.Now()
}

View File

@ -1,67 +0,0 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package miniokv
/*
import (
"context"
"strconv"
"testing"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/stretchr/testify/assert"
)
func TestMinioStats(t *testing.T) {
var p paramtable.BaseTable
p.Init()
endPoint, _ := p.Load("_MinioAddress")
accessKeyID, _ := p.Load("minio.accessKeyID")
secretAccessKey, _ := p.Load("minio.secretAccessKey")
useSSLStr, _ := p.Load("minio.useSSL")
useSSL, _ := strconv.ParseBool(useSSLStr)
option := &Option{
Address: endPoint,
AccessKeyID: accessKeyID,
SecretAccessKeyID: secretAccessKey,
UseSSL: useSSL,
BucketName: "teststats",
CreateBucket: true,
}
cli, err := NewMinIOKV(context.TODO(), option)
assert.Nil(t, err)
startCh := make(chan struct{})
receiveCh := make(chan struct{})
w := NewMinioStatsWatcher(cli.minioClient, "teststats")
w.helper.eventAfterStartWatch = func() {
var e struct{}
startCh <- e
}
w.helper.eventAfterNotify = func() {
var e struct{}
receiveCh <- e
}
go w.StartBackground(context.TODO())
<-startCh
err = cli.Save("a", string([]byte{65, 65, 65, 65, 65}))
assert.Nil(t, err)
<-receiveCh
size := w.GetObjectCreateSize()
assert.EqualValues(t, 5, size)
}
*/