237 lines
5.9 KiB
Go
237 lines
5.9 KiB
Go
package export
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"sort"
|
|
"text/tabwriter"
|
|
"time"
|
|
|
|
"github.com/influxdata/influxdb/cmd/influx_tools/internal/format"
|
|
"github.com/influxdata/influxdb/cmd/influx_tools/internal/storage"
|
|
"github.com/influxdata/influxdb/cmd/influx_tools/server"
|
|
"github.com/influxdata/influxdb/models"
|
|
"github.com/influxdata/influxdb/services/meta"
|
|
"github.com/influxdata/influxdb/tsdb"
|
|
)
|
|
|
|
type exporterConfig struct {
|
|
Database string
|
|
RP string
|
|
ShardDuration time.Duration
|
|
Min, Max uint64
|
|
}
|
|
|
|
type exporter struct {
|
|
metaClient server.MetaClient
|
|
tsdbStore *tsdb.Store
|
|
store *storage.Store
|
|
|
|
min, max uint64
|
|
db, rp string
|
|
d time.Duration
|
|
sourceGroups []meta.ShardGroupInfo
|
|
targetGroups []meta.ShardGroupInfo
|
|
|
|
// source data time range
|
|
startDate time.Time
|
|
endDate time.Time
|
|
}
|
|
|
|
func newExporter(server server.Interface, cfg *exporterConfig) (*exporter, error) {
|
|
client := server.MetaClient()
|
|
|
|
dbi := client.Database(cfg.Database)
|
|
if dbi == nil {
|
|
return nil, fmt.Errorf("database '%s' does not exist", cfg.Database)
|
|
}
|
|
|
|
if cfg.RP == "" {
|
|
// select default RP
|
|
cfg.RP = dbi.DefaultRetentionPolicy
|
|
}
|
|
|
|
rpi, err := client.RetentionPolicy(cfg.Database, cfg.RP)
|
|
if rpi == nil || err != nil {
|
|
return nil, fmt.Errorf("retention policy '%s' does not exist", cfg.RP)
|
|
}
|
|
|
|
store := tsdb.NewStore(server.TSDBConfig().Dir)
|
|
if server.Logger() != nil {
|
|
store.WithLogger(server.Logger())
|
|
}
|
|
store.EngineOptions.MonitorDisabled = true
|
|
store.EngineOptions.CompactionDisabled = true
|
|
store.EngineOptions.Config = server.TSDBConfig()
|
|
store.EngineOptions.EngineVersion = server.TSDBConfig().Engine
|
|
store.EngineOptions.IndexVersion = server.TSDBConfig().Index
|
|
store.EngineOptions.DatabaseFilter = func(database string) bool {
|
|
return database == cfg.Database
|
|
}
|
|
store.EngineOptions.RetentionPolicyFilter = func(_, rp string) bool {
|
|
return rp == cfg.RP
|
|
}
|
|
store.EngineOptions.ShardFilter = func(_, _ string, _ uint64) bool {
|
|
return false
|
|
}
|
|
|
|
return &exporter{
|
|
metaClient: client,
|
|
tsdbStore: store,
|
|
store: &storage.Store{TSDBStore: store},
|
|
min: cfg.Min,
|
|
max: cfg.Max,
|
|
db: cfg.Database,
|
|
rp: cfg.RP,
|
|
d: cfg.ShardDuration,
|
|
}, nil
|
|
}
|
|
|
|
func (e *exporter) Open() (err error) {
|
|
err = e.tsdbStore.Open()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = e.loadShardGroups()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
e.targetGroups = planShardGroups(e.sourceGroups, e.startDate, e.endDate, e.d)
|
|
if e.max >= uint64(len(e.targetGroups)) {
|
|
e.max = uint64(len(e.targetGroups) - 1)
|
|
}
|
|
if e.min > e.max {
|
|
return fmt.Errorf("invalid shard group range %d to %d", e.min, e.max)
|
|
}
|
|
|
|
e.targetGroups = e.targetGroups[e.min : e.max+1]
|
|
|
|
return nil
|
|
}
|
|
|
|
func (e *exporter) PrintPlan(w io.Writer) {
|
|
fmt.Fprintf(w, "Source data from: %s -> %s\n\n", e.startDate, e.endDate)
|
|
fmt.Fprintf(w, "Converting source from %d shard group(s) to %d shard groups:\n\n", len(e.sourceGroups), len(e.targetGroups))
|
|
e.printShardGroups(w, 0, e.sourceGroups)
|
|
fmt.Fprintln(w)
|
|
e.printShardGroups(w, int(e.min), e.targetGroups)
|
|
}
|
|
|
|
func (e *exporter) printShardGroups(w io.Writer, base int, target []meta.ShardGroupInfo) {
|
|
tw := tabwriter.NewWriter(w, 10, 8, 1, '\t', 0)
|
|
fmt.Fprintln(tw, "Seq #\tID\tStart\tEnd")
|
|
for i := 0; i < len(target); i++ {
|
|
g := target[i]
|
|
fmt.Fprintf(tw, "%d\t%d\t%s\t%s\n", i+base, g.ID, g.StartTime, g.EndTime)
|
|
}
|
|
tw.Flush()
|
|
}
|
|
|
|
func (e *exporter) SourceTimeRange() (time.Time, time.Time) { return e.startDate, e.endDate }
|
|
func (e *exporter) SourceShardGroups() []meta.ShardGroupInfo { return e.sourceGroups }
|
|
func (e *exporter) TargetShardGroups() []meta.ShardGroupInfo { return e.targetGroups }
|
|
|
|
func (e *exporter) loadShardGroups() error {
|
|
min := time.Unix(0, models.MinNanoTime)
|
|
max := time.Unix(0, models.MaxNanoTime)
|
|
|
|
groups, err := e.metaClient.NodeShardGroupsByTimeRange(e.db, e.rp, min, max)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(groups) == 0 {
|
|
return nil
|
|
}
|
|
|
|
sort.Sort(meta.ShardGroupInfos(groups))
|
|
e.sourceGroups = groups
|
|
e.startDate = groups[0].StartTime
|
|
e.endDate = groups[len(groups)-1].EndTime
|
|
|
|
return nil
|
|
}
|
|
|
|
func (e *exporter) shardsGroupsByTimeRange(min, max time.Time) []meta.ShardGroupInfo {
|
|
groups := make([]meta.ShardGroupInfo, 0, len(e.sourceGroups))
|
|
for _, g := range e.sourceGroups {
|
|
if !g.Overlaps(min, max) {
|
|
continue
|
|
}
|
|
groups = append(groups, g)
|
|
}
|
|
return groups
|
|
}
|
|
|
|
func (e *exporter) WriteTo(w format.Writer) error {
|
|
for _, g := range e.targetGroups {
|
|
min, max := g.StartTime, g.EndTime
|
|
rs, err := e.read(min, max.Add(-1))
|
|
if err != nil || rs == nil {
|
|
return err
|
|
}
|
|
|
|
format.WriteBucket(w, min.UnixNano(), max.UnixNano(), rs)
|
|
rs.Close()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Read creates a ResultSet that reads all points with a timestamp ts, such that start ≤ ts < end.
|
|
func (e *exporter) read(min, max time.Time) (*storage.ResultSet, error) {
|
|
shards, err := e.getShards(min, max)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
req := storage.ReadRequest{
|
|
Database: e.db,
|
|
RP: e.rp,
|
|
Shards: shards,
|
|
Start: min.UnixNano(),
|
|
End: max.UnixNano(),
|
|
}
|
|
|
|
return e.store.Read(context.Background(), &req)
|
|
}
|
|
|
|
func (e *exporter) Close() error {
|
|
return e.tsdbStore.Close()
|
|
}
|
|
|
|
func (e *exporter) getShards(min, max time.Time) ([]*tsdb.Shard, error) {
|
|
groups := e.shardsGroupsByTimeRange(min, max)
|
|
var ids []uint64
|
|
for _, g := range groups {
|
|
for _, s := range g.Shards {
|
|
ids = append(ids, s.ID)
|
|
}
|
|
}
|
|
|
|
shards := e.tsdbStore.Shards(ids)
|
|
if len(shards) == len(ids) {
|
|
return shards, nil
|
|
}
|
|
|
|
return e.openStoreWithShardsIDs(ids)
|
|
}
|
|
|
|
func (e *exporter) openStoreWithShardsIDs(ids []uint64) ([]*tsdb.Shard, error) {
|
|
e.tsdbStore.Close()
|
|
e.tsdbStore.EngineOptions.ShardFilter = func(_, _ string, id uint64) bool {
|
|
for i := range ids {
|
|
if id == ids[i] {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
if err := e.tsdbStore.Open(); err != nil {
|
|
return nil, err
|
|
}
|
|
return e.tsdbStore.Shards(ids), nil
|
|
}
|