mirror of https://github.com/milvus-io/milvus.git
parent
1ba8e2448f
commit
12b2eaf196
|
@ -128,9 +128,6 @@ func (i *ServiceImpl) Init() error {
|
|||
}
|
||||
|
||||
func (i *ServiceImpl) Start() error {
|
||||
i.loopWg.Add(1)
|
||||
go i.tsLoop()
|
||||
|
||||
i.sched.Start()
|
||||
// Start callbacks
|
||||
for _, cb := range i.startCallbacks {
|
||||
|
@ -279,24 +276,3 @@ func (i *ServiceImpl) NotifyBuildIndex(nty *indexpb.BuildIndexNotification) (*co
|
|||
i.nodeClients.IncPriority(nty.NodeID, -1)
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func (i *ServiceImpl) tsLoop() {
|
||||
tsoTicker := time.NewTicker(UpdateTimestampStep)
|
||||
defer tsoTicker.Stop()
|
||||
ctx, cancel := context.WithCancel(i.loopCtx)
|
||||
defer cancel()
|
||||
defer i.loopWg.Done()
|
||||
for {
|
||||
select {
|
||||
case <-tsoTicker.C:
|
||||
if err := i.idAllocator.UpdateID(); err != nil {
|
||||
log.Println("failed to update id", err)
|
||||
return
|
||||
}
|
||||
case <-ctx.Done():
|
||||
// Server is closed and it should return nil.
|
||||
log.Println("tsLoop is closed")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -60,8 +60,8 @@ type collectionReplica interface {
|
|||
getSegmentNum() int
|
||||
|
||||
getSegmentStatistics() []*internalpb2.SegmentStats
|
||||
getEnabledSealedSegmentsBySegmentType(segType segmentType) ([]UniqueID, []UniqueID, []UniqueID)
|
||||
getSealedSegmentsBySegmentType(segType segmentType) ([]UniqueID, []UniqueID, []UniqueID)
|
||||
getEnabledSegmentsBySegmentType(segType segmentType) ([]UniqueID, []UniqueID, []UniqueID)
|
||||
getSegmentsBySegmentType(segType segmentType) ([]UniqueID, []UniqueID, []UniqueID)
|
||||
replaceGrowingSegmentBySealedSegment(segment *Segment) error
|
||||
|
||||
getTSafe() tSafe
|
||||
|
@ -458,7 +458,7 @@ func (colReplica *collectionReplicaImpl) getSegmentStatistics() []*internalpb2.S
|
|||
return statisticData
|
||||
}
|
||||
|
||||
func (colReplica *collectionReplicaImpl) getEnabledSealedSegmentsBySegmentType(segType segmentType) ([]UniqueID, []UniqueID, []UniqueID) {
|
||||
func (colReplica *collectionReplicaImpl) getEnabledSegmentsBySegmentType(segType segmentType) ([]UniqueID, []UniqueID, []UniqueID) {
|
||||
colReplica.mu.RLock()
|
||||
defer colReplica.mu.RUnlock()
|
||||
|
||||
|
@ -487,7 +487,7 @@ func (colReplica *collectionReplicaImpl) getEnabledSealedSegmentsBySegmentType(s
|
|||
return targetCollectionIDs, targetPartitionIDs, targetSegmentIDs
|
||||
}
|
||||
|
||||
func (colReplica *collectionReplicaImpl) getSealedSegmentsBySegmentType(segType segmentType) ([]UniqueID, []UniqueID, []UniqueID) {
|
||||
func (colReplica *collectionReplicaImpl) getSegmentsBySegmentType(segType segmentType) ([]UniqueID, []UniqueID, []UniqueID) {
|
||||
colReplica.mu.RLock()
|
||||
defer colReplica.mu.RUnlock()
|
||||
|
||||
|
@ -513,7 +513,7 @@ func (colReplica *collectionReplicaImpl) replaceGrowingSegmentBySealedSegment(se
|
|||
return errors.New("unexpected segment type")
|
||||
}
|
||||
targetSegment, err := colReplica.getSegmentByIDPrivate(segment.ID())
|
||||
if err != nil && targetSegment != nil {
|
||||
if err == nil && targetSegment != nil {
|
||||
if targetSegment.segmentType != segTypeGrowing {
|
||||
// target segment has been a sealed segment
|
||||
return nil
|
||||
|
@ -521,7 +521,7 @@ func (colReplica *collectionReplicaImpl) replaceGrowingSegmentBySealedSegment(se
|
|||
deleteSegment(targetSegment)
|
||||
}
|
||||
|
||||
targetSegment = segment
|
||||
colReplica.segments[segment.ID()] = segment
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -242,5 +242,36 @@ func TestCollectionReplica_freeAll(t *testing.T) {
|
|||
|
||||
err := node.Stop()
|
||||
assert.NoError(t, err)
|
||||
|
||||
}
|
||||
|
||||
func TestReplaceGrowingSegmentBySealedSegment(t *testing.T) {
|
||||
node := newQueryNodeMock()
|
||||
collectionID := UniqueID(0)
|
||||
segmentID := UniqueID(520)
|
||||
initTestMeta(t, node, collectionID, segmentID)
|
||||
|
||||
_, _, segIDs := node.replica.getSegmentsBySegmentType(segTypeGrowing)
|
||||
assert.Equal(t, len(segIDs), 1)
|
||||
|
||||
collection, err := node.replica.getCollectionByID(collectionID)
|
||||
assert.NoError(t, err)
|
||||
ns := newSegment(collection, segmentID, defaultPartitionID, collectionID, segTypeSealed)
|
||||
err = node.replica.replaceGrowingSegmentBySealedSegment(ns)
|
||||
assert.NoError(t, err)
|
||||
|
||||
segmentNums := node.replica.getSegmentNum()
|
||||
assert.Equal(t, segmentNums, 1)
|
||||
|
||||
segment, err := node.replica.getSegmentByID(segmentID)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, segment.getType(), segTypeSealed)
|
||||
|
||||
_, _, segIDs = node.replica.getSegmentsBySegmentType(segTypeGrowing)
|
||||
assert.Equal(t, len(segIDs), 0)
|
||||
_, _, segIDs = node.replica.getSegmentsBySegmentType(segTypeSealed)
|
||||
assert.Equal(t, len(segIDs), 1)
|
||||
|
||||
err = node.Stop()
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
|
|
@ -41,7 +41,7 @@ type loadIndex struct {
|
|||
}
|
||||
|
||||
func (loader *indexLoader) doLoadIndex(wg *sync.WaitGroup) {
|
||||
collectionIDs, _, segmentIDs := loader.replica.getSealedSegmentsBySegmentType(segTypeSealed)
|
||||
collectionIDs, _, segmentIDs := loader.replica.getSegmentsBySegmentType(segTypeSealed)
|
||||
if len(collectionIDs) <= 0 {
|
||||
wg.Done()
|
||||
return
|
||||
|
@ -51,16 +51,17 @@ func (loader *indexLoader) doLoadIndex(wg *sync.WaitGroup) {
|
|||
// we don't need index id yet
|
||||
_, buildID, err := loader.getIndexInfo(collectionIDs[i], segmentIDs[i])
|
||||
if err != nil {
|
||||
indexPaths, err := loader.getIndexPaths(buildID)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
continue
|
||||
}
|
||||
err = loader.loadIndexDelayed(collectionIDs[i], segmentIDs[i], indexPaths)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
continue
|
||||
}
|
||||
log.Println(err)
|
||||
continue
|
||||
}
|
||||
indexPaths, err := loader.getIndexPaths(buildID)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
continue
|
||||
}
|
||||
err = loader.loadIndexDelayed(collectionIDs[i], segmentIDs[i], indexPaths)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
}
|
||||
}
|
||||
// sendQueryNodeStats
|
||||
|
|
|
@ -40,7 +40,7 @@ func (s *loadService) close() {
|
|||
}
|
||||
|
||||
func (s *loadService) loadSegmentActively(wg *sync.WaitGroup) {
|
||||
collectionIDs, partitionIDs, segmentIDs := s.segLoader.replica.getSealedSegmentsBySegmentType(segTypeGrowing)
|
||||
collectionIDs, partitionIDs, segmentIDs := s.segLoader.replica.getSegmentsBySegmentType(segTypeGrowing)
|
||||
if len(collectionIDs) <= 0 {
|
||||
wg.Done()
|
||||
return
|
||||
|
@ -89,6 +89,16 @@ func (s *loadService) loadSegment(collectionID UniqueID, partitionID UniqueID, s
|
|||
}
|
||||
|
||||
func (s *loadService) loadSegmentInternal(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID, fieldIDs []int64) error {
|
||||
// create segment
|
||||
collection, err := s.segLoader.replica.getCollectionByID(collectionID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = s.segLoader.replica.getPartitionByID(partitionID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
segment := newSegment(collection, segmentID, partitionID, collectionID, segTypeSealed)
|
||||
// we don't need index id yet
|
||||
_, buildID, errIndex := s.segLoader.indexLoader.getIndexInfo(collectionID, segmentID)
|
||||
if errIndex == nil {
|
||||
|
@ -105,12 +115,12 @@ func (s *loadService) loadSegmentInternal(collectionID UniqueID, partitionID Uni
|
|||
}
|
||||
|
||||
targetFields := s.segLoader.getTargetFields(paths, srcFieldIDs, fieldIDs)
|
||||
collection, err := s.segLoader.replica.getCollectionByID(collectionID)
|
||||
err = s.segLoader.loadSegmentFieldsData(segment, targetFields)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
segment := newSegment(collection, segmentID, partitionID, collectionID, segTypeSealed)
|
||||
err = s.segLoader.loadSegmentFieldsData(segment, targetFields)
|
||||
// replace segment
|
||||
err = s.segLoader.replica.replaceGrowingSegmentBySealedSegment(segment)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -125,8 +135,7 @@ func (s *loadService) loadSegmentInternal(collectionID UniqueID, partitionID Uni
|
|||
return err
|
||||
}
|
||||
}
|
||||
// replace segment
|
||||
return s.segLoader.replica.replaceGrowingSegmentBySealedSegment(segment)
|
||||
return nil
|
||||
}
|
||||
|
||||
func newLoadService(ctx context.Context, masterClient MasterServiceInterface, dataClient DataServiceInterface, indexClient IndexServiceInterface, replica collectionReplica, dmStream msgstream.MsgStream) *loadService {
|
||||
|
|
|
@ -12,6 +12,7 @@ package querynode
|
|||
*/
|
||||
import "C"
|
||||
import (
|
||||
"log"
|
||||
"strconv"
|
||||
"sync"
|
||||
"unsafe"
|
||||
|
@ -23,13 +24,13 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
segTypeInvalid = C.Invalid
|
||||
segTypeGrowing = C.Growing
|
||||
segTypeSealed = C.Sealed
|
||||
segTypeIndexing = C.Indexing
|
||||
segTypeInvalid = 0
|
||||
segTypeGrowing = 1
|
||||
segTypeSealed = 2
|
||||
segTypeIndexing = 3
|
||||
)
|
||||
|
||||
type segmentType = C.SegmentType
|
||||
type segmentType = int
|
||||
type indexParam = map[string]string
|
||||
|
||||
type Segment struct {
|
||||
|
@ -45,7 +46,7 @@ type Segment struct {
|
|||
recentlyModified bool
|
||||
|
||||
typeMu sync.Mutex // guards builtIndex
|
||||
segmentType C.SegmentType
|
||||
segmentType int
|
||||
|
||||
paramMutex sync.RWMutex // guards index
|
||||
indexParam map[int64]indexParam
|
||||
|
@ -112,7 +113,20 @@ func newSegment(collection *Collection, segmentID int64, partitionID UniqueID, c
|
|||
NewSegment(CCollection collection, uint64_t segment_id, SegmentType seg_type);
|
||||
*/
|
||||
initIndexParam := make(map[int64]indexParam)
|
||||
segmentPtr := C.NewSegment(collection.collectionPtr, C.ulong(segmentID), segType)
|
||||
var segmentPtr C.CSegmentInterface
|
||||
switch segType {
|
||||
case segTypeInvalid:
|
||||
log.Println("illegal segment type when create segment")
|
||||
return nil
|
||||
case segTypeSealed:
|
||||
segmentPtr = C.NewSegment(collection.collectionPtr, C.ulong(segmentID), C.Sealed)
|
||||
case segTypeGrowing:
|
||||
segmentPtr = C.NewSegment(collection.collectionPtr, C.ulong(segmentID), C.Growing)
|
||||
default:
|
||||
log.Println("illegal segment type when create segment")
|
||||
return nil
|
||||
}
|
||||
|
||||
var newSegment = &Segment{
|
||||
segmentPtr: segmentPtr,
|
||||
segmentType: segType,
|
||||
|
|
Loading…
Reference in New Issue