Implement methods that don't require merge

pull/9150/head
Edd Robinson 2017-09-28 18:01:45 +01:00 committed by Ben Johnson
parent bf132004a3
commit 65c6fa747e
No known key found for this signature in database
GPG Key ID: 81741CD251883081
2 changed files with 262 additions and 55 deletions

View File

@ -5,7 +5,9 @@ import (
"fmt"
"path/filepath"
"regexp"
"runtime"
"sync"
"sync/atomic"
"github.com/cespare/xxhash"
"github.com/influxdata/influxdb/models"
@ -147,6 +149,16 @@ func (i *Index) partition(key []byte) *Partition {
return i.partitions[int(xxhash.Sum64(key)&TotalPartitions)]
}
// availableThreads returns the minimum of GOMAXPROCS and the number of
// partitions in the Index.
func (i *Index) availableThreads() int {
n := runtime.GOMAXPROCS(0)
if len(i.partitions) < n {
return len(i.partitions)
}
return n
}
// RetainFileSet returns the current fileset for all partitions in the Index.
func (i *Index) RetainFileSet() *FileSet {
// TODO(edd): Merge all FileSets for all partitions. For the moment we will
@ -158,19 +170,87 @@ func (i *Index) RetainFileSet() *FileSet {
func (i *Index) SetFieldSet(fs *tsdb.MeasurementFieldSet) {
i.mu.Lock()
// TODO(edd): set the field set on all the Partitions?
// since fs is a pointer, it probably can't be shared...
i.mu.Unlock()
}
// ForEachMeasurementName iterates over all measurement names in the index.
// ForEachMeasurementName iterates over all measurement names in the index,
// applying fn. Note, the provided function may be called concurrently, and it
// must be safe to do so.
//
// It returns the first error encountered, if any.
func (i *Index) ForEachMeasurementName(fn func(name []byte) error) error {
// TODO(edd): Call on each partition. Could be done in parallel?
n := i.availableThreads()
// Store results.
errC := make(chan error, n)
// Run fn on each partition using a fixed number of goroutines.
var pidx uint32 // Index of maximum Partition being worked on.
for k := 0; k < n; k++ {
go func() {
for {
idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to work on.
if idx > len(i.partitions) {
return // No more work.
}
errC <- i.partitions[idx].ForEachMeasurementName(fn)
}
}()
}
// Check for error
for i := 0; i < cap(errC); i++ {
if err := <-errC; err != nil {
return err
}
}
return nil
}
// MeasurementExists returns true if a measurement exists.
func (i *Index) MeasurementExists(name []byte) (bool, error) {
// TODO(edd): Call on each Partition. In parallel?
return false, nil
n := i.availableThreads()
// Store errors
var found uint32 // Use this to signal we found the measurement.
errC := make(chan error, n)
// Check each partition for the measurement concurrently.
var pidx uint32 // Index of maximum Partition being worked on.
for k := 0; k < n; k++ {
go func() {
for {
idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to check
if idx > len(i.partitions) {
return // No more work.
}
// Check if the measurement has been found. If it has don't
// need to check this partition and can just move on.
if atomic.LoadUint32(&found) == 1 {
errC <- nil
continue
}
b, err := i.partitions[idx].MeasurementExists(name)
errC <- err
if b {
atomic.StoreUint32(&found, 1)
}
}
}()
}
// Check for error
for i := 0; i < cap(errC); i++ {
if err := <-errC; err != nil {
return false, err
}
}
// Check if we found the measurement.
return atomic.LoadUint32(&found) == 1, nil
}
// MeasurementNamesByExpr returns measurement names for the provided expression.
@ -185,9 +265,33 @@ func (i *Index) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) {
return nil, nil
}
// DropMeasurement deletes a measurement from the index.
// DropMeasurement deletes a measurement from the index. It returns the first
// error encountered, if any.
func (i *Index) DropMeasurement(name []byte) error {
// TODO(edd): Call on each Partition. In parallel?
n := i.availableThreads()
// Store results.
errC := make(chan error, n)
var pidx uint32 // Index of maximum Partition being worked on.
for k := 0; k < n; k++ {
go func() {
for {
idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to work on.
if idx > len(i.partitions) {
return // No more work.
}
errC <- i.partitions[idx].DropMeasurement(name)
}
}()
}
// Check for error
for i := 0; i < cap(errC); i++ {
if err := <-errC; err != nil {
return err
}
}
return nil
}
@ -197,11 +301,6 @@ func (i *Index) CreateSeriesListIfNotExists(_, names [][]byte, tagsSlice []model
return nil
}
// InitializeSeries is a no-op. This only applies to the in-memory index.
func (i *Index) InitializeSeries(key, name []byte, tags models.Tags) error {
return nil
}
// CreateSeriesIfNotExists creates a series if it doesn't exist or is deleted.
func (i *Index) CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error {
// TODO(edd): Call on correct Partition.
@ -227,14 +326,57 @@ func (i *Index) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, erro
// across indexes then use SeriesSketches and merge the results from other
// indexes.
func (i *Index) SeriesN() int64 {
// TODO(edd): Sum over all Partitions.
return 0
var total int64
for _, p := range i.partitions {
total += p.SeriesN()
}
return total
}
// HasTagKey returns true if tag key exists.
// HasTagKey returns true if tag key exists. It returns the first error
// encountered if any.
func (i *Index) HasTagKey(name, key []byte) (bool, error) {
// TODO(edd): Check on each Partition? In parallel?
return false, nil
n := i.availableThreads()
// Store errors
var found uint32 // Use this to signal we found the tag key.
errC := make(chan error, n)
// Check each partition for the tag key concurrently.
var pidx uint32 // Index of maximum Partition being worked on.
for k := 0; k < n; k++ {
go func() {
for {
idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to check
if idx > len(i.partitions) {
return // No more work.
}
// Check if the tag key has already been found. If it has, we
// don't need to check this partition and can just move on.
if atomic.LoadUint32(&found) == 1 {
errC <- nil
continue
}
b, err := i.partitions[idx].HasTagKey(name, key)
errC <- err
if b {
atomic.StoreUint32(&found, 1)
}
}
}()
}
// Check for error
for i := 0; i < cap(errC); i++ {
if err := <-errC; err != nil {
return false, err
}
}
// Check if we found the tag key.
return atomic.LoadUint32(&found) == 1, nil
}
// MeasurementTagKeysByExpr extracts the tag keys wanted by the expression.
@ -255,7 +397,31 @@ func (i *Index) MeasurementTagKeyValuesByExpr(name []byte, keys []string, expr i
// ForEachMeasurementTagKey iterates over all tag keys in a measurement and applies
// the provided function.
func (i *Index) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error {
// TODO(edd): Apply fn on each Partition. In parallel?
n := i.availableThreads()
// Store results.
errC := make(chan error, n)
// Run fn on each partition using a fixed number of goroutines.
var pidx uint32 // Index of maximum Partition being worked on.
for k := 0; k < n; k++ {
go func() {
for {
idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to work on.
if idx > len(i.partitions) {
return // No more work.
}
errC <- i.partitions[idx].ForEachMeasurementTagKey(name, fn)
}
}()
}
// Check for error
for i := 0; i < cap(errC); i++ {
if err := <-errC; err != nil {
return err
}
}
return nil
}
@ -281,19 +447,23 @@ func (i *Index) TagSets(name []byte, opt query.IteratorOptions) ([]*query.TagSet
// SnapshotTo creates hard links to the file set into path.
func (i *Index) SnapshotTo(path string) error {
// TODO(edd): Call on each Partition.
// Store results.
errC := make(chan error, len(i.partitions))
for _, p := range i.partitions {
go func(p *Partition) {
errC <- p.SnapshotTo(path)
}(p)
}
// Check for error
for i := 0; i < cap(errC); i++ {
if err := <-errC; err != nil {
return err
}
}
return nil
}
// SetFieldName is a no-op on tsi1.
func (i *Index) SetFieldName(measurement []byte, name string) {}
// RemoveShard is a no-op on tsi1.
func (i *Index) RemoveShard(shardID uint64) {}
// AssignShard is a no-op on tsi1.
func (i *Index) AssignShard(k string, shardID uint64) {}
// UnassignShard simply calls into DropSeries.
func (i *Index) UnassignShard(k string, shardID uint64) error {
// This can be called directly once inmem is gone.
@ -307,10 +477,46 @@ func (i *Index) SeriesPointIterator(opt query.IteratorOptions) (query.Iterator,
return nil, nil
}
// Compact requests a compaction of log files.
// Compact requests a compaction of log files in the index.
func (i *Index) Compact() {
// TODO(edd): Request compactions on each Partition?
// Compact using half the available threads.
// TODO(edd): this might need adjusting.
n := runtime.GOMAXPROCS(0) / 2
// Run fn on each partition using a fixed number of goroutines.
var wg sync.WaitGroup
var pidx uint32 // Index of maximum Partition being worked on.
for k := 0; k < n; k++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to work on.
if idx > len(i.partitions) {
return // No more work.
}
i.partitions[idx].Compact()
}
}()
}
// Wait for all partitions to complete compactions.
wg.Wait()
}
// NO-OPS
// Rebuild is a no-op on tsi1.
func (i *Index) Rebuild() {}
// InitializeSeries is a no-op. This only applies to the in-memory index.
func (i *Index) InitializeSeries(key, name []byte, tags models.Tags) error { return nil }
// SetFieldName is a no-op on tsi1.
func (i *Index) SetFieldName(measurement []byte, name string) {}
// RemoveShard is a no-op on tsi1.
func (i *Index) RemoveShard(shardID uint64) {}
// AssignShard is a no-op on tsi1.
func (i *Index) AssignShard(k string, shardID uint64) {}

View File

@ -785,39 +785,40 @@ func (i *Partition) TagSets(name []byte, opt query.IteratorOptions) ([]*query.Ta
// SnapshotTo creates hard links to the file set into path.
func (i *Partition) SnapshotTo(path string) error {
i.mu.Lock()
defer i.mu.Unlock()
panic("NEED TO MAKE THIS PARTITION AWARE")
// i.mu.Lock()
// defer i.mu.Unlock()
fs := i.retainFileSet()
defer fs.Release()
// fs := i.retainFileSet()
// defer fs.Release()
// Flush active log file, if any.
if err := i.activeLogFile.Flush(); err != nil {
return err
}
// // Flush active log file, if any.
// if err := i.activeLogFile.Flush(); err != nil {
// return err
// }
if err := os.Mkdir(filepath.Join(path, "index"), 0777); err != nil {
return err
}
// if err := os.Mkdir(filepath.Join(path, "index"), 0777); err != nil {
// return err
// }
// Link manifest.
if err := os.Link(i.ManifestPath(), filepath.Join(path, "index", filepath.Base(i.ManifestPath()))); err != nil {
return fmt.Errorf("error creating tsi manifest hard link: %q", err)
}
// // Link manifest.
// if err := os.Link(i.ManifestPath(), filepath.Join(path, "index", filepath.Base(i.ManifestPath()))); err != nil {
// return fmt.Errorf("error creating tsi manifest hard link: %q", err)
// }
// Link series file.
if err := os.Link(i.SeriesFilePath(), filepath.Join(path, "index", filepath.Base(i.SeriesFilePath()))); err != nil {
return fmt.Errorf("error creating tsi series file hard link: %q", err)
}
// // Link series file.
// if err := os.Link(i.SeriesFilePath(), filepath.Join(path, "index", filepath.Base(i.SeriesFilePath()))); err != nil {
// return fmt.Errorf("error creating tsi series file hard link: %q", err)
// }
// Link files in directory.
for _, f := range fs.files {
if err := os.Link(f.Path(), filepath.Join(path, "index", filepath.Base(f.Path()))); err != nil {
return fmt.Errorf("error creating tsi hard link: %q", err)
}
}
// // Link files in directory.
// for _, f := range fs.files {
// if err := os.Link(f.Path(), filepath.Join(path, "index", filepath.Base(f.Path()))); err != nil {
// return fmt.Errorf("error creating tsi hard link: %q", err)
// }
// }
return nil
// return nil
}
func (i *Partition) SetFieldName(measurement []byte, name string) {}