mirror of https://github.com/milvus-io/milvus.git
Fix bug of estimating record size
Signed-off-by: sunby <bingyi.sun@zilliz.com>pull/4973/head^2
parent
ee53a1371a
commit
ff8ebd5f42
|
@ -13,9 +13,8 @@ const (
|
|||
)
|
||||
|
||||
type Request interface {
|
||||
Wait()
|
||||
Wait() error
|
||||
Notify(error)
|
||||
IsValid() bool
|
||||
}
|
||||
|
||||
type BaseRequest struct {
|
||||
|
@ -23,13 +22,9 @@ type BaseRequest struct {
|
|||
Valid bool
|
||||
}
|
||||
|
||||
func (req *BaseRequest) Wait() {
|
||||
func (req *BaseRequest) Wait() error {
|
||||
err := <-req.Done
|
||||
req.Valid = err == nil
|
||||
}
|
||||
|
||||
func (req *BaseRequest) IsValid() bool {
|
||||
return req.Valid
|
||||
return err
|
||||
}
|
||||
|
||||
func (req *BaseRequest) Notify(err error) {
|
||||
|
@ -252,5 +247,5 @@ func (ta *Allocator) Close() {
|
|||
func (ta *Allocator) CleanCache() {
|
||||
req := &SyncRequest{BaseRequest: BaseRequest{Done: make(chan error), Valid: false}}
|
||||
ta.ForceSyncChan <- req
|
||||
req.Wait()
|
||||
_ = req.Wait()
|
||||
}
|
||||
|
|
|
@ -144,11 +144,10 @@ func (ia *IDAllocator) Alloc(count uint32) (UniqueID, UniqueID, error) {
|
|||
|
||||
req.count = count
|
||||
ia.Reqs <- req
|
||||
req.Wait()
|
||||
|
||||
if !req.IsValid() {
|
||||
return 0, 0, nil
|
||||
if err := req.Wait(); err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
start, count := req.id, req.count
|
||||
return start, start + int64(count), nil
|
||||
}
|
||||
|
|
|
@ -2,7 +2,7 @@ package allocator
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
|
@ -144,10 +144,8 @@ func (ta *TimestampAllocator) Alloc(count uint32) ([]Timestamp, error) {
|
|||
}
|
||||
req.count = count
|
||||
ta.Reqs <- req
|
||||
req.Wait()
|
||||
|
||||
if !req.IsValid() {
|
||||
return nil, errors.New("alloc time stamp request failed")
|
||||
if err := req.Wait(); err != nil {
|
||||
return nil, fmt.Errorf("alloc time stamp request failed: %s", err)
|
||||
}
|
||||
|
||||
start, count := req.timestamp, req.count
|
||||
|
|
|
@ -355,10 +355,9 @@ func (sa *SegIDAssigner) GetSegmentID(collID UniqueID, partitionID UniqueID, cha
|
|||
timestamp: ts,
|
||||
}
|
||||
sa.Reqs <- req
|
||||
req.Wait()
|
||||
|
||||
if !req.IsValid() {
|
||||
return nil, errors.New("GetSegmentID Failed")
|
||||
if err := req.Wait(); err != nil {
|
||||
return nil, fmt.Errorf("GetSegmentID failed: %s", err)
|
||||
}
|
||||
|
||||
return req.segInfo, nil
|
||||
}
|
||||
|
|
|
@ -20,14 +20,26 @@ func EstimateSizePerRecord(schema *schemapb.CollectionSchema) (int, error) {
|
|||
res += 8
|
||||
case schemapb.DataType_String:
|
||||
res += 125 // todo find a better way to estimate string type
|
||||
case schemapb.DataType_BinaryVector, schemapb.DataType_FloatVector:
|
||||
case schemapb.DataType_BinaryVector:
|
||||
for _, kv := range fs.TypeParams {
|
||||
if kv.Key == "dim" {
|
||||
v, err := strconv.Atoi(kv.Value)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
res += v
|
||||
res += v / 8
|
||||
break
|
||||
}
|
||||
}
|
||||
case schemapb.DataType_FloatVector:
|
||||
for _, kv := range fs.TypeParams {
|
||||
if kv.Key == "dim" {
|
||||
v, err := strconv.Atoi(kv.Value)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
res += v * 4
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue