milvus/client/bulkwriter/bulk_import_test.go

169 lines
5.9 KiB
Go

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bulkwriter
import (
"context"
"net/http"
"net/http/httptest"
"strings"
"testing"
"github.com/stretchr/testify/suite"
)
type BulkImportSuite struct {
suite.Suite
}
func (s *BulkImportSuite) TestBulkImport() {
s.Run("normal_case", func() {
svr := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
authHeader := req.Header.Get("Authorization")
s.Equal("Bearer root:Milvus", authHeader)
s.True(strings.Contains(req.URL.Path, "/v2/vectordb/jobs/import/create"))
rw.Write([]byte(`{"status":0, "data":{"jobId": "123"}}`))
}))
defer svr.Close()
resp, err := BulkImport(context.Background(),
NewBulkImportOption(svr.URL, "hello_milvus", [][]string{{"files/a.json", "files/b.json"}}).
WithPartition("_default").
WithOption("backup", "true").
WithAPIKey("root:Milvus"),
)
s.NoError(err)
s.EqualValues(0, resp.Status)
s.Equal("123", resp.Data.JobID)
})
s.Run("svr_error", func() {
svr := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
// rw.
rw.WriteHeader(http.StatusInternalServerError)
rw.Write([]byte(`interal server error`))
}))
defer svr.Close()
_, err := BulkImport(context.Background(), NewBulkImportOption(svr.URL, "hello_milvus", [][]string{{"files/a.json", "files/b.json"}}))
s.Error(err)
})
s.Run("status_error", func() {
svr := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
s.True(strings.Contains(req.URL.Path, "/v2/vectordb/jobs/import/create"))
rw.Write([]byte(`{"status":1100, "message": "import job failed"}`))
}))
defer svr.Close()
_, err := BulkImport(context.Background(), NewBulkImportOption(svr.URL, "hello_milvus", [][]string{{"files/a.json", "files/b.json"}}))
s.Error(err)
})
s.Run("server_closed", func() {
svr2 := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {}))
svr2.Close()
_, err := BulkImport(context.Background(), NewBulkImportOption(svr2.URL, "hello_milvus", [][]string{{"files/a.json", "files/b.json"}}))
s.Error(err)
})
}
func (s *BulkImportSuite) TestListImportJobs() {
s.Run("normal_case", func() {
svr := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
authHeader := req.Header.Get("Authorization")
s.Equal("Bearer root:Milvus", authHeader)
s.True(strings.Contains(req.URL.Path, "/v2/vectordb/jobs/import/list"))
rw.Write([]byte(`{"status":0, "data":{"records": [{"jobID": "abc", "collectionName": "hello_milvus", "state":"Importing", "progress": 50}]}}`))
}))
defer svr.Close()
resp, err := ListImportJobs(context.Background(),
NewListImportJobsOption(svr.URL, "hello_milvus").
WithPageSize(10).
WithCurrentPage(1).
WithAPIKey("root:Milvus"),
)
s.NoError(err)
s.EqualValues(0, resp.Status)
if s.Len(resp.Data.Records, 1) {
record := resp.Data.Records[0]
s.Equal("abc", record.JobID)
s.Equal("hello_milvus", record.CollectionName)
s.Equal("Importing", record.State)
s.EqualValues(50, record.Progress)
}
})
s.Run("svr_error", func() {
svr := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.WriteHeader(http.StatusInternalServerError)
}))
defer svr.Close()
_, err := ListImportJobs(context.Background(), NewListImportJobsOption(svr.URL, "hello_milvus"))
s.Error(err)
})
}
func (s *BulkImportSuite) TestGetImportProgress() {
s.Run("normal_case", func() {
svr := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
authHeader := req.Header.Get("Authorization")
s.Equal("Bearer root:Milvus", authHeader)
s.True(strings.Contains(req.URL.Path, "/v2/vectordb/jobs/import/describe"))
rw.Write([]byte(`{"status":0, "data":{"collectionName": "hello_milvus","jobId":"abc", "state":"Importing", "progress": 50, "importedRows": 20000,"totalRows": 40000, "details":[{"fileName": "files/a.json", "fileSize": 64312, "progress": 100, "state": "Completed"}, {"fileName":"files/b.json", "fileSize":52912, "progress":0, "state":"Importing"}]}}`))
}))
defer svr.Close()
resp, err := GetImportProgress(context.Background(),
NewGetImportProgressOption(svr.URL, "abc").
WithAPIKey("root:Milvus"),
)
s.NoError(err)
s.EqualValues(0, resp.Status)
s.Equal("hello_milvus", resp.Data.CollectionName)
s.Equal("abc", resp.Data.JobID)
s.Equal("Importing", resp.Data.State)
s.EqualValues(50, resp.Data.Progress)
if s.Len(resp.Data.Details, 2) {
detail1 := resp.Data.Details[0]
s.Equal("files/a.json", detail1.FileName)
s.Equal("Completed", detail1.State)
s.EqualValues(100, detail1.Progress)
detail2 := resp.Data.Details[1]
s.Equal("files/b.json", detail2.FileName)
s.Equal("Importing", detail2.State)
s.EqualValues(0, detail2.Progress)
}
})
s.Run("svr_error", func() {
svr := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.WriteHeader(http.StatusInternalServerError)
}))
defer svr.Close()
_, err := GetImportProgress(context.Background(), NewGetImportProgressOption(svr.URL, "abc"))
s.Error(err)
})
}
func TestBulkImportAPIs(t *testing.T) {
suite.Run(t, new(BulkImportSuite))
}