218 lines
5.5 KiB
Go
218 lines
5.5 KiB
Go
package influxdb
|
|
|
|
import (
|
|
"encoding/binary"
|
|
"errors"
|
|
"fmt"
|
|
"math"
|
|
"sort"
|
|
"time"
|
|
|
|
"github.com/boltdb/bolt"
|
|
)
|
|
|
|
// ShardGroup represents a group of shards created for a single time range.
|
|
type ShardGroup struct {
|
|
ID uint64 `json:"id,omitempty"`
|
|
StartTime time.Time `json:"startTime,omitempty"`
|
|
EndTime time.Time `json:"endTime,omitempty"`
|
|
Shards []*Shard `json:"shards,omitempty"`
|
|
}
|
|
|
|
// close closes all shards.
|
|
func (g *ShardGroup) close() {
|
|
for _, sh := range g.Shards {
|
|
_ = sh.close()
|
|
}
|
|
}
|
|
|
|
// Shard represents the logical storage for a given time range.
|
|
// The instance on a local server may contain the raw data in "store" if the
|
|
// shard is assigned to the server's data node id.
|
|
type Shard struct {
|
|
ID uint64 `json:"id,omitempty"`
|
|
DataNodeIDs []uint64 `json:"nodeIDs,omitempty"` // owners
|
|
|
|
store *bolt.DB
|
|
}
|
|
|
|
// newShardGroup returns a new initialized ShardGroup instance.
|
|
func newShardGroup() *ShardGroup { return &ShardGroup{} }
|
|
|
|
// Duration returns the duration between the shard group's start and end time.
|
|
func (g *ShardGroup) Duration() time.Duration { return g.EndTime.Sub(g.StartTime) }
|
|
|
|
// newShard returns a new initialized Shard instance.
|
|
func newShard() *Shard { return &Shard{} }
|
|
|
|
// open initializes and opens the shard's store.
|
|
func (s *Shard) open(path string) error {
|
|
// Return an error if the shard is already open.
|
|
if s.store != nil {
|
|
return errors.New("shard already open")
|
|
}
|
|
|
|
// Open store on shard.
|
|
store, err := bolt.Open(path, 0600, &bolt.Options{Timeout: 1 * time.Second})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
s.store = store
|
|
|
|
// Initialize store.
|
|
if err := s.store.Update(func(tx *bolt.Tx) error {
|
|
_, _ = tx.CreateBucketIfNotExists([]byte("values"))
|
|
return nil
|
|
}); err != nil {
|
|
_ = s.close()
|
|
return fmt.Errorf("init: %s", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// close shuts down the shard's store.
|
|
func (s *Shard) close() error {
|
|
if s.store == nil {
|
|
return nil
|
|
}
|
|
return s.store.Close()
|
|
}
|
|
|
|
// HasDataNodeID return true if the data node owns the shard.
|
|
func (s *Shard) HasDataNodeID(id uint64) bool {
|
|
for _, dataNodeID := range s.DataNodeIDs {
|
|
if dataNodeID == id {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// readSeries reads encoded series data from a shard.
|
|
func (s *Shard) readSeries(seriesID uint32, timestamp int64) (values []byte, err error) {
|
|
err = s.store.View(func(tx *bolt.Tx) error {
|
|
// Find series bucket.
|
|
b := tx.Bucket(u32tob(seriesID))
|
|
if b == nil {
|
|
return nil
|
|
}
|
|
|
|
// Retrieve encoded series data.
|
|
values = b.Get(u64tob(uint64(timestamp)))
|
|
return nil
|
|
})
|
|
return
|
|
}
|
|
|
|
// writeSeries writes series data to a shard.
|
|
func (s *Shard) writeSeries(seriesID uint32, timestamp int64, values []byte, overwrite bool) error {
|
|
return s.store.Update(func(tx *bolt.Tx) error {
|
|
// Create a bucket for the series.
|
|
b, err := tx.CreateBucketIfNotExists(u32tob(seriesID))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Insert the values by timestamp.
|
|
if err := b.Put(u64tob(uint64(timestamp)), values); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func (s *Shard) deleteSeries(name string) error {
|
|
panic("not yet implemented") // TODO
|
|
}
|
|
|
|
// Shards represents a list of shards.
|
|
type Shards []*Shard
|
|
|
|
// pointHeaderSize represents the size of a point header, in bytes.
|
|
const pointHeaderSize = 4 + 12 // seriesID + timestamp
|
|
|
|
// marshalPointHeader encodes a series id, timestamp, & flagset into a byte slice.
|
|
func marshalPointHeader(seriesID uint32, timestamp int64) []byte {
|
|
b := make([]byte, 12)
|
|
binary.BigEndian.PutUint32(b[0:4], seriesID)
|
|
binary.BigEndian.PutUint64(b[4:12], uint64(timestamp))
|
|
return b
|
|
}
|
|
|
|
// unmarshalPointHeader decodes a byte slice into a series id, timestamp & flagset.
|
|
func unmarshalPointHeader(b []byte) (seriesID uint32, timestamp int64) {
|
|
seriesID = binary.BigEndian.Uint32(b[0:4])
|
|
timestamp = int64(binary.BigEndian.Uint64(b[4:12]))
|
|
return
|
|
}
|
|
|
|
// marshalValues encodes a set of field ids and values to a byte slice.
|
|
func marshalValues(values map[uint8]interface{}) []byte {
|
|
// Sort fields for consistency.
|
|
fieldIDs := make([]uint8, 0, len(values))
|
|
for fieldID := range values {
|
|
fieldIDs = append(fieldIDs, fieldID)
|
|
}
|
|
sort.Sort(uint8Slice(fieldIDs))
|
|
|
|
// Allocate byte slice and write field count.
|
|
b := make([]byte, 1, 10)
|
|
b[0] = byte(len(values))
|
|
|
|
// Write out each field.
|
|
for _, fieldID := range fieldIDs {
|
|
// Create a temporary buffer for this field.
|
|
buf := make([]byte, 9)
|
|
buf[0] = fieldID
|
|
|
|
// Encode value after field id.
|
|
// TODO: Support non-float types.
|
|
switch v := values[fieldID].(type) {
|
|
case float64:
|
|
binary.BigEndian.PutUint64(buf[1:9], math.Float64bits(v))
|
|
default:
|
|
panic(fmt.Sprintf("unsupported value type: %T", v))
|
|
}
|
|
|
|
// Append temp buffer to the end.
|
|
b = append(b, buf...)
|
|
}
|
|
|
|
return b
|
|
}
|
|
|
|
// unmarshalValues decodes a byte slice into a set of field ids and values.
|
|
func unmarshalValues(b []byte) map[uint8]interface{} {
|
|
// Read the field count from the field byte.
|
|
n := int(b[0])
|
|
|
|
// Create a map to hold the decoded data.
|
|
values := make(map[uint8]interface{}, n)
|
|
|
|
// Start from the second byte and iterate over until we're done decoding.
|
|
b = b[1:]
|
|
for i := 0; i < n; i++ {
|
|
// First byte is the field identifier.
|
|
fieldID := b[0]
|
|
|
|
// Decode value.
|
|
// TODO: Support non-float types.
|
|
value := math.Float64frombits(binary.BigEndian.Uint64(b[1:9]))
|
|
|
|
values[fieldID] = value
|
|
|
|
// Move bytes forward.
|
|
b = b[9:]
|
|
}
|
|
|
|
return values
|
|
}
|
|
|
|
type uint8Slice []uint8
|
|
|
|
func (p uint8Slice) Len() int { return len(p) }
|
|
func (p uint8Slice) Less(i, j int) bool { return p[i] < p[j] }
|
|
func (p uint8Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
|