From fac350cc582c697d0c6f532becb20cdc26cea98e Mon Sep 17 00:00:00 2001 From: neza2017 Date: Wed, 14 Jul 2021 17:11:54 +0800 Subject: [PATCH] fix tso (#6500) Signed-off-by: yefu.chen --- internal/allocator/global_id.go | 4 ++-- internal/allocator/global_id_test.go | 12 ++++++++++++ internal/rootcoord/root_coord.go | 3 +++ internal/rootcoord/task.go | 3 +++ 4 files changed, 20 insertions(+), 2 deletions(-) diff --git a/internal/allocator/global_id.go b/internal/allocator/global_id.go index ce851e315c..cd4941940b 100644 --- a/internal/allocator/global_id.go +++ b/internal/allocator/global_id.go @@ -48,8 +48,8 @@ func (gia *GlobalIDAllocator) Alloc(count uint32) (typeutil.UniqueID, typeutil.U if err != nil { return 0, 0, err } - idStart := typeutil.UniqueID(timestamp) - idEnd := idStart + int64(count) + idEnd := typeutil.UniqueID(timestamp) + 1 + idStart := idEnd - int64(count) return idStart, idEnd, nil } diff --git a/internal/allocator/global_id_test.go b/internal/allocator/global_id_test.go index 65ac8d18ee..09b7a9ceac 100644 --- a/internal/allocator/global_id_test.go +++ b/internal/allocator/global_id_test.go @@ -49,4 +49,16 @@ func TestGlobalTSOAllocator_All(t *testing.T) { assert.Nil(t, err) assert.Equal(t, count, uint32(idEnd-idStart)) }) + + t.Run("Alloc2", func(t *testing.T) { + count1 := uint32(2 << 18) + id1, err := gTestIDAllocator.allocator.GenerateTSO(count1) + assert.Nil(t, err) + + count2 := uint32(2 << 8) + id2, err := gTestIDAllocator.allocator.GenerateTSO(count2) + assert.Nil(t, err) + assert.Equal(t, id2-id1, uint64(count2)) + + }) } diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 63b1db49fb..b146823e6b 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -1750,6 +1750,9 @@ func (c *Core) AllocTimestamp(ctx context.Context, in *rootcoordpb.AllocTimestam }, nil } // log.Printf("AllocTimestamp : %d", ts) + + //return first available time stamp + ts = ts - uint64(in.Count) + 1 return &rootcoordpb.AllocTimestampResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, diff --git a/internal/rootcoord/task.go b/internal/rootcoord/task.go index 8d6cf64fa6..27274a817a 100644 --- a/internal/rootcoord/task.go +++ b/internal/rootcoord/task.go @@ -60,6 +60,9 @@ func executeTask(t reqTask) error { case <-t.Ctx().Done(): return fmt.Errorf("context canceled") case err := <-errChan: + if t.Core().ctx.Err() != nil || t.Ctx().Err() != nil { + return fmt.Errorf("context canceled") + } return err } }