mirror of https://github.com/milvus-io/milvus.git
169 lines
5.9 KiB
Go
169 lines
5.9 KiB
Go
// Licensed to the LF AI & Data foundation under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package bulkwriter
|
|
|
|
import (
|
|
"context"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"strings"
|
|
"testing"
|
|
|
|
"github.com/stretchr/testify/suite"
|
|
)
|
|
|
|
type BulkImportSuite struct {
|
|
suite.Suite
|
|
}
|
|
|
|
func (s *BulkImportSuite) TestBulkImport() {
|
|
s.Run("normal_case", func() {
|
|
svr := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
|
authHeader := req.Header.Get("Authorization")
|
|
s.Equal("Bearer root:Milvus", authHeader)
|
|
s.True(strings.Contains(req.URL.Path, "/v2/vectordb/jobs/import/create"))
|
|
rw.Write([]byte(`{"status":0, "data":{"jobId": "123"}}`))
|
|
}))
|
|
defer svr.Close()
|
|
|
|
resp, err := BulkImport(context.Background(),
|
|
NewBulkImportOption(svr.URL, "hello_milvus", [][]string{{"files/a.json", "files/b.json"}}).
|
|
WithPartition("_default").
|
|
WithOption("backup", "true").
|
|
WithAPIKey("root:Milvus"),
|
|
)
|
|
s.NoError(err)
|
|
s.EqualValues(0, resp.Status)
|
|
s.Equal("123", resp.Data.JobID)
|
|
})
|
|
|
|
s.Run("svr_error", func() {
|
|
svr := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
|
// rw.
|
|
rw.WriteHeader(http.StatusInternalServerError)
|
|
rw.Write([]byte(`interal server error`))
|
|
}))
|
|
defer svr.Close()
|
|
|
|
_, err := BulkImport(context.Background(), NewBulkImportOption(svr.URL, "hello_milvus", [][]string{{"files/a.json", "files/b.json"}}))
|
|
s.Error(err)
|
|
})
|
|
|
|
s.Run("status_error", func() {
|
|
svr := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
|
s.True(strings.Contains(req.URL.Path, "/v2/vectordb/jobs/import/create"))
|
|
rw.Write([]byte(`{"status":1100, "message": "import job failed"}`))
|
|
}))
|
|
defer svr.Close()
|
|
|
|
_, err := BulkImport(context.Background(), NewBulkImportOption(svr.URL, "hello_milvus", [][]string{{"files/a.json", "files/b.json"}}))
|
|
s.Error(err)
|
|
})
|
|
|
|
s.Run("server_closed", func() {
|
|
svr2 := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {}))
|
|
svr2.Close()
|
|
_, err := BulkImport(context.Background(), NewBulkImportOption(svr2.URL, "hello_milvus", [][]string{{"files/a.json", "files/b.json"}}))
|
|
s.Error(err)
|
|
})
|
|
}
|
|
|
|
func (s *BulkImportSuite) TestListImportJobs() {
|
|
s.Run("normal_case", func() {
|
|
svr := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
|
authHeader := req.Header.Get("Authorization")
|
|
s.Equal("Bearer root:Milvus", authHeader)
|
|
s.True(strings.Contains(req.URL.Path, "/v2/vectordb/jobs/import/list"))
|
|
rw.Write([]byte(`{"status":0, "data":{"records": [{"jobID": "abc", "collectionName": "hello_milvus", "state":"Importing", "progress": 50}]}}`))
|
|
}))
|
|
defer svr.Close()
|
|
|
|
resp, err := ListImportJobs(context.Background(),
|
|
NewListImportJobsOption(svr.URL, "hello_milvus").
|
|
WithPageSize(10).
|
|
WithCurrentPage(1).
|
|
WithAPIKey("root:Milvus"),
|
|
)
|
|
s.NoError(err)
|
|
s.EqualValues(0, resp.Status)
|
|
if s.Len(resp.Data.Records, 1) {
|
|
record := resp.Data.Records[0]
|
|
s.Equal("abc", record.JobID)
|
|
s.Equal("hello_milvus", record.CollectionName)
|
|
s.Equal("Importing", record.State)
|
|
s.EqualValues(50, record.Progress)
|
|
}
|
|
})
|
|
|
|
s.Run("svr_error", func() {
|
|
svr := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
|
rw.WriteHeader(http.StatusInternalServerError)
|
|
}))
|
|
defer svr.Close()
|
|
|
|
_, err := ListImportJobs(context.Background(), NewListImportJobsOption(svr.URL, "hello_milvus"))
|
|
s.Error(err)
|
|
})
|
|
}
|
|
|
|
func (s *BulkImportSuite) TestGetImportProgress() {
|
|
s.Run("normal_case", func() {
|
|
svr := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
|
authHeader := req.Header.Get("Authorization")
|
|
s.Equal("Bearer root:Milvus", authHeader)
|
|
s.True(strings.Contains(req.URL.Path, "/v2/vectordb/jobs/import/describe"))
|
|
rw.Write([]byte(`{"status":0, "data":{"collectionName": "hello_milvus","jobId":"abc", "state":"Importing", "progress": 50, "importedRows": 20000,"totalRows": 40000, "details":[{"fileName": "files/a.json", "fileSize": 64312, "progress": 100, "state": "Completed"}, {"fileName":"files/b.json", "fileSize":52912, "progress":0, "state":"Importing"}]}}`))
|
|
}))
|
|
defer svr.Close()
|
|
|
|
resp, err := GetImportProgress(context.Background(),
|
|
NewGetImportProgressOption(svr.URL, "abc").
|
|
WithAPIKey("root:Milvus"),
|
|
)
|
|
s.NoError(err)
|
|
s.EqualValues(0, resp.Status)
|
|
s.Equal("hello_milvus", resp.Data.CollectionName)
|
|
s.Equal("abc", resp.Data.JobID)
|
|
s.Equal("Importing", resp.Data.State)
|
|
s.EqualValues(50, resp.Data.Progress)
|
|
if s.Len(resp.Data.Details, 2) {
|
|
detail1 := resp.Data.Details[0]
|
|
s.Equal("files/a.json", detail1.FileName)
|
|
s.Equal("Completed", detail1.State)
|
|
s.EqualValues(100, detail1.Progress)
|
|
detail2 := resp.Data.Details[1]
|
|
s.Equal("files/b.json", detail2.FileName)
|
|
s.Equal("Importing", detail2.State)
|
|
s.EqualValues(0, detail2.Progress)
|
|
}
|
|
})
|
|
|
|
s.Run("svr_error", func() {
|
|
svr := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
|
rw.WriteHeader(http.StatusInternalServerError)
|
|
}))
|
|
defer svr.Close()
|
|
|
|
_, err := GetImportProgress(context.Background(), NewGetImportProgressOption(svr.URL, "abc"))
|
|
s.Error(err)
|
|
})
|
|
}
|
|
|
|
func TestBulkImportAPIs(t *testing.T) {
|
|
suite.Run(t, new(BulkImportSuite))
|
|
}
|