Add mutex for segment of segCore in querynode (#6870)

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/6891/head
bigsheeper 2021-07-29 16:03:22 +08:00 committed by GitHub
parent d51fec6678
commit eee0f1e077
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 68 additions and 3 deletions

View File

@ -59,6 +59,7 @@ func newVectorFieldInfo(fieldBinlog *datapb.FieldBinlog) *VectorFieldInfo {
//--------------------------------------------------------------------------------------
type Segment struct {
segPtrMu sync.RWMutex // guards segmentPtr
segmentPtr C.CSegmentInterface
segmentID UniqueID
@ -202,6 +203,8 @@ func deleteSegment(segment *Segment) {
void
deleteSegment(CSegmentInterface segment);
*/
segment.segPtrMu.Lock()
defer segment.segPtrMu.Unlock()
cPtr := segment.segmentPtr
C.DeleteSegment(cPtr)
segment.segmentPtr = nil
@ -216,8 +219,8 @@ func (s *Segment) getRowCount() int64 {
long int
getRowCount(CSegmentInterface c_segment);
*/
//segmentPtrIsNil := s.segmentPtr == nil
//log.Debug("QueryNode::Segment::getRowCount", zap.Any("segmentPtrIsNil", segmentPtrIsNil))
s.segPtrMu.RLock()
defer s.segPtrMu.RUnlock()
if s.segmentPtr == nil {
return -1
}
@ -231,6 +234,8 @@ func (s *Segment) getDeletedCount() int64 {
long int
getDeletedCount(CSegmentInterface c_segment);
*/
s.segPtrMu.RLock()
defer s.segPtrMu.RUnlock()
if s.segmentPtr == nil {
return -1
}
@ -243,6 +248,8 @@ func (s *Segment) getMemSize() int64 {
long int
GetMemoryUsageInBytes(CSegmentInterface c_segment);
*/
s.segPtrMu.RLock()
defer s.segPtrMu.RUnlock()
if s.segmentPtr == nil {
return -1
}
@ -263,6 +270,8 @@ func (s *Segment) search(plan *SearchPlan,
long int* result_ids,
float* result_distances);
*/
s.segPtrMu.RLock()
defer s.segPtrMu.RUnlock()
if s.segmentPtr == nil {
return nil, errors.New("null seg core pointer")
}
@ -289,6 +298,11 @@ func (s *Segment) search(plan *SearchPlan,
}
func (s *Segment) getEntityByIds(plan *RetrievePlan) (*segcorepb.RetrieveResults, error) {
s.segPtrMu.RLock()
defer s.segPtrMu.RUnlock()
if s.segmentPtr == nil {
return nil, errors.New("null seg core pointer")
}
resProto := C.GetEntityByIds(s.segmentPtr, plan.cRetrievePlan, C.uint64_t(plan.Timestamp))
result := new(segcorepb.RetrieveResults)
err := HandleCProtoResult(&resProto, result)
@ -299,6 +313,8 @@ func (s *Segment) getEntityByIds(plan *RetrievePlan) (*segcorepb.RetrieveResults
}
func (s *Segment) fillTargetEntry(plan *SearchPlan, result *SearchResult) error {
s.segPtrMu.RLock()
defer s.segPtrMu.RUnlock()
if s.segmentPtr == nil {
return errors.New("null seg core pointer")
}
@ -465,6 +481,8 @@ func (s *Segment) segmentPreInsert(numOfRecords int) (int64, error) {
long int
PreInsert(CSegmentInterface c_segment, long int size);
*/
s.segPtrMu.RLock()
defer s.segPtrMu.RUnlock() // thread safe guaranteed by segCore, use RLock
if s.segmentType != segmentTypeGrowing {
return 0, nil
}
@ -487,6 +505,8 @@ func (s *Segment) segmentPreDelete(numOfRecords int) int64 {
long int
PreDelete(CSegmentInterface c_segment, long int size);
*/
s.segPtrMu.RLock()
defer s.segPtrMu.RUnlock() // thread safe guaranteed by segCore, use RLock
var offset = C.PreDelete(s.segmentPtr, C.long(int64(numOfRecords)))
return int64(offset)
@ -504,10 +524,12 @@ func (s *Segment) segmentInsert(offset int64, entityIDs *[]UniqueID, timestamps
int sizeof_per_row,
signed long int count);
*/
s.segPtrMu.RLock()
defer s.segPtrMu.RUnlock() // thread safe guaranteed by segCore, use RLock
if s.segmentType != segmentTypeGrowing {
return nil
}
log.Debug("QueryNode::Segment::segmentInsert:", zap.Any("s.sgmentPtr", s.segmentPtr))
log.Debug("QueryNode::Segment::segmentInsert:", zap.Any("s.segmentPtr", s.segmentPtr))
if s.segmentPtr == nil {
return errors.New("null seg core pointer")
@ -565,6 +587,8 @@ func (s *Segment) segmentDelete(offset int64, entityIDs *[]UniqueID, timestamps
const long* primary_keys,
const unsigned long* timestamps);
*/
s.segPtrMu.RLock()
defer s.segPtrMu.RUnlock() // thread safe guaranteed by segCore, use RLock
if s.segmentPtr == nil {
return errors.New("null seg core pointer")
}
@ -592,6 +616,8 @@ func (s *Segment) segmentLoadFieldData(fieldID int64, rowCount int, data interfa
CStatus
LoadFieldData(CSegmentInterface c_segment, CLoadFieldDataInfo load_field_data_info);
*/
s.segPtrMu.RLock()
defer s.segPtrMu.RUnlock() // thread safe guaranteed by segCore, use RLock
if s.segmentPtr == nil {
return errors.New("null seg core pointer")
}
@ -685,6 +711,8 @@ func (s *Segment) dropFieldData(fieldID int64) error {
CStatus
DropFieldData(CSegmentInterface c_segment, int64_t field_id);
*/
s.segPtrMu.RLock()
defer s.segPtrMu.RUnlock() // thread safe guaranteed by segCore, use RLock
if s.segmentPtr == nil {
return errors.New("null seg core pointer")
}
@ -729,6 +757,8 @@ func (s *Segment) updateSegmentIndex(bytesIndex [][]byte, fieldID UniqueID) erro
return err
}
s.segPtrMu.RLock()
defer s.segPtrMu.RUnlock() // thread safe guaranteed by segCore, use RLock
if s.segmentPtr == nil {
return errors.New("null seg core pointer")
}
@ -757,6 +787,8 @@ func (s *Segment) dropSegmentIndex(fieldID int64) error {
CStatus
DropSealedSegmentIndex(CSegmentInterface c_segment, int64_t field_id);
*/
s.segPtrMu.RLock()
defer s.segPtrMu.RUnlock() // thread safe guaranteed by segCore, use RLock
if s.segmentPtr == nil {
return errors.New("null seg core pointer")
}

View File

@ -15,6 +15,7 @@ import (
"encoding/binary"
"log"
"math"
"sync"
"testing"
"github.com/golang/protobuf/proto"
@ -584,3 +585,35 @@ func TestSegment_segmentLoadFieldData(t *testing.T) {
deleteSegment(segment)
deleteCollection(collection)
}
func TestSegment_ConcurrentOperation(t *testing.T) {
const N = 16
var ages = []int32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}
collectionID := UniqueID(0)
partitionID := UniqueID(0)
collectionMeta := genTestCollectionMeta(collectionID, false)
collection := newCollection(collectionMeta.ID, collectionMeta.Schema)
assert.Equal(t, collection.ID(), collectionID)
wg := sync.WaitGroup{}
for i := 0; i < 1000; i++ {
segmentID := UniqueID(i)
segment := newSegment(collection, segmentID, partitionID, collectionID, "", segmentTypeSealed, true)
assert.Equal(t, segmentID, segment.segmentID)
assert.Equal(t, partitionID, segment.partitionID)
wg.Add(2)
go func() {
deleteSegment(segment)
wg.Done()
}()
go func() {
// segmentLoadFieldData result error may be nil or not, we just expected this test would not crash.
_ = segment.segmentLoadFieldData(101, N, ages)
wg.Done()
}()
}
wg.Wait()
deleteCollection(collection)
}