Fix metafile so it doesn't get trampled by other goroutines.
Fixes #3832 and fixes #3833pull/3845/head
parent
a4735624f8
commit
0d744dafed
|
@ -325,14 +325,14 @@ func (l *Log) LoadMetadataIndex(index *tsdb.DatabaseIndex, measurementFields map
|
||||||
// is meant to be called by bz1 BEFORE it updates its own index, since the metadata
|
// is meant to be called by bz1 BEFORE it updates its own index, since the metadata
|
||||||
// is flushed here first.
|
// is flushed here first.
|
||||||
func (l *Log) DeleteSeries(keys []string) error {
|
func (l *Log) DeleteSeries(keys []string) error {
|
||||||
// we want to stop any writes from happening to ensure the data gets cleared
|
|
||||||
l.mu.Lock()
|
|
||||||
defer l.mu.Unlock()
|
|
||||||
|
|
||||||
if err := l.flushMetadata(); err != nil {
|
if err := l.flushMetadata(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// we want to stop any writes from happening to ensure the data gets cleared
|
||||||
|
l.mu.Lock()
|
||||||
|
defer l.mu.Unlock()
|
||||||
|
|
||||||
for _, p := range l.partitions {
|
for _, p := range l.partitions {
|
||||||
p.deleteSeries(keys)
|
p.deleteSeries(keys)
|
||||||
}
|
}
|
||||||
|
@ -435,6 +435,9 @@ func (l *Log) writeSeriesAndFields(fields map[string]*tsdb.MeasurementFields, se
|
||||||
// nextMetaFile will close the current file if there is one open and open a new file to log
|
// nextMetaFile will close the current file if there is one open and open a new file to log
|
||||||
// metadata updates to. This function assumes that you've locked l.mu elsewhere.
|
// metadata updates to. This function assumes that you've locked l.mu elsewhere.
|
||||||
func (l *Log) nextMetaFile() error {
|
func (l *Log) nextMetaFile() error {
|
||||||
|
l.mu.Lock()
|
||||||
|
defer l.mu.Unlock()
|
||||||
|
|
||||||
if l.metaFile != nil {
|
if l.metaFile != nil {
|
||||||
if err := l.metaFile.Close(); err != nil {
|
if err := l.metaFile.Close(); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -455,7 +458,7 @@ func (l *Log) nextMetaFile() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
id = int(n)
|
id = int(n) + 1
|
||||||
}
|
}
|
||||||
|
|
||||||
nextFileName := filepath.Join(l.path, fmt.Sprintf("%06d.%s", id, MetaFileExtension))
|
nextFileName := filepath.Join(l.path, fmt.Sprintf("%06d.%s", id, MetaFileExtension))
|
||||||
|
@ -620,6 +623,28 @@ func (l *Log) autoflusher(closing chan struct{}) {
|
||||||
// metadata from previous files to the index. After a sucessful write, the metadata files
|
// metadata from previous files to the index. After a sucessful write, the metadata files
|
||||||
// will be removed. While the flush to index is happening we aren't blocked for new metadata writes.
|
// will be removed. While the flush to index is happening we aren't blocked for new metadata writes.
|
||||||
func (l *Log) flushMetadata() error {
|
func (l *Log) flushMetadata() error {
|
||||||
|
// make sure there's actually something in the metadata file to flush
|
||||||
|
size, err := func() (int64, error) {
|
||||||
|
l.mu.Lock()
|
||||||
|
defer l.mu.Unlock()
|
||||||
|
|
||||||
|
if l.metaFile == nil {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
st, err := l.metaFile.Stat()
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return st.Size(), nil
|
||||||
|
}()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
} else if size == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// we have data, get a list of the existing files and rotate to a new one, then flush
|
||||||
files, err := l.metadataFiles()
|
files, err := l.metadataFiles()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
Loading…
Reference in New Issue