mirror of https://github.com/milvus-io/milvus.git
Prepare http admin API (#19676)
Signed-off-by: Enwei Jiao <enwei.jiao@zilliz.com> Signed-off-by: Enwei Jiao <enwei.jiao@zilliz.com>pull/19747/head
parent
ae373d450f
commit
4df3552f74
|
@ -29,6 +29,8 @@ import (
|
|||
|
||||
"github.com/milvus-io/milvus/api/commonpb"
|
||||
"github.com/milvus-io/milvus/api/milvuspb"
|
||||
"github.com/milvus-io/milvus/internal/management"
|
||||
|
||||
"github.com/milvus-io/milvus/cmd/components"
|
||||
"github.com/milvus-io/milvus/internal/datacoord"
|
||||
"github.com/milvus-io/milvus/internal/datanode"
|
||||
|
@ -515,7 +517,8 @@ func (mr *MilvusRoles) Run(local bool, alias string) {
|
|||
http.HandleFunc(healthz.HealthzRouterPath, multiRoleHealthzHandler)
|
||||
}
|
||||
|
||||
metrics.ServeHTTP(Registry)
|
||||
metrics.Register(Registry)
|
||||
management.ServeHTTP()
|
||||
sc := make(chan os.Signal, 1)
|
||||
signal.Notify(sc,
|
||||
syscall.SIGHUP,
|
||||
|
|
|
@ -31,7 +31,6 @@ package log
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
@ -66,11 +65,6 @@ func init() {
|
|||
r := utils.NewRateLimiter(1.0, 60.0)
|
||||
_globalR.Store(r)
|
||||
|
||||
updateLoglLevel := func(w http.ResponseWriter, req *http.Request) {
|
||||
_globalP.Load().(*ZapProperties).Level.ServeHTTP(w, req)
|
||||
}
|
||||
|
||||
http.HandleFunc("/log/level", updateLoglLevel)
|
||||
}
|
||||
|
||||
// InitLogger initializes a zap logger.
|
||||
|
@ -248,3 +242,7 @@ func Sync() error {
|
|||
})
|
||||
return reterr
|
||||
}
|
||||
|
||||
func Level() zap.AtomicLevel {
|
||||
return _globalP.Load().(*ZapProperties).Level
|
||||
}
|
||||
|
|
|
@ -34,11 +34,7 @@ import (
|
|||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -123,47 +119,6 @@ func TestLevelGetterAndSetter(t *testing.T) {
|
|||
assert.Equal(t, zap.ErrorLevel, GetLevel())
|
||||
}
|
||||
|
||||
func TestUpdateLogLevelThroughHttp(t *testing.T) {
|
||||
httpServer := httptest.NewServer(nil)
|
||||
defer httpServer.Close()
|
||||
|
||||
SetLevel(zap.DebugLevel)
|
||||
assert.Equal(t, zap.DebugLevel, GetLevel())
|
||||
|
||||
// replace global logger, log change will not be affected.
|
||||
conf := &Config{Level: "info", File: FileLogConfig{}, DisableTimestamp: true}
|
||||
logger, p, _ := InitLogger(conf)
|
||||
ReplaceGlobals(logger, p)
|
||||
assert.Equal(t, zap.InfoLevel, GetLevel())
|
||||
|
||||
// change log level through http
|
||||
payload, err := json.Marshal(map[string]interface{}{"level": "error"})
|
||||
if err != nil {
|
||||
Fatal(err.Error())
|
||||
}
|
||||
|
||||
url := httpServer.URL + "/log/level"
|
||||
req, err := http.NewRequest(http.MethodPut, url, bytes.NewBuffer(payload))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
if err != nil {
|
||||
Fatal(err.Error())
|
||||
}
|
||||
|
||||
client := httpServer.Client()
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
Fatal(err.Error())
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
Fatal(err.Error())
|
||||
}
|
||||
assert.Equal(t, "{\"level\":\"error\"}\n", string(body))
|
||||
assert.Equal(t, zap.ErrorLevel, GetLevel())
|
||||
}
|
||||
|
||||
func TestSampling(t *testing.T) {
|
||||
sample, drop := make(chan zapcore.SamplingDecision, 1), make(chan zapcore.SamplingDecision, 1)
|
||||
samplingConf := zap.SamplingConfig{
|
||||
|
|
|
@ -0,0 +1,78 @@
|
|||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package management
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"strconv"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
DefaultListenPort = "9091"
|
||||
ListenPortEnvKey = "METRICS_PORT"
|
||||
)
|
||||
|
||||
type HTTPHandler struct {
|
||||
Path string
|
||||
HandlerFunc http.HandlerFunc
|
||||
Handler http.Handler
|
||||
}
|
||||
|
||||
func registerDefaults() {
|
||||
Register(&HTTPHandler{
|
||||
Path: "/log/level",
|
||||
HandlerFunc: func(w http.ResponseWriter, req *http.Request) {
|
||||
log.Level().ServeHTTP(w, req)
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func Register(h *HTTPHandler) {
|
||||
if h.HandlerFunc != nil {
|
||||
http.HandleFunc(h.Path, h.HandlerFunc)
|
||||
return
|
||||
}
|
||||
if h.Handler != nil {
|
||||
http.Handle(h.Path, h.Handler)
|
||||
}
|
||||
}
|
||||
|
||||
func ServeHTTP() {
|
||||
registerDefaults()
|
||||
go func() {
|
||||
bindAddr := getHTTPAddr()
|
||||
log.Info("management listen", zap.String("addr", bindAddr))
|
||||
if err := http.ListenAndServe(bindAddr, nil); err != nil {
|
||||
log.Error("handle metrics failed", zap.Error(err))
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func getHTTPAddr() string {
|
||||
port := os.Getenv(ListenPortEnvKey)
|
||||
_, err := strconv.Atoi(port)
|
||||
if err != nil {
|
||||
return fmt.Sprintf(":%s", DefaultListenPort)
|
||||
}
|
||||
|
||||
return fmt.Sprintf(":%s", port)
|
||||
}
|
|
@ -0,0 +1,79 @@
|
|||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package management
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func TestGetHTTPAddr(t *testing.T) {
|
||||
assert.Equal(t, getHTTPAddr(), ":"+DefaultListenPort)
|
||||
testPort := "9092"
|
||||
t.Setenv(ListenPortEnvKey, testPort)
|
||||
assert.Equal(t, getHTTPAddr(), ":"+testPort)
|
||||
}
|
||||
|
||||
func TestDefaultLogHandler(t *testing.T) {
|
||||
httpServer := httptest.NewServer(nil)
|
||||
defer httpServer.Close()
|
||||
registerDefaults()
|
||||
|
||||
log.SetLevel(zap.DebugLevel)
|
||||
assert.Equal(t, zap.DebugLevel, log.GetLevel())
|
||||
|
||||
// replace global logger, log change will not be affected.
|
||||
conf := &log.Config{Level: "info", File: log.FileLogConfig{}, DisableTimestamp: true}
|
||||
logger, p, _ := log.InitLogger(conf)
|
||||
log.ReplaceGlobals(logger, p)
|
||||
assert.Equal(t, zap.InfoLevel, log.GetLevel())
|
||||
|
||||
// change log level through http
|
||||
payload, err := json.Marshal(map[string]interface{}{"level": "error"})
|
||||
if err != nil {
|
||||
log.Fatal(err.Error())
|
||||
}
|
||||
|
||||
url := httpServer.URL + "/log/level"
|
||||
req, err := http.NewRequest(http.MethodPut, url, bytes.NewBuffer(payload))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
if err != nil {
|
||||
log.Fatal(err.Error())
|
||||
}
|
||||
|
||||
client := httpServer.Client()
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
log.Fatal(err.Error())
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
log.Fatal(err.Error())
|
||||
}
|
||||
assert.Equal(t, "{\"level\":\"error\"}\n", string(body))
|
||||
assert.Equal(t, zap.ErrorLevel, log.GetLevel())
|
||||
}
|
|
@ -17,24 +17,16 @@
|
|||
package metrics
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"strconv"
|
||||
|
||||
// nolint:gosec
|
||||
_ "net/http/pprof"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/management"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
DefaultListenPort = "9091"
|
||||
ListenPortEnvKey = "METRICS_PORT"
|
||||
|
||||
milvusNamespace = "milvus"
|
||||
|
||||
AbandonLabel = "abandon"
|
||||
|
@ -83,25 +75,14 @@ var (
|
|||
buckets = prometheus.ExponentialBuckets(1, 2, 18)
|
||||
)
|
||||
|
||||
//ServeHTTP serves prometheus http service
|
||||
func ServeHTTP(r *prometheus.Registry) {
|
||||
http.Handle("/metrics", promhttp.HandlerFor(r, promhttp.HandlerOpts{}))
|
||||
http.Handle("/metrics_default", promhttp.Handler())
|
||||
go func() {
|
||||
bindAddr := getMetricsAddr()
|
||||
log.Debug("metrics listen", zap.Any("addr", bindAddr))
|
||||
if err := http.ListenAndServe(bindAddr, nil); err != nil {
|
||||
log.Error("handle metrics failed", zap.Error(err))
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func getMetricsAddr() string {
|
||||
port := os.Getenv(ListenPortEnvKey)
|
||||
_, err := strconv.Atoi(port)
|
||||
if err != nil {
|
||||
return fmt.Sprintf(":%s", DefaultListenPort)
|
||||
}
|
||||
|
||||
return fmt.Sprintf(":%s", port)
|
||||
//Register serves prometheus http service
|
||||
func Register(r *prometheus.Registry) {
|
||||
management.Register(&management.HTTPHandler{
|
||||
Path: "/metrics",
|
||||
Handler: promhttp.HandlerFor(r, promhttp.HandlerOpts{}),
|
||||
})
|
||||
management.Register(&management.HTTPHandler{
|
||||
Path: "/metrics_default",
|
||||
Handler: promhttp.Handler(),
|
||||
})
|
||||
}
|
||||
|
|
|
@ -20,7 +20,6 @@ import (
|
|||
"testing"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestRegisterMetrics(t *testing.T) {
|
||||
|
@ -35,12 +34,5 @@ func TestRegisterMetrics(t *testing.T) {
|
|||
RegisterQueryNode(r)
|
||||
RegisterQueryCoord(r)
|
||||
RegisterEtcdMetrics(r)
|
||||
ServeHTTP(r)
|
||||
}
|
||||
|
||||
func TestGetMetricsAddr(t *testing.T) {
|
||||
assert.Equal(t, getMetricsAddr(), ":"+DefaultListenPort)
|
||||
testPort := "9092"
|
||||
t.Setenv(ListenPortEnvKey, testPort)
|
||||
assert.Equal(t, getMetricsAddr(), ":"+testPort)
|
||||
Register(r)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue