Merge pull request #9287 from influxdata/dn-return-digest-size

fix #9286: return digest size
pull/9296/head
David Norton 2018-01-08 13:30:56 -05:00 committed by GitHub
commit 1ea41b0dd6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 35 additions and 15 deletions

View File

@ -45,7 +45,7 @@ type Engine interface {
Export(w io.Writer, basePath string, start time.Time, end time.Time) error
Restore(r io.Reader, basePath string) error
Import(r io.Reader, basePath string) error
Digest() (io.ReadCloser, error)
Digest() (io.ReadCloser, int64, error)
CreateIterator(ctx context.Context, measurement string, opt query.IteratorOptions) (query.Iterator, error)
CreateCursor(ctx context.Context, r *CursorRequest) (Cursor, error)

View File

@ -229,7 +229,7 @@ func NewEngine(id uint64, idx tsdb.Index, database, path string, walPath string,
}
// Digest returns a reader for the shard's digest.
func (e *Engine) Digest() (io.ReadCloser, error) {
func (e *Engine) Digest() (io.ReadCloser, int64, error) {
digestPath := filepath.Join(e.path, "digest.tsd")
// See if there's an existing digest file on disk.
@ -239,16 +239,21 @@ func (e *Engine) Digest() (io.ReadCloser, error) {
fi, err := f.Stat()
if err != nil {
f.Close()
return nil, err
return nil, 0, err
}
if !e.LastModified().After(fi.ModTime()) {
// Existing digest is still fresh so return a reader for it.
return f, nil
fi, err := f.Stat()
if err != nil {
f.Close()
return nil, 0, err
}
return f, fi.Size(), nil
}
if err := f.Close(); err != nil {
return nil, err
return nil, 0, err
}
}
@ -258,23 +263,34 @@ func (e *Engine) Digest() (io.ReadCloser, error) {
// Create a tmp file to write the digest to.
tf, err := os.Create(digestPath + ".tmp")
if err != nil {
return nil, err
return nil, 0, err
}
// Write the new digest to the tmp file.
if err := Digest(e.path, tf); err != nil {
tf.Close()
os.Remove(tf.Name())
return nil, err
return nil, 0, err
}
// Rename the temporary digest file to the actual digest file.
if err := renameFile(tf.Name(), digestPath); err != nil {
return nil, err
return nil, 0, err
}
// Create and return a reader for the new digest file.
return os.Open(digestPath)
f, err = os.Open(digestPath)
if err != nil {
return nil, 0, err
}
fi, err := f.Stat()
if err != nil {
f.Close()
return nil, 0, err
}
return f, fi.Size(), nil
}
// SetEnabled sets whether the engine is enabled.

View File

@ -90,11 +90,15 @@ func TestEngine_Digest(t *testing.T) {
digest := func() ([]span, error) {
// Get a reader for the shard's digest.
r, err := e.Digest()
r, sz, err := e.Digest()
if err != nil {
return nil, err
}
if sz <= 0 {
t.Fatalf("expected digest size > 0")
}
// Make sure the digest can be read.
dr, err := tsm1.NewDigestReader(r)
if err != nil {

View File

@ -1112,16 +1112,16 @@ func (s *Shard) TagKeyCardinality(name, key []byte) int {
}
// Digest returns a digest of the shard.
func (s *Shard) Digest() (io.ReadCloser, error) {
func (s *Shard) Digest() (io.ReadCloser, int64, error) {
engine, err := s.engine()
if err != nil {
return nil, err
return nil, 0, err
}
// Make sure the shard is idle/cold. (No use creating a digest of a
// hot shard that is rapidly changing.)
if !engine.IsIdle() {
return nil, ErrShardNotIdle
return nil, 0, ErrShardNotIdle
}
return engine.Digest()

View File

@ -439,10 +439,10 @@ func (s *Store) ShardN() int {
}
// ShardDigest returns a digest of the shard with the specified ID.
func (s *Store) ShardDigest(id uint64) (io.ReadCloser, error) {
func (s *Store) ShardDigest(id uint64) (io.ReadCloser, int64, error) {
sh := s.Shard(id)
if sh == nil {
return nil, ErrShardNotFound
return nil, 0, ErrShardNotFound
}
return sh.Digest()