milvus/internal/dataservice/segment_allocator.go

405 lines
12 KiB
Go

// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package dataservice
import (
"context"
"fmt"
"strconv"
"sync"
"time"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/milvus-io/milvus/internal/proto/datapb"
)
type errRemainInSufficient struct {
requestRows int
}
func newErrRemainInSufficient(requestRows int) errRemainInSufficient {
return errRemainInSufficient{requestRows: requestRows}
}
func (err errRemainInSufficient) Error() string {
return "segment remaining is insufficient for" + strconv.Itoa(err.requestRows)
}
// segmentAllocator is used to allocate rows for segments and record the allocations.
type segmentAllocatorInterface interface {
// OpenSegment add the segment to allocator and set it allocatable
OpenSegment(ctx context.Context, segmentInfo *datapb.SegmentInfo) error
// AllocSegment allocate rows and record the allocation.
AllocSegment(ctx context.Context, collectionID UniqueID, partitionID UniqueID, channelName string, requestRows int) (UniqueID, int, Timestamp, error)
// GetSealedSegments get all sealed segment.
GetSealedSegments(ctx context.Context) ([]UniqueID, error)
// SealSegment set segment sealed, the segment will not be allocated anymore.
SealSegment(ctx context.Context, segmentID UniqueID) error
// DropSegment drop the segment from allocator.
DropSegment(ctx context.Context, segmentID UniqueID)
// ExpireAllocations check all allocations' expire time and remove the expired allocation.
ExpireAllocations(ctx context.Context, timeTick Timestamp) error
// SealAllSegments get all opened segment ids of collection. return success and failed segment ids
SealAllSegments(ctx context.Context, collectionID UniqueID)
// IsAllocationsExpired check all allocations of segment expired.
IsAllocationsExpired(ctx context.Context, segmentID UniqueID, ts Timestamp) (bool, error)
}
type segmentStatus struct {
id UniqueID
collectionID UniqueID
partitionID UniqueID
total int
sealed bool
lastExpireTime Timestamp
allocations []*allocation
insertChannel string
}
type allocation struct {
rowNums int
expireTime Timestamp
}
type segmentAllocator struct {
mt *meta
segments map[UniqueID]*segmentStatus //segment id -> status
segmentExpireDuration int64
segmentThreshold float64
segmentThresholdFactor float64
mu sync.RWMutex
allocator allocatorInterface
segmentInfoStream msgstream.MsgStream
}
type Option struct {
apply func(alloc *segmentAllocator)
}
func WithSegmentStream(stream msgstream.MsgStream) Option {
return Option{
apply: func(alloc *segmentAllocator) {
alloc.segmentInfoStream = stream
},
}
}
func newSegmentAllocator(meta *meta, allocator allocatorInterface, opts ...Option) *segmentAllocator {
alloc := &segmentAllocator{
mt: meta,
segments: make(map[UniqueID]*segmentStatus),
segmentExpireDuration: Params.SegIDAssignExpiration,
segmentThreshold: Params.SegmentSize * 1024 * 1024,
segmentThresholdFactor: Params.SegmentSizeFactor,
allocator: allocator,
}
for _, opt := range opts {
opt.apply(alloc)
}
return alloc
}
func (s *segmentAllocator) OpenSegment(ctx context.Context, segmentInfo *datapb.SegmentInfo) error {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
s.mu.Lock()
defer s.mu.Unlock()
if _, ok := s.segments[segmentInfo.ID]; ok {
return fmt.Errorf("segment %d already exist", segmentInfo.ID)
}
return s.open(segmentInfo)
}
func (s *segmentAllocator) open(segmentInfo *datapb.SegmentInfo) error {
totalRows, err := s.estimateTotalRows(segmentInfo.CollectionID)
if err != nil {
return err
}
log.Debug("dataservice: estimateTotalRows: ",
zap.Int64("CollectionID", segmentInfo.CollectionID),
zap.Int64("SegmentID", segmentInfo.ID),
zap.Int("Rows", totalRows))
s.segments[segmentInfo.ID] = &segmentStatus{
id: segmentInfo.ID,
collectionID: segmentInfo.CollectionID,
partitionID: segmentInfo.PartitionID,
total: totalRows,
sealed: false,
lastExpireTime: 0,
insertChannel: segmentInfo.InsertChannel,
}
return nil
}
func (s *segmentAllocator) AllocSegment(ctx context.Context, collectionID UniqueID,
partitionID UniqueID, channelName string, requestRows int) (segID UniqueID, retCount int, expireTime Timestamp, err error) {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
s.mu.Lock()
defer s.mu.Unlock()
for _, segStatus := range s.segments {
if segStatus.sealed || segStatus.collectionID != collectionID || segStatus.partitionID != partitionID ||
segStatus.insertChannel != channelName {
continue
}
var success bool
success, err = s.alloc(segStatus, requestRows)
if err != nil {
return
}
if !success {
continue
}
segID = segStatus.id
retCount = requestRows
expireTime = segStatus.lastExpireTime
return
}
var segStatus *segmentStatus
segStatus, err = s.openNewSegment(ctx, collectionID, partitionID, channelName)
if err != nil {
return
}
var success bool
success, err = s.alloc(segStatus, requestRows)
if err != nil {
return
}
if !success {
err = newErrRemainInSufficient(requestRows)
return
}
segID = segStatus.id
retCount = requestRows
expireTime = segStatus.lastExpireTime
return
}
func (s *segmentAllocator) alloc(segStatus *segmentStatus, numRows int) (bool, error) {
totalOfAllocations := 0
for _, allocation := range segStatus.allocations {
totalOfAllocations += allocation.rowNums
}
segMeta, err := s.mt.GetSegment(segStatus.id)
if err != nil {
return false, err
}
free := segStatus.total - int(segMeta.NumRows) - totalOfAllocations
log.Debug("dataservice::alloc: ",
zap.Any("segMeta.NumRows", int(segMeta.NumRows)),
zap.Any("totalOfAllocations", totalOfAllocations))
if numRows > free {
return false, nil
}
ts, err := s.allocator.allocTimestamp()
if err != nil {
return false, err
}
physicalTs, logicalTs := tsoutil.ParseTS(ts)
expirePhysicalTs := physicalTs.Add(time.Duration(s.segmentExpireDuration) * time.Millisecond)
expireTs := tsoutil.ComposeTS(expirePhysicalTs.UnixNano()/int64(time.Millisecond), int64(logicalTs))
segStatus.lastExpireTime = expireTs
segStatus.allocations = append(segStatus.allocations, &allocation{
numRows,
expireTs,
})
return true, nil
}
func (s *segmentAllocator) openNewSegment(ctx context.Context, collectionID UniqueID, partitionID UniqueID, channelName string) (*segmentStatus, error) {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
id, err := s.allocator.allocID()
if err != nil {
return nil, err
}
segmentInfo, err := BuildSegment(collectionID, partitionID, id, channelName)
if err != nil {
return nil, err
}
if err = s.mt.AddSegment(segmentInfo); err != nil {
return nil, err
}
if err = s.open(segmentInfo); err != nil {
return nil, err
}
infoMsg := &msgstream.SegmentInfoMsg{
BaseMsg: msgstream.BaseMsg{
HashValues: []uint32{0},
},
SegmentMsg: datapb.SegmentMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SegmentInfo,
MsgID: 0,
Timestamp: 0,
SourceID: Params.NodeID,
},
Segment: segmentInfo,
},
}
msgPack := &msgstream.MsgPack{
Msgs: []msgstream.TsMsg{infoMsg},
}
if s.segmentInfoStream != nil {
if err = s.segmentInfoStream.Produce(msgPack); err != nil {
return nil, err
}
}
return s.segments[segmentInfo.ID], nil
}
func (s *segmentAllocator) estimateTotalRows(collectionID UniqueID) (int, error) {
collMeta, err := s.mt.GetCollection(collectionID)
if err != nil {
return -1, err
}
sizePerRecord, err := typeutil.EstimateSizePerRecord(collMeta.Schema)
if err != nil {
return -1, err
}
return int(s.segmentThreshold / float64(sizePerRecord)), nil
}
func (s *segmentAllocator) GetSealedSegments(ctx context.Context) ([]UniqueID, error) {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
s.mu.Lock()
defer s.mu.Unlock()
keys := make([]UniqueID, 0)
for _, segStatus := range s.segments {
if !segStatus.sealed {
sealed, err := s.checkSegmentSealed(segStatus)
if err != nil {
return nil, err
}
if !sealed {
continue
}
if err := s.sealSegmentInMeta(segStatus.id); err != nil {
return nil, err
}
segStatus.sealed = sealed
}
if segStatus.sealed {
keys = append(keys, segStatus.id)
}
}
return keys, nil
}
func (s *segmentAllocator) checkSegmentSealed(segStatus *segmentStatus) (bool, error) {
segMeta, err := s.mt.GetSegment(segStatus.id)
if err != nil {
return false, err
}
return float64(segMeta.NumRows) >= s.segmentThresholdFactor*float64(segStatus.total), nil
}
func (s *segmentAllocator) SealSegment(ctx context.Context, segmentID UniqueID) error {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
s.mu.Lock()
defer s.mu.Unlock()
status, ok := s.segments[segmentID]
if !ok {
return nil
}
if err := s.sealSegmentInMeta(segmentID); err != nil {
return err
}
status.sealed = true
return nil
}
func (s *segmentAllocator) HasSegment(ctx context.Context, segmentID UniqueID) bool {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
s.mu.Lock()
defer s.mu.Unlock()
_, ok := s.segments[segmentID]
return ok
}
func (s *segmentAllocator) sealSegmentInMeta(id UniqueID) error {
ts, err := s.allocator.allocTimestamp()
if err != nil {
return err
}
return s.mt.SealSegment(id, ts)
}
func (s *segmentAllocator) DropSegment(ctx context.Context, segmentID UniqueID) {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
s.mu.Lock()
defer s.mu.Unlock()
delete(s.segments, segmentID)
}
func (s *segmentAllocator) ExpireAllocations(ctx context.Context, timeTick Timestamp) error {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
s.mu.Lock()
defer s.mu.Unlock()
for _, segStatus := range s.segments {
for i := 0; i < len(segStatus.allocations); i++ {
if timeTick < segStatus.allocations[i].expireTime {
continue
}
log.Debug("dataservice::ExpireAllocations: ",
zap.Any("segStatus.id", segStatus.id),
zap.Any("segStatus.allocations.rowNums", segStatus.allocations[i].rowNums))
segStatus.allocations = append(segStatus.allocations[:i], segStatus.allocations[i+1:]...)
i--
}
}
return nil
}
func (s *segmentAllocator) IsAllocationsExpired(ctx context.Context, segmentID UniqueID, ts Timestamp) (bool, error) {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
s.mu.RLock()
defer s.mu.RUnlock()
status, ok := s.segments[segmentID]
if !ok {
return false, fmt.Errorf("segment %d not found", segmentID)
}
return status.lastExpireTime <= ts, nil
}
func (s *segmentAllocator) SealAllSegments(ctx context.Context, collectionID UniqueID) {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
s.mu.Lock()
defer s.mu.Unlock()
for _, status := range s.segments {
if status.collectionID == collectionID {
if status.sealed {
continue
}
status.sealed = true
}
}
}