feat: Fixes incorrect import for atomic

pull/26511/head
devanbenz 2025-06-13 13:17:15 -05:00
parent 9dd2db7a71
commit de6b137844
1 changed files with 10 additions and 13 deletions

View File

@ -4,13 +4,13 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"github.com/streadway/handy/atomic"
"os" "os"
"path/filepath" "path/filepath"
"regexp" "regexp"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"unsafe" "unsafe"
@ -63,7 +63,7 @@ type Partition struct {
// Fieldset shared with engine. // Fieldset shared with engine.
fieldset *tsdb.MeasurementFieldSet fieldset *tsdb.MeasurementFieldSet
currentCompactionN atomic.Int // counter of in-progress compactions currentCompactionN atomic.Int32 // counter of in-progress compactions
// Directory of the Partition's index files. // Directory of the Partition's index files.
path string path string
@ -349,10 +349,8 @@ func (p *Partition) buildSeriesSet() error {
} }
// CurrentCompactionN returns the number of compactions currently running. // CurrentCompactionN returns the number of compactions currently running.
func (p *Partition) CurrentCompactionN() atomic.Int { func (p *Partition) CurrentCompactionN() int32 {
p.mu.RLock() return p.currentCompactionN.Load()
defer p.mu.RUnlock()
return p.currentCompactionN
} }
// Wait will block until all compactions are finished. // Wait will block until all compactions are finished.
@ -367,8 +365,7 @@ func (p *Partition) Wait() {
defer timeout.Stop() defer timeout.Stop()
for { for {
n := p.CurrentCompactionN() if p.CurrentCompactionN() == 0 {
if n.Get() == 0 {
return return
} }
select { select {
@ -378,7 +375,7 @@ func (p *Partition) Wait() {
files = append(files, v.Path()) files = append(files, v.Path())
} }
p.logger.Debug("Partition.Wait() timed out waiting for compactions to complete", p.logger.Debug("Partition.Wait() timed out waiting for compactions to complete",
zap.Int64("stuck_compactions", n.Get()), zap.Duration("timeout", timeoutDuration), zap.Int32("stuck_compactions", p.CurrentCompactionN()), zap.Duration("timeout", timeoutDuration),
zap.Strings("files", files)) zap.Strings("files", files))
case <-ticker.C: case <-ticker.C:
} }
@ -1058,11 +1055,11 @@ func (p *Partition) compact() {
} }
// Mark the level as compacting. // Mark the level as compacting.
p.levelCompacting[0] = true p.levelCompacting[0] = true
p.currentCompactionN++ p.currentCompactionN.Add(1)
go func() { go func() {
defer func() { defer func() {
p.mu.Lock() p.mu.Lock()
p.currentCompactionN-- p.currentCompactionN.Add(-1)
p.levelCompacting[0] = false p.levelCompacting[0] = false
p.mu.Unlock() p.mu.Unlock()
p.Compact() p.Compact()
@ -1100,13 +1097,13 @@ func (p *Partition) compact() {
// Execute in closure to save reference to the group within the loop. // Execute in closure to save reference to the group within the loop.
func(files []*IndexFile, level int) { func(files []*IndexFile, level int) {
// Start compacting in a separate goroutine. // Start compacting in a separate goroutine.
p.currentCompactionN++ p.currentCompactionN.Add(1)
go func() { go func() {
defer func() { defer func() {
// Ensure compaction lock for the level is released. // Ensure compaction lock for the level is released.
p.mu.Lock() p.mu.Lock()
p.levelCompacting[level] = false p.levelCompacting[level] = false
p.currentCompactionN-- p.currentCompactionN.Add(-1)
p.mu.Unlock() p.mu.Unlock()
// Check for new compactions // Check for new compactions