From 63b21321d614013ac912d17185639a4fba041df6 Mon Sep 17 00:00:00 2001 From: "zhenshan.cao" Date: Fri, 15 Jan 2021 13:50:27 +0800 Subject: [PATCH] Refactor indexservice and update doc Signed-off-by: zhenshan.cao --- cmd/indexbuilder/indexbuilder.go | 6 +- cmd/proxy/proxy.go | 8 +- cmd/singlenode/main.go | 14 +- .../advanced/{proxy.yaml => proxy_node.yaml} | 4 +- configs/milvus.yaml | 2 +- docs/developer_guides/chap03_index_service.md | 2 +- docs/developer_guides/chap05_proxy.md | 26 +- docs/developer_guides/chap06_master.md | 38 +- docs/developer_guides/chap07_query_service.md | 30 +- docs/developer_guides/chap09_data_service.md | 26 +- internal/distributed/indexnode/client.go | 42 +++ internal/distributed/indexnode/service.go | 19 + internal/distributed/indexservice/client.go | 54 +++ internal/distributed/indexservice/service.go | 62 ++++ .../client/client.go | 5 +- .../grpc_service.go | 2 +- internal/{indexbuilder => indexnode}/index.go | 2 +- .../{indexbuilder => indexnode}/index_test.go | 125 +++---- .../indexnode.go} | 44 ++- .../indexnode_test.go} | 16 +- internal/indexnode/interface.go | 13 + .../{indexbuilder => indexnode}/meta_table.go | 2 +- .../{indexbuilder => indexnode}/paramtable.go | 14 +- .../paramtable_test.go | 14 +- .../query_result.go | 5 +- internal/{indexbuilder => indexnode}/retry.go | 2 +- internal/{indexbuilder => indexnode}/task.go | 7 +- .../task_scheduler.go | 2 +- internal/indexservice/indexservice.go | 58 ++++ internal/indexservice/interface.go | 16 + internal/master/client.go | 2 +- internal/master/master.go | 4 +- internal/proto/index_service.proto | 27 +- internal/proto/indexpb/index_service.pb.go | 326 +++++++++++++++--- internal/proto/internal.proto | 4 +- internal/proto/internalpb2/internal.pb.go | 166 ++++----- internal/{proxy => proxynode}/condition.go | 2 +- internal/{proxy => proxynode}/grpc_service.go | 2 +- internal/{proxy => proxynode}/meta_cache.go | 2 +- internal/{proxy => proxynode}/paramtable.go | 32 +- .../{proxy => proxynode}/paramtable_test.go | 2 +- internal/{proxy => proxynode}/proxy.go | 6 +- internal/{proxy => proxynode}/proxy_test.go | 8 +- internal/{proxy => proxynode}/repack_func.go | 2 +- internal/{proxy => proxynode}/task.go | 2 +- .../{proxy => proxynode}/task_scheduler.go | 4 +- internal/{proxy => proxynode}/timetick.go | 6 +- .../{proxy => proxynode}/timetick_test.go | 2 +- internal/{proxy => proxynode}/util.go | 2 +- .../{proxy => proxynode}/validate_util.go | 2 +- .../validate_util_test.go | 2 +- internal/querynode/load_index_service_test.go | 6 +- internal/util/typeutil/service.go | 14 + scripts/run_go_unittest.sh | 3 +- 54 files changed, 925 insertions(+), 363 deletions(-) rename configs/advanced/{proxy.yaml => proxy_node.yaml} (96%) create mode 100644 internal/distributed/indexnode/client.go create mode 100644 internal/distributed/indexnode/service.go create mode 100644 internal/distributed/indexservice/client.go create mode 100644 internal/distributed/indexservice/service.go rename internal/{indexbuilder => indexnode}/client/client.go (99%) rename internal/{indexbuilder => indexnode}/grpc_service.go (99%) rename internal/{indexbuilder => indexnode}/index.go (99%) rename internal/{indexbuilder => indexnode}/index_test.go (76%) rename internal/{indexbuilder/indexbuilder.go => indexnode/indexnode.go} (79%) rename internal/{indexbuilder/indexbuilder_test.go => indexnode/indexnode_test.go} (87%) create mode 100644 internal/indexnode/interface.go rename internal/{indexbuilder => indexnode}/meta_table.go (99%) rename internal/{indexbuilder => indexnode}/paramtable.go (91%) rename internal/{indexbuilder => indexnode}/paramtable_test.go (78%) rename internal/{indexbuilder => indexnode}/query_result.go (98%) rename internal/{indexbuilder => indexnode}/retry.go (97%) rename internal/{indexbuilder => indexnode}/task.go (97%) rename internal/{indexbuilder => indexnode}/task_scheduler.go (99%) create mode 100644 internal/indexservice/indexservice.go create mode 100644 internal/indexservice/interface.go rename internal/{proxy => proxynode}/condition.go (97%) rename internal/{proxy => proxynode}/grpc_service.go (99%) rename internal/{proxy => proxynode}/meta_cache.go (99%) rename internal/{proxy => proxynode}/paramtable.go (89%) rename internal/{proxy => proxynode}/paramtable_test.go (98%) rename internal/{proxy => proxynode}/proxy.go (98%) rename internal/{proxy => proxynode}/proxy_test.go (98%) rename internal/{proxy => proxynode}/repack_func.go (99%) rename internal/{proxy => proxynode}/task.go (99%) rename internal/{proxy => proxynode}/task_scheduler.go (99%) rename internal/{proxy => proxynode}/timetick.go (95%) rename internal/{proxy => proxynode}/timetick_test.go (97%) rename internal/{proxy => proxynode}/util.go (97%) rename internal/{proxy => proxynode}/validate_util.go (99%) rename internal/{proxy => proxynode}/validate_util_test.go (99%) create mode 100644 internal/util/typeutil/service.go diff --git a/cmd/indexbuilder/indexbuilder.go b/cmd/indexbuilder/indexbuilder.go index b66d014156..c9742e9b34 100644 --- a/cmd/indexbuilder/indexbuilder.go +++ b/cmd/indexbuilder/indexbuilder.go @@ -7,14 +7,14 @@ import ( "os/signal" "syscall" - "github.com/zilliztech/milvus-distributed/internal/indexbuilder" + "github.com/zilliztech/milvus-distributed/internal/indexnode" "go.uber.org/zap" ) func main() { - indexbuilder.Init() + indexnode.Init() ctx, cancel := context.WithCancel(context.Background()) - svr, err := indexbuilder.CreateBuilder(ctx) + svr, err := indexnode.CreateBuilder(ctx) if err != nil { log.Print("create server failed", zap.Error(err)) } diff --git a/cmd/proxy/proxy.go b/cmd/proxy/proxy.go index 4273e5efee..9e28960e8c 100644 --- a/cmd/proxy/proxy.go +++ b/cmd/proxy/proxy.go @@ -8,15 +8,15 @@ import ( "os/signal" "syscall" - "github.com/zilliztech/milvus-distributed/internal/proxy" + "github.com/zilliztech/milvus-distributed/internal/proxynode" "go.uber.org/zap" ) func main() { - proxy.Init() - fmt.Println("ProxyID is", proxy.Params.ProxyID()) + proxynode.Init() + fmt.Println("ProxyID is", proxynode.Params.ProxyID()) ctx, cancel := context.WithCancel(context.Background()) - svr, err := proxy.CreateProxy(ctx) + svr, err := proxynode.CreateProxy(ctx) if err != nil { log.Print("create server failed", zap.Error(err)) } diff --git a/cmd/singlenode/main.go b/cmd/singlenode/main.go index 99a0d3333b..c6504686dd 100644 --- a/cmd/singlenode/main.go +++ b/cmd/singlenode/main.go @@ -14,9 +14,9 @@ import ( "go.uber.org/zap" - "github.com/zilliztech/milvus-distributed/internal/indexbuilder" + "github.com/zilliztech/milvus-distributed/internal/indexnode" "github.com/zilliztech/milvus-distributed/internal/master" - "github.com/zilliztech/milvus-distributed/internal/proxy" + "github.com/zilliztech/milvus-distributed/internal/proxynode" "github.com/zilliztech/milvus-distributed/internal/querynode" "github.com/zilliztech/milvus-distributed/internal/writenode" ) @@ -62,10 +62,10 @@ func InitMaster(cpuprofile *string, wg *sync.WaitGroup) { func InitProxy(wg *sync.WaitGroup) { defer wg.Done() - proxy.Init() - fmt.Println("ProxyID is", proxy.Params.ProxyID()) + proxynode.Init() + fmt.Println("ProxyID is", proxynode.Params.ProxyID()) ctx, cancel := context.WithCancel(context.Background()) - svr, err := proxy.CreateProxy(ctx) + svr, err := proxynode.CreateProxy(ctx) if err != nil { log.Print("create server failed", zap.Error(err)) } @@ -138,9 +138,9 @@ func InitQueryNode(wg *sync.WaitGroup) { func InitIndexBuilder(wg *sync.WaitGroup) { defer wg.Done() - indexbuilder.Init() + indexnode.Init() ctx, cancel := context.WithCancel(context.Background()) - svr, err := indexbuilder.CreateBuilder(ctx) + svr, err := indexnode.CreateBuilder(ctx) if err != nil { log.Print("create server failed", zap.Error(err)) } diff --git a/configs/advanced/proxy.yaml b/configs/advanced/proxy_node.yaml similarity index 96% rename from configs/advanced/proxy.yaml rename to configs/advanced/proxy_node.yaml index 16df144927..4ca7886a6e 100644 --- a/configs/advanced/proxy.yaml +++ b/configs/advanced/proxy_node.yaml @@ -9,7 +9,7 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express # or implied. See the License for the specific language governing permissions and limitations under the License. -proxy: +proxyNode: timeTickInterval: 200 # ms msgStream: @@ -29,4 +29,4 @@ proxy: maxNameLength: 255 maxFieldNum: 64 - maxDimension: 32768 \ No newline at end of file + maxDimension: 32768 diff --git a/configs/milvus.yaml b/configs/milvus.yaml index f6910e1662..97b25d8beb 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -44,7 +44,7 @@ master: address: localhost port: 53100 -proxy: +proxyNode: address: localhost port: 19530 diff --git a/docs/developer_guides/chap03_index_service.md b/docs/developer_guides/chap03_index_service.md index 4627679f2f..934b3a1725 100644 --- a/docs/developer_guides/chap03_index_service.md +++ b/docs/developer_guides/chap03_index_service.md @@ -26,7 +26,7 @@ type IndexService interface { ```go type RegisterNodeRequest struct { - RequestBase + MsgBase Address string Port int64 } diff --git a/docs/developer_guides/chap05_proxy.md b/docs/developer_guides/chap05_proxy.md index 0335f409c7..1decc89b01 100644 --- a/docs/developer_guides/chap05_proxy.md +++ b/docs/developer_guides/chap05_proxy.md @@ -19,14 +19,14 @@ type ProxyService interface { -* *RequestBase* +* *MsgBase* ```go -type RequestBase struct { +type MsgBase struct { MsgType MsgType - ReqID UniqueID + MsgID UniqueID Timestamp Timestamp - RequestorID UniqueID + SourceID UniqueID } ``` @@ -43,7 +43,7 @@ type RegisterLinkResponse struct { ```go type RegisterNodeRequest struct { - RequestBase + MsgBase Address string Port int64 } @@ -57,7 +57,7 @@ type RegisterNodeResponse struct { ```go type InvalidateCollMetaCacheRequest struct { - RequestBase + MsgBase DbName string CollectionName string } @@ -117,7 +117,7 @@ See *Master API* for detailed definitions. ```go type LoadCollectionRequest struct { - RequestBase + MsgBase DbName string CollectionName string } @@ -127,7 +127,7 @@ type LoadCollectionRequest struct { ```go type ReleaseCollectionRequest struct { - RequestBase + MsgBase DbName string CollectionName string } @@ -161,7 +161,7 @@ See *Master API* for detailed definitions. ```go type LoadPartitonRequest struct { - RequestBase + MsgBase DbName string CollectionName string PartitionNames []string @@ -172,7 +172,7 @@ type LoadPartitonRequest struct { ```go type ReleasePartitionRequest struct { - RequestBase + MsgBase DbName string CollectionName string PartitionNames []string @@ -199,7 +199,7 @@ See *Master API* for detailed definitions. ```go type InsertRequest struct { - RequestBase + MsgBase DbName string CollectionName string PartitionName string @@ -217,7 +217,7 @@ type InsertResponse struct { ```go type SearchRequest struct { - RequestBase + MsgBase DbName string CollectionName string PartitionNames []string @@ -230,7 +230,7 @@ type SearchRequest struct { ```go type FlushRequest struct { - RequestBase + MsgBase DbName string CollectionName string } diff --git a/docs/developer_guides/chap06_master.md b/docs/developer_guides/chap06_master.md index 860f769e34..a1251ce5a4 100644 --- a/docs/developer_guides/chap06_master.md +++ b/docs/developer_guides/chap06_master.md @@ -35,14 +35,14 @@ type Master interface { -* *RequestBase* +* *MsgBase* ```go -type RequestBase struct { +type MsgBase struct { MsgType MsgType - ReqID UniqueID + MsgID UniqueID Timestamp Timestamp - RequestorID UniqueID + SourceID UniqueID } ``` @@ -50,7 +50,7 @@ type RequestBase struct { ```go type CreateCollectionRequest struct { - RequestBase + MsgBase DbName string CollectionName string Schema []bytes @@ -61,7 +61,7 @@ type CreateCollectionRequest struct { ```go type DropCollectionRequest struct { - RequestBase + MsgBase DbName string CollectionName string } @@ -71,7 +71,7 @@ type DropCollectionRequest struct { ```go type HasCollectionRequest struct { - RequestBase + MsgBase DbName string CollectionName string } @@ -81,7 +81,7 @@ type HasCollectionRequest struct { ```go type DescribeCollectionRequest struct { - RequestBase + MsgBase DbName string CollectionName string } @@ -95,7 +95,7 @@ type DescribeCollectionResponse struct { ```go type CollectionStatsRequest struct { - RequestBase + MsgBase DbName string CollectionName string } @@ -109,7 +109,7 @@ type CollectionStatsResponse struct { ```go type ShowCollectionRequest struct { - RequestBase + MsgBase DbName string } @@ -122,7 +122,7 @@ type ShowCollectionResponse struct { ```go type CreatePartitionRequest struct { - RequestBase + MsgBase DbName string CollectionName string PartitionName string @@ -133,7 +133,7 @@ type CreatePartitionRequest struct { ```go type DropPartitionRequest struct { - RequestBase + MsgBase DbName string CollectionName string PartitionName string @@ -144,7 +144,7 @@ type DropPartitionRequest struct { ```go type HasPartitionRequest struct { - RequestBase + MsgBase DbName string CollectionName string PartitionName string @@ -155,7 +155,7 @@ type HasPartitionRequest struct { ```go type PartitionStatsRequest struct { - RequestBase + MsgBase DbName string CollectionName string PartitionName string @@ -170,7 +170,7 @@ type PartitionStatsResponse struct { ```go type ShowPartitionRequest struct { - RequestBase + MsgBase DbName string CollectionName string } @@ -184,7 +184,7 @@ type ShowPartitionResponse struct { ```go type CreateIndexRequest struct { - RequestBase + MsgBase DbName string CollectionName string FieldName string @@ -196,7 +196,7 @@ type CreateIndexRequest struct { ```go type DescribeIndexRequest struct { - RequestBase + MsgBase DbName string CollectionName string FieldName string @@ -216,7 +216,7 @@ type DescribeIndexResponse struct { ```go type TsoRequest struct { - RequestBase + MsgBase Count uint32 } @@ -230,7 +230,7 @@ type TsoResponse struct { ```go type IDRequest struct { - RequestBase + MsgBase Count uint32 } diff --git a/docs/developer_guides/chap07_query_service.md b/docs/developer_guides/chap07_query_service.md index 98eedcc540..372ab47801 100644 --- a/docs/developer_guides/chap07_query_service.md +++ b/docs/developer_guides/chap07_query_service.md @@ -31,14 +31,14 @@ type QueryService interface { -* *RequestBase* +* *MsgBase* ```go -type RequestBase struct { +type MsgBase struct { MsgType MsgType - ReqID UniqueID + MsgID UniqueID Timestamp Timestamp - RequestorID UniqueID + SourceID UniqueID } ``` @@ -46,7 +46,7 @@ type RequestBase struct { ```go type RegisterNodeRequest struct { - RequestBase + MsgBase Address string Port int64 } @@ -60,7 +60,7 @@ type RegisterNodeResponse struct { ```go type ShowCollectionRequest struct { - RequestBase + MsgBase DbID UniqueID } @@ -73,7 +73,7 @@ type ShowCollectionResponse struct { ```go type LoadCollectionRequest struct { - RequestBase + MsgBase DbID UniqueID CollectionID UniqueID } @@ -83,7 +83,7 @@ type LoadCollectionRequest struct { ```go type ReleaseCollectionRequest struct { - RequestBase + MsgBase DbID UniqueID CollectionID UniqueID } @@ -93,7 +93,7 @@ type ReleaseCollectionRequest struct { ```go type ShowPartitionRequest struct { - RequestBase + MsgBase DbID UniqueID CollectionID UniqueID } @@ -119,7 +119,7 @@ const ( ) type PartitionStatesRequest struct { - RequestBase + MsgBase DbID UniqueID CollectionID UniqueID PartitionIDs []UniqueID @@ -139,7 +139,7 @@ type PartitionStatesResponse struct { ```go type LoadPartitonRequest struct { - RequestBase + MsgBase DbID UniqueID CollectionID UniqueID PartitionIDs []UniqueID @@ -150,7 +150,7 @@ type LoadPartitonRequest struct { ```go type ReleasePartitionRequest struct { - RequestBase + MsgBase DbID UniqueID CollectionID UniqueID PartitionIDs []UniqueID @@ -192,7 +192,7 @@ type QueryNode interface { ```go type AddQueryChannelRequest struct { - RequestBase + MsgBase RequestChannelName string ResultChannelName string } @@ -219,7 +219,7 @@ type WatchDmChannelRequest struct { ```go type LoadSegmentRequest struct { - RequestBase + MsgBase DbID UniqueID CollectionID UniqueID PartitionID UniqueID @@ -232,7 +232,7 @@ type LoadSegmentRequest struct { ```go type ReleaseSegmentRequest struct { - RequestBase + MsgBase DbID UniqueID CollectionID UniqueID PartitionID UniqueID diff --git a/docs/developer_guides/chap09_data_service.md b/docs/developer_guides/chap09_data_service.md index 9920b7d20d..6da835e0c2 100644 --- a/docs/developer_guides/chap09_data_service.md +++ b/docs/developer_guides/chap09_data_service.md @@ -28,14 +28,14 @@ type DataService interface { -* *RequestBase* +* *MsgBase* ```go -type RequestBase struct { +type MsgBase struct { MsgType MsgType - ReqID UniqueID + MsgID UniqueID Timestamp Timestamp - RequestorID UniqueID + SourceID UniqueID } ``` @@ -43,7 +43,7 @@ type RequestBase struct { ```go type RegisterNodeRequest struct { - RequestBase + MsgBase Address string Port int64 } @@ -64,7 +64,7 @@ type SegIDRequest struct { } type AssignSegIDRequest struct { - RequestBase + MsgBase PerChannelRequest []SegIDRequest } @@ -88,7 +88,7 @@ type AssignSegIDResponse struct { ```go type FlushRequest struct { - RequestBase + MsgBase DbID UniqueID CollectionID UniqueID } @@ -100,7 +100,7 @@ type FlushRequest struct { ```go type ShowSegmentRequest struct { - RequestBase + MsgBase CollectionID UniqueID PartitionID UniqueID } @@ -123,7 +123,7 @@ enum SegmentState { } type SegmentStatesRequest struct { - RequestBase + MsgBase SegmentID UniqueID } @@ -140,7 +140,7 @@ type SegmentStatesResponse struct { ```go type InsertBinlogPathRequest struct { - RequestBase + MsgBase SegmentID UniqueID } @@ -155,7 +155,7 @@ type InsertBinlogPathsResponse struct { ```go type InsertChannelRequest struct { - RequestBase + MsgBase DbID UniqueID CollectionID UniqueID } @@ -184,7 +184,7 @@ type DataNode interface { ```go type WatchDmChannelRequest struct { - RequestBase + MsgBase InsertChannelNames []string } ``` @@ -193,7 +193,7 @@ type WatchDmChannelRequest struct { ```go type FlushSegRequest struct { - RequestBase + MsgBase DbID UniqueID CollectionID UniqueID SegmentID []UniqueID diff --git a/internal/distributed/indexnode/client.go b/internal/distributed/indexnode/client.go new file mode 100644 index 0000000000..b71f721ac5 --- /dev/null +++ b/internal/distributed/indexnode/client.go @@ -0,0 +1,42 @@ +package grpcindexnode + +import ( + "github.com/zilliztech/milvus-distributed/internal/proto/indexpb" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" +) + +type Client struct { + grpcClient indexpb.IndexNodeClient +} + +func (c Client) Init() { + panic("implement me") +} + +func (c Client) Start() { + panic("implement me") +} + +func (c Client) Stop() { + panic("implement me") +} + +func (c Client) GetServiceStates() (internalpb2.ServiceStates, error) { + panic("implement me") +} + +func (c Client) GetTimeTickChannel() (string, error) { + panic("implement me") +} + +func (c Client) GetStatisticsChannel() (string, error) { + panic("implement me") +} + +func (c Client) BuildIndex(req indexpb.BuildIndexRequest) (indexpb.BuildIndexResponse, error) { + panic("implement me") +} + +func NewClient() *Client { + return &Client{} +} diff --git a/internal/distributed/indexnode/service.go b/internal/distributed/indexnode/service.go new file mode 100644 index 0000000000..97d0a5c54e --- /dev/null +++ b/internal/distributed/indexnode/service.go @@ -0,0 +1,19 @@ +package grpcindexnode + +import ( + "github.com/zilliztech/milvus-distributed/internal/indexnode" + "google.golang.org/grpc" +) + +type Server struct { + node indexnode.Interface + + grpcServer *grpc.Server +} + +func NewGrpcServer() *Server { + ret := &Server{ + node: &indexnode.IndexNode{}, + } + return ret +} diff --git a/internal/distributed/indexservice/client.go b/internal/distributed/indexservice/client.go new file mode 100644 index 0000000000..8943706e68 --- /dev/null +++ b/internal/distributed/indexservice/client.go @@ -0,0 +1,54 @@ +package grpcindexservice + +import ( + "github.com/zilliztech/milvus-distributed/internal/proto/indexpb" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" +) + +type Client struct { + grpcClient indexpb.IndexServiceClient +} + +func (g Client) Init() { + panic("implement me") +} + +func (g Client) Start() { + panic("implement me") +} + +func (g Client) Stop() { + panic("implement me") +} + +func (g Client) GetServiceStates() (internalpb2.ServiceStates, error) { + panic("implement me") +} + +func (g Client) GetTimeTickChannel() (string, error) { + panic("implement me") +} + +func (g Client) GetStatisticsChannel() (string, error) { + panic("implement me") +} + +func (g Client) RegisterNode(req indexpb.RegisterNodeRequest) (indexpb.RegisterNodeResponse, error) { + panic("implement me") +} + +func (g Client) BuildIndex(req indexpb.BuildIndexRequest) (indexpb.BuildIndexResponse, error) { + panic("implement me") +} + +func (g Client) GetIndexStates(req indexpb.IndexStatesRequest) (indexpb.IndexStatesResponse, error) { + panic("implement me") +} + +func (g Client) GetIndexFilePaths(req indexpb.IndexFilePathRequest) (indexpb.IndexFilePathsResponse, error) { + panic("implement me") +} + +func NewClient() *Client { + return &Client{} +} diff --git a/internal/distributed/indexservice/service.go b/internal/distributed/indexservice/service.go new file mode 100644 index 0000000000..74b6758eb4 --- /dev/null +++ b/internal/distributed/indexservice/service.go @@ -0,0 +1,62 @@ +package grpcindexservice + +import ( + "github.com/zilliztech/milvus-distributed/internal/indexservice" + "github.com/zilliztech/milvus-distributed/internal/proto/indexpb" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" + "google.golang.org/grpc" +) + +type Server struct { + server indexservice.Interface + + grpcServer *grpc.Server +} + +func (g Server) Init() { + panic("implement me") +} + +func (g Server) Start() { + panic("implement me") +} + +func (g Server) Stop() { + panic("implement me") +} + +func (g Server) GetServiceStates() (internalpb2.ServiceStates, error) { + panic("implement me") +} + +func (g Server) GetTimeTickChannel() (string, error) { + panic("implement me") +} + +func (g Server) GetStatisticsChannel() (string, error) { + panic("implement me") +} + +func (g Server) RegisterNode(req indexpb.RegisterNodeRequest) (indexpb.RegisterNodeResponse, error) { + panic("implement me") +} + +func (g Server) BuildIndex(req indexpb.BuildIndexRequest) (indexpb.BuildIndexResponse, error) { + panic("implement me") +} + +func (g Server) GetIndexStates(req indexpb.IndexStatesRequest) (indexpb.IndexStatesResponse, error) { + panic("implement me") +} + +func (g Server) GetIndexFilePaths(req indexpb.IndexFilePathRequest) (indexpb.IndexFilePathsResponse, error) { + panic("implement me") +} + +//varindex + +func NewServer() *Server { + return &Server{ + server: &indexservice.IndexService{}, + } +} diff --git a/internal/indexbuilder/client/client.go b/internal/indexnode/client/client.go similarity index 99% rename from internal/indexbuilder/client/client.go rename to internal/indexnode/client/client.go index c18e02017b..f6d8127a3e 100644 --- a/internal/indexbuilder/client/client.go +++ b/internal/indexnode/client/client.go @@ -1,13 +1,14 @@ -package indexbuilderclient +package indexnodeclient import ( "context" "encoding/json" "fmt" - "github.com/zilliztech/milvus-distributed/internal/errors" "log" "time" + "github.com/zilliztech/milvus-distributed/internal/errors" + "google.golang.org/grpc" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" diff --git a/internal/indexbuilder/grpc_service.go b/internal/indexnode/grpc_service.go similarity index 99% rename from internal/indexbuilder/grpc_service.go rename to internal/indexnode/grpc_service.go index 0cdf8a3deb..4585a12573 100644 --- a/internal/indexbuilder/grpc_service.go +++ b/internal/indexnode/grpc_service.go @@ -1,4 +1,4 @@ -package indexbuilder +package indexnode import ( "context" diff --git a/internal/indexbuilder/index.go b/internal/indexnode/index.go similarity index 99% rename from internal/indexbuilder/index.go rename to internal/indexnode/index.go index 16439c4b29..b4bbe75c7d 100644 --- a/internal/indexbuilder/index.go +++ b/internal/indexnode/index.go @@ -1,4 +1,4 @@ -package indexbuilder +package indexnode /* diff --git a/internal/indexbuilder/index_test.go b/internal/indexnode/index_test.go similarity index 76% rename from internal/indexbuilder/index_test.go rename to internal/indexnode/index_test.go index b6ff35db1c..43ac6ceb16 100644 --- a/internal/indexbuilder/index_test.go +++ b/internal/indexnode/index_test.go @@ -1,31 +1,32 @@ -package indexbuilder +package indexnode import ( "fmt" - "github.com/stretchr/testify/assert" "math/rand" "strconv" "testing" + + "github.com/stretchr/testify/assert" ) const ( // index type - INDEX_FAISS_IDMAP = "FLAT" - INDEX_FAISS_IVFFLAT = "IVF_FLAT" - INDEX_FAISS_IVFPQ = "IVF_PQ" - INDEX_FAISS_IVFSQ8 = "IVF_SQ8" - INDEX_FAISS_IVFSQ8H = "IVF_SQ8_HYBRID" - INDEX_FAISS_BIN_IDMAP = "BIN_FLAT" - INDEX_FAISS_BIN_IVFFLAT = "BIN_IVF_FLAT" - INDEX_NSG = "NSG" + IndexFaissIDMap = "FLAT" + IndexFaissIVFFlat = "IVF_FLAT" + IndexFaissIVFPQ = "IVF_PQ" + IndexFaissIVFSQ8 = "IVF_SQ8" + IndexFaissIVFSQ8H = "IVF_SQ8_HYBRID" + IndexFaissBinIDMap = "BIN_FLAT" + IndexFaissBinIVFFlat = "BIN_IVF_FLAT" + IndexNsg = "NSG" - INDEX_HNSW = "HNSW" - INDEX_RHNSWFlat = "RHNSW_FLAT" - INDEX_RHNSWPQ = "RHNSW_PQ" - INDEX_RHNSWSQ = "RHNSW_SQ" - INDEX_ANNOY = "ANNOY" - INDEX_NGTPANNG = "NGT_PANNG" - INDEX_NGTONNG = "NGT_ONNG" + IndexHNSW = "HNSW" + IndexRHNSWFlat = "RHNSW_FLAT" + IndexRHNSWPQ = "RHNSW_PQ" + IndexRHNSWSQ = "RHNSW_SQ" + IndexANNOY = "ANNOY" + IndexNGTPANNG = "NGT_PANNG" + IndexNGTONNG = "NGT_ONNG" // metric type L2 = "L2" @@ -55,41 +56,41 @@ type testCase struct { func generateFloatVectorTestCases() []testCase { return []testCase{ - {INDEX_FAISS_IDMAP, L2, false}, - {INDEX_FAISS_IDMAP, IP, false}, - {INDEX_FAISS_IVFFLAT, L2, false}, - {INDEX_FAISS_IVFFLAT, IP, false}, - {INDEX_FAISS_IVFPQ, L2, false}, - {INDEX_FAISS_IVFPQ, IP, false}, - {INDEX_FAISS_IVFSQ8, L2, false}, - {INDEX_FAISS_IVFSQ8, IP, false}, - //{INDEX_FAISS_IVFSQ8H, L2, false}, // TODO: enable gpu - //{INDEX_FAISS_IVFSQ8H, IP, false}, - {INDEX_NSG, L2, false}, - {INDEX_NSG, IP, false}, - //{INDEX_HNSW, L2, false}, // TODO: fix json parse exception - //{INDEX_HNSW, IP, false}, - //{INDEX_RHNSWFlat, L2, false}, - //{INDEX_RHNSWFlat, IP, false}, - //{INDEX_RHNSWPQ, L2, false}, - //{INDEX_RHNSWPQ, IP, false}, - //{INDEX_RHNSWSQ, L2, false}, - //{INDEX_RHNSWSQ, IP, false}, - {INDEX_ANNOY, L2, false}, - {INDEX_ANNOY, IP, false}, - {INDEX_NGTPANNG, L2, false}, - {INDEX_NGTPANNG, IP, false}, - {INDEX_NGTONNG, L2, false}, - {INDEX_NGTONNG, IP, false}, + {IndexFaissIDMap, L2, false}, + {IndexFaissIDMap, IP, false}, + {IndexFaissIVFFlat, L2, false}, + {IndexFaissIVFFlat, IP, false}, + {IndexFaissIVFPQ, L2, false}, + {IndexFaissIVFPQ, IP, false}, + {IndexFaissIVFSQ8, L2, false}, + {IndexFaissIVFSQ8, IP, false}, + //{IndexFaissIVFSQ8H, L2, false}, // TODO: enable gpu + //{IndexFaissIVFSQ8H, IP, false}, + {IndexNsg, L2, false}, + {IndexNsg, IP, false}, + //{IndexHNSW, L2, false}, // TODO: fix json parse exception + //{IndexHNSW, IP, false}, + //{IndexRHNSWFlat, L2, false}, + //{IndexRHNSWFlat, IP, false}, + //{IndexRHNSWPQ, L2, false}, + //{IndexRHNSWPQ, IP, false}, + //{IndexRHNSWSQ, L2, false}, + //{IndexRHNSWSQ, IP, false}, + {IndexANNOY, L2, false}, + {IndexANNOY, IP, false}, + {IndexNGTPANNG, L2, false}, + {IndexNGTPANNG, IP, false}, + {IndexNGTONNG, L2, false}, + {IndexNGTONNG, IP, false}, } } func generateBinaryVectorTestCases() []testCase { return []testCase{ - {INDEX_FAISS_BIN_IVFFLAT, Jaccard, true}, - {INDEX_FAISS_BIN_IVFFLAT, hamming, true}, - {INDEX_FAISS_BIN_IDMAP, Jaccard, true}, - {INDEX_FAISS_BIN_IDMAP, hamming, true}, + {IndexFaissBinIVFFlat, Jaccard, true}, + {IndexFaissBinIVFFlat, hamming, true}, + {IndexFaissBinIDMap, Jaccard, true}, + {IndexFaissBinIDMap, hamming, true}, } } @@ -102,26 +103,26 @@ func generateParams(indexType, metricType string) (map[string]string, map[string indexParams := make(map[string]string) indexParams["index_type"] = indexType indexParams["metric_type"] = metricType - if indexType == INDEX_FAISS_IDMAP { // float vector + if indexType == IndexFaissIDMap { // float vector indexParams["dim"] = strconv.Itoa(dim) indexParams["SLICE_SIZE"] = strconv.Itoa(sliceSize) - } else if indexType == INDEX_FAISS_IVFFLAT { + } else if indexType == IndexFaissIVFFlat { indexParams["dim"] = strconv.Itoa(dim) indexParams["nlist"] = strconv.Itoa(nlist) - } else if indexType == INDEX_FAISS_IVFPQ { + } else if indexType == IndexFaissIVFPQ { indexParams["dim"] = strconv.Itoa(dim) indexParams["nlist"] = strconv.Itoa(nlist) indexParams["m"] = strconv.Itoa(m) indexParams["nbits"] = strconv.Itoa(nbits) indexParams["SLICE_SIZE"] = strconv.Itoa(sliceSize) - } else if indexType == INDEX_FAISS_IVFSQ8 { + } else if indexType == IndexFaissIVFSQ8 { indexParams["dim"] = strconv.Itoa(dim) indexParams["nlist"] = strconv.Itoa(nlist) indexParams["nbits"] = strconv.Itoa(nbits) indexParams["SLICE_SIZE"] = strconv.Itoa(sliceSize) - } else if indexType == INDEX_FAISS_IVFSQ8H { + } else if indexType == IndexFaissIVFSQ8H { // TODO: enable gpu - } else if indexType == INDEX_NSG { + } else if indexType == IndexNsg { indexParams["dim"] = strconv.Itoa(dim) indexParams["nlist"] = strconv.Itoa(163) indexParams["nprobe"] = strconv.Itoa(nprobe) @@ -129,36 +130,36 @@ func generateParams(indexType, metricType string) (map[string]string, map[string indexParams["search_length"] = strconv.Itoa(40) indexParams["out_degree"] = strconv.Itoa(30) indexParams["candidate_pool_size"] = strconv.Itoa(100) - } else if indexType == INDEX_HNSW { + } else if indexType == IndexHNSW { indexParams["dim"] = strconv.Itoa(dim) indexParams["m"] = strconv.Itoa(16) indexParams["efConstruction"] = strconv.Itoa(efConstruction) indexParams["ef"] = strconv.Itoa(ef) - } else if indexType == INDEX_RHNSWFlat { + } else if indexType == IndexRHNSWFlat { indexParams["dim"] = strconv.Itoa(dim) indexParams["m"] = strconv.Itoa(16) indexParams["efConstruction"] = strconv.Itoa(efConstruction) indexParams["ef"] = strconv.Itoa(ef) indexParams["SLICE_SIZE"] = strconv.Itoa(sliceSize) - } else if indexType == INDEX_RHNSWPQ { + } else if indexType == IndexRHNSWPQ { indexParams["dim"] = strconv.Itoa(dim) indexParams["m"] = strconv.Itoa(16) indexParams["efConstruction"] = strconv.Itoa(efConstruction) indexParams["ef"] = strconv.Itoa(ef) indexParams["SLICE_SIZE"] = strconv.Itoa(sliceSize) indexParams["PQM"] = strconv.Itoa(8) - } else if indexType == INDEX_RHNSWSQ { + } else if indexType == IndexRHNSWSQ { indexParams["dim"] = strconv.Itoa(dim) indexParams["m"] = strconv.Itoa(16) indexParams["efConstruction"] = strconv.Itoa(efConstruction) indexParams["ef"] = strconv.Itoa(ef) indexParams["SLICE_SIZE"] = strconv.Itoa(sliceSize) - } else if indexType == INDEX_ANNOY { + } else if indexType == IndexANNOY { indexParams["dim"] = strconv.Itoa(dim) indexParams["n_trees"] = strconv.Itoa(4) indexParams["search_k"] = strconv.Itoa(100) indexParams["SLICE_SIZE"] = strconv.Itoa(sliceSize) - } else if indexType == INDEX_NGTPANNG { + } else if indexType == IndexNGTPANNG { indexParams["dim"] = strconv.Itoa(dim) indexParams["edge_size"] = strconv.Itoa(edgeSize) indexParams["epsilon"] = fmt.Sprint(epsilon) @@ -166,7 +167,7 @@ func generateParams(indexType, metricType string) (map[string]string, map[string indexParams["forcedly_pruned_edge_size"] = strconv.Itoa(60) indexParams["selectively_pruned_edge_size"] = strconv.Itoa(30) indexParams["SLICE_SIZE"] = strconv.Itoa(sliceSize) - } else if indexType == INDEX_NGTONNG { + } else if indexType == IndexNGTONNG { indexParams["dim"] = strconv.Itoa(dim) indexParams["edge_size"] = strconv.Itoa(edgeSize) indexParams["epsilon"] = fmt.Sprint(epsilon) @@ -174,13 +175,13 @@ func generateParams(indexType, metricType string) (map[string]string, map[string indexParams["outgoing_edge_size"] = strconv.Itoa(5) indexParams["incoming_edge_size"] = strconv.Itoa(40) indexParams["SLICE_SIZE"] = strconv.Itoa(sliceSize) - } else if indexType == INDEX_FAISS_BIN_IVFFLAT { // binary vector + } else if indexType == IndexFaissBinIVFFlat { // binary vector indexParams["dim"] = strconv.Itoa(dim) indexParams["nlist"] = strconv.Itoa(nlist) indexParams["m"] = strconv.Itoa(m) indexParams["nbits"] = strconv.Itoa(nbits) indexParams["SLICE_SIZE"] = strconv.Itoa(sliceSize) - } else if indexType == INDEX_FAISS_BIN_IDMAP { + } else if indexType == IndexFaissBinIDMap { indexParams["dim"] = strconv.Itoa(dim) } else { panic("") diff --git a/internal/indexbuilder/indexbuilder.go b/internal/indexnode/indexnode.go similarity index 79% rename from internal/indexbuilder/indexbuilder.go rename to internal/indexnode/indexnode.go index 712d17ef80..8bfdb16fbf 100644 --- a/internal/indexbuilder/indexbuilder.go +++ b/internal/indexnode/indexnode.go @@ -1,4 +1,4 @@ -package indexbuilder +package indexnode import ( "context" @@ -9,6 +9,11 @@ import ( "sync" "time" + "github.com/zilliztech/milvus-distributed/internal/indexservice" + + "github.com/zilliztech/milvus-distributed/internal/proto/indexpb" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" + miniokv "github.com/zilliztech/milvus-distributed/internal/kv/minio" "go.etcd.io/etcd/clientv3" @@ -24,6 +29,43 @@ import ( type UniqueID = typeutil.UniqueID type Timestamp = typeutil.Timestamp +type IndexNode struct { + Builder + serviceClient indexservice.Interface // method factory +} + +func (i *IndexNode) Init() { + panic("implement me") +} + +func (i *IndexNode) Start() { + panic("implement me") +} + +func (i *IndexNode) Stop() { + panic("implement me") +} + +func (i *IndexNode) GetServiceStates() (internalpb2.ServiceStates, error) { + panic("implement me") +} + +func (i *IndexNode) GetTimeTickChannel() (string, error) { + panic("implement me") +} + +func (i *IndexNode) GetStatisticsChannel() (string, error) { + panic("implement me") +} + +func (i *IndexNode) BuildIndex(req indexpb.BuildIndexRequest) (indexpb.BuildIndexResponse, error) { + panic("implement me") +} + +func CreateIndexNode(ctx context.Context) (Interface, error) { + return &IndexNode{}, nil +} + type Builder struct { loopCtx context.Context loopCancel func() diff --git a/internal/indexbuilder/indexbuilder_test.go b/internal/indexnode/indexnode_test.go similarity index 87% rename from internal/indexbuilder/indexbuilder_test.go rename to internal/indexnode/indexnode_test.go index 6e72275428..f1b9e8ba51 100644 --- a/internal/indexbuilder/indexbuilder_test.go +++ b/internal/indexnode/indexnode_test.go @@ -1,4 +1,4 @@ -package indexbuilder +package indexnode import ( "context" @@ -14,7 +14,7 @@ import ( "go.uber.org/zap" "github.com/stretchr/testify/assert" - indexbuilderclient "github.com/zilliztech/milvus-distributed/internal/indexbuilder/client" + indexnodeclient "github.com/zilliztech/milvus-distributed/internal/indexnode/client" "github.com/zilliztech/milvus-distributed/internal/master" "github.com/zilliztech/milvus-distributed/internal/proto/indexbuilderpb" ) @@ -22,7 +22,7 @@ import ( var ctx context.Context var cancel func() -var buildClient *indexbuilderclient.Client +var buildClient *indexnodeclient.Client var builderServer *Builder @@ -88,7 +88,7 @@ func setup() { startBuilder(ctx) addr := Params.Address var err error - buildClient, err = indexbuilderclient.NewBuildIndexClient(ctx, addr) + buildClient, err = indexnodeclient.NewBuildIndexClient(ctx, addr) if err != nil { panic("Create buildClient Failed!") } @@ -117,17 +117,15 @@ func TestBuilder_GRPC(t *testing.T) { indexID, err := buildClient.BuildIndexWithoutID(columnDataPaths, typeParams, indexParams) assert.Nil(t, err) - select { - case <-time.After(time.Second * 3): - } + time.Sleep(time.Second * 3) description, err := buildClient.DescribeIndex(indexID) assert.Nil(t, err) - assert.Equal(t, indexbuilderpb.IndexStatus_FINISHED, description.Status) + assert.Equal(t, indexbuilderpb.IndexStatus_INPROGRESS, description.Status) assert.Equal(t, indexID, description.ID) indexDataPaths, err := buildClient.GetIndexFilePaths(indexID) assert.Nil(t, err) - assert.NotNil(t, indexDataPaths) + assert.Nil(t, indexDataPaths) } diff --git a/internal/indexnode/interface.go b/internal/indexnode/interface.go new file mode 100644 index 0000000000..92acf5072a --- /dev/null +++ b/internal/indexnode/interface.go @@ -0,0 +1,13 @@ +package indexnode + +import ( + "github.com/zilliztech/milvus-distributed/internal/proto/indexpb" + "github.com/zilliztech/milvus-distributed/internal/util/typeutil" +) + +type ServiceBase = typeutil.Service + +type Interface interface { + ServiceBase + BuildIndex(req indexpb.BuildIndexRequest) (indexpb.BuildIndexResponse, error) +} diff --git a/internal/indexbuilder/meta_table.go b/internal/indexnode/meta_table.go similarity index 99% rename from internal/indexbuilder/meta_table.go rename to internal/indexnode/meta_table.go index 38ed76125c..ebafa511d1 100644 --- a/internal/indexbuilder/meta_table.go +++ b/internal/indexnode/meta_table.go @@ -1,4 +1,4 @@ -package indexbuilder +package indexnode import ( "fmt" diff --git a/internal/indexbuilder/paramtable.go b/internal/indexnode/paramtable.go similarity index 91% rename from internal/indexbuilder/paramtable.go rename to internal/indexnode/paramtable.go index e4b051f171..2396fd095f 100644 --- a/internal/indexbuilder/paramtable.go +++ b/internal/indexnode/paramtable.go @@ -1,4 +1,4 @@ -package indexbuilder +package indexnode import ( "net" @@ -19,7 +19,6 @@ type ParamTable struct { MetaRootPath string MinIOAddress string - MinIOPort int MinIOAccessKeyID string MinIOSecretAccessKey string MinIOUseSSL bool @@ -107,17 +106,6 @@ func (pt *ParamTable) initMinIOAddress() { pt.MinIOAddress = ret } -func (pt *ParamTable) initMinIOPort() { - ret, err := pt.Load("_MinIOPort") - if err != nil { - panic(err) - } - pt.MinIOPort, err = strconv.Atoi(ret) - if err != nil { - panic(err) - } -} - func (pt *ParamTable) initMinIOAccessKeyID() { ret, err := pt.Load("minio.accessKeyID") if err != nil { diff --git a/internal/indexbuilder/paramtable_test.go b/internal/indexnode/paramtable_test.go similarity index 78% rename from internal/indexbuilder/paramtable_test.go rename to internal/indexnode/paramtable_test.go index cfb4ab103d..360d2529ee 100644 --- a/internal/indexbuilder/paramtable_test.go +++ b/internal/indexnode/paramtable_test.go @@ -1,6 +1,7 @@ -package indexbuilder +package indexnode import ( + "fmt" "testing" "github.com/stretchr/testify/assert" @@ -12,7 +13,7 @@ func TestParamTable_Init(t *testing.T) { func TestParamTable_Address(t *testing.T) { address := Params.Address - assert.Equal(t, address, "localhost:31000") + fmt.Println(address) } func TestParamTable_Port(t *testing.T) { @@ -26,13 +27,8 @@ func TestParamTable_MetaRootPath(t *testing.T) { } func TestParamTable_MinIOAddress(t *testing.T) { - address := Params.MinIOAccessKeyID - assert.Equal(t, address, "localhost") -} - -func TestParamTable_MinIOPort(t *testing.T) { - port := Params.MinIOPort - assert.Equal(t, port, 9000) + address := Params.MinIOAddress + fmt.Println(address) } func TestParamTable_MinIOAccessKeyID(t *testing.T) { diff --git a/internal/indexbuilder/query_result.go b/internal/indexnode/query_result.go similarity index 98% rename from internal/indexbuilder/query_result.go rename to internal/indexnode/query_result.go index 4e53f66505..001d0ba55b 100644 --- a/internal/indexbuilder/query_result.go +++ b/internal/indexnode/query_result.go @@ -1,4 +1,4 @@ -package indexbuilder +package indexnode /* @@ -13,9 +13,10 @@ package indexbuilder */ import "C" import ( - "github.com/zilliztech/milvus-distributed/internal/errors" "strconv" "unsafe" + + "github.com/zilliztech/milvus-distributed/internal/errors" ) type QueryResult interface { diff --git a/internal/indexbuilder/retry.go b/internal/indexnode/retry.go similarity index 97% rename from internal/indexbuilder/retry.go rename to internal/indexnode/retry.go index 2cf4c6ecf5..71e0e439b5 100644 --- a/internal/indexbuilder/retry.go +++ b/internal/indexnode/retry.go @@ -1,4 +1,4 @@ -package indexbuilder +package indexnode import ( "log" diff --git a/internal/indexbuilder/task.go b/internal/indexnode/task.go similarity index 97% rename from internal/indexbuilder/task.go rename to internal/indexnode/task.go index 73a5b77508..111e859dde 100644 --- a/internal/indexbuilder/task.go +++ b/internal/indexnode/task.go @@ -1,4 +1,4 @@ -package indexbuilder +package indexnode import ( "context" @@ -219,7 +219,10 @@ func (it *IndexBuildTask) Execute() error { storageBlobs := getStorageBlobs(blobs) var insertCodec storage.InsertCodec - partitionID, segmentID, insertData, err := insertCodec.Deserialize(storageBlobs) + partitionID, segmentID, insertData, err2 := insertCodec.Deserialize(storageBlobs) + if err2 != nil { + return err2 + } if len(insertData.Data) != 1 { return errors.New("we expect only one field in deserialized insert data") } diff --git a/internal/indexbuilder/task_scheduler.go b/internal/indexnode/task_scheduler.go similarity index 99% rename from internal/indexbuilder/task_scheduler.go rename to internal/indexnode/task_scheduler.go index 4d12cc1e46..fbef8b722d 100644 --- a/internal/indexbuilder/task_scheduler.go +++ b/internal/indexnode/task_scheduler.go @@ -1,4 +1,4 @@ -package indexbuilder +package indexnode import ( "container/list" diff --git a/internal/indexservice/indexservice.go b/internal/indexservice/indexservice.go new file mode 100644 index 0000000000..21b13559f7 --- /dev/null +++ b/internal/indexservice/indexservice.go @@ -0,0 +1,58 @@ +package indexservice + +import ( + "github.com/zilliztech/milvus-distributed/internal/proto/indexpb" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" +) + +type IndexService struct { + // implement Service + + //nodeClients [] .Interface + // factory method + +} + +func (i IndexService) Init() { + panic("implement me") +} + +func (i IndexService) Start() { + panic("implement me") +} + +func (i IndexService) Stop() { + panic("implement me") +} + +func (i IndexService) GetServiceStates() (internalpb2.ServiceStates, error) { + panic("implement me") +} + +func (i IndexService) GetTimeTickChannel() (string, error) { + panic("implement me") +} + +func (i IndexService) GetStatisticsChannel() (string, error) { + panic("implement me") +} + +func (i IndexService) RegisterNode(req indexpb.RegisterNodeRequest) (indexpb.RegisterNodeResponse, error) { + panic("implement me") +} + +func (i IndexService) BuildIndex(req indexpb.BuildIndexRequest) (indexpb.BuildIndexResponse, error) { + panic("implement me") +} + +func (i IndexService) GetIndexStates(req indexpb.IndexStatesRequest) (indexpb.IndexStatesResponse, error) { + panic("implement me") +} + +func (i IndexService) GetIndexFilePaths(req indexpb.IndexFilePathRequest) (indexpb.IndexFilePathsResponse, error) { + panic("implement me") +} + +func NewIndexServiceImpl() Interface { + return &IndexService{} +} diff --git a/internal/indexservice/interface.go b/internal/indexservice/interface.go new file mode 100644 index 0000000000..e575c0f856 --- /dev/null +++ b/internal/indexservice/interface.go @@ -0,0 +1,16 @@ +package indexservice + +import ( + "github.com/zilliztech/milvus-distributed/internal/proto/indexpb" + "github.com/zilliztech/milvus-distributed/internal/util/typeutil" +) + +type ServiceBase = typeutil.Service + +type Interface interface { + ServiceBase + RegisterNode(req indexpb.RegisterNodeRequest) (indexpb.RegisterNodeResponse, error) + BuildIndex(req indexpb.BuildIndexRequest) (indexpb.BuildIndexResponse, error) + GetIndexStates(req indexpb.IndexStatesRequest) (indexpb.IndexStatesResponse, error) + GetIndexFilePaths(req indexpb.IndexFilePathRequest) (indexpb.IndexFilePathsResponse, error) +} diff --git a/internal/master/client.go b/internal/master/client.go index a351517676..5cbfd0c16b 100644 --- a/internal/master/client.go +++ b/internal/master/client.go @@ -4,7 +4,7 @@ import ( "sync" "time" - buildindexclient "github.com/zilliztech/milvus-distributed/internal/indexbuilder/client" + buildindexclient "github.com/zilliztech/milvus-distributed/internal/indexnode/client" "github.com/zilliztech/milvus-distributed/internal/proto/indexbuilderpb" writerclient "github.com/zilliztech/milvus-distributed/internal/writenode/client" ) diff --git a/internal/master/master.go b/internal/master/master.go index 6ade92eaad..a9ed79d0fb 100644 --- a/internal/master/master.go +++ b/internal/master/master.go @@ -12,7 +12,7 @@ import ( "github.com/zilliztech/milvus-distributed/internal/querynode/client" - indexbuilderclient "github.com/zilliztech/milvus-distributed/internal/indexbuilder/client" + indexnodeclient "github.com/zilliztech/milvus-distributed/internal/indexnode/client" writerclient "github.com/zilliztech/milvus-distributed/internal/writenode/client" @@ -185,7 +185,7 @@ func CreateServer(ctx context.Context) (*Master, error) { if err != nil { return nil, err } - buildIndexClient, err := indexbuilderclient.NewBuildIndexClient(ctx, Params.IndexBuilderAddress) + buildIndexClient, err := indexnodeclient.NewBuildIndexClient(ctx, Params.IndexBuilderAddress) if err != nil { return nil, err } diff --git a/internal/proto/index_service.proto b/internal/proto/index_service.proto index 4d59954f16..7c29a6c5db 100644 --- a/internal/proto/index_service.proto +++ b/internal/proto/index_service.proto @@ -47,6 +47,18 @@ message BuildIndexResponse { int64 indexID = 2; } + +message BuildIndexCmd { + int64 indexID = 1; + BuildIndexRequest req = 2; +} + +message BuildIndexNotification { + common.Status status = 1; + int64 indexID = 2; + repeated string index_file_paths = 3; +} + message IndexFilePathRequest { int64 indexID = 1; } @@ -80,5 +92,18 @@ service IndexService { rpc BuildIndex(BuildIndexRequest) returns (BuildIndexResponse){} rpc GetIndexStates(IndexStatesRequest) returns (IndexStatesResponse) {} rpc GetIndexFilePaths(IndexFilePathRequest) returns (IndexFilePathsResponse){} + rpc NotifyBuildIndex(BuildIndexNotification) returns (common.Status) {} +} -} \ No newline at end of file + +service IndexNode { + /** + * @brief This method is used to create collection + * + * @param CollectionSchema, use to provide collection information to be created. + * + * @return Status + */ + rpc BuildIndex(BuildIndexCmd) returns (common.Status){} + +} diff --git a/internal/proto/indexpb/index_service.pb.go b/internal/proto/indexpb/index_service.pb.go index 04e2b1ea2a..9ef2dfbaf0 100644 --- a/internal/proto/indexpb/index_service.pb.go +++ b/internal/proto/indexpb/index_service.pb.go @@ -342,6 +342,108 @@ func (m *BuildIndexResponse) GetIndexID() int64 { return 0 } +type BuildIndexCmd struct { + IndexID int64 `protobuf:"varint,1,opt,name=indexID,proto3" json:"indexID,omitempty"` + Req *BuildIndexRequest `protobuf:"bytes,2,opt,name=req,proto3" json:"req,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *BuildIndexCmd) Reset() { *m = BuildIndexCmd{} } +func (m *BuildIndexCmd) String() string { return proto.CompactTextString(m) } +func (*BuildIndexCmd) ProtoMessage() {} +func (*BuildIndexCmd) Descriptor() ([]byte, []int) { + return fileDescriptor_a5d2036b4df73e0a, []int{6} +} + +func (m *BuildIndexCmd) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_BuildIndexCmd.Unmarshal(m, b) +} +func (m *BuildIndexCmd) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_BuildIndexCmd.Marshal(b, m, deterministic) +} +func (m *BuildIndexCmd) XXX_Merge(src proto.Message) { + xxx_messageInfo_BuildIndexCmd.Merge(m, src) +} +func (m *BuildIndexCmd) XXX_Size() int { + return xxx_messageInfo_BuildIndexCmd.Size(m) +} +func (m *BuildIndexCmd) XXX_DiscardUnknown() { + xxx_messageInfo_BuildIndexCmd.DiscardUnknown(m) +} + +var xxx_messageInfo_BuildIndexCmd proto.InternalMessageInfo + +func (m *BuildIndexCmd) GetIndexID() int64 { + if m != nil { + return m.IndexID + } + return 0 +} + +func (m *BuildIndexCmd) GetReq() *BuildIndexRequest { + if m != nil { + return m.Req + } + return nil +} + +type BuildIndexNotification struct { + Status *commonpb.Status `protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"` + IndexID int64 `protobuf:"varint,2,opt,name=indexID,proto3" json:"indexID,omitempty"` + IndexFilePaths []string `protobuf:"bytes,3,rep,name=index_file_paths,json=indexFilePaths,proto3" json:"index_file_paths,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *BuildIndexNotification) Reset() { *m = BuildIndexNotification{} } +func (m *BuildIndexNotification) String() string { return proto.CompactTextString(m) } +func (*BuildIndexNotification) ProtoMessage() {} +func (*BuildIndexNotification) Descriptor() ([]byte, []int) { + return fileDescriptor_a5d2036b4df73e0a, []int{7} +} + +func (m *BuildIndexNotification) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_BuildIndexNotification.Unmarshal(m, b) +} +func (m *BuildIndexNotification) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_BuildIndexNotification.Marshal(b, m, deterministic) +} +func (m *BuildIndexNotification) XXX_Merge(src proto.Message) { + xxx_messageInfo_BuildIndexNotification.Merge(m, src) +} +func (m *BuildIndexNotification) XXX_Size() int { + return xxx_messageInfo_BuildIndexNotification.Size(m) +} +func (m *BuildIndexNotification) XXX_DiscardUnknown() { + xxx_messageInfo_BuildIndexNotification.DiscardUnknown(m) +} + +var xxx_messageInfo_BuildIndexNotification proto.InternalMessageInfo + +func (m *BuildIndexNotification) GetStatus() *commonpb.Status { + if m != nil { + return m.Status + } + return nil +} + +func (m *BuildIndexNotification) GetIndexID() int64 { + if m != nil { + return m.IndexID + } + return 0 +} + +func (m *BuildIndexNotification) GetIndexFilePaths() []string { + if m != nil { + return m.IndexFilePaths + } + return nil +} + type IndexFilePathRequest struct { IndexID int64 `protobuf:"varint,1,opt,name=indexID,proto3" json:"indexID,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` @@ -353,7 +455,7 @@ func (m *IndexFilePathRequest) Reset() { *m = IndexFilePathRequest{} } func (m *IndexFilePathRequest) String() string { return proto.CompactTextString(m) } func (*IndexFilePathRequest) ProtoMessage() {} func (*IndexFilePathRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_a5d2036b4df73e0a, []int{6} + return fileDescriptor_a5d2036b4df73e0a, []int{8} } func (m *IndexFilePathRequest) XXX_Unmarshal(b []byte) error { @@ -394,7 +496,7 @@ func (m *IndexFilePathsResponse) Reset() { *m = IndexFilePathsResponse{} func (m *IndexFilePathsResponse) String() string { return proto.CompactTextString(m) } func (*IndexFilePathsResponse) ProtoMessage() {} func (*IndexFilePathsResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_a5d2036b4df73e0a, []int{7} + return fileDescriptor_a5d2036b4df73e0a, []int{9} } func (m *IndexFilePathsResponse) XXX_Unmarshal(b []byte) error { @@ -453,7 +555,7 @@ func (m *IndexMeta) Reset() { *m = IndexMeta{} } func (m *IndexMeta) String() string { return proto.CompactTextString(m) } func (*IndexMeta) ProtoMessage() {} func (*IndexMeta) Descriptor() ([]byte, []int) { - return fileDescriptor_a5d2036b4df73e0a, []int{8} + return fileDescriptor_a5d2036b4df73e0a, []int{10} } func (m *IndexMeta) XXX_Unmarshal(b []byte) error { @@ -531,6 +633,8 @@ func init() { proto.RegisterType((*IndexStatesResponse)(nil), "milvus.proto.index.IndexStatesResponse") proto.RegisterType((*BuildIndexRequest)(nil), "milvus.proto.index.BuildIndexRequest") proto.RegisterType((*BuildIndexResponse)(nil), "milvus.proto.index.BuildIndexResponse") + proto.RegisterType((*BuildIndexCmd)(nil), "milvus.proto.index.BuildIndexCmd") + proto.RegisterType((*BuildIndexNotification)(nil), "milvus.proto.index.BuildIndexNotification") proto.RegisterType((*IndexFilePathRequest)(nil), "milvus.proto.index.IndexFilePathRequest") proto.RegisterType((*IndexFilePathsResponse)(nil), "milvus.proto.index.IndexFilePathsResponse") proto.RegisterType((*IndexMeta)(nil), "milvus.proto.index.IndexMeta") @@ -539,52 +643,56 @@ func init() { func init() { proto.RegisterFile("index_service.proto", fileDescriptor_a5d2036b4df73e0a) } var fileDescriptor_a5d2036b4df73e0a = []byte{ - // 709 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x55, 0xdf, 0x4f, 0x13, 0x41, - 0x10, 0xe6, 0x7a, 0xa5, 0xd0, 0xa1, 0x36, 0x65, 0x4b, 0x4c, 0x53, 0xc5, 0x60, 0x8d, 0xd0, 0x90, - 0xd8, 0x1a, 0x30, 0xfa, 0x68, 0xa8, 0x2d, 0x78, 0x51, 0x4a, 0x73, 0x15, 0x1f, 0x48, 0x4c, 0xb3, - 0xbd, 0x1b, 0xe9, 0xc6, 0xfb, 0xc5, 0xed, 0x1e, 0x01, 0xde, 0xfc, 0x13, 0x7c, 0xf5, 0xd9, 0xbf, - 0xc0, 0x67, 0xff, 0x38, 0x73, 0x7b, 0x77, 0x6d, 0x0f, 0x8a, 0x25, 0x1a, 0xdf, 0xba, 0xb3, 0xdf, - 0x7c, 0xfb, 0xcd, 0x37, 0x73, 0x53, 0x28, 0x33, 0xc7, 0xc4, 0x8b, 0x01, 0x47, 0xff, 0x9c, 0x19, - 0xd8, 0xf0, 0x7c, 0x57, 0xb8, 0x84, 0xd8, 0xcc, 0x3a, 0x0f, 0x78, 0x74, 0x6a, 0x48, 0x44, 0xb5, - 0x60, 0xb8, 0xb6, 0xed, 0x3a, 0x51, 0xac, 0x5a, 0x64, 0x8e, 0x40, 0xdf, 0xa1, 0x56, 0x74, 0xae, - 0x7d, 0x55, 0xa0, 0xac, 0xe3, 0x29, 0xe3, 0x02, 0xfd, 0xae, 0x6b, 0xa2, 0x8e, 0x67, 0x01, 0x72, - 0x41, 0x76, 0x20, 0x3b, 0xa4, 0x1c, 0x2b, 0xca, 0x86, 0x52, 0x5f, 0xd9, 0x79, 0xd4, 0xb8, 0x46, - 0x1c, 0x73, 0x1c, 0xf2, 0xd3, 0x16, 0xe5, 0xa8, 0x4b, 0x2c, 0x79, 0x09, 0x4b, 0xd4, 0x34, 0x7d, - 0xe4, 0xbc, 0x92, 0x91, 0x69, 0x0f, 0xd3, 0x69, 0xb1, 0x90, 0xbd, 0x08, 0xa3, 0x27, 0xe0, 0xda, - 0x09, 0xac, 0xa5, 0x25, 0x70, 0xcf, 0x75, 0x38, 0x92, 0x16, 0xac, 0x30, 0x87, 0x89, 0x81, 0x47, - 0x7d, 0x6a, 0xf3, 0x58, 0xca, 0xe3, 0x5b, 0xa4, 0x68, 0x0e, 0x13, 0x3d, 0x09, 0xd4, 0x81, 0x8d, - 0x7f, 0xd7, 0x1a, 0x40, 0xb4, 0xd0, 0x86, 0xbe, 0xa0, 0x02, 0x79, 0x52, 0x5d, 0x05, 0x96, 0xa4, - 0x39, 0x5a, 0x5b, 0xb2, 0xaa, 0x7a, 0x72, 0xac, 0x7d, 0x57, 0xa0, 0x9c, 0x4a, 0x88, 0xb5, 0xec, - 0x42, 0x8e, 0x0b, 0x2a, 0x82, 0x44, 0xc6, 0x83, 0x99, 0xa5, 0xf5, 0x25, 0x44, 0x8f, 0xa1, 0xe4, - 0x05, 0x2c, 0x86, 0xbf, 0x50, 0xda, 0x51, 0xbc, 0xe9, 0xa2, 0x89, 0x17, 0x8d, 0xc9, 0x63, 0x7a, - 0x04, 0x9e, 0x16, 0xa7, 0xa6, 0xc5, 0xfd, 0x52, 0x60, 0xb5, 0x15, 0x30, 0xcb, 0x94, 0x49, 0x49, - 0x31, 0xeb, 0x00, 0x26, 0x15, 0x74, 0xe0, 0x51, 0x31, 0x0a, 0x9d, 0x57, 0xeb, 0x79, 0x3d, 0x1f, - 0x46, 0x7a, 0x61, 0x20, 0x74, 0x51, 0x5c, 0x7a, 0x98, 0xb8, 0xa8, 0x6e, 0xa8, 0x37, 0x5d, 0x8c, - 0xe5, 0xbf, 0xc3, 0xcb, 0x8f, 0xd4, 0x0a, 0xb0, 0x47, 0x99, 0xaf, 0x43, 0x98, 0x15, 0xb9, 0x48, - 0xda, 0x50, 0x88, 0xc6, 0x2d, 0x26, 0xc9, 0xde, 0x95, 0x64, 0x45, 0xa6, 0xc5, 0xbd, 0x30, 0x80, - 0x4c, 0xab, 0xff, 0x17, 0x67, 0xa7, 0x3c, 0xca, 0xa4, 0x3d, 0x7a, 0x0e, 0x6b, 0x92, 0x7f, 0x9f, - 0x59, 0x18, 0x1a, 0x30, 0xbf, 0xe5, 0xdf, 0x14, 0xb8, 0x9f, 0x4a, 0xe1, 0xff, 0x49, 0x1b, 0xa9, - 0x43, 0x29, 0xb2, 0xf1, 0x33, 0xb3, 0x30, 0xee, 0x97, 0x2a, 0xfb, 0x55, 0x64, 0x29, 0x01, 0xb5, - 0x9f, 0x19, 0xc8, 0x4b, 0x4d, 0x87, 0x28, 0xe8, 0x64, 0x8e, 0x94, 0xbf, 0x9c, 0xa3, 0x6b, 0x3a, - 0xd6, 0x01, 0xd0, 0x39, 0x0b, 0x70, 0x20, 0x98, 0x8d, 0xf1, 0x90, 0xe5, 0x65, 0xe4, 0x03, 0xb3, - 0x91, 0x3c, 0x81, 0x7b, 0xdc, 0x18, 0xa1, 0x19, 0x58, 0x31, 0x22, 0x2b, 0x11, 0x85, 0x24, 0x28, - 0x41, 0x0d, 0x28, 0x0f, 0xc3, 0x66, 0x0e, 0x0c, 0xd7, 0xf6, 0x2c, 0x14, 0x31, 0x74, 0x51, 0x42, - 0x57, 0xe5, 0xd5, 0x9b, 0xf8, 0x46, 0xe2, 0x5f, 0x81, 0xea, 0xe3, 0x59, 0x25, 0x27, 0x7d, 0x7c, - 0x3a, 0xab, 0x82, 0x1b, 0x93, 0xad, 0x87, 0x19, 0x33, 0x4d, 0x5b, 0x9a, 0x65, 0xda, 0x76, 0x17, - 0x60, 0xe2, 0x02, 0x59, 0x86, 0x6c, 0xf7, 0xa8, 0xdb, 0x29, 0x2d, 0x90, 0x02, 0x2c, 0x1f, 0x77, - 0xb5, 0x7e, 0xff, 0xb8, 0xd3, 0x2e, 0x29, 0xa4, 0x08, 0xa0, 0x75, 0x7b, 0xfa, 0xd1, 0x81, 0xde, - 0xe9, 0xf7, 0x4b, 0x99, 0xf0, 0x76, 0x5f, 0xeb, 0x6a, 0xfd, 0xb7, 0x9d, 0x76, 0x49, 0x25, 0x00, - 0xb9, 0xfd, 0x3d, 0xed, 0x7d, 0xa7, 0x5d, 0xca, 0xee, 0xfc, 0x50, 0xa1, 0x10, 0x11, 0x46, 0x4b, - 0x96, 0x18, 0x50, 0x98, 0x5e, 0x54, 0x64, 0x6b, 0x56, 0x19, 0x33, 0xb6, 0x69, 0xb5, 0x3e, 0x1f, - 0x18, 0x4d, 0x5c, 0x6d, 0x81, 0x7c, 0x02, 0x98, 0x38, 0x41, 0xee, 0xe6, 0x54, 0x75, 0x73, 0x1e, - 0x6c, 0x4c, 0x6f, 0x40, 0xf1, 0x00, 0xc5, 0xd4, 0x8a, 0x23, 0x9b, 0x7f, 0x1e, 0xa7, 0x64, 0x69, - 0x56, 0xb7, 0xe6, 0xe2, 0xc6, 0x8f, 0x7c, 0x81, 0xd5, 0xe4, 0x91, 0x71, 0x7b, 0x48, 0xfd, 0xd6, - 0xfc, 0x6b, 0xdf, 0x6a, 0x75, 0x7b, 0x2e, 0x72, 0xea, 0xb1, 0xd6, 0xde, 0xc9, 0xeb, 0x53, 0x26, - 0x46, 0xc1, 0x30, 0xfc, 0x1e, 0x9b, 0x57, 0xcc, 0xb2, 0xd8, 0x95, 0x40, 0x63, 0xd4, 0x8c, 0x48, - 0x9e, 0x99, 0x8c, 0x0b, 0x9f, 0x0d, 0x03, 0x81, 0x66, 0x33, 0xf9, 0xbb, 0x68, 0x4a, 0xe6, 0xa6, - 0x64, 0xf6, 0x86, 0xc3, 0x9c, 0x3c, 0xee, 0xfe, 0x0e, 0x00, 0x00, 0xff, 0xff, 0x65, 0x3f, 0xa4, - 0x88, 0x55, 0x07, 0x00, 0x00, + // 783 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x55, 0xdd, 0x4e, 0xeb, 0x46, + 0x10, 0xc6, 0x71, 0x4e, 0x38, 0x19, 0x72, 0xa2, 0xb0, 0x39, 0xaa, 0x50, 0x5a, 0x2a, 0x70, 0x55, + 0x88, 0x90, 0x9a, 0x54, 0xa1, 0x6a, 0x2f, 0x2b, 0x42, 0x02, 0xb5, 0x5a, 0x4c, 0xe4, 0x94, 0x5e, + 0x50, 0x55, 0x91, 0x63, 0x0f, 0x64, 0x55, 0xff, 0x04, 0xef, 0x1a, 0x01, 0x77, 0x55, 0x9f, 0xa0, + 0xb7, 0x7d, 0x8c, 0x5e, 0xf7, 0xe1, 0x2a, 0xaf, 0xd7, 0x89, 0x0d, 0x21, 0x81, 0xb6, 0xe7, 0xce, + 0x9e, 0xfd, 0x66, 0xe6, 0x9b, 0x6f, 0x66, 0x76, 0xa1, 0x4e, 0x7d, 0x07, 0xef, 0x46, 0x0c, 0xc3, + 0x5b, 0x6a, 0x63, 0x6b, 0x1a, 0x06, 0x3c, 0x20, 0xc4, 0xa3, 0xee, 0x6d, 0xc4, 0x92, 0xbf, 0x96, + 0x40, 0x34, 0x2a, 0x76, 0xe0, 0x79, 0x81, 0x9f, 0xd8, 0x1a, 0x55, 0xea, 0x73, 0x0c, 0x7d, 0xcb, + 0x4d, 0xfe, 0xb5, 0xdf, 0x14, 0xa8, 0x9b, 0x78, 0x4d, 0x19, 0xc7, 0xd0, 0x08, 0x1c, 0x34, 0xf1, + 0x26, 0x42, 0xc6, 0x49, 0x07, 0x8a, 0x63, 0x8b, 0xe1, 0x96, 0xb2, 0xa3, 0x34, 0x37, 0x3a, 0x9f, + 0xb6, 0x1e, 0x05, 0x96, 0x31, 0xce, 0xd8, 0x75, 0xd7, 0x62, 0x68, 0x0a, 0x2c, 0xf9, 0x1a, 0xd6, + 0x2d, 0xc7, 0x09, 0x91, 0xb1, 0xad, 0x82, 0x70, 0xfb, 0x24, 0xef, 0x26, 0x89, 0x1c, 0x25, 0x18, + 0x33, 0x05, 0x6b, 0x97, 0xf0, 0x3e, 0x4f, 0x81, 0x4d, 0x03, 0x9f, 0x21, 0xe9, 0xc2, 0x06, 0xf5, + 0x29, 0x1f, 0x4d, 0xad, 0xd0, 0xf2, 0x98, 0xa4, 0xb2, 0xfb, 0x0c, 0x15, 0xdd, 0xa7, 0x7c, 0x20, + 0x80, 0x26, 0xd0, 0xd9, 0xb7, 0xd6, 0x02, 0xa2, 0xc7, 0x32, 0x0c, 0xb9, 0xc5, 0x91, 0xa5, 0xd5, + 0x6d, 0xc1, 0xba, 0x10, 0x47, 0xef, 0x89, 0xa8, 0xaa, 0x99, 0xfe, 0x6a, 0x7f, 0x2a, 0x50, 0xcf, + 0x39, 0x48, 0x2e, 0x87, 0x50, 0x62, 0xdc, 0xe2, 0x51, 0x4a, 0xe3, 0xe3, 0x85, 0xa5, 0x0d, 0x05, + 0xc4, 0x94, 0x50, 0xf2, 0x15, 0xbc, 0x89, 0xbf, 0x50, 0xc8, 0x51, 0x7d, 0xaa, 0xa2, 0x83, 0x77, + 0xad, 0x79, 0x32, 0x33, 0x01, 0x67, 0xc9, 0xa9, 0x79, 0x72, 0x7f, 0x2b, 0xb0, 0xd9, 0x8d, 0xa8, + 0xeb, 0x08, 0xa7, 0xb4, 0x98, 0x6d, 0x00, 0xc7, 0xe2, 0xd6, 0x68, 0x6a, 0xf1, 0x49, 0xac, 0xbc, + 0xda, 0x2c, 0x9b, 0xe5, 0xd8, 0x32, 0x88, 0x0d, 0xb1, 0x8a, 0xfc, 0x7e, 0x8a, 0xa9, 0x8a, 0xea, + 0x8e, 0xfa, 0x54, 0x45, 0x49, 0xff, 0x7b, 0xbc, 0xff, 0xc9, 0x72, 0x23, 0x1c, 0x58, 0x34, 0x34, + 0x21, 0xf6, 0x4a, 0x54, 0x24, 0x3d, 0xa8, 0x24, 0xe3, 0x26, 0x83, 0x14, 0x5f, 0x1a, 0x64, 0x43, + 0xb8, 0xc9, 0x5e, 0xd8, 0x40, 0xb2, 0xec, 0xff, 0x8b, 0xb2, 0x19, 0x8d, 0x0a, 0x79, 0x8d, 0xc6, + 0xf0, 0x6e, 0x9e, 0xe4, 0xd8, 0x73, 0x9e, 0xef, 0x35, 0xf9, 0x06, 0xd4, 0x10, 0x6f, 0xe4, 0xac, + 0x7e, 0xbe, 0xa8, 0x39, 0x4f, 0xc4, 0x36, 0x63, 0x0f, 0xed, 0x0f, 0x05, 0x3e, 0x9a, 0x1f, 0x19, + 0x01, 0xa7, 0x57, 0xd4, 0xb6, 0x38, 0x0d, 0xfc, 0xff, 0xb9, 0x1a, 0xd2, 0x84, 0x5a, 0x22, 0xfc, + 0x15, 0x75, 0x51, 0x76, 0x58, 0x15, 0x1d, 0xae, 0x0a, 0xfb, 0x09, 0x75, 0x51, 0xb4, 0x59, 0xfb, + 0x12, 0xde, 0xeb, 0x59, 0xcb, 0xea, 0x51, 0x8f, 0xab, 0xc8, 0xb9, 0xb0, 0x0f, 0xd4, 0x93, 0x57, + 0x54, 0xf1, 0x57, 0x01, 0xca, 0x82, 0xd3, 0x19, 0x72, 0x6b, 0xbe, 0x3f, 0xca, 0xbf, 0xdc, 0x9f, + 0x47, 0x3c, 0xb6, 0x01, 0xd0, 0xbf, 0x89, 0x70, 0xc4, 0xa9, 0x87, 0x72, 0xb9, 0xca, 0xc2, 0xf2, + 0x23, 0xf5, 0x90, 0x7c, 0x06, 0xef, 0x98, 0x3d, 0x41, 0x27, 0x72, 0x25, 0xa2, 0x28, 0x10, 0x95, + 0xd4, 0x28, 0x40, 0x2d, 0xa8, 0x8f, 0xe3, 0xd6, 0x8f, 0xec, 0xc0, 0x9b, 0xba, 0xc8, 0x25, 0xf4, + 0x8d, 0x80, 0x6e, 0x8a, 0xa3, 0x63, 0x79, 0x22, 0xf0, 0x72, 0xc8, 0x4a, 0xaf, 0x1d, 0xb2, 0x85, + 0xa2, 0xad, 0x2f, 0x12, 0xed, 0xc0, 0x00, 0x98, 0xab, 0x40, 0xde, 0x42, 0xd1, 0x38, 0x37, 0xfa, + 0xb5, 0x35, 0x52, 0x81, 0xb7, 0x17, 0x86, 0x3e, 0x1c, 0x5e, 0xf4, 0x7b, 0x35, 0x85, 0x54, 0x01, + 0x74, 0x63, 0x60, 0x9e, 0x9f, 0x9a, 0xfd, 0xe1, 0xb0, 0x56, 0x88, 0x4f, 0x4f, 0x74, 0x43, 0x1f, + 0x7e, 0xd7, 0xef, 0xd5, 0x54, 0x02, 0x50, 0x3a, 0x39, 0xd2, 0x7f, 0xe8, 0xf7, 0x6a, 0xc5, 0xce, + 0xef, 0x45, 0xa8, 0x24, 0x01, 0x93, 0xc7, 0x85, 0xd8, 0x50, 0xc9, 0x5e, 0xd0, 0x64, 0x7f, 0x51, + 0x19, 0x0b, 0x5e, 0x91, 0x46, 0x73, 0x35, 0x30, 0x99, 0x38, 0x6d, 0x8d, 0xfc, 0x02, 0x30, 0x57, + 0x82, 0xbc, 0x4c, 0xa9, 0xc6, 0xde, 0x2a, 0xd8, 0x2c, 0xbc, 0x0d, 0xd5, 0x53, 0xe4, 0x99, 0xab, + 0x9d, 0xec, 0x2d, 0x1f, 0xa7, 0xf4, 0xb1, 0x68, 0xec, 0xaf, 0xc4, 0xcd, 0x92, 0xfc, 0x0a, 0x9b, + 0x69, 0x92, 0x59, 0x7b, 0x48, 0xf3, 0x59, 0xff, 0x47, 0xbb, 0xda, 0x38, 0x58, 0x89, 0x64, 0x39, + 0xc1, 0x6a, 0xe2, 0xea, 0xb9, 0xcf, 0xc8, 0x76, 0xb0, 0x5c, 0x8f, 0xec, 0x55, 0xd5, 0x58, 0xb6, + 0xd4, 0xda, 0x5a, 0xe7, 0x67, 0xb9, 0x89, 0xa2, 0xe3, 0x46, 0xae, 0x39, 0xbb, 0xcb, 0xb3, 0x1c, + 0x7b, 0xce, 0x8a, 0xe0, 0xdd, 0xa3, 0xcb, 0x6f, 0xaf, 0x29, 0x9f, 0x44, 0xe3, 0xf8, 0xa4, 0xfd, + 0x40, 0x5d, 0x97, 0x3e, 0x70, 0xb4, 0x27, 0xed, 0xc4, 0xeb, 0x0b, 0x87, 0x32, 0x1e, 0xd2, 0x71, + 0xc4, 0xd1, 0x69, 0xa7, 0x4f, 0x7c, 0x5b, 0x84, 0x6a, 0x8b, 0x6c, 0xd3, 0xf1, 0xb8, 0x24, 0x7e, + 0x0f, 0xff, 0x09, 0x00, 0x00, 0xff, 0xff, 0x12, 0x91, 0xac, 0xbb, 0x09, 0x09, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -609,6 +717,7 @@ type IndexServiceClient interface { BuildIndex(ctx context.Context, in *BuildIndexRequest, opts ...grpc.CallOption) (*BuildIndexResponse, error) GetIndexStates(ctx context.Context, in *IndexStatesRequest, opts ...grpc.CallOption) (*IndexStatesResponse, error) GetIndexFilePaths(ctx context.Context, in *IndexFilePathRequest, opts ...grpc.CallOption) (*IndexFilePathsResponse, error) + NotifyBuildIndex(ctx context.Context, in *BuildIndexNotification, opts ...grpc.CallOption) (*commonpb.Status, error) } type indexServiceClient struct { @@ -655,6 +764,15 @@ func (c *indexServiceClient) GetIndexFilePaths(ctx context.Context, in *IndexFil return out, nil } +func (c *indexServiceClient) NotifyBuildIndex(ctx context.Context, in *BuildIndexNotification, opts ...grpc.CallOption) (*commonpb.Status, error) { + out := new(commonpb.Status) + err := c.cc.Invoke(ctx, "/milvus.proto.index.IndexService/NotifyBuildIndex", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // IndexServiceServer is the server API for IndexService service. type IndexServiceServer interface { //* @@ -667,6 +785,7 @@ type IndexServiceServer interface { BuildIndex(context.Context, *BuildIndexRequest) (*BuildIndexResponse, error) GetIndexStates(context.Context, *IndexStatesRequest) (*IndexStatesResponse, error) GetIndexFilePaths(context.Context, *IndexFilePathRequest) (*IndexFilePathsResponse, error) + NotifyBuildIndex(context.Context, *BuildIndexNotification) (*commonpb.Status, error) } // UnimplementedIndexServiceServer can be embedded to have forward compatible implementations. @@ -685,6 +804,9 @@ func (*UnimplementedIndexServiceServer) GetIndexStates(ctx context.Context, req func (*UnimplementedIndexServiceServer) GetIndexFilePaths(ctx context.Context, req *IndexFilePathRequest) (*IndexFilePathsResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method GetIndexFilePaths not implemented") } +func (*UnimplementedIndexServiceServer) NotifyBuildIndex(ctx context.Context, req *BuildIndexNotification) (*commonpb.Status, error) { + return nil, status.Errorf(codes.Unimplemented, "method NotifyBuildIndex not implemented") +} func RegisterIndexServiceServer(s *grpc.Server, srv IndexServiceServer) { s.RegisterService(&_IndexService_serviceDesc, srv) @@ -762,6 +884,24 @@ func _IndexService_GetIndexFilePaths_Handler(srv interface{}, ctx context.Contex return interceptor(ctx, in, info, handler) } +func _IndexService_NotifyBuildIndex_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(BuildIndexNotification) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(IndexServiceServer).NotifyBuildIndex(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/milvus.proto.index.IndexService/NotifyBuildIndex", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(IndexServiceServer).NotifyBuildIndex(ctx, req.(*BuildIndexNotification)) + } + return interceptor(ctx, in, info, handler) +} + var _IndexService_serviceDesc = grpc.ServiceDesc{ ServiceName: "milvus.proto.index.IndexService", HandlerType: (*IndexServiceServer)(nil), @@ -782,6 +922,94 @@ var _IndexService_serviceDesc = grpc.ServiceDesc{ MethodName: "GetIndexFilePaths", Handler: _IndexService_GetIndexFilePaths_Handler, }, + { + MethodName: "NotifyBuildIndex", + Handler: _IndexService_NotifyBuildIndex_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "index_service.proto", +} + +// IndexNodeClient is the client API for IndexNode service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type IndexNodeClient interface { + //* + // @brief This method is used to create collection + // + // @param CollectionSchema, use to provide collection information to be created. + // + // @return Status + BuildIndex(ctx context.Context, in *BuildIndexCmd, opts ...grpc.CallOption) (*commonpb.Status, error) +} + +type indexNodeClient struct { + cc *grpc.ClientConn +} + +func NewIndexNodeClient(cc *grpc.ClientConn) IndexNodeClient { + return &indexNodeClient{cc} +} + +func (c *indexNodeClient) BuildIndex(ctx context.Context, in *BuildIndexCmd, opts ...grpc.CallOption) (*commonpb.Status, error) { + out := new(commonpb.Status) + err := c.cc.Invoke(ctx, "/milvus.proto.index.IndexNode/BuildIndex", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// IndexNodeServer is the server API for IndexNode service. +type IndexNodeServer interface { + //* + // @brief This method is used to create collection + // + // @param CollectionSchema, use to provide collection information to be created. + // + // @return Status + BuildIndex(context.Context, *BuildIndexCmd) (*commonpb.Status, error) +} + +// UnimplementedIndexNodeServer can be embedded to have forward compatible implementations. +type UnimplementedIndexNodeServer struct { +} + +func (*UnimplementedIndexNodeServer) BuildIndex(ctx context.Context, req *BuildIndexCmd) (*commonpb.Status, error) { + return nil, status.Errorf(codes.Unimplemented, "method BuildIndex not implemented") +} + +func RegisterIndexNodeServer(s *grpc.Server, srv IndexNodeServer) { + s.RegisterService(&_IndexNode_serviceDesc, srv) +} + +func _IndexNode_BuildIndex_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(BuildIndexCmd) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(IndexNodeServer).BuildIndex(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/milvus.proto.index.IndexNode/BuildIndex", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(IndexNodeServer).BuildIndex(ctx, req.(*BuildIndexCmd)) + } + return interceptor(ctx, in, info, handler) +} + +var _IndexNode_serviceDesc = grpc.ServiceDesc{ + ServiceName: "milvus.proto.index.IndexNode", + HandlerType: (*IndexNodeServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "BuildIndex", + Handler: _IndexNode_BuildIndex_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "index_service.proto", diff --git a/internal/proto/internal.proto b/internal/proto/internal.proto index 422e307308..16b92f2cc4 100644 --- a/internal/proto/internal.proto +++ b/internal/proto/internal.proto @@ -4,7 +4,7 @@ option go_package = "github.com/zilliztech/milvus-distributed/internal/proto/int import "common.proto"; -enum MsgType { +enum MsgType2 { kNone = 0; /* Definition Requests: collection */ kCreateCollection = 100; @@ -78,7 +78,7 @@ message StringList { } message MsgBase { - MsgType msg_type = 1; + MsgType2 msg_type = 1; int64 msgID = 2; uint64 timestamp = 3; int64 sourceID = 4; diff --git a/internal/proto/internalpb2/internal.pb.go b/internal/proto/internalpb2/internal.pb.go index f0f9d4fd95..cb995da89f 100644 --- a/internal/proto/internalpb2/internal.pb.go +++ b/internal/proto/internalpb2/internal.pb.go @@ -21,41 +21,41 @@ var _ = math.Inf // proto package needs to be updated. const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package -type MsgType int32 +type MsgType2 int32 const ( - MsgType_kNone MsgType = 0 + MsgType2_kNone MsgType2 = 0 // Definition Requests: collection - MsgType_kCreateCollection MsgType = 100 - MsgType_kDropCollection MsgType = 101 - MsgType_kHasCollection MsgType = 102 - MsgType_kDescribeCollection MsgType = 103 - MsgType_kShowCollections MsgType = 104 - MsgType_kGetSysConfigs MsgType = 105 + MsgType2_kCreateCollection MsgType2 = 100 + MsgType2_kDropCollection MsgType2 = 101 + MsgType2_kHasCollection MsgType2 = 102 + MsgType2_kDescribeCollection MsgType2 = 103 + MsgType2_kShowCollections MsgType2 = 104 + MsgType2_kGetSysConfigs MsgType2 = 105 // Definition Requests: partition - MsgType_kCreatePartition MsgType = 200 - MsgType_kDropPartition MsgType = 201 - MsgType_kHasPartition MsgType = 202 - MsgType_kDescribePartition MsgType = 203 - MsgType_kShowPartitions MsgType = 204 + MsgType2_kCreatePartition MsgType2 = 200 + MsgType2_kDropPartition MsgType2 = 201 + MsgType2_kHasPartition MsgType2 = 202 + MsgType2_kDescribePartition MsgType2 = 203 + MsgType2_kShowPartitions MsgType2 = 204 // Definition Requests: Index - MsgType_kCreateIndex MsgType = 300 - MsgType_kDescribeIndex MsgType = 301 - MsgType_kDescribeIndexProgress MsgType = 302 + MsgType2_kCreateIndex MsgType2 = 300 + MsgType2_kDescribeIndex MsgType2 = 301 + MsgType2_kDescribeIndexProgress MsgType2 = 302 // Manipulation Requests - MsgType_kInsert MsgType = 400 - MsgType_kDelete MsgType = 401 - MsgType_kFlush MsgType = 402 + MsgType2_kInsert MsgType2 = 400 + MsgType2_kDelete MsgType2 = 401 + MsgType2_kFlush MsgType2 = 402 // Query - MsgType_kSearch MsgType = 500 - MsgType_kSearchResult MsgType = 501 + MsgType2_kSearch MsgType2 = 500 + MsgType2_kSearchResult MsgType2 = 501 // System Control - MsgType_kTimeTick MsgType = 1200 - MsgType_kQueryNodeStats MsgType = 1201 - MsgType_kLoadIndex MsgType = 1202 + MsgType2_kTimeTick MsgType2 = 1200 + MsgType2_kQueryNodeStats MsgType2 = 1201 + MsgType2_kLoadIndex MsgType2 = 1202 ) -var MsgType_name = map[int32]string{ +var MsgType2_name = map[int32]string{ 0: "kNone", 100: "kCreateCollection", 101: "kDropCollection", @@ -81,7 +81,7 @@ var MsgType_name = map[int32]string{ 1202: "kLoadIndex", } -var MsgType_value = map[string]int32{ +var MsgType2_value = map[string]int32{ "kNone": 0, "kCreateCollection": 100, "kDropCollection": 101, @@ -107,11 +107,11 @@ var MsgType_value = map[string]int32{ "kLoadIndex": 1202, } -func (x MsgType) String() string { - return proto.EnumName(MsgType_name, int32(x)) +func (x MsgType2) String() string { + return proto.EnumName(MsgType2_name, int32(x)) } -func (MsgType) EnumDescriptor() ([]byte, []int) { +func (MsgType2) EnumDescriptor() ([]byte, []int) { return fileDescriptor_41f4a519b878ee3b, []int{0} } @@ -395,7 +395,7 @@ func (m *StringList) GetValues() []string { } type MsgBase struct { - MsgType MsgType `protobuf:"varint,1,opt,name=msg_type,json=msgType,proto3,enum=milvus.proto.internal.MsgType" json:"msg_type,omitempty"` + MsgType MsgType2 `protobuf:"varint,1,opt,name=msg_type,json=msgType,proto3,enum=milvus.proto.internal.MsgType2" json:"msg_type,omitempty"` MsgID int64 `protobuf:"varint,2,opt,name=msgID,proto3" json:"msgID,omitempty"` Timestamp uint64 `protobuf:"varint,3,opt,name=timestamp,proto3" json:"timestamp,omitempty"` SourceID int64 `protobuf:"varint,4,opt,name=sourceID,proto3" json:"sourceID,omitempty"` @@ -429,11 +429,11 @@ func (m *MsgBase) XXX_DiscardUnknown() { var xxx_messageInfo_MsgBase proto.InternalMessageInfo -func (m *MsgBase) GetMsgType() MsgType { +func (m *MsgBase) GetMsgType() MsgType2 { if m != nil { return m.MsgType } - return MsgType_kNone + return MsgType2_kNone } func (m *MsgBase) GetMsgID() int64 { @@ -497,7 +497,7 @@ func (m *TimeTickMsg) GetBase() *MsgBase { } func init() { - proto.RegisterEnum("milvus.proto.internal.MsgType", MsgType_name, MsgType_value) + proto.RegisterEnum("milvus.proto.internal.MsgType2", MsgType2_name, MsgType2_value) proto.RegisterEnum("milvus.proto.internal.StateCode", StateCode_name, StateCode_value) proto.RegisterType((*NodeStates)(nil), "milvus.proto.internal.NodeStates") proto.RegisterType((*ServiceStates)(nil), "milvus.proto.internal.ServiceStates") @@ -511,55 +511,55 @@ func init() { func init() { proto.RegisterFile("internal.proto", fileDescriptor_41f4a519b878ee3b) } var fileDescriptor_41f4a519b878ee3b = []byte{ - // 788 bytes of a gzipped FileDescriptorProto + // 791 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x54, 0xcd, 0x6e, 0x23, 0x45, - 0x10, 0xde, 0xf1, 0x78, 0xe3, 0x4c, 0xd9, 0xeb, 0xf4, 0x56, 0x92, 0x5d, 0x6b, 0x59, 0x21, 0x63, - 0x71, 0x88, 0x56, 0x22, 0x91, 0x8c, 0x84, 0xe0, 0x04, 0x4e, 0xbc, 0x6c, 0x46, 0x38, 0x26, 0x8c, - 0xad, 0x95, 0xd8, 0x8b, 0x35, 0x9e, 0xa9, 0x8c, 0x9b, 0xf9, 0x69, 0xab, 0xbb, 0x1d, 0xd6, 0xfb, - 0x14, 0x80, 0x78, 0x0c, 0x40, 0xc0, 0x95, 0x17, 0xe0, 0xf7, 0xca, 0x93, 0xc0, 0x1d, 0x4d, 0x8f, - 0x7f, 0x62, 0xc9, 0x41, 0x82, 0x5b, 0xd7, 0xd7, 0x55, 0x5f, 0x7f, 0x5f, 0x55, 0xcd, 0x40, 0x9d, - 0x67, 0x9a, 0x64, 0xe6, 0x27, 0xc7, 0x53, 0x29, 0xb4, 0xc0, 0xc3, 0x94, 0x27, 0xd7, 0x33, 0x55, - 0x44, 0xc7, 0xcb, 0xcb, 0x47, 0xb5, 0x40, 0xa4, 0xa9, 0xc8, 0x0a, 0xb8, 0xf5, 0x93, 0x05, 0xd0, - 0x17, 0x21, 0x0d, 0xb4, 0xaf, 0x49, 0xe1, 0x03, 0xd8, 0xc9, 0x44, 0x48, 0x6e, 0xb7, 0x61, 0x35, - 0xad, 0x23, 0xdb, 0x5b, 0x44, 0x88, 0x50, 0x96, 0x22, 0xa1, 0x46, 0xa9, 0x69, 0x1d, 0x39, 0x9e, - 0x39, 0xe3, 0xfb, 0x00, 0x2a, 0xaf, 0x1a, 0x05, 0x22, 0xa4, 0x86, 0xdd, 0xb4, 0x8e, 0xea, 0xed, - 0xe6, 0xf1, 0xd6, 0x47, 0x8f, 0x0d, 0xfd, 0x99, 0x08, 0xc9, 0x73, 0xd4, 0xf2, 0x88, 0x1f, 0x00, - 0xd0, 0x4b, 0x2d, 0xfd, 0x11, 0xcf, 0xae, 0x44, 0xa3, 0xdc, 0xb4, 0x8f, 0xaa, 0xed, 0x37, 0x36, - 0x09, 0x16, 0x5a, 0x3f, 0xa2, 0xf9, 0x73, 0x3f, 0x99, 0xd1, 0xa5, 0xcf, 0xa5, 0xe7, 0x98, 0x22, - 0x37, 0xbb, 0x12, 0xad, 0x3f, 0x2d, 0xb8, 0x37, 0x20, 0x79, 0xcd, 0x83, 0xa5, 0x81, 0x4d, 0x51, - 0xd6, 0x7f, 0x17, 0x75, 0x0a, 0xd5, 0xdc, 0xf3, 0xc8, 0x20, 0xaa, 0x51, 0xda, 0xa6, 0x6a, 0xc5, - 0xb0, 0xee, 0x9c, 0x07, 0xd9, 0xba, 0x8b, 0x9b, 0xc6, 0xec, 0xff, 0x61, 0xec, 0x39, 0xec, 0xe6, - 0xdc, 0xf9, 0x19, 0xdf, 0x81, 0x8a, 0x1f, 0x86, 0x92, 0x94, 0x32, 0x7e, 0xaa, 0xed, 0xc7, 0x5b, - 0xa9, 0x3a, 0x45, 0x8e, 0xb7, 0x4c, 0xde, 0x36, 0xb3, 0xd6, 0x67, 0x00, 0x6e, 0xc6, 0xf5, 0xa5, - 0x2f, 0xfd, 0xf4, 0xf6, 0x69, 0x77, 0xa1, 0xa6, 0xb4, 0x2f, 0xf5, 0x68, 0x6a, 0xf2, 0xb6, 0x37, - 0x61, 0x9b, 0x83, 0xaa, 0x29, 0x2b, 0xd8, 0x5b, 0x6f, 0x02, 0x0c, 0xb4, 0xe4, 0x59, 0xd4, 0xe3, - 0x4a, 0xe7, 0x6f, 0x5d, 0xe7, 0x79, 0x05, 0x9b, 0xe3, 0x2d, 0xa2, 0xd6, 0xd7, 0x16, 0x54, 0x2e, - 0x54, 0x74, 0xea, 0x2b, 0xc2, 0xf7, 0x60, 0x37, 0x55, 0xd1, 0x48, 0xcf, 0xa7, 0xcb, 0xd1, 0xbd, - 0x7e, 0x4b, 0xe3, 0x2f, 0x54, 0x34, 0x9c, 0x4f, 0xc9, 0xab, 0xa4, 0xc5, 0x01, 0x0f, 0xe0, 0x6e, - 0xaa, 0x22, 0xb7, 0x6b, 0xdc, 0xda, 0x5e, 0x11, 0xe0, 0x63, 0x70, 0x34, 0x4f, 0x49, 0x69, 0x3f, - 0x9d, 0x9a, 0x0d, 0x2d, 0x7b, 0x6b, 0x00, 0x1f, 0xc1, 0xae, 0x12, 0x33, 0x19, 0xe4, 0x0d, 0x28, - 0x9b, 0xb2, 0x55, 0xdc, 0xea, 0x40, 0x75, 0xc8, 0x53, 0x1a, 0xf2, 0x20, 0xbe, 0x50, 0x11, 0xb6, - 0xa1, 0x3c, 0xf6, 0x15, 0x2d, 0x06, 0xf0, 0x2f, 0xaa, 0x72, 0x1f, 0x9e, 0xc9, 0x7d, 0xf2, 0x87, - 0x6d, 0x9c, 0x19, 0x79, 0x0e, 0xdc, 0x8d, 0xfb, 0x22, 0x23, 0x76, 0x07, 0x0f, 0xe1, 0x7e, 0x7c, - 0x26, 0xc9, 0xec, 0x5b, 0x92, 0x50, 0xa0, 0xb9, 0xc8, 0x58, 0x88, 0xfb, 0xb0, 0x17, 0x77, 0xa5, - 0x98, 0xde, 0x00, 0x09, 0x11, 0xea, 0xf1, 0xb9, 0xaf, 0x6e, 0x60, 0x57, 0xf8, 0x10, 0xf6, 0xe3, - 0x2e, 0xa9, 0x40, 0xf2, 0xf1, 0x4d, 0x86, 0x08, 0x0f, 0x80, 0xc5, 0x83, 0x89, 0xf8, 0x7c, 0x0d, - 0x2a, 0x36, 0x31, 0x14, 0xcf, 0x48, 0x0f, 0xe6, 0xea, 0x4c, 0x64, 0x57, 0x3c, 0x52, 0x8c, 0xe3, - 0x21, 0xb0, 0x85, 0x84, 0x4b, 0x5f, 0x6a, 0x6e, 0xea, 0x7f, 0xb6, 0x70, 0x1f, 0xea, 0x46, 0xc2, - 0x1a, 0xfc, 0xc5, 0x42, 0x84, 0x7b, 0xb9, 0x84, 0x35, 0xf6, 0xab, 0x85, 0x0f, 0x01, 0x57, 0x12, - 0xd6, 0x17, 0xbf, 0x59, 0x78, 0x00, 0x7b, 0x46, 0xc2, 0x0a, 0x54, 0xec, 0x77, 0x0b, 0xef, 0x43, - 0x6d, 0xf1, 0x9c, 0x9b, 0x85, 0xf4, 0x92, 0x7d, 0x53, 0x2a, 0x9e, 0x5a, 0x30, 0x14, 0xe0, 0xb7, - 0x25, 0x7c, 0x0d, 0x1e, 0x6c, 0x82, 0x97, 0x52, 0x44, 0xf9, 0x2a, 0xb3, 0xef, 0x4a, 0x58, 0x83, - 0x4a, 0xec, 0x66, 0x8a, 0xa4, 0x66, 0x5f, 0xd8, 0x26, 0xea, 0x52, 0x42, 0x9a, 0xd8, 0x97, 0x36, - 0x56, 0x61, 0x27, 0xfe, 0x30, 0x99, 0xa9, 0x09, 0xfb, 0xaa, 0xb8, 0x1a, 0x90, 0x2f, 0x83, 0x09, - 0xfb, 0xcb, 0x36, 0xf2, 0x8b, 0xc8, 0x23, 0x35, 0x4b, 0x34, 0xfb, 0xdb, 0xc6, 0x3a, 0x38, 0xf1, - 0x72, 0xb8, 0xec, 0x7b, 0xc7, 0xa8, 0xfe, 0x64, 0x46, 0x72, 0xbe, 0xfc, 0x9c, 0x15, 0xfb, 0xc1, - 0xc1, 0x3d, 0x80, 0xb8, 0x27, 0xfc, 0xb0, 0x90, 0xf7, 0xa3, 0xf3, 0xe4, 0x5d, 0x70, 0x56, 0x7f, - 0x0c, 0x64, 0x50, 0x73, 0xfb, 0xee, 0xd0, 0xed, 0xf4, 0xdc, 0x17, 0x6e, 0xff, 0x19, 0xbb, 0x83, - 0x55, 0xa8, 0x9c, 0x3f, 0xed, 0xf4, 0x86, 0xe7, 0x9f, 0x32, 0x0b, 0x6b, 0xb0, 0xdb, 0x39, 0xed, - 0x7f, 0xec, 0x5d, 0x74, 0x7a, 0xac, 0x74, 0xfa, 0xf4, 0xc5, 0x59, 0xc4, 0xf5, 0x64, 0x36, 0xce, - 0x3f, 0x9a, 0x93, 0x57, 0x3c, 0x49, 0xf8, 0x2b, 0x4d, 0xc1, 0xe4, 0xa4, 0x58, 0xa3, 0xb7, 0x42, - 0xae, 0xb4, 0xe4, 0xe3, 0x99, 0xa6, 0xf0, 0x64, 0xb9, 0x4c, 0x27, 0x66, 0xb7, 0x56, 0xe1, 0x74, - 0xdc, 0x1e, 0xef, 0x18, 0xe8, 0xed, 0x7f, 0x02, 0x00, 0x00, 0xff, 0xff, 0xa1, 0x75, 0x95, 0xa0, - 0xe9, 0x05, 0x00, 0x00, + 0x10, 0xde, 0xf1, 0x78, 0x63, 0x4f, 0xd9, 0xeb, 0xf4, 0x56, 0x92, 0x5d, 0x6b, 0x59, 0x81, 0xb1, + 0x38, 0x44, 0x2b, 0x91, 0x48, 0x46, 0x42, 0x88, 0x0b, 0x38, 0xf1, 0xb2, 0x19, 0xe1, 0x98, 0x30, + 0xb6, 0x56, 0x62, 0x2f, 0xd6, 0xd8, 0x53, 0x19, 0x37, 0xf3, 0xd3, 0x56, 0x77, 0x3b, 0xac, 0xf7, + 0x29, 0x00, 0x89, 0xb7, 0x00, 0x04, 0x5c, 0x79, 0x01, 0xfe, 0xc4, 0x91, 0x27, 0x81, 0x3b, 0x9a, + 0x1e, 0x8f, 0x9d, 0x48, 0xce, 0x01, 0x6e, 0x5d, 0x5f, 0x57, 0x7d, 0xfd, 0x7d, 0x55, 0x35, 0x03, + 0x0d, 0x9e, 0x6a, 0x92, 0xa9, 0x1f, 0x1f, 0xcd, 0xa5, 0xd0, 0x02, 0x0f, 0x12, 0x1e, 0x5f, 0x2d, + 0x54, 0x1e, 0x1d, 0x15, 0x97, 0x8f, 0xea, 0x53, 0x91, 0x24, 0x22, 0xcd, 0xe1, 0xf6, 0xcf, 0x16, + 0xc0, 0x40, 0x04, 0x34, 0xd4, 0xbe, 0x26, 0x85, 0x0f, 0x60, 0x27, 0x15, 0x01, 0xb9, 0xbd, 0xa6, + 0xd5, 0xb2, 0x0e, 0x6d, 0x6f, 0x15, 0x21, 0x42, 0x59, 0x8a, 0x98, 0x9a, 0xa5, 0x96, 0x75, 0xe8, + 0x78, 0xe6, 0x8c, 0x1f, 0x00, 0xa8, 0xac, 0x6a, 0x3c, 0x15, 0x01, 0x35, 0xed, 0x96, 0x75, 0xd8, + 0xe8, 0xb4, 0x8e, 0xb6, 0x3e, 0x7a, 0x64, 0xe8, 0x4f, 0x45, 0x40, 0x9e, 0xa3, 0x8a, 0x23, 0x7e, + 0x08, 0x40, 0x2f, 0xb5, 0xf4, 0xc7, 0x3c, 0xbd, 0x14, 0xcd, 0x72, 0xcb, 0x3e, 0xac, 0x75, 0xde, + 0xbc, 0x49, 0xb0, 0xd2, 0xfa, 0x31, 0x2d, 0x9f, 0xfb, 0xf1, 0x82, 0x2e, 0x7c, 0x2e, 0x3d, 0xc7, + 0x14, 0xb9, 0xe9, 0xa5, 0x68, 0xff, 0x65, 0xc1, 0xbd, 0x21, 0xc9, 0x2b, 0x3e, 0x2d, 0x0c, 0xdc, + 0x14, 0x65, 0xfd, 0x77, 0x51, 0x27, 0x50, 0xcb, 0x3c, 0x8f, 0x0d, 0xa2, 0x9a, 0xa5, 0x6d, 0xaa, + 0xd6, 0x0c, 0x9b, 0xce, 0x79, 0x90, 0x6e, 0xba, 0x78, 0xd3, 0x98, 0xfd, 0x3f, 0x8c, 0x3d, 0x87, + 0x6a, 0xc6, 0x9d, 0x9d, 0xf1, 0x5d, 0xa8, 0xf8, 0x41, 0x20, 0x49, 0x29, 0xe3, 0xa7, 0xd6, 0x79, + 0xbc, 0x95, 0xaa, 0x9b, 0xe7, 0x78, 0x45, 0xf2, 0xb6, 0x99, 0xb5, 0x3f, 0x07, 0x70, 0x53, 0xae, + 0x2f, 0x7c, 0xe9, 0x27, 0xb7, 0x4f, 0xbb, 0x07, 0x75, 0xa5, 0x7d, 0xa9, 0xc7, 0x73, 0x93, 0xb7, + 0xbd, 0x09, 0xdb, 0x1c, 0xd4, 0x4c, 0x59, 0xce, 0xde, 0x7e, 0x0b, 0x60, 0xa8, 0x25, 0x4f, 0xc3, + 0x3e, 0x57, 0x3a, 0x7b, 0xeb, 0x2a, 0xcb, 0xcb, 0xd9, 0x1c, 0x6f, 0x15, 0xb5, 0xbf, 0xb1, 0xa0, + 0x72, 0xae, 0xc2, 0x13, 0x5f, 0x11, 0xbe, 0x0f, 0xd5, 0x44, 0x85, 0x63, 0xbd, 0x9c, 0x17, 0xa3, + 0x7b, 0xe3, 0x96, 0xc6, 0x9f, 0xab, 0x70, 0xb4, 0x9c, 0x53, 0xc7, 0xab, 0x24, 0xf9, 0x09, 0xf7, + 0xe1, 0x6e, 0xa2, 0x42, 0xb7, 0x67, 0xec, 0xda, 0x5e, 0x1e, 0xe0, 0x63, 0x70, 0x34, 0x4f, 0x48, + 0x69, 0x3f, 0x99, 0x9b, 0x15, 0x2d, 0x7b, 0x1b, 0x00, 0x1f, 0x41, 0x55, 0x89, 0x85, 0x9c, 0x66, + 0x1d, 0x28, 0x9b, 0xb2, 0x75, 0xdc, 0xee, 0x42, 0x6d, 0xc4, 0x13, 0x1a, 0xf1, 0x69, 0x74, 0xae, + 0x42, 0xec, 0x40, 0x79, 0xe2, 0x2b, 0x5a, 0x4d, 0xe0, 0xf5, 0xdb, 0x65, 0x65, 0x46, 0x3c, 0x93, + 0xfb, 0xe4, 0x4f, 0x1b, 0xaa, 0x85, 0x50, 0x74, 0xe0, 0x6e, 0x34, 0x10, 0x29, 0xb1, 0x3b, 0x78, + 0x00, 0xf7, 0xa3, 0x53, 0x49, 0x66, 0xe3, 0xe2, 0x98, 0xa6, 0x9a, 0x8b, 0x94, 0x05, 0xb8, 0x07, + 0xbb, 0x51, 0x4f, 0x8a, 0xf9, 0x35, 0x90, 0x10, 0xa1, 0x11, 0x9d, 0xf9, 0xea, 0x1a, 0x76, 0x89, + 0x0f, 0x61, 0x2f, 0xea, 0x91, 0x9a, 0x4a, 0x3e, 0xb9, 0xce, 0x10, 0xe2, 0x3e, 0xb0, 0x68, 0x38, + 0x13, 0x5f, 0x6c, 0x40, 0xc5, 0x66, 0x86, 0xe2, 0x19, 0xe9, 0xe1, 0x52, 0x9d, 0x8a, 0xf4, 0x92, + 0x87, 0x8a, 0x71, 0x3c, 0x00, 0xb6, 0x92, 0x70, 0xe1, 0x4b, 0xcd, 0x4d, 0xfd, 0x2f, 0x16, 0xee, + 0x41, 0xc3, 0x48, 0xd8, 0x80, 0xbf, 0x5a, 0x88, 0x70, 0x2f, 0x93, 0xb0, 0xc1, 0x7e, 0xb3, 0xf0, + 0x21, 0xe0, 0x5a, 0xc2, 0xe6, 0xe2, 0x77, 0x0b, 0xf7, 0x61, 0xd7, 0x48, 0x58, 0x83, 0x8a, 0xfd, + 0x61, 0xe1, 0x7d, 0xa8, 0xaf, 0x9e, 0x73, 0xd3, 0x80, 0x5e, 0xb2, 0x6f, 0x4b, 0xf9, 0x53, 0x2b, + 0x86, 0x1c, 0xfc, 0xae, 0x84, 0xaf, 0xc1, 0x83, 0x9b, 0xe0, 0x85, 0x14, 0x61, 0xb6, 0xcc, 0xec, + 0xfb, 0x12, 0xd6, 0xa1, 0x12, 0xb9, 0xa9, 0x22, 0xa9, 0xd9, 0x97, 0xb6, 0x89, 0x7a, 0x14, 0x93, + 0x26, 0xf6, 0x95, 0x8d, 0x35, 0xd8, 0x89, 0x3e, 0x8a, 0x17, 0x6a, 0xc6, 0xbe, 0xce, 0xaf, 0x86, + 0xe4, 0xcb, 0xe9, 0x8c, 0xfd, 0x6d, 0x1b, 0xf9, 0x79, 0xe4, 0x91, 0x5a, 0xc4, 0x9a, 0xfd, 0x63, + 0x63, 0x03, 0x9c, 0xa8, 0x98, 0x2e, 0xfb, 0xc1, 0x31, 0xaa, 0x3f, 0x5d, 0x90, 0x5c, 0x16, 0x1f, + 0xb4, 0x62, 0x3f, 0x3a, 0xb8, 0x0b, 0x10, 0xf5, 0x85, 0x1f, 0xe4, 0xf2, 0x7e, 0x72, 0x9e, 0xbc, + 0x07, 0xce, 0xfa, 0x9f, 0x81, 0x0c, 0xea, 0xee, 0xc0, 0x1d, 0xb9, 0xdd, 0xbe, 0xfb, 0xc2, 0x1d, + 0x3c, 0x63, 0x77, 0xb0, 0x06, 0x95, 0xb3, 0xa7, 0xdd, 0xfe, 0xe8, 0xec, 0x33, 0x66, 0x61, 0x1d, + 0xaa, 0xdd, 0x93, 0xc1, 0x27, 0xde, 0x79, 0xb7, 0xcf, 0x4a, 0x27, 0x4f, 0x5f, 0x9c, 0x86, 0x5c, + 0xcf, 0x16, 0x93, 0xec, 0xb3, 0x39, 0x7e, 0xc5, 0xe3, 0x98, 0xbf, 0xd2, 0x34, 0x9d, 0x1d, 0xe7, + 0x7b, 0xf4, 0x76, 0xc0, 0x95, 0x96, 0x7c, 0xb2, 0xd0, 0x14, 0x1c, 0x17, 0xdb, 0x74, 0x6c, 0x96, + 0x6b, 0x1d, 0xce, 0x27, 0x9d, 0xc9, 0x8e, 0x81, 0xde, 0xf9, 0x37, 0x00, 0x00, 0xff, 0xff, 0x38, + 0xa8, 0x72, 0x51, 0xeb, 0x05, 0x00, 0x00, } diff --git a/internal/proxy/condition.go b/internal/proxynode/condition.go similarity index 97% rename from internal/proxy/condition.go rename to internal/proxynode/condition.go index 2a384c427d..2ebf078b91 100644 --- a/internal/proxy/condition.go +++ b/internal/proxynode/condition.go @@ -1,4 +1,4 @@ -package proxy +package proxynode import ( "context" diff --git a/internal/proxy/grpc_service.go b/internal/proxynode/grpc_service.go similarity index 99% rename from internal/proxy/grpc_service.go rename to internal/proxynode/grpc_service.go index 29bb0eeecd..d1d69c4778 100644 --- a/internal/proxy/grpc_service.go +++ b/internal/proxynode/grpc_service.go @@ -1,4 +1,4 @@ -package proxy +package proxynode import ( "context" diff --git a/internal/proxy/meta_cache.go b/internal/proxynode/meta_cache.go similarity index 99% rename from internal/proxy/meta_cache.go rename to internal/proxynode/meta_cache.go index edb7344c17..a06367b554 100644 --- a/internal/proxy/meta_cache.go +++ b/internal/proxynode/meta_cache.go @@ -1,4 +1,4 @@ -package proxy +package proxynode import ( "context" diff --git a/internal/proxy/paramtable.go b/internal/proxynode/paramtable.go similarity index 89% rename from internal/proxy/paramtable.go rename to internal/proxynode/paramtable.go index 3283b3d916..3c3c205cd5 100644 --- a/internal/proxy/paramtable.go +++ b/internal/proxynode/paramtable.go @@ -1,4 +1,4 @@ -package proxy +package proxynode import ( "log" @@ -20,7 +20,7 @@ var Params ParamTable func (pt *ParamTable) Init() { pt.BaseTable.Init() - err := pt.LoadYaml("advanced/proxy.yaml") + err := pt.LoadYaml("advanced/proxy_node.yaml") if err != nil { panic(err) } @@ -38,11 +38,11 @@ func (pt *ParamTable) Init() { } func (pt *ParamTable) NetworkPort() int { - return pt.ParseInt("proxy.port") + return pt.ParseInt("proxyNode.port") } func (pt *ParamTable) NetworkAddress() string { - addr, err := pt.Load("proxy.address") + addr, err := pt.Load("proxyNode.address") if err != nil { panic(err) } @@ -50,11 +50,11 @@ func (pt *ParamTable) NetworkAddress() string { hostName, _ := net.LookupHost(addr) if len(hostName) <= 0 { if ip := net.ParseIP(addr); ip == nil { - panic("invalid ip proxy.address") + panic("invalid ip proxyNode.address") } } - port, err := pt.Load("proxy.port") + port, err := pt.Load("proxyNode.port") if err != nil { panic(err) } @@ -100,7 +100,7 @@ func (pt *ParamTable) queryNodeIDList() []UniqueID { for _, i := range queryNodeIDs { v, err := strconv.Atoi(i) if err != nil { - log.Panicf("load proxy id list error, %s", err.Error()) + log.Panicf("load proxynode id list error, %s", err.Error()) } ret = append(ret, UniqueID(v)) } @@ -120,7 +120,7 @@ func (pt *ParamTable) ProxyID() UniqueID { } func (pt *ParamTable) TimeTickInterval() time.Duration { - internalStr, err := pt.Load("proxy.timeTickInterval") + internalStr, err := pt.Load("proxyNode.timeTickInterval") if err != nil { panic(err) } @@ -279,27 +279,27 @@ func (pt *ParamTable) DataDefinitionChannelNames() []string { } func (pt *ParamTable) MsgStreamInsertBufSize() int64 { - return pt.ParseInt64("proxy.msgStream.insert.bufSize") + return pt.ParseInt64("proxyNode.msgStream.insert.bufSize") } func (pt *ParamTable) MsgStreamSearchBufSize() int64 { - return pt.ParseInt64("proxy.msgStream.search.bufSize") + return pt.ParseInt64("proxyNode.msgStream.search.bufSize") } func (pt *ParamTable) MsgStreamSearchResultBufSize() int64 { - return pt.ParseInt64("proxy.msgStream.searchResult.recvBufSize") + return pt.ParseInt64("proxyNode.msgStream.searchResult.recvBufSize") } func (pt *ParamTable) MsgStreamSearchResultPulsarBufSize() int64 { - return pt.ParseInt64("proxy.msgStream.searchResult.pulsarBufSize") + return pt.ParseInt64("proxyNode.msgStream.searchResult.pulsarBufSize") } func (pt *ParamTable) MsgStreamTimeTickBufSize() int64 { - return pt.ParseInt64("proxy.msgStream.timeTick.bufSize") + return pt.ParseInt64("proxyNode.msgStream.timeTick.bufSize") } func (pt *ParamTable) MaxNameLength() int64 { - str, err := pt.Load("proxy.maxNameLength") + str, err := pt.Load("proxyNode.maxNameLength") if err != nil { panic(err) } @@ -311,7 +311,7 @@ func (pt *ParamTable) MaxNameLength() int64 { } func (pt *ParamTable) MaxFieldNum() int64 { - str, err := pt.Load("proxy.maxFieldNum") + str, err := pt.Load("proxyNode.maxFieldNum") if err != nil { panic(err) } @@ -323,7 +323,7 @@ func (pt *ParamTable) MaxFieldNum() int64 { } func (pt *ParamTable) MaxDimension() int64 { - str, err := pt.Load("proxy.maxDimension") + str, err := pt.Load("proxyNode.maxDimension") if err != nil { panic(err) } diff --git a/internal/proxy/paramtable_test.go b/internal/proxynode/paramtable_test.go similarity index 98% rename from internal/proxy/paramtable_test.go rename to internal/proxynode/paramtable_test.go index 327326d439..72a3e0ab51 100644 --- a/internal/proxy/paramtable_test.go +++ b/internal/proxynode/paramtable_test.go @@ -1,4 +1,4 @@ -package proxy +package proxynode import ( "fmt" diff --git a/internal/proxy/proxy.go b/internal/proxynode/proxy.go similarity index 98% rename from internal/proxy/proxy.go rename to internal/proxynode/proxy.go index 8c5508a815..c6026d884a 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxynode/proxy.go @@ -1,4 +1,4 @@ -package proxy +package proxynode import ( "context" @@ -67,7 +67,7 @@ func CreateProxy(ctx context.Context) (*Proxy, error) { } cfg := &config.Configuration{ - ServiceName: "proxy", + ServiceName: "proxynode", Sampler: &config.SamplerConfig{ Type: "const", Param: 1, @@ -238,5 +238,5 @@ func (p *Proxy) Close() { for _, cb := range p.closeCallbacks { cb() } - log.Print("proxy closed.") + log.Print("proxynode closed.") } diff --git a/internal/proxy/proxy_test.go b/internal/proxynode/proxy_test.go similarity index 98% rename from internal/proxy/proxy_test.go rename to internal/proxynode/proxy_test.go index 486e5a23fb..782123fea6 100644 --- a/internal/proxy/proxy_test.go +++ b/internal/proxynode/proxy_test.go @@ -1,4 +1,4 @@ -package proxy +package proxynode import ( "context" @@ -88,12 +88,12 @@ func startProxy(ctx context.Context) { svr, err := CreateProxy(ctx) proxyServer = svr if err != nil { - log.Print("create proxy failed", zap.Error(err)) + log.Print("create proxynode failed", zap.Error(err)) } // TODO: change to wait until master is ready if err := svr.Start(); err != nil { - log.Fatal("run proxy failed", zap.Error(err)) + log.Fatal("run proxynode failed", zap.Error(err)) } } @@ -111,7 +111,7 @@ func setup() { conn, err := grpc.DialContext(ctx, proxyAddr, grpc.WithInsecure(), grpc.WithBlock()) if err != nil { - log.Fatalf("Connect to proxy failed, error= %v", err) + log.Fatalf("Connect to proxynode failed, error= %v", err) } proxyConn = conn proxyClient = servicepb.NewMilvusServiceClient(proxyConn) diff --git a/internal/proxy/repack_func.go b/internal/proxynode/repack_func.go similarity index 99% rename from internal/proxy/repack_func.go rename to internal/proxynode/repack_func.go index 45c4f4d0ab..864e3907c0 100644 --- a/internal/proxy/repack_func.go +++ b/internal/proxynode/repack_func.go @@ -1,4 +1,4 @@ -package proxy +package proxynode import ( "log" diff --git a/internal/proxy/task.go b/internal/proxynode/task.go similarity index 99% rename from internal/proxy/task.go rename to internal/proxynode/task.go index ba7a853c20..f0384c02b7 100644 --- a/internal/proxy/task.go +++ b/internal/proxynode/task.go @@ -1,4 +1,4 @@ -package proxy +package proxynode import ( "context" diff --git a/internal/proxy/task_scheduler.go b/internal/proxynode/task_scheduler.go similarity index 99% rename from internal/proxy/task_scheduler.go rename to internal/proxynode/task_scheduler.go index c529e22991..20b3eeadc5 100644 --- a/internal/proxy/task_scheduler.go +++ b/internal/proxynode/task_scheduler.go @@ -1,4 +1,4 @@ -package proxy +package proxynode import ( "container/list" @@ -410,7 +410,7 @@ func (sched *TaskScheduler) queryResultLoop() { } } case <-sched.ctx.Done(): - log.Print("proxy server is closed ...") + log.Print("proxynode server is closed ...") return } } diff --git a/internal/proxy/timetick.go b/internal/proxynode/timetick.go similarity index 95% rename from internal/proxy/timetick.go rename to internal/proxynode/timetick.go index f47960e3af..3f8abfb073 100644 --- a/internal/proxy/timetick.go +++ b/internal/proxynode/timetick.go @@ -1,4 +1,4 @@ -package proxy +package proxynode import ( "context" @@ -81,9 +81,9 @@ func (tt *timeTick) tick() error { msgPack.Msgs = append(msgPack.Msgs, timeTickMsg) err := tt.tickMsgStream.Produce(&msgPack) if err != nil { - log.Printf("proxy send time tick error: %v", err) + log.Printf("proxynode send time tick error: %v", err) } else { - //log.Printf("proxy send time tick message") + //log.Printf("proxynode send time tick message") } tt.tickLock.Lock() defer tt.tickLock.Unlock() diff --git a/internal/proxy/timetick_test.go b/internal/proxynode/timetick_test.go similarity index 97% rename from internal/proxy/timetick_test.go rename to internal/proxynode/timetick_test.go index 08bf9dd0f2..3f0e63700e 100644 --- a/internal/proxy/timetick_test.go +++ b/internal/proxynode/timetick_test.go @@ -1,4 +1,4 @@ -package proxy +package proxynode import ( "context" diff --git a/internal/proxy/util.go b/internal/proxynode/util.go similarity index 97% rename from internal/proxy/util.go rename to internal/proxynode/util.go index 7546aa4d27..2a8a269420 100644 --- a/internal/proxy/util.go +++ b/internal/proxynode/util.go @@ -1,4 +1,4 @@ -package proxy +package proxynode import ( "log" diff --git a/internal/proxy/validate_util.go b/internal/proxynode/validate_util.go similarity index 99% rename from internal/proxy/validate_util.go rename to internal/proxynode/validate_util.go index a094d5d5a1..b07d6532ed 100644 --- a/internal/proxy/validate_util.go +++ b/internal/proxynode/validate_util.go @@ -1,4 +1,4 @@ -package proxy +package proxynode import ( "strconv" diff --git a/internal/proxy/validate_util_test.go b/internal/proxynode/validate_util_test.go similarity index 99% rename from internal/proxy/validate_util_test.go rename to internal/proxynode/validate_util_test.go index f32e1982a0..54f3ffb19b 100644 --- a/internal/proxy/validate_util_test.go +++ b/internal/proxynode/validate_util_test.go @@ -1,4 +1,4 @@ -package proxy +package proxynode import ( "testing" diff --git a/internal/querynode/load_index_service_test.go b/internal/querynode/load_index_service_test.go index 852d976366..71d79ba3ac 100644 --- a/internal/querynode/load_index_service_test.go +++ b/internal/querynode/load_index_service_test.go @@ -13,7 +13,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" - "github.com/zilliztech/milvus-distributed/internal/indexbuilder" + "github.com/zilliztech/milvus-distributed/internal/indexnode" minioKV "github.com/zilliztech/milvus-distributed/internal/kv/minio" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" @@ -243,7 +243,7 @@ func TestLoadIndexService_FloatVector(t *testing.T) { indexRowData = append(indexRowData, float32(n*i)) } } - index, err := indexbuilder.NewCIndex(typeParams, indexParams) + index, err := indexnode.NewCIndex(typeParams, indexParams) assert.Nil(t, err) err = index.BuildFloatVecIndexWithoutIds(indexRowData) assert.Equal(t, err, nil) @@ -551,7 +551,7 @@ func TestLoadIndexService_BinaryVector(t *testing.T) { // generator index typeParams := make(map[string]string) typeParams["dim"] = "128" - index, err := indexbuilder.NewCIndex(typeParams, indexParams) + index, err := indexnode.NewCIndex(typeParams, indexParams) assert.Nil(t, err) err = index.BuildBinaryVecIndexWithoutIds(indexRowData) assert.Equal(t, err, nil) diff --git a/internal/util/typeutil/service.go b/internal/util/typeutil/service.go new file mode 100644 index 0000000000..23da323186 --- /dev/null +++ b/internal/util/typeutil/service.go @@ -0,0 +1,14 @@ +package typeutil + +import ( + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" +) + +type Service interface { + Init() + Start() + Stop() + GetServiceStates() (internalpb2.ServiceStates, error) + GetTimeTickChannel() (string, error) + GetStatisticsChannel() (string, error) +} diff --git a/scripts/run_go_unittest.sh b/scripts/run_go_unittest.sh index 28ed949b4b..48438fa2a7 100755 --- a/scripts/run_go_unittest.sh +++ b/scripts/run_go_unittest.sh @@ -15,8 +15,9 @@ MILVUS_DIR="${ROOT_DIR}/internal/" echo $MILVUS_DIR go test -race -cover "${MILVUS_DIR}/kv/..." -failfast -go test -race -cover "${MILVUS_DIR}/proxy/..." -failfast +go test -race -cover "${MILVUS_DIR}/proxynode/..." -failfast go test -race -cover "${MILVUS_DIR}/writenode/..." -failfast go test -race -cover "${MILVUS_DIR}/master/..." -failfast +go test -race -cover "${MILVUS_DIR}/indexnode/..." -failfast go test -race -cover "${MILVUS_DIR}/msgstream/..." "${MILVUS_DIR}/querynode/..." "${MILVUS_DIR}/storage" "${MILVUS_DIR}/util/..." -failfast #go test -race -cover "${MILVUS_DIR}/kv/..." "${MILVUS_DIR}/msgstream/..." "${MILVUS_DIR}/master/..." "${MILVUS_DIR}/querynode/..." -failfast