Merged pull request #382 from influxdata/nc-rename-parition
chore: Rename parition key to group keypull/10616/head
commit
e5b74d9670
|
@ -511,7 +511,8 @@ If you need to convert other columns use the `map` function directly with the `u
|
|||
|
||||
|
||||
#### window
|
||||
Partitions the results by a given time range
|
||||
|
||||
Groups the results by a given time range
|
||||
|
||||
##### options
|
||||
* `every` duration
|
||||
|
|
|
@ -25,9 +25,9 @@ const (
|
|||
|
||||
recordStartIdx = 3
|
||||
|
||||
datatypeAnnotation = "datatype"
|
||||
partitionAnnotation = "partition"
|
||||
defaultAnnotation = "default"
|
||||
datatypeAnnotation = "datatype"
|
||||
groupAnnotation = "group"
|
||||
defaultAnnotation = "default"
|
||||
|
||||
resultLabel = "result"
|
||||
tableLabel = "table"
|
||||
|
@ -248,20 +248,20 @@ func (r *resultDecoder) Do(f func(query.Block) error) error {
|
|||
}
|
||||
|
||||
type tableMetadata struct {
|
||||
ResultID string
|
||||
TableID string
|
||||
Cols []colMeta
|
||||
Partitions []bool
|
||||
Defaults []values.Value
|
||||
NumFields int
|
||||
ResultID string
|
||||
TableID string
|
||||
Cols []colMeta
|
||||
Groups []bool
|
||||
Defaults []values.Value
|
||||
NumFields int
|
||||
}
|
||||
|
||||
// readMetadata reads the table annotations and header.
|
||||
func readMetadata(r *csv.Reader, c ResultDecoderConfig, extraLine []string) (tableMetadata, error) {
|
||||
n := -1
|
||||
var resultID, tableID string
|
||||
var datatypes, partitions, defaults []string
|
||||
for datatypes == nil || partitions == nil || defaults == nil {
|
||||
var datatypes, groups, defaults []string
|
||||
for datatypes == nil || groups == nil || defaults == nil {
|
||||
var line []string
|
||||
if len(extraLine) > 0 {
|
||||
line = extraLine
|
||||
|
@ -270,15 +270,15 @@ func readMetadata(r *csv.Reader, c ResultDecoderConfig, extraLine []string) (tab
|
|||
l, err := r.Read()
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
if datatypes == nil && partitions == nil && defaults == nil {
|
||||
if datatypes == nil && groups == nil && defaults == nil {
|
||||
// No, just pass the EOF up
|
||||
return tableMetadata{}, err
|
||||
}
|
||||
switch {
|
||||
case datatypes == nil:
|
||||
return tableMetadata{}, fmt.Errorf("missing expected annotation datatype")
|
||||
case partitions == nil:
|
||||
return tableMetadata{}, fmt.Errorf("missing expected annotation partition")
|
||||
case groups == nil:
|
||||
return tableMetadata{}, fmt.Errorf("missing expected annotation group")
|
||||
case defaults == nil:
|
||||
return tableMetadata{}, fmt.Errorf("missing expected annotation default")
|
||||
}
|
||||
|
@ -296,8 +296,8 @@ func readMetadata(r *csv.Reader, c ResultDecoderConfig, extraLine []string) (tab
|
|||
switch annotation := strings.TrimPrefix(line[annotationIdx], commentPrefix); annotation {
|
||||
case datatypeAnnotation:
|
||||
datatypes = copyLine(line[recordStartIdx:])
|
||||
case partitionAnnotation:
|
||||
partitions = copyLine(line[recordStartIdx:])
|
||||
case groupAnnotation:
|
||||
groups = copyLine(line[recordStartIdx:])
|
||||
case defaultAnnotation:
|
||||
resultID = line[resultIdx]
|
||||
tableID = line[tableIdx]
|
||||
|
@ -307,8 +307,8 @@ func readMetadata(r *csv.Reader, c ResultDecoderConfig, extraLine []string) (tab
|
|||
switch {
|
||||
case datatypes == nil:
|
||||
return tableMetadata{}, fmt.Errorf("missing expected annotation datatype")
|
||||
case partitions == nil:
|
||||
return tableMetadata{}, fmt.Errorf("missing expected annotation partition")
|
||||
case groups == nil:
|
||||
return tableMetadata{}, fmt.Errorf("missing expected annotation group")
|
||||
case defaults == nil:
|
||||
return tableMetadata{}, fmt.Errorf("missing expected annotation default")
|
||||
}
|
||||
|
@ -341,7 +341,7 @@ func readMetadata(r *csv.Reader, c ResultDecoderConfig, extraLine []string) (tab
|
|||
|
||||
cols := make([]colMeta, len(labels))
|
||||
defaultValues := make([]values.Value, len(labels))
|
||||
partitionValues := make([]bool, len(labels))
|
||||
groupValues := make([]bool, len(labels))
|
||||
|
||||
for j, label := range labels {
|
||||
t, desc, err := decodeType(datatypes[j])
|
||||
|
@ -367,16 +367,16 @@ func readMetadata(r *csv.Reader, c ResultDecoderConfig, extraLine []string) (tab
|
|||
}
|
||||
defaultValues[j] = v
|
||||
}
|
||||
partitionValues[j] = partitions[j] == "true"
|
||||
groupValues[j] = groups[j] == "true"
|
||||
}
|
||||
|
||||
return tableMetadata{
|
||||
ResultID: resultID,
|
||||
TableID: tableID,
|
||||
Cols: cols,
|
||||
Partitions: partitionValues,
|
||||
Defaults: defaultValues,
|
||||
NumFields: n,
|
||||
ResultID: resultID,
|
||||
TableID: tableID,
|
||||
Cols: cols,
|
||||
Groups: groupValues,
|
||||
Defaults: defaultValues,
|
||||
NumFields: n,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -390,7 +390,7 @@ type blockDecoder struct {
|
|||
|
||||
initialized bool
|
||||
id string
|
||||
key query.PartitionKey
|
||||
key query.GroupKey
|
||||
cols []colMeta
|
||||
|
||||
builder *execute.ColListBlockBuilder
|
||||
|
@ -505,7 +505,7 @@ DONE:
|
|||
// table is done
|
||||
b.extraLine = line
|
||||
if !b.initialized {
|
||||
return false, errors.New("table was not initialized, missing partition key data")
|
||||
return false, errors.New("table was not initialized, missing group key data")
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
@ -525,7 +525,7 @@ func (b *blockDecoder) init(line []string) error {
|
|||
keyCols := make([]query.ColMeta, 0, len(b.meta.Cols))
|
||||
keyValues := make([]values.Value, 0, len(b.meta.Cols))
|
||||
for j, c := range b.meta.Cols {
|
||||
if b.meta.Partitions[j] {
|
||||
if b.meta.Groups[j] {
|
||||
var value values.Value
|
||||
if b.meta.Defaults[j] != nil {
|
||||
value = b.meta.Defaults[j]
|
||||
|
@ -536,14 +536,14 @@ func (b *blockDecoder) init(line []string) error {
|
|||
}
|
||||
value = v
|
||||
} else {
|
||||
return fmt.Errorf("missing value for partition key column %q", c.Label)
|
||||
return fmt.Errorf("missing value for group key column %q", c.Label)
|
||||
}
|
||||
keyCols = append(keyCols, c.ColMeta)
|
||||
keyValues = append(keyValues, value)
|
||||
}
|
||||
}
|
||||
|
||||
key := execute.NewPartitionKey(keyCols, keyValues)
|
||||
key := execute.NewGroupKey(keyCols, keyValues)
|
||||
b.builder = execute.NewColListBlockBuilder(key, newUnlimitedAllocator())
|
||||
for _, c := range b.meta.Cols {
|
||||
b.builder.AddCol(c.ColMeta)
|
||||
|
@ -593,7 +593,7 @@ func (b *blockDecoder) Empty() bool {
|
|||
|
||||
func (b *blockDecoder) RefCount(n int) {}
|
||||
|
||||
func (b *blockDecoder) Key() query.PartitionKey {
|
||||
func (b *blockDecoder) Key() query.GroupKey {
|
||||
return b.builder.Key()
|
||||
}
|
||||
|
||||
|
@ -630,7 +630,7 @@ type ResultEncoderConfig struct {
|
|||
|
||||
func DefaultEncoderConfig() ResultEncoderConfig {
|
||||
return ResultEncoderConfig{
|
||||
Annotations: []string{datatypeAnnotation, partitionAnnotation, defaultAnnotation},
|
||||
Annotations: []string{datatypeAnnotation, groupAnnotation, defaultAnnotation},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -750,7 +750,7 @@ func (e *ResultEncoder) EncodeError(w io.Writer, err error) error {
|
|||
return writer.Error()
|
||||
}
|
||||
|
||||
func writeSchema(writer *csv.Writer, c *ResultEncoderConfig, row []string, cols []colMeta, useKeyDefaults bool, key query.PartitionKey, resultName, tableID string) error {
|
||||
func writeSchema(writer *csv.Writer, c *ResultEncoderConfig, row []string, cols []colMeta, useKeyDefaults bool, key query.GroupKey, resultName, tableID string) error {
|
||||
defaults := make([]string, len(row))
|
||||
for j, c := range cols {
|
||||
switch j {
|
||||
|
@ -795,15 +795,15 @@ func writeSchema(writer *csv.Writer, c *ResultEncoderConfig, row []string, cols
|
|||
return writer.Error()
|
||||
}
|
||||
|
||||
func writeAnnotations(writer *csv.Writer, annotations []string, row, defaults []string, cols []colMeta, key query.PartitionKey) error {
|
||||
func writeAnnotations(writer *csv.Writer, annotations []string, row, defaults []string, cols []colMeta, key query.GroupKey) error {
|
||||
for _, annotation := range annotations {
|
||||
switch annotation {
|
||||
case datatypeAnnotation:
|
||||
if err := writeDatatypes(writer, row, cols); err != nil {
|
||||
return err
|
||||
}
|
||||
case partitionAnnotation:
|
||||
if err := writePartitions(writer, row, cols, key); err != nil {
|
||||
case groupAnnotation:
|
||||
if err := writeGroups(writer, row, cols, key); err != nil {
|
||||
return err
|
||||
}
|
||||
case defaultAnnotation:
|
||||
|
@ -843,10 +843,10 @@ func writeDatatypes(writer *csv.Writer, row []string, cols []colMeta) error {
|
|||
return writer.Write(row)
|
||||
}
|
||||
|
||||
func writePartitions(writer *csv.Writer, row []string, cols []colMeta, key query.PartitionKey) error {
|
||||
func writeGroups(writer *csv.Writer, row []string, cols []colMeta, key query.GroupKey) error {
|
||||
for j, c := range cols {
|
||||
if j == annotationIdx {
|
||||
row[j] = commentPrefix + partitionAnnotation
|
||||
row[j] = commentPrefix + groupAnnotation
|
||||
continue
|
||||
}
|
||||
row[j] = strconv.FormatBool(key.HasCol(c.Label))
|
||||
|
|
|
@ -29,7 +29,7 @@ var symetricalTestCases = []TestCase{
|
|||
name: "single table",
|
||||
encoderConfig: csv.DefaultEncoderConfig(),
|
||||
encoded: toCRLF(`#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,string,string,double
|
||||
#partition,false,false,true,true,false,true,true,false
|
||||
#group,false,false,true,true,false,true,true,false
|
||||
#default,_result,,,,,,,
|
||||
,result,table,_start,_stop,_time,_measurement,host,_value
|
||||
,,0,2018-04-17T00:00:00Z,2018-04-17T00:05:00Z,2018-04-17T00:00:00Z,cpu,A,42
|
||||
|
@ -72,7 +72,7 @@ var symetricalTestCases = []TestCase{
|
|||
name: "single empty table",
|
||||
encoderConfig: csv.DefaultEncoderConfig(),
|
||||
encoded: toCRLF(`#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,string,string,double
|
||||
#partition,false,false,true,true,false,true,true,false
|
||||
#group,false,false,true,true,false,true,true,false
|
||||
#default,_result,0,2018-04-17T00:00:00Z,2018-04-17T00:05:00Z,,cpu,A,
|
||||
,result,table,_start,_stop,_time,_measurement,host,_value
|
||||
`),
|
||||
|
@ -101,7 +101,7 @@ var symetricalTestCases = []TestCase{
|
|||
name: "multiple tables",
|
||||
encoderConfig: csv.DefaultEncoderConfig(),
|
||||
encoded: toCRLF(`#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,string,string,double
|
||||
#partition,false,false,true,true,false,true,true,false
|
||||
#group,false,false,true,true,false,true,true,false
|
||||
#default,_result,,,,,,,
|
||||
,result,table,_start,_stop,_time,_measurement,host,_value
|
||||
,,0,2018-04-17T00:00:00Z,2018-04-17T00:05:00Z,2018-04-17T00:00:00Z,cpu,A,42
|
||||
|
@ -177,7 +177,7 @@ var symetricalTestCases = []TestCase{
|
|||
name: "multiple tables with differing schemas",
|
||||
encoderConfig: csv.DefaultEncoderConfig(),
|
||||
encoded: toCRLF(`#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,string,string,double
|
||||
#partition,false,false,true,true,false,true,true,false
|
||||
#group,false,false,true,true,false,true,true,false
|
||||
#default,_result,,,,,,,
|
||||
,result,table,_start,_stop,_time,_measurement,host,_value
|
||||
,,0,2018-04-17T00:00:00Z,2018-04-17T00:05:00Z,2018-04-17T00:00:00Z,cpu,A,42
|
||||
|
@ -186,7 +186,7 @@ var symetricalTestCases = []TestCase{
|
|||
,,1,2018-04-17T00:05:00Z,2018-04-17T00:10:00Z,2018-04-17T00:07:01Z,mem,A,53
|
||||
|
||||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,string,string,double,double
|
||||
#partition,false,false,true,true,false,true,false,false,false
|
||||
#group,false,false,true,true,false,true,false,false,false
|
||||
#default,_result,,,,,,,,
|
||||
,result,table,_start,_stop,_time,location,device,min,max
|
||||
,,2,2018-04-17T00:00:00Z,2018-04-17T00:05:00Z,2018-04-17T00:00:00Z,USA,1563,42,67.9
|
||||
|
@ -326,7 +326,7 @@ var symetricalTestCases = []TestCase{
|
|||
name: "multiple tables with one empty",
|
||||
encoderConfig: csv.DefaultEncoderConfig(),
|
||||
encoded: toCRLF(`#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,string,string,double
|
||||
#partition,false,false,true,true,false,true,true,false
|
||||
#group,false,false,true,true,false,true,true,false
|
||||
#default,_result,,,,,,,
|
||||
,result,table,_start,_stop,_time,_measurement,host,_value
|
||||
,,0,2018-04-17T00:00:00Z,2018-04-17T00:05:00Z,2018-04-17T00:00:00Z,cpu,A,42
|
||||
|
@ -335,7 +335,7 @@ var symetricalTestCases = []TestCase{
|
|||
,,1,2018-04-17T00:05:00Z,2018-04-17T00:10:00Z,2018-04-17T00:07:01Z,mem,A,53
|
||||
|
||||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,string,string,double
|
||||
#partition,false,false,true,true,false,true,true,false
|
||||
#group,false,false,true,true,false,true,true,false
|
||||
#default,_result,2,2018-04-17T00:00:00Z,2018-04-17T00:05:00Z,,cpu,A,
|
||||
,result,table,_start,_stop,_time,_measurement,host,_value
|
||||
`),
|
||||
|
@ -428,7 +428,7 @@ func TestResultDecoder(t *testing.T) {
|
|||
name: "single table with defaults",
|
||||
encoderConfig: csv.DefaultEncoderConfig(),
|
||||
encoded: toCRLF(`#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,string,string,double
|
||||
#partition,false,false,true,true,false,true,true,false
|
||||
#group,false,false,true,true,false,true,true,false
|
||||
#default,_result,0,2018-04-17T00:00:00Z,2018-04-17T00:05:00Z,,cpu,A,
|
||||
,result,table,_start,_stop,_time,_measurement,host,_value
|
||||
,,,,,2018-04-17T00:00:00Z,cpu,A,42.0
|
||||
|
@ -574,7 +574,7 @@ func TestMutliResultEncoder(t *testing.T) {
|
|||
}},
|
||||
}}),
|
||||
encoded: toCRLF(`#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,string,string,double
|
||||
#partition,false,false,true,true,false,true,true,false
|
||||
#group,false,false,true,true,false,true,true,false
|
||||
#default,_result,,,,,,,
|
||||
,result,table,_start,_stop,_time,_measurement,host,_value
|
||||
,,0,2018-04-17T00:00:00Z,2018-04-17T00:05:00Z,2018-04-17T00:00:00Z,cpu,A,42
|
||||
|
@ -652,14 +652,14 @@ func TestMutliResultEncoder(t *testing.T) {
|
|||
},
|
||||
}),
|
||||
encoded: toCRLF(`#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,string,string,double
|
||||
#partition,false,false,true,true,false,true,true,false
|
||||
#group,false,false,true,true,false,true,true,false
|
||||
#default,_result,,,,,,,
|
||||
,result,table,_start,_stop,_time,_measurement,host,_value
|
||||
,,0,2018-04-17T00:00:00Z,2018-04-17T00:05:00Z,2018-04-17T00:00:00Z,cpu,A,42
|
||||
,,0,2018-04-17T00:00:00Z,2018-04-17T00:05:00Z,2018-04-17T00:00:01Z,cpu,A,43
|
||||
|
||||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,string,string,double
|
||||
#partition,false,false,true,true,false,true,true,false
|
||||
#group,false,false,true,true,false,true,true,false
|
||||
#default,mean,,,,,,,
|
||||
,result,table,_start,_stop,_time,_measurement,host,_value
|
||||
,,0,2018-04-17T00:00:00Z,2018-04-17T00:05:00Z,2018-04-17T00:00:00Z,cpu,A,40
|
||||
|
|
|
@ -715,14 +715,14 @@ The available data types for a column are:
|
|||
|
||||
#### Table
|
||||
|
||||
A table is set of records, with a common set of columns and a partition key.
|
||||
A table is set of records, with a common set of columns and a group key.
|
||||
|
||||
The partition key is a list of columns.
|
||||
A table's partition key denotes which subset of the entire dataset is assigned to the table.
|
||||
As such, all records within a table will have the same values for each column that is part of the partition key.
|
||||
These common values are referred to as the partition key value, and can be represented as a set of key value pairs.
|
||||
The group key is a list of columns.
|
||||
A table's group key denotes which subset of the entire dataset is assigned to the table.
|
||||
As such, all records within a table will have the same values for each column that is part of the group key.
|
||||
These common values are referred to as the group key value, and can be represented as a set of key value pairs.
|
||||
|
||||
A tables schema consists of its partition key, and its column's labels and types.
|
||||
A tables schema consists of its group key, and its column's labels and types.
|
||||
|
||||
|
||||
[IMPL#294](https://github.com/influxdata/platform/query/issues/294) Remove concept of Kind from table columns
|
||||
|
@ -732,8 +732,8 @@ A tables schema consists of its partition key, and its column's labels and types
|
|||
#### Stream
|
||||
|
||||
A stream represents a potentially unbounded dataset.
|
||||
A stream partitioned into individual tables.
|
||||
Within a stream each table's partition key value is unique.
|
||||
A stream grouped into individual tables.
|
||||
Within a stream each table's group key value is unique.
|
||||
|
||||
#### Missing values
|
||||
|
||||
|
@ -751,7 +751,7 @@ All operations may consume a stream and always produce a new stream.
|
|||
|
||||
Most operations output one table for every table they receive from the input stream.
|
||||
|
||||
Operations that modify the partition keys or values will need to repartition the tables in the output stream.
|
||||
Operations that modify the group keys or values will need to regroup the tables in the output stream.
|
||||
|
||||
### Built-in operations
|
||||
|
||||
|
@ -806,7 +806,7 @@ The aggregate function is applied to each column in isolation.
|
|||
Any output table will have the following properties:
|
||||
|
||||
* It always contains a single record.
|
||||
* It will have the same partition key as the input table.
|
||||
* It will have the same group key as the input table.
|
||||
* It will have a column `_time` which represents the time of the aggregated record.
|
||||
This can be set as the start or stop time of the input table.
|
||||
By default the stop time is used.
|
||||
|
@ -820,7 +820,7 @@ All aggregate operations have the following properties:
|
|||
columns specifies a list of columns to aggregate.
|
||||
* `timeSrc` string
|
||||
timeSrc is the source time column to use on the resulting aggregate record.
|
||||
The value must be column with type `time` and must be part of the partition key.
|
||||
The value must be column with type `time` and must be part of the group key.
|
||||
Defaults to `_stop`.
|
||||
* `timeDst` string
|
||||
timeDst is the destination column to use for the resulting aggregate record.
|
||||
|
@ -917,7 +917,7 @@ A single column on which to operate must be provided to the operation.
|
|||
|
||||
Any output table will have the following properties:
|
||||
|
||||
* It will have the same partition key as the input table.
|
||||
* It will have the same group key as the input table.
|
||||
* It will contain the same columns as the input table.
|
||||
* It will have a column `_time` which represents the time of the selected record.
|
||||
This can be set as the value of any time column on the input table.
|
||||
|
@ -996,11 +996,11 @@ Limit has the following properties:
|
|||
#### Map
|
||||
|
||||
Map applies a function to each record of the input tables.
|
||||
The modified records are assigned to new tables based on the partition key of the input table.
|
||||
The modified records are assigned to new tables based on the group key of the input table.
|
||||
The output tables are the result of applying the map function to each record on the input tables.
|
||||
|
||||
When the output record contains a different value for the partition key the record is repartitioned into the appropriate table.
|
||||
When the output record drops a column that was part of the partition key that column is removed from the partition key.
|
||||
When the output record contains a different value for the group key the record is regroup into the appropriate table.
|
||||
When the output record drops a column that was part of the group key that column is removed from the group key.
|
||||
|
||||
Map has the following properties:
|
||||
|
||||
|
@ -1008,8 +1008,8 @@ Map has the following properties:
|
|||
Function to apply to each record.
|
||||
The return value must be an object.
|
||||
* `mergeKey` bool
|
||||
MergeKey indicates if the record returned from fn should be merged with the partition key.
|
||||
When merging, all columns on the partition key will be added to the record giving precedence to any columns already present on the record.
|
||||
MergeKey indicates if the record returned from fn should be merged with the group key.
|
||||
When merging, all columns on the group key will be added to the record giving precedence to any columns already present on the record.
|
||||
When not merging, only columns defined on the returned record will be present on the output records.
|
||||
Defaults to true.
|
||||
|
||||
|
@ -1017,7 +1017,7 @@ Map has the following properties:
|
|||
|
||||
Range filters records based on provided time bounds.
|
||||
Each input tables records are filtered to contain only records that exist within the time bounds.
|
||||
Each input table's partition key value is modified to fit within the time bounds.
|
||||
Each input table's group key value is modified to fit within the time bounds.
|
||||
Tables where all records exists outside the time bounds are filtered entirely.
|
||||
|
||||
|
||||
|
@ -1036,7 +1036,7 @@ Range has the following properties:
|
|||
|
||||
Set assigns a static value to each record.
|
||||
The key may modify and existing column or it may add a new column to the tables.
|
||||
If the column that is modified is part of the partition key, then the output tables will be repartitioned as needed.
|
||||
If the column that is modified is part of the group key, then the output tables will be regroup as needed.
|
||||
|
||||
|
||||
Set has the following properties:
|
||||
|
@ -1064,8 +1064,8 @@ Sort has the following properties:
|
|||
|
||||
#### Group
|
||||
|
||||
Group partitions records based on their values for specific columns.
|
||||
It produces tables with new partition keys based on the provided properties.
|
||||
Group groups records based on their values for specific columns.
|
||||
It produces tables with new group keys based on the provided properties.
|
||||
|
||||
Group has the following properties:
|
||||
|
||||
|
@ -1080,15 +1080,15 @@ Examples:
|
|||
|
||||
group(by:["host"]) // group records by their "host" value
|
||||
group(except:["_time", "region", "_value"]) // group records by all other columns except for _time, region, and _value
|
||||
group(by:[]) // group all records into a single partition
|
||||
group(except:[]) // group records into all unique partitions
|
||||
group(by:[]) // group all records into a single group
|
||||
group(except:[]) // group records into all unique groups
|
||||
|
||||
[IMPL#322](https://github.com/influxdata/platform/query/issues/322) Investigate always keeping all columns in group.
|
||||
|
||||
#### Window
|
||||
|
||||
Window partitions records based on a time value.
|
||||
New columns are added to uniquely identify each window and those columns are added to the partition key of the output tables.
|
||||
Window groups records based on a time value.
|
||||
New columns are added to uniquely identify each window and those columns are added to the group key of the output tables.
|
||||
|
||||
A single input record will be placed into zero or more output tables, depending on the specific windowing function.
|
||||
|
||||
|
@ -1098,10 +1098,10 @@ Window has the following properties:
|
|||
Duration of time between windows
|
||||
Defaults to `period`'s value
|
||||
* `period` duration
|
||||
Duration of the windowed partition
|
||||
Duration of the windowed group
|
||||
Default to `every`'s value
|
||||
* `start` time
|
||||
The time of the initial window partition
|
||||
The time of the initial window group
|
||||
* `round` duration
|
||||
Rounds a window's bounds to the nearest duration
|
||||
Defaults to `every`'s value
|
||||
|
@ -1121,8 +1121,8 @@ Window has the following properties:
|
|||
#### Join
|
||||
|
||||
Join merges two or more input streams into a single output stream.
|
||||
Input tables are matched on their partition keys and then each of their records are joined into a single output table.
|
||||
The output table partition key will be the same as the input table.
|
||||
Input tables are matched on their group keys and then each of their records are joined into a single output table.
|
||||
The output table group key will be the same as the input table.
|
||||
|
||||
The join operation compares values based on equality.
|
||||
|
||||
|
@ -1297,7 +1297,7 @@ Triggers can fire based on these inputs:
|
|||
| Current processing time | The current processing time is the system time when the trigger is being evaluated. |
|
||||
| Watermark time | The watermark time is a time where it is expected that no data will arrive that is older than it. |
|
||||
| Record count | The number of records currently in the table. |
|
||||
| Partition key value | The partition key value of the table. |
|
||||
| Group key value | The group key value of the table. |
|
||||
|
||||
Additionally triggers can be _finished_, which means that they will never fire again.
|
||||
Once a trigger is finished, its associated table is deleted.
|
||||
|
@ -1453,7 +1453,7 @@ In addition to the columns on the tables themselves three additional columns may
|
|||
Columns support the following annotations:
|
||||
|
||||
* datatype - a description of the type of data contained within the column.
|
||||
* partition - a boolean flag indicating if the column is part of the table's partition key.
|
||||
* group - a boolean flag indicating if the column is part of the table's group key.
|
||||
* default - a default value to be used for rows whose string value is the empty string.
|
||||
|
||||
##### Multiple tables
|
||||
|
@ -1467,7 +1467,7 @@ It is also possible that a table has no records.
|
|||
In such cases an empty row delimits a new table boundary and new annotations and header rows follow.
|
||||
The empty row acts like a delimiter between two independent CSV files that have been concatenated together.
|
||||
|
||||
In the case were a table has no rows the `default` annotation is used to provide the values of the partition key.
|
||||
In the case were a table has no rows the `default` annotation is used to provide the values of the group key.
|
||||
|
||||
##### Multiple results
|
||||
|
||||
|
@ -1495,12 +1495,12 @@ The possible data types are:
|
|||
| dateTime | time | an instant in time, may be followed with a colon `:` and a description of the format |
|
||||
| duration | duration | a length of time represented as an unsigned 64-bit integer number of nanoseconds |
|
||||
|
||||
The `partition` annotation specifies if the column is part of the table's partition key.
|
||||
The `group` annotation specifies if the column is part of the table's group key.
|
||||
Possible values are `true` or `false`.
|
||||
|
||||
The `default` annotation specifies a default value, if it exists, for each column.
|
||||
|
||||
In order to fully encode a table with its partition key the `datatype`, `partition` and `default` annotations must be used.
|
||||
In order to fully encode a table with its group key the `datatype`, `group` and `default` annotations must be used.
|
||||
|
||||
##### Errors
|
||||
|
||||
|
@ -1591,11 +1591,11 @@ Example encoding with two tables in the same result with the datatype annotation
|
|||
,mean,1,2018-05-08T20:50:00Z,2018-05-08T20:51:00Z,2018-05-08T20:50:40Z,west,C,51.62
|
||||
```
|
||||
|
||||
Example encoding with two tables in the same result with the datatype and partition annotations:
|
||||
Example encoding with two tables in the same result with the datatype and group annotations:
|
||||
|
||||
```
|
||||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,string,string,double
|
||||
#partition,false,false,true,true,false,true,false,false
|
||||
#group,false,false,true,true,false,true,false,false
|
||||
,result,table,_start,_stop,_time,region,host,_value
|
||||
,mean,0,2018-05-08T20:50:00Z,2018-05-08T20:51:00Z,2018-05-08T20:50:00Z,east,A,15.43
|
||||
,mean,0,2018-05-08T20:50:00Z,2018-05-08T20:51:00Z,2018-05-08T20:50:20Z,east,B,59.25
|
||||
|
@ -1605,18 +1605,18 @@ Example encoding with two tables in the same result with the datatype and partit
|
|||
,mean,1,2018-05-08T20:50:00Z,2018-05-08T20:51:00Z,2018-05-08T20:50:40Z,west,C,51.62
|
||||
```
|
||||
|
||||
Example encoding with two tables with differing schemas in the same result with the datatype and partition annotations:
|
||||
Example encoding with two tables with differing schemas in the same result with the datatype and group annotations:
|
||||
|
||||
```
|
||||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,string,string,double
|
||||
#partition,false,false,true,true,false,true,false,false
|
||||
#group,false,false,true,true,false,true,false,false
|
||||
,result,table,_start,_stop,_time,region,host,_value
|
||||
,mean,0,2018-05-08T20:50:00Z,2018-05-08T20:51:00Z,2018-05-08T20:50:00Z,east,A,15.43
|
||||
,mean,0,2018-05-08T20:50:00Z,2018-05-08T20:51:00Z,2018-05-08T20:50:20Z,east,B,59.25
|
||||
,mean,0,2018-05-08T20:50:00Z,2018-05-08T20:51:00Z,2018-05-08T20:50:40Z,east,C,52.62
|
||||
|
||||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,string,string,double
|
||||
#partition,false,false,true,true,false,true,false,false
|
||||
#group,false,false,true,true,false,true,false,false
|
||||
,result,table,_start,_stop,_time,location,device,min,max
|
||||
,mean,1,2018-05-08T20:50:00Z,2018-05-08T20:51:00Z,2018-05-08T20:50:00Z,USA,5825,62.73,68.42
|
||||
,mean,1,2018-05-08T20:50:00Z,2018-05-08T20:51:00Z,2018-05-08T20:50:20Z,USA,2175,12.83,56.12
|
||||
|
|
|
@ -84,7 +84,7 @@ func NewAggregateTransformationAndDataset(id DatasetID, mode AccumulationMode, a
|
|||
return NewAggregateTransformation(d, cache, agg, config), d
|
||||
}
|
||||
|
||||
func (t *aggregateTransformation) RetractBlock(id DatasetID, key query.PartitionKey) error {
|
||||
func (t *aggregateTransformation) RetractBlock(id DatasetID, key query.GroupKey) error {
|
||||
//TODO(nathanielc): Store intermediate state for retractions
|
||||
return t.d.RetractBlock(key)
|
||||
}
|
||||
|
@ -119,7 +119,7 @@ func (t *aggregateTransformation) Process(id DatasetID, b query.Block) error {
|
|||
}
|
||||
c := cols[idx]
|
||||
if b.Key().HasCol(c.Label) {
|
||||
return errors.New("cannot aggregate columns that are part of the partition key")
|
||||
return errors.New("cannot aggregate columns that are part of the group key")
|
||||
}
|
||||
var vf ValueFunc
|
||||
switch c.Type {
|
||||
|
@ -203,7 +203,7 @@ func (t *aggregateTransformation) Finish(id DatasetID, err error) {
|
|||
t.d.Finish(err)
|
||||
}
|
||||
|
||||
func AppendAggregateTime(srcTime, dstTime string, key query.PartitionKey, builder BlockBuilder) error {
|
||||
func AppendAggregateTime(srcTime, dstTime string, key query.GroupKey, builder BlockBuilder) error {
|
||||
srcTimeIdx := ColIdx(srcTime, key.Cols())
|
||||
if srcTimeIdx < 0 {
|
||||
return fmt.Errorf("timeValue column %q does not exist", srcTime)
|
||||
|
|
|
@ -16,38 +16,7 @@ const (
|
|||
DefaultValueColLabel = "_value"
|
||||
)
|
||||
|
||||
func PartitionKeyForRow(i int, cr query.ColReader) query.PartitionKey {
|
||||
key := cr.Key()
|
||||
cols := cr.Cols()
|
||||
colsCpy := make([]query.ColMeta, 0, len(cols))
|
||||
vs := make([]values.Value, 0, len(cols))
|
||||
for j, c := range cols {
|
||||
if !key.HasCol(c.Label) {
|
||||
continue
|
||||
}
|
||||
colsCpy = append(colsCpy, c)
|
||||
switch c.Type {
|
||||
case query.TBool:
|
||||
vs = append(vs, values.NewBoolValue(cr.Bools(j)[i]))
|
||||
case query.TInt:
|
||||
vs = append(vs, values.NewIntValue(cr.Ints(j)[i]))
|
||||
case query.TUInt:
|
||||
vs = append(vs, values.NewUIntValue(cr.UInts(j)[i]))
|
||||
case query.TFloat:
|
||||
vs = append(vs, values.NewFloatValue(cr.Floats(j)[i]))
|
||||
case query.TString:
|
||||
vs = append(vs, values.NewStringValue(cr.Strings(j)[i]))
|
||||
case query.TTime:
|
||||
vs = append(vs, values.NewTimeValue(cr.Times(j)[i]))
|
||||
}
|
||||
}
|
||||
return &partitionKey{
|
||||
cols: colsCpy,
|
||||
values: vs,
|
||||
}
|
||||
}
|
||||
|
||||
func PartitionKeyForRowOn(i int, cr query.ColReader, on map[string]bool) query.PartitionKey {
|
||||
func GroupKeyForRowOn(i int, cr query.ColReader, on map[string]bool) query.GroupKey {
|
||||
cols := make([]query.ColMeta, 0, len(on))
|
||||
vs := make([]values.Value, 0, len(on))
|
||||
for j, c := range cr.Cols() {
|
||||
|
@ -70,7 +39,7 @@ func PartitionKeyForRowOn(i int, cr query.ColReader, on map[string]bool) query.P
|
|||
vs = append(vs, values.NewTimeValue(cr.Times(j)[i]))
|
||||
}
|
||||
}
|
||||
return NewPartitionKey(cols, vs)
|
||||
return NewGroupKey(cols, vs)
|
||||
}
|
||||
|
||||
// OneTimeBlock is a Block that permits reading data only once.
|
||||
|
@ -116,7 +85,7 @@ func AddBlockCols(b query.Block, builder BlockBuilder) {
|
|||
}
|
||||
}
|
||||
|
||||
func AddBlockKeyCols(key query.PartitionKey, builder BlockBuilder) {
|
||||
func AddBlockKeyCols(key query.GroupKey, builder BlockBuilder) {
|
||||
for _, c := range key.Cols() {
|
||||
builder.AddCol(c)
|
||||
}
|
||||
|
@ -254,7 +223,7 @@ func AppendRecordForCols(i int, cr query.ColReader, builder BlockBuilder, cols [
|
|||
}
|
||||
}
|
||||
|
||||
func AppendKeyValues(key query.PartitionKey, builder BlockBuilder) {
|
||||
func AppendKeyValues(key query.GroupKey, builder BlockBuilder) {
|
||||
for j, c := range key.Cols() {
|
||||
idx := ColIdx(c.Label, builder.Cols())
|
||||
switch c.Type {
|
||||
|
@ -299,7 +268,7 @@ func HasCol(label string, cols []query.ColMeta) bool {
|
|||
|
||||
// BlockBuilder builds blocks that can be used multiple times
|
||||
type BlockBuilder interface {
|
||||
Key() query.PartitionKey
|
||||
Key() query.GroupKey
|
||||
|
||||
NRows() int
|
||||
NCols() int
|
||||
|
@ -348,14 +317,14 @@ type ColListBlockBuilder struct {
|
|||
alloc *Allocator
|
||||
}
|
||||
|
||||
func NewColListBlockBuilder(key query.PartitionKey, a *Allocator) *ColListBlockBuilder {
|
||||
func NewColListBlockBuilder(key query.GroupKey, a *Allocator) *ColListBlockBuilder {
|
||||
return &ColListBlockBuilder{
|
||||
blk: &ColListBlock{key: key},
|
||||
alloc: a,
|
||||
}
|
||||
}
|
||||
|
||||
func (b ColListBlockBuilder) Key() query.PartitionKey {
|
||||
func (b ColListBlockBuilder) Key() query.GroupKey {
|
||||
return b.blk.Key()
|
||||
}
|
||||
|
||||
|
@ -564,7 +533,7 @@ func (b ColListBlockBuilder) Sort(cols []string, desc bool) {
|
|||
// All data for the block is stored in RAM.
|
||||
// As a result At* methods are provided directly on the block for easy access.
|
||||
type ColListBlock struct {
|
||||
key query.PartitionKey
|
||||
key query.GroupKey
|
||||
colMeta []query.ColMeta
|
||||
cols []column
|
||||
nrows int
|
||||
|
@ -581,7 +550,7 @@ func (b *ColListBlock) RefCount(n int) {
|
|||
}
|
||||
}
|
||||
|
||||
func (b *ColListBlock) Key() query.PartitionKey {
|
||||
func (b *ColListBlock) Key() query.GroupKey {
|
||||
return b.key
|
||||
}
|
||||
func (b *ColListBlock) Cols() []query.ColMeta {
|
||||
|
@ -893,12 +862,12 @@ func (c *timeColumn) Swap(i, j int) {
|
|||
type BlockBuilderCache interface {
|
||||
// BlockBuilder returns an existing or new BlockBuilder for the given meta data.
|
||||
// The boolean return value indicates if BlockBuilder is new.
|
||||
BlockBuilder(key query.PartitionKey) (BlockBuilder, bool)
|
||||
ForEachBuilder(f func(query.PartitionKey, BlockBuilder))
|
||||
BlockBuilder(key query.GroupKey) (BlockBuilder, bool)
|
||||
ForEachBuilder(f func(query.GroupKey, BlockBuilder))
|
||||
}
|
||||
|
||||
type blockBuilderCache struct {
|
||||
blocks *PartitionLookup
|
||||
blocks *GroupLookup
|
||||
alloc *Allocator
|
||||
|
||||
triggerSpec query.TriggerSpec
|
||||
|
@ -906,7 +875,7 @@ type blockBuilderCache struct {
|
|||
|
||||
func NewBlockBuilderCache(a *Allocator) *blockBuilderCache {
|
||||
return &blockBuilderCache{
|
||||
blocks: NewPartitionLookup(),
|
||||
blocks: NewGroupLookup(),
|
||||
alloc: a,
|
||||
}
|
||||
}
|
||||
|
@ -920,7 +889,7 @@ func (d *blockBuilderCache) SetTriggerSpec(ts query.TriggerSpec) {
|
|||
d.triggerSpec = ts
|
||||
}
|
||||
|
||||
func (d *blockBuilderCache) Block(key query.PartitionKey) (query.Block, error) {
|
||||
func (d *blockBuilderCache) Block(key query.GroupKey) (query.Block, error) {
|
||||
b, ok := d.lookupState(key)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("block not found with key %v", key)
|
||||
|
@ -928,7 +897,7 @@ func (d *blockBuilderCache) Block(key query.PartitionKey) (query.Block, error) {
|
|||
return b.builder.Block()
|
||||
}
|
||||
|
||||
func (d *blockBuilderCache) lookupState(key query.PartitionKey) (blockState, bool) {
|
||||
func (d *blockBuilderCache) lookupState(key query.GroupKey) (blockState, bool) {
|
||||
v, ok := d.blocks.Lookup(key)
|
||||
if !ok {
|
||||
return blockState{}, false
|
||||
|
@ -938,7 +907,7 @@ func (d *blockBuilderCache) lookupState(key query.PartitionKey) (blockState, boo
|
|||
|
||||
// BlockBuilder will return the builder for the specified block.
|
||||
// If no builder exists, one will be created.
|
||||
func (d *blockBuilderCache) BlockBuilder(key query.PartitionKey) (BlockBuilder, bool) {
|
||||
func (d *blockBuilderCache) BlockBuilder(key query.GroupKey) (BlockBuilder, bool) {
|
||||
b, ok := d.lookupState(key)
|
||||
if !ok {
|
||||
builder := NewColListBlockBuilder(key, d.alloc)
|
||||
|
@ -952,34 +921,34 @@ func (d *blockBuilderCache) BlockBuilder(key query.PartitionKey) (BlockBuilder,
|
|||
return b.builder, !ok
|
||||
}
|
||||
|
||||
func (d *blockBuilderCache) ForEachBuilder(f func(query.PartitionKey, BlockBuilder)) {
|
||||
d.blocks.Range(func(key query.PartitionKey, value interface{}) {
|
||||
func (d *blockBuilderCache) ForEachBuilder(f func(query.GroupKey, BlockBuilder)) {
|
||||
d.blocks.Range(func(key query.GroupKey, value interface{}) {
|
||||
f(key, value.(blockState).builder)
|
||||
})
|
||||
}
|
||||
|
||||
func (d *blockBuilderCache) DiscardBlock(key query.PartitionKey) {
|
||||
func (d *blockBuilderCache) DiscardBlock(key query.GroupKey) {
|
||||
b, ok := d.lookupState(key)
|
||||
if ok {
|
||||
b.builder.ClearData()
|
||||
}
|
||||
}
|
||||
|
||||
func (d *blockBuilderCache) ExpireBlock(key query.PartitionKey) {
|
||||
func (d *blockBuilderCache) ExpireBlock(key query.GroupKey) {
|
||||
b, ok := d.blocks.Delete(key)
|
||||
if ok {
|
||||
b.(blockState).builder.ClearData()
|
||||
}
|
||||
}
|
||||
|
||||
func (d *blockBuilderCache) ForEach(f func(query.PartitionKey)) {
|
||||
d.blocks.Range(func(key query.PartitionKey, value interface{}) {
|
||||
func (d *blockBuilderCache) ForEach(f func(query.GroupKey)) {
|
||||
d.blocks.Range(func(key query.GroupKey, value interface{}) {
|
||||
f(key)
|
||||
})
|
||||
}
|
||||
|
||||
func (d *blockBuilderCache) ForEachWithContext(f func(query.PartitionKey, Trigger, BlockContext)) {
|
||||
d.blocks.Range(func(key query.PartitionKey, value interface{}) {
|
||||
func (d *blockBuilderCache) ForEachWithContext(f func(query.GroupKey, Trigger, BlockContext)) {
|
||||
d.blocks.Range(func(key query.GroupKey, value interface{}) {
|
||||
b := value.(blockState)
|
||||
f(key, b.trigger, BlockContext{
|
||||
Key: key,
|
||||
|
|
|
@ -9,7 +9,7 @@ import (
|
|||
type Dataset interface {
|
||||
Node
|
||||
|
||||
RetractBlock(key query.PartitionKey) error
|
||||
RetractBlock(key query.GroupKey) error
|
||||
UpdateProcessingTime(t Time) error
|
||||
UpdateWatermark(mark Time) error
|
||||
Finish(error)
|
||||
|
@ -19,13 +19,13 @@ type Dataset interface {
|
|||
|
||||
// DataCache holds all working data for a transformation.
|
||||
type DataCache interface {
|
||||
Block(query.PartitionKey) (query.Block, error)
|
||||
Block(query.GroupKey) (query.Block, error)
|
||||
|
||||
ForEach(func(query.PartitionKey))
|
||||
ForEachWithContext(func(query.PartitionKey, Trigger, BlockContext))
|
||||
ForEach(func(query.GroupKey))
|
||||
ForEachWithContext(func(query.GroupKey, Trigger, BlockContext))
|
||||
|
||||
DiscardBlock(query.PartitionKey)
|
||||
ExpireBlock(query.PartitionKey)
|
||||
DiscardBlock(query.GroupKey)
|
||||
ExpireBlock(query.GroupKey)
|
||||
|
||||
SetTriggerSpec(t query.TriggerSpec)
|
||||
}
|
||||
|
@ -105,7 +105,7 @@ func (d *dataset) UpdateProcessingTime(time Time) error {
|
|||
}
|
||||
|
||||
func (d *dataset) evalTriggers() (err error) {
|
||||
d.cache.ForEachWithContext(func(key query.PartitionKey, trigger Trigger, bc BlockContext) {
|
||||
d.cache.ForEachWithContext(func(key query.GroupKey, trigger Trigger, bc BlockContext) {
|
||||
if err != nil {
|
||||
// Skip the rest once we have encountered an error
|
||||
return
|
||||
|
@ -126,7 +126,7 @@ func (d *dataset) evalTriggers() (err error) {
|
|||
return err
|
||||
}
|
||||
|
||||
func (d *dataset) triggerBlock(key query.PartitionKey) error {
|
||||
func (d *dataset) triggerBlock(key query.GroupKey) error {
|
||||
b, err := d.cache.Block(key)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -157,11 +157,11 @@ func (d *dataset) triggerBlock(key query.PartitionKey) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (d *dataset) expireBlock(key query.PartitionKey) {
|
||||
func (d *dataset) expireBlock(key query.GroupKey) {
|
||||
d.cache.ExpireBlock(key)
|
||||
}
|
||||
|
||||
func (d *dataset) RetractBlock(key query.PartitionKey) error {
|
||||
func (d *dataset) RetractBlock(key query.GroupKey) error {
|
||||
d.cache.DiscardBlock(key)
|
||||
for _, t := range d.ts {
|
||||
if err := t.RetractBlock(d.id, key); err != nil {
|
||||
|
@ -174,7 +174,7 @@ func (d *dataset) RetractBlock(key query.PartitionKey) error {
|
|||
func (d *dataset) Finish(err error) {
|
||||
if err == nil {
|
||||
// Only trigger blocks we if we not finishing because of an error.
|
||||
d.cache.ForEach(func(bk query.PartitionKey) {
|
||||
d.cache.ForEach(func(bk query.GroupKey) {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
|
|
@ -13,12 +13,12 @@ import (
|
|||
// Not all fields need to be set. See comments on each field.
|
||||
// Use Normalize to ensure that all fields are set before equality comparisons.
|
||||
type Block struct {
|
||||
// PartitionKey of the block. Does not need to be set explicitly.
|
||||
PartitionKey query.PartitionKey
|
||||
// KeyCols is a list of column that are part of the partition key.
|
||||
// GroupKey of the block. Does not need to be set explicitly.
|
||||
GroupKey query.GroupKey
|
||||
// KeyCols is a list of column that are part of the group key.
|
||||
// The column type is deduced from the ColMeta slice.
|
||||
KeyCols []string
|
||||
// KeyValues is a list of values for the partition key columns.
|
||||
// KeyValues is a list of values for the group key columns.
|
||||
// Only needs to be set when no data is present on the Block.
|
||||
KeyValues []interface{}
|
||||
// ColMeta is a list of columns of the block.
|
||||
|
@ -30,7 +30,7 @@ type Block struct {
|
|||
|
||||
// Normalize ensures all fields of the Block are set correctly.
|
||||
func (b *Block) Normalize() {
|
||||
if b.PartitionKey == nil {
|
||||
if b.GroupKey == nil {
|
||||
cols := make([]query.ColMeta, len(b.KeyCols))
|
||||
vs := make([]values.Value, len(b.KeyCols))
|
||||
if len(b.KeyValues) != len(b.KeyCols) {
|
||||
|
@ -39,7 +39,7 @@ func (b *Block) Normalize() {
|
|||
for j, label := range b.KeyCols {
|
||||
idx := execute.ColIdx(label, b.ColMeta)
|
||||
if idx < 0 {
|
||||
panic(fmt.Errorf("block invalid: missing partition column %q", label))
|
||||
panic(fmt.Errorf("block invalid: missing group column %q", label))
|
||||
}
|
||||
cols[j] = b.ColMeta[idx]
|
||||
if len(b.Data) > 0 {
|
||||
|
@ -51,7 +51,7 @@ func (b *Block) Normalize() {
|
|||
}
|
||||
vs[j] = v
|
||||
}
|
||||
b.PartitionKey = execute.NewPartitionKey(cols, vs)
|
||||
b.GroupKey = execute.NewGroupKey(cols, vs)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -65,9 +65,9 @@ func (b *Block) Cols() []query.ColMeta {
|
|||
return b.ColMeta
|
||||
}
|
||||
|
||||
func (b *Block) Key() query.PartitionKey {
|
||||
func (b *Block) Key() query.GroupKey {
|
||||
b.Normalize()
|
||||
return b.PartitionKey
|
||||
return b.GroupKey
|
||||
}
|
||||
|
||||
func (b *Block) Do(f func(query.ColReader) error) error {
|
||||
|
@ -84,7 +84,7 @@ func (b *Block) Do(f func(query.ColReader) error) error {
|
|||
}
|
||||
|
||||
type ColReader struct {
|
||||
key query.PartitionKey
|
||||
key query.GroupKey
|
||||
cols []query.ColMeta
|
||||
row []interface{}
|
||||
}
|
||||
|
@ -93,7 +93,7 @@ func (cr ColReader) Cols() []query.ColMeta {
|
|||
return cr.cols
|
||||
}
|
||||
|
||||
func (cr ColReader) Key() query.PartitionKey {
|
||||
func (cr ColReader) Key() query.GroupKey {
|
||||
return cr.key
|
||||
}
|
||||
func (cr ColReader) Len() int {
|
||||
|
@ -125,7 +125,7 @@ func (cr ColReader) Times(j int) []execute.Time {
|
|||
}
|
||||
|
||||
func BlocksFromCache(c execute.DataCache) (blocks []*Block, err error) {
|
||||
c.ForEach(func(key query.PartitionKey) {
|
||||
c.ForEach(func(key query.GroupKey) {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
@ -147,8 +147,8 @@ func BlocksFromCache(c execute.DataCache) (blocks []*Block, err error) {
|
|||
func ConvertBlock(b query.Block) (*Block, error) {
|
||||
key := b.Key()
|
||||
blk := &Block{
|
||||
PartitionKey: key,
|
||||
ColMeta: b.Cols(),
|
||||
GroupKey: key,
|
||||
ColMeta: b.Cols(),
|
||||
}
|
||||
|
||||
keyCols := key.Cols()
|
||||
|
|
|
@ -15,7 +15,7 @@ func RandomDatasetID() execute.DatasetID {
|
|||
|
||||
type Dataset struct {
|
||||
ID execute.DatasetID
|
||||
Retractions []query.PartitionKey
|
||||
Retractions []query.GroupKey
|
||||
ProcessingTimeUpdates []execute.Time
|
||||
WatermarkUpdates []execute.Time
|
||||
Finished bool
|
||||
|
@ -32,7 +32,7 @@ func (d *Dataset) AddTransformation(t execute.Transformation) {
|
|||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (d *Dataset) RetractBlock(key query.PartitionKey) error {
|
||||
func (d *Dataset) RetractBlock(key query.GroupKey) error {
|
||||
d.Retractions = append(d.Retractions, key)
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -241,10 +241,10 @@ func (f *Formatter) valueBuf(i, j int, typ query.DataType, cr query.ColReader) (
|
|||
type orderedCols struct {
|
||||
indexMap []int
|
||||
cols []query.ColMeta
|
||||
key query.PartitionKey
|
||||
key query.GroupKey
|
||||
}
|
||||
|
||||
func newOrderedCols(cols []query.ColMeta, key query.PartitionKey) orderedCols {
|
||||
func newOrderedCols(cols []query.ColMeta, key query.GroupKey) orderedCols {
|
||||
indexMap := make([]int, len(cols))
|
||||
for i := range indexMap {
|
||||
indexMap[i] = i
|
||||
|
|
|
@ -8,58 +8,58 @@ import (
|
|||
"github.com/influxdata/platform/query/values"
|
||||
)
|
||||
|
||||
type partitionKey struct {
|
||||
type groupKey struct {
|
||||
cols []query.ColMeta
|
||||
values []values.Value
|
||||
}
|
||||
|
||||
func NewPartitionKey(cols []query.ColMeta, values []values.Value) query.PartitionKey {
|
||||
return &partitionKey{
|
||||
func NewGroupKey(cols []query.ColMeta, values []values.Value) query.GroupKey {
|
||||
return &groupKey{
|
||||
cols: cols,
|
||||
values: values,
|
||||
}
|
||||
}
|
||||
|
||||
func (k *partitionKey) Cols() []query.ColMeta {
|
||||
func (k *groupKey) Cols() []query.ColMeta {
|
||||
return k.cols
|
||||
}
|
||||
func (k *partitionKey) HasCol(label string) bool {
|
||||
func (k *groupKey) HasCol(label string) bool {
|
||||
return ColIdx(label, k.cols) >= 0
|
||||
}
|
||||
func (k *partitionKey) Value(j int) values.Value {
|
||||
func (k *groupKey) Value(j int) values.Value {
|
||||
return k.values[j]
|
||||
}
|
||||
func (k *partitionKey) ValueBool(j int) bool {
|
||||
func (k *groupKey) ValueBool(j int) bool {
|
||||
return k.values[j].Bool()
|
||||
}
|
||||
func (k *partitionKey) ValueUInt(j int) uint64 {
|
||||
func (k *groupKey) ValueUInt(j int) uint64 {
|
||||
return k.values[j].UInt()
|
||||
}
|
||||
func (k *partitionKey) ValueInt(j int) int64 {
|
||||
func (k *groupKey) ValueInt(j int) int64 {
|
||||
return k.values[j].Int()
|
||||
}
|
||||
func (k *partitionKey) ValueFloat(j int) float64 {
|
||||
func (k *groupKey) ValueFloat(j int) float64 {
|
||||
return k.values[j].Float()
|
||||
}
|
||||
func (k *partitionKey) ValueString(j int) string {
|
||||
func (k *groupKey) ValueString(j int) string {
|
||||
return k.values[j].Str()
|
||||
}
|
||||
func (k *partitionKey) ValueDuration(j int) Duration {
|
||||
func (k *groupKey) ValueDuration(j int) Duration {
|
||||
return k.values[j].Duration()
|
||||
}
|
||||
func (k *partitionKey) ValueTime(j int) Time {
|
||||
func (k *groupKey) ValueTime(j int) Time {
|
||||
return k.values[j].Time()
|
||||
}
|
||||
|
||||
func (k *partitionKey) Equal(o query.PartitionKey) bool {
|
||||
return partitionKeyEqual(k, o)
|
||||
func (k *groupKey) Equal(o query.GroupKey) bool {
|
||||
return groupKeyEqual(k, o)
|
||||
}
|
||||
|
||||
func (k *partitionKey) Less(o query.PartitionKey) bool {
|
||||
return partitionKeyLess(k, o)
|
||||
func (k *groupKey) Less(o query.GroupKey) bool {
|
||||
return groupKeyLess(k, o)
|
||||
}
|
||||
|
||||
func (k *partitionKey) String() string {
|
||||
func (k *groupKey) String() string {
|
||||
var b strings.Builder
|
||||
b.WriteRune('{')
|
||||
for j, c := range k.cols {
|
||||
|
@ -72,7 +72,7 @@ func (k *partitionKey) String() string {
|
|||
return b.String()
|
||||
}
|
||||
|
||||
func partitionKeyEqual(a, b query.PartitionKey) bool {
|
||||
func groupKeyEqual(a, b query.GroupKey) bool {
|
||||
aCols := a.Cols()
|
||||
bCols := b.Cols()
|
||||
if len(aCols) != len(bCols) {
|
||||
|
@ -112,7 +112,7 @@ func partitionKeyEqual(a, b query.PartitionKey) bool {
|
|||
return true
|
||||
}
|
||||
|
||||
func partitionKeyLess(a, b query.PartitionKey) bool {
|
||||
func groupKeyLess(a, b query.GroupKey) bool {
|
||||
aCols := a.Cols()
|
||||
bCols := b.Cols()
|
||||
if av, bv := len(aCols), len(bCols); av != bv {
|
|
@ -0,0 +1,91 @@
|
|||
package execute
|
||||
|
||||
import (
|
||||
"sort"
|
||||
|
||||
"github.com/influxdata/platform/query"
|
||||
)
|
||||
|
||||
type GroupLookup struct {
|
||||
groups groupEntries
|
||||
|
||||
// range state
|
||||
rangeIdx int
|
||||
}
|
||||
|
||||
type groupEntry struct {
|
||||
key query.GroupKey
|
||||
value interface{}
|
||||
}
|
||||
|
||||
func NewGroupLookup() *GroupLookup {
|
||||
return &GroupLookup{
|
||||
groups: make(groupEntries, 0, 100),
|
||||
}
|
||||
}
|
||||
|
||||
func (l *GroupLookup) findIdx(key query.GroupKey) int {
|
||||
i := sort.Search(len(l.groups), func(i int) bool {
|
||||
return !l.groups[i].key.Less(key)
|
||||
})
|
||||
if i < len(l.groups) && l.groups[i].key.Equal(key) {
|
||||
return i
|
||||
}
|
||||
return -1
|
||||
}
|
||||
|
||||
func (l *GroupLookup) Lookup(key query.GroupKey) (interface{}, bool) {
|
||||
if key == nil {
|
||||
return nil, false
|
||||
}
|
||||
i := l.findIdx(key)
|
||||
if i >= 0 {
|
||||
return l.groups[i].value, true
|
||||
}
|
||||
return nil, false
|
||||
}
|
||||
|
||||
func (l *GroupLookup) Set(key query.GroupKey, value interface{}) {
|
||||
i := l.findIdx(key)
|
||||
if i >= 0 {
|
||||
l.groups[i].value = value
|
||||
} else {
|
||||
l.groups = append(l.groups, groupEntry{
|
||||
key: key,
|
||||
value: value,
|
||||
})
|
||||
sort.Sort(l.groups)
|
||||
}
|
||||
}
|
||||
|
||||
func (l *GroupLookup) Delete(key query.GroupKey) (v interface{}, found bool) {
|
||||
if key == nil {
|
||||
return
|
||||
}
|
||||
i := l.findIdx(key)
|
||||
found = i >= 0
|
||||
if found {
|
||||
if i <= l.rangeIdx {
|
||||
l.rangeIdx--
|
||||
}
|
||||
v = l.groups[i].value
|
||||
l.groups = append(l.groups[:i], l.groups[i+1:]...)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Range will iterate over all groups keys in sorted order.
|
||||
// Range must not be called within another call to Range.
|
||||
// It is safe to call Set/Delete while ranging.
|
||||
func (l *GroupLookup) Range(f func(key query.GroupKey, value interface{})) {
|
||||
for l.rangeIdx = 0; l.rangeIdx < len(l.groups); l.rangeIdx++ {
|
||||
entry := l.groups[l.rangeIdx]
|
||||
f(entry.key, entry.value)
|
||||
}
|
||||
}
|
||||
|
||||
type groupEntries []groupEntry
|
||||
|
||||
func (p groupEntries) Len() int { return len(p) }
|
||||
func (p groupEntries) Less(i int, j int) bool { return p[i].key.Less(p[j].key) }
|
||||
func (p groupEntries) Swap(i int, j int) { p[i], p[j] = p[j], p[i] }
|
|
@ -15,7 +15,7 @@ var (
|
|||
{Label: "b", Type: query.TString},
|
||||
{Label: "c", Type: query.TString},
|
||||
}
|
||||
key0 = execute.NewPartitionKey(
|
||||
key0 = execute.NewGroupKey(
|
||||
cols,
|
||||
[]values.Value{
|
||||
values.NewStringValue("I"),
|
||||
|
@ -23,7 +23,7 @@ var (
|
|||
values.NewStringValue("K"),
|
||||
},
|
||||
)
|
||||
key1 = execute.NewPartitionKey(
|
||||
key1 = execute.NewGroupKey(
|
||||
cols,
|
||||
[]values.Value{
|
||||
values.NewStringValue("L"),
|
||||
|
@ -31,7 +31,7 @@ var (
|
|||
values.NewStringValue("N"),
|
||||
},
|
||||
)
|
||||
key2 = execute.NewPartitionKey(
|
||||
key2 = execute.NewGroupKey(
|
||||
cols,
|
||||
[]values.Value{
|
||||
values.NewStringValue("X"),
|
||||
|
@ -41,8 +41,8 @@ var (
|
|||
)
|
||||
)
|
||||
|
||||
func TestPartitionLookup(t *testing.T) {
|
||||
l := execute.NewPartitionLookup()
|
||||
func TestGroupLookup(t *testing.T) {
|
||||
l := execute.NewGroupLookup()
|
||||
l.Set(key0, 0)
|
||||
if v, ok := l.Lookup(key0); !ok || v != 0 {
|
||||
t.Error("failed to lookup key0")
|
||||
|
@ -63,7 +63,7 @@ func TestPartitionLookup(t *testing.T) {
|
|||
}
|
||||
|
||||
var got []entry
|
||||
l.Range(func(k query.PartitionKey, v interface{}) {
|
||||
l.Range(func(k query.GroupKey, v interface{}) {
|
||||
got = append(got, entry{
|
||||
Key: k,
|
||||
Value: v.(int),
|
||||
|
@ -94,8 +94,8 @@ func TestPartitionLookup(t *testing.T) {
|
|||
}
|
||||
|
||||
// Test that the lookup supports Deletes while rangeing.
|
||||
func TestPartitionLookup_RangeWithDelete(t *testing.T) {
|
||||
l := execute.NewPartitionLookup()
|
||||
func TestGroupLookup_RangeWithDelete(t *testing.T) {
|
||||
l := execute.NewGroupLookup()
|
||||
l.Set(key0, 0)
|
||||
if v, ok := l.Lookup(key0); !ok || v != 0 {
|
||||
t.Error("failed to lookup key0")
|
||||
|
@ -114,7 +114,7 @@ func TestPartitionLookup_RangeWithDelete(t *testing.T) {
|
|||
{Key: key1, Value: 1},
|
||||
}
|
||||
var got []entry
|
||||
l.Range(func(k query.PartitionKey, v interface{}) {
|
||||
l.Range(func(k query.GroupKey, v interface{}) {
|
||||
// Delete the current key
|
||||
l.Delete(key0)
|
||||
// Delete a future key
|
||||
|
@ -131,6 +131,6 @@ func TestPartitionLookup_RangeWithDelete(t *testing.T) {
|
|||
}
|
||||
|
||||
type entry struct {
|
||||
Key query.PartitionKey
|
||||
Key query.GroupKey
|
||||
Value int
|
||||
}
|
|
@ -1,91 +0,0 @@
|
|||
package execute
|
||||
|
||||
import (
|
||||
"sort"
|
||||
|
||||
"github.com/influxdata/platform/query"
|
||||
)
|
||||
|
||||
type PartitionLookup struct {
|
||||
partitions partitionEntries
|
||||
|
||||
// range state
|
||||
rangeIdx int
|
||||
}
|
||||
|
||||
type partitionEntry struct {
|
||||
key query.PartitionKey
|
||||
value interface{}
|
||||
}
|
||||
|
||||
func NewPartitionLookup() *PartitionLookup {
|
||||
return &PartitionLookup{
|
||||
partitions: make(partitionEntries, 0, 100),
|
||||
}
|
||||
}
|
||||
|
||||
func (l *PartitionLookup) findIdx(key query.PartitionKey) int {
|
||||
i := sort.Search(len(l.partitions), func(i int) bool {
|
||||
return !l.partitions[i].key.Less(key)
|
||||
})
|
||||
if i < len(l.partitions) && l.partitions[i].key.Equal(key) {
|
||||
return i
|
||||
}
|
||||
return -1
|
||||
}
|
||||
|
||||
func (l *PartitionLookup) Lookup(key query.PartitionKey) (interface{}, bool) {
|
||||
if key == nil {
|
||||
return nil, false
|
||||
}
|
||||
i := l.findIdx(key)
|
||||
if i >= 0 {
|
||||
return l.partitions[i].value, true
|
||||
}
|
||||
return nil, false
|
||||
}
|
||||
|
||||
func (l *PartitionLookup) Set(key query.PartitionKey, value interface{}) {
|
||||
i := l.findIdx(key)
|
||||
if i >= 0 {
|
||||
l.partitions[i].value = value
|
||||
} else {
|
||||
l.partitions = append(l.partitions, partitionEntry{
|
||||
key: key,
|
||||
value: value,
|
||||
})
|
||||
sort.Sort(l.partitions)
|
||||
}
|
||||
}
|
||||
|
||||
func (l *PartitionLookup) Delete(key query.PartitionKey) (v interface{}, found bool) {
|
||||
if key == nil {
|
||||
return
|
||||
}
|
||||
i := l.findIdx(key)
|
||||
found = i >= 0
|
||||
if found {
|
||||
if i <= l.rangeIdx {
|
||||
l.rangeIdx--
|
||||
}
|
||||
v = l.partitions[i].value
|
||||
l.partitions = append(l.partitions[:i], l.partitions[i+1:]...)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Range will iterate over all partitions keys in sorted order.
|
||||
// Range must not be called within another call to Range.
|
||||
// It is safe to call Set/Delete while ranging.
|
||||
func (l *PartitionLookup) Range(f func(key query.PartitionKey, value interface{})) {
|
||||
for l.rangeIdx = 0; l.rangeIdx < len(l.partitions); l.rangeIdx++ {
|
||||
entry := l.partitions[l.rangeIdx]
|
||||
f(entry.key, entry.value)
|
||||
}
|
||||
}
|
||||
|
||||
type partitionEntries []partitionEntry
|
||||
|
||||
func (p partitionEntries) Len() int { return len(p) }
|
||||
func (p partitionEntries) Less(i int, j int) bool { return p[i].key.Less(p[j].key) }
|
||||
func (p partitionEntries) Swap(i int, j int) { p[i], p[j] = p[j], p[i] }
|
|
@ -37,7 +37,7 @@ func newResult(name string, spec plan.YieldSpec) *result {
|
|||
func (s *result) Name() string {
|
||||
return s.name
|
||||
}
|
||||
func (s *result) RetractBlock(DatasetID, query.PartitionKey) error {
|
||||
func (s *result) RetractBlock(DatasetID, query.GroupKey) error {
|
||||
//TODO implement
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -70,7 +70,7 @@ func newSelectorTransformation(d Dataset, c BlockBuilderCache, config SelectorCo
|
|||
}
|
||||
}
|
||||
|
||||
func (t *selectorTransformation) RetractBlock(id DatasetID, key query.PartitionKey) error {
|
||||
func (t *selectorTransformation) RetractBlock(id DatasetID, key query.GroupKey) error {
|
||||
//TODO(nathanielc): Store intermediate state for retractions
|
||||
return t.d.RetractBlock(key)
|
||||
}
|
||||
|
|
|
@ -9,7 +9,7 @@ import (
|
|||
)
|
||||
|
||||
type Transformation interface {
|
||||
RetractBlock(id DatasetID, key query.PartitionKey) error
|
||||
RetractBlock(id DatasetID, key query.GroupKey) error
|
||||
Process(id DatasetID, b query.Block) error
|
||||
UpdateWatermark(id DatasetID, t Time) error
|
||||
UpdateProcessingTime(id DatasetID, t Time) error
|
||||
|
|
|
@ -54,7 +54,7 @@ func (t *consecutiveTransport) Finished() <-chan struct{} {
|
|||
return t.finished
|
||||
}
|
||||
|
||||
func (t *consecutiveTransport) RetractBlock(id DatasetID, key query.PartitionKey) error {
|
||||
func (t *consecutiveTransport) RetractBlock(id DatasetID, key query.GroupKey) error {
|
||||
select {
|
||||
case <-t.finished:
|
||||
return t.err()
|
||||
|
@ -232,18 +232,18 @@ func (m srcMessage) SrcDatasetID() DatasetID {
|
|||
|
||||
type RetractBlockMsg interface {
|
||||
Message
|
||||
Key() query.PartitionKey
|
||||
Key() query.GroupKey
|
||||
}
|
||||
|
||||
type retractBlockMsg struct {
|
||||
srcMessage
|
||||
key query.PartitionKey
|
||||
key query.GroupKey
|
||||
}
|
||||
|
||||
func (m *retractBlockMsg) Type() MessageType {
|
||||
return RetractBlockType
|
||||
}
|
||||
func (m *retractBlockMsg) Key() query.PartitionKey {
|
||||
func (m *retractBlockMsg) Key() query.GroupKey {
|
||||
return m.key
|
||||
}
|
||||
|
||||
|
|
|
@ -19,7 +19,7 @@ type TriggerContext struct {
|
|||
}
|
||||
|
||||
type BlockContext struct {
|
||||
Key query.PartitionKey
|
||||
Key query.GroupKey
|
||||
Count int
|
||||
}
|
||||
|
||||
|
|
|
@ -151,7 +151,7 @@ func NewCovarianceTransformation(d execute.Dataset, cache execute.BlockBuilderCa
|
|||
}
|
||||
}
|
||||
|
||||
func (t *CovarianceTransformation) RetractBlock(id execute.DatasetID, key query.PartitionKey) error {
|
||||
func (t *CovarianceTransformation) RetractBlock(id execute.DatasetID, key query.GroupKey) error {
|
||||
return t.d.RetractBlock(key)
|
||||
}
|
||||
|
||||
|
|
|
@ -106,7 +106,7 @@ func NewCumulativeSumTransformation(d execute.Dataset, cache execute.BlockBuilde
|
|||
}
|
||||
}
|
||||
|
||||
func (t *cumulativeSumTransformation) RetractBlock(id execute.DatasetID, key query.PartitionKey) error {
|
||||
func (t *cumulativeSumTransformation) RetractBlock(id execute.DatasetID, key query.GroupKey) error {
|
||||
return t.d.RetractBlock(key)
|
||||
}
|
||||
|
||||
|
|
|
@ -39,7 +39,7 @@ func init() {
|
|||
start := execute.Time(time.Date(2016, 10, 10, 0, 0, 0, 0, time.UTC).UnixNano())
|
||||
stop := execute.Time(time.Date(2017, 10, 10, 0, 0, 0, 0, time.UTC).UnixNano())
|
||||
t1Value := "a"
|
||||
key := execute.NewPartitionKey(
|
||||
key := execute.NewGroupKey(
|
||||
[]query.ColMeta{
|
||||
{Label: execute.DefaultStartColLabel, Type: query.TTime},
|
||||
{Label: execute.DefaultStopColLabel, Type: query.TTime},
|
||||
|
|
|
@ -152,7 +152,7 @@ func NewDerivativeTransformation(d execute.Dataset, cache execute.BlockBuilderCa
|
|||
}
|
||||
}
|
||||
|
||||
func (t *derivativeTransformation) RetractBlock(id execute.DatasetID, key query.PartitionKey) error {
|
||||
func (t *derivativeTransformation) RetractBlock(id execute.DatasetID, key query.GroupKey) error {
|
||||
return t.d.RetractBlock(key)
|
||||
}
|
||||
|
||||
|
|
|
@ -129,7 +129,7 @@ func NewDifferenceTransformation(d execute.Dataset, cache execute.BlockBuilderCa
|
|||
}
|
||||
}
|
||||
|
||||
func (t *differenceTransformation) RetractBlock(id execute.DatasetID, key query.PartitionKey) error {
|
||||
func (t *differenceTransformation) RetractBlock(id execute.DatasetID, key query.GroupKey) error {
|
||||
return t.d.RetractBlock(key)
|
||||
}
|
||||
|
||||
|
|
|
@ -140,7 +140,7 @@ func NewDistinctTransformation(d execute.Dataset, cache execute.BlockBuilderCach
|
|||
}
|
||||
}
|
||||
|
||||
func (t *distinctTransformation) RetractBlock(id execute.DatasetID, key query.PartitionKey) error {
|
||||
func (t *distinctTransformation) RetractBlock(id execute.DatasetID, key query.GroupKey) error {
|
||||
return t.d.RetractBlock(key)
|
||||
}
|
||||
|
||||
|
|
|
@ -217,7 +217,7 @@ func NewFilterTransformation(d execute.Dataset, cache execute.BlockBuilderCache,
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (t *filterTransformation) RetractBlock(id execute.DatasetID, key query.PartitionKey) error {
|
||||
func (t *filterTransformation) RetractBlock(id execute.DatasetID, key query.GroupKey) error {
|
||||
return t.d.RetractBlock(key)
|
||||
}
|
||||
|
||||
|
|
|
@ -281,7 +281,7 @@ func NewGroupTransformation(d execute.Dataset, cache execute.BlockBuilderCache,
|
|||
return t
|
||||
}
|
||||
|
||||
func (t *groupTransformation) RetractBlock(id execute.DatasetID, key query.PartitionKey) (err error) {
|
||||
func (t *groupTransformation) RetractBlock(id execute.DatasetID, key query.GroupKey) (err error) {
|
||||
//TODO(nathanielc): Investigate if this can be smarter and not retract all blocks with the same time bounds.
|
||||
panic("not implemented")
|
||||
//t.cache.ForEachBuilder(func(bk execute.BlockKey, builder execute.BlockBuilder) {
|
||||
|
@ -316,7 +316,7 @@ func (t *groupTransformation) Process(id execute.DatasetID, b query.Block) error
|
|||
return b.Do(func(cr query.ColReader) error {
|
||||
l := cr.Len()
|
||||
for i := 0; i < l; i++ {
|
||||
key := execute.PartitionKeyForRowOn(i, cr, on)
|
||||
key := execute.GroupKeyForRowOn(i, cr, on)
|
||||
builder, created := t.cache.BlockBuilder(key)
|
||||
if created {
|
||||
execute.AddBlockCols(b, builder)
|
||||
|
|
|
@ -113,7 +113,7 @@ func NewIntegralTransformation(d execute.Dataset, cache execute.BlockBuilderCach
|
|||
}
|
||||
}
|
||||
|
||||
func (t *integralTransformation) RetractBlock(id execute.DatasetID, key query.PartitionKey) error {
|
||||
func (t *integralTransformation) RetractBlock(id execute.DatasetID, key query.GroupKey) error {
|
||||
return t.d.RetractBlock(key)
|
||||
}
|
||||
|
||||
|
|
|
@ -127,7 +127,7 @@ func createJoinOpSpec(args query.Arguments, a *query.Administration) (query.Oper
|
|||
return
|
||||
}
|
||||
p := t.(*query.TableObject)
|
||||
joinParams.add(k/*parameter*/, p/*argument*/)
|
||||
joinParams.add(k /*parameter*/, p /*argument*/)
|
||||
spec.tableNames[p] = k
|
||||
})
|
||||
if err != nil {
|
||||
|
@ -274,7 +274,7 @@ type mergeJoinParentState struct {
|
|||
finished bool
|
||||
}
|
||||
|
||||
func (t *mergeJoinTransformation) RetractBlock(id execute.DatasetID, key query.PartitionKey) error {
|
||||
func (t *mergeJoinTransformation) RetractBlock(id execute.DatasetID, key query.GroupKey) error {
|
||||
panic("not implemented")
|
||||
//t.mu.Lock()
|
||||
//defer t.mu.Unlock()
|
||||
|
@ -391,11 +391,11 @@ func (t *mergeJoinTransformation) Finish(id execute.DatasetID, err error) {
|
|||
}
|
||||
|
||||
type MergeJoinCache interface {
|
||||
Tables(query.PartitionKey) *joinTables
|
||||
Tables(query.GroupKey) *joinTables
|
||||
}
|
||||
|
||||
type mergeJoinCache struct {
|
||||
data *execute.PartitionLookup
|
||||
data *execute.GroupLookup
|
||||
alloc *execute.Allocator
|
||||
|
||||
keys []string
|
||||
|
@ -414,7 +414,7 @@ func NewMergeJoinCache(joinFn *joinFunc, a *execute.Allocator, leftName, rightNa
|
|||
on[k] = true
|
||||
}
|
||||
return &mergeJoinCache{
|
||||
data: execute.NewPartitionLookup(),
|
||||
data: execute.NewGroupLookup(),
|
||||
keys: keys,
|
||||
on: on,
|
||||
joinFn: joinFn,
|
||||
|
@ -424,7 +424,7 @@ func NewMergeJoinCache(joinFn *joinFunc, a *execute.Allocator, leftName, rightNa
|
|||
}
|
||||
}
|
||||
|
||||
func (c *mergeJoinCache) Block(key query.PartitionKey) (query.Block, error) {
|
||||
func (c *mergeJoinCache) Block(key query.GroupKey) (query.Block, error) {
|
||||
t, ok := c.lookup(key)
|
||||
if !ok {
|
||||
return nil, errors.New("block not found")
|
||||
|
@ -432,14 +432,14 @@ func (c *mergeJoinCache) Block(key query.PartitionKey) (query.Block, error) {
|
|||
return t.Join()
|
||||
}
|
||||
|
||||
func (c *mergeJoinCache) ForEach(f func(query.PartitionKey)) {
|
||||
c.data.Range(func(key query.PartitionKey, value interface{}) {
|
||||
func (c *mergeJoinCache) ForEach(f func(query.GroupKey)) {
|
||||
c.data.Range(func(key query.GroupKey, value interface{}) {
|
||||
f(key)
|
||||
})
|
||||
}
|
||||
|
||||
func (c *mergeJoinCache) ForEachWithContext(f func(query.PartitionKey, execute.Trigger, execute.BlockContext)) {
|
||||
c.data.Range(func(key query.PartitionKey, value interface{}) {
|
||||
func (c *mergeJoinCache) ForEachWithContext(f func(query.GroupKey, execute.Trigger, execute.BlockContext)) {
|
||||
c.data.Range(func(key query.GroupKey, value interface{}) {
|
||||
tables := value.(*joinTables)
|
||||
bc := execute.BlockContext{
|
||||
Key: key,
|
||||
|
@ -449,14 +449,14 @@ func (c *mergeJoinCache) ForEachWithContext(f func(query.PartitionKey, execute.T
|
|||
})
|
||||
}
|
||||
|
||||
func (c *mergeJoinCache) DiscardBlock(key query.PartitionKey) {
|
||||
func (c *mergeJoinCache) DiscardBlock(key query.GroupKey) {
|
||||
t, ok := c.lookup(key)
|
||||
if ok {
|
||||
t.ClearData()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *mergeJoinCache) ExpireBlock(key query.PartitionKey) {
|
||||
func (c *mergeJoinCache) ExpireBlock(key query.GroupKey) {
|
||||
v, ok := c.data.Delete(key)
|
||||
if ok {
|
||||
v.(*joinTables).ClearData()
|
||||
|
@ -467,7 +467,7 @@ func (c *mergeJoinCache) SetTriggerSpec(spec query.TriggerSpec) {
|
|||
c.triggerSpec = spec
|
||||
}
|
||||
|
||||
func (c *mergeJoinCache) lookup(key query.PartitionKey) (*joinTables, bool) {
|
||||
func (c *mergeJoinCache) lookup(key query.GroupKey) (*joinTables, bool) {
|
||||
v, ok := c.data.Lookup(key)
|
||||
if !ok {
|
||||
return nil, false
|
||||
|
@ -475,7 +475,7 @@ func (c *mergeJoinCache) lookup(key query.PartitionKey) (*joinTables, bool) {
|
|||
return v.(*joinTables), true
|
||||
}
|
||||
|
||||
func (c *mergeJoinCache) Tables(key query.PartitionKey) *joinTables {
|
||||
func (c *mergeJoinCache) Tables(key query.GroupKey) *joinTables {
|
||||
tables, ok := c.lookup(key)
|
||||
if !ok {
|
||||
tables = &joinTables{
|
||||
|
@ -498,7 +498,7 @@ func (c *mergeJoinCache) Tables(key query.PartitionKey) *joinTables {
|
|||
type joinTables struct {
|
||||
keys []string
|
||||
on map[string]bool
|
||||
key query.PartitionKey
|
||||
key query.GroupKey
|
||||
|
||||
alloc *execute.Allocator
|
||||
|
||||
|
@ -562,7 +562,7 @@ func (t *joinTables) Join() (query.Block, error) {
|
|||
|
||||
var (
|
||||
leftSet, rightSet subset
|
||||
leftKey, rightKey query.PartitionKey
|
||||
leftKey, rightKey query.GroupKey
|
||||
)
|
||||
|
||||
rows := map[string]int{
|
||||
|
@ -601,12 +601,12 @@ func (t *joinTables) Join() (query.Block, error) {
|
|||
return builder.Block()
|
||||
}
|
||||
|
||||
func (t *joinTables) advance(offset int, table *execute.ColListBlock) (subset, query.PartitionKey) {
|
||||
func (t *joinTables) advance(offset int, table *execute.ColListBlock) (subset, query.GroupKey) {
|
||||
if n := table.NRows(); n == offset {
|
||||
return subset{Start: n, Stop: n}, nil
|
||||
}
|
||||
start := offset
|
||||
key := execute.PartitionKeyForRowOn(start, table, t.on)
|
||||
key := execute.GroupKeyForRowOn(start, table, t.on)
|
||||
s := subset{Start: start}
|
||||
offset++
|
||||
for offset < table.NRows() && equalRowKeys(start, offset, table, t.on) {
|
||||
|
|
|
@ -151,7 +151,7 @@ func NewKeysTransformation(d execute.Dataset, cache execute.BlockBuilderCache, s
|
|||
}
|
||||
}
|
||||
|
||||
func (t *keysTransformation) RetractBlock(id execute.DatasetID, key query.PartitionKey) error {
|
||||
func (t *keysTransformation) RetractBlock(id execute.DatasetID, key query.GroupKey) error {
|
||||
return t.d.RetractBlock(key)
|
||||
}
|
||||
|
||||
|
|
|
@ -140,7 +140,7 @@ func NewLimitTransformation(d execute.Dataset, cache execute.BlockBuilderCache,
|
|||
}
|
||||
}
|
||||
|
||||
func (t *limitTransformation) RetractBlock(id execute.DatasetID, key query.PartitionKey) error {
|
||||
func (t *limitTransformation) RetractBlock(id execute.DatasetID, key query.GroupKey) error {
|
||||
return t.d.RetractBlock(key)
|
||||
}
|
||||
|
||||
|
|
|
@ -128,7 +128,7 @@ func NewMapTransformation(d execute.Dataset, cache execute.BlockBuilderCache, sp
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (t *mapTransformation) RetractBlock(id execute.DatasetID, key query.PartitionKey) error {
|
||||
func (t *mapTransformation) RetractBlock(id execute.DatasetID, key query.GroupKey) error {
|
||||
return t.d.RetractBlock(key)
|
||||
}
|
||||
|
||||
|
@ -148,7 +148,7 @@ func (t *mapTransformation) Process(id execute.DatasetID, b query.Block) error {
|
|||
}
|
||||
sort.Strings(keys)
|
||||
|
||||
// Determine on which cols to partition
|
||||
// Determine on which cols to group
|
||||
on := make(map[string]bool, len(b.Key().Cols()))
|
||||
for _, c := range b.Key().Cols() {
|
||||
on[c.Label] = t.mergeKey || execute.ContainsStr(keys, c.Label)
|
||||
|
@ -162,7 +162,7 @@ func (t *mapTransformation) Process(id execute.DatasetID, b query.Block) error {
|
|||
log.Printf("failed to evaluate map expression: %v", err)
|
||||
continue
|
||||
}
|
||||
key := partitionKeyForObject(i, cr, m, on)
|
||||
key := groupKeyForObject(i, cr, m, on)
|
||||
builder, created := t.cache.BlockBuilder(key)
|
||||
if created {
|
||||
if t.mergeKey {
|
||||
|
@ -196,7 +196,7 @@ func (t *mapTransformation) Process(id execute.DatasetID, b query.Block) error {
|
|||
})
|
||||
}
|
||||
|
||||
func partitionKeyForObject(i int, cr query.ColReader, obj values.Object, on map[string]bool) query.PartitionKey {
|
||||
func groupKeyForObject(i int, cr query.ColReader, obj values.Object, on map[string]bool) query.GroupKey {
|
||||
cols := make([]query.ColMeta, 0, len(on))
|
||||
vs := make([]values.Value, 0, len(on))
|
||||
for j, c := range cr.Cols() {
|
||||
|
@ -224,7 +224,7 @@ func partitionKeyForObject(i int, cr query.ColReader, obj values.Object, on map[
|
|||
}
|
||||
}
|
||||
}
|
||||
return execute.NewPartitionKey(cols, vs)
|
||||
return execute.NewGroupKey(cols, vs)
|
||||
}
|
||||
|
||||
func (t *mapTransformation) UpdateWatermark(id execute.DatasetID, mark execute.Time) error {
|
||||
|
|
|
@ -331,7 +331,7 @@ func TestMap_Process(t *testing.T) {
|
|||
}},
|
||||
},
|
||||
{
|
||||
name: `_value+5 mergeKey=true repartition`,
|
||||
name: `_value+5 mergeKey=true regroup`,
|
||||
spec: &functions.MapProcedureSpec{
|
||||
MergeKey: true,
|
||||
Fn: &semantic.FunctionExpression{
|
||||
|
@ -407,7 +407,7 @@ func TestMap_Process(t *testing.T) {
|
|||
}},
|
||||
},
|
||||
{
|
||||
name: `_value+5 mergeKey=true repartition fan out`,
|
||||
name: `_value+5 mergeKey=true regroup fan out`,
|
||||
spec: &functions.MapProcedureSpec{
|
||||
MergeKey: true,
|
||||
Fn: &semantic.FunctionExpression{
|
||||
|
|
|
@ -145,7 +145,7 @@ func NewRangeTransformation(d execute.Dataset, cache execute.BlockBuilderCache,
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (t *rangeTransformation) RetractBlock(id execute.DatasetID, key query.PartitionKey) error {
|
||||
func (t *rangeTransformation) RetractBlock(id execute.DatasetID, key query.GroupKey) error {
|
||||
return t.d.RetractBlock(key)
|
||||
}
|
||||
|
||||
|
|
|
@ -115,7 +115,7 @@ func NewSetTransformation(
|
|||
}
|
||||
}
|
||||
|
||||
func (t *setTransformation) RetractBlock(id execute.DatasetID, key query.PartitionKey) error {
|
||||
func (t *setTransformation) RetractBlock(id execute.DatasetID, key query.GroupKey) error {
|
||||
// TODO
|
||||
return nil
|
||||
}
|
||||
|
@ -134,7 +134,7 @@ func (t *setTransformation) Process(id execute.DatasetID, b query.Block) error {
|
|||
vs[j] = key.Value(j)
|
||||
}
|
||||
}
|
||||
key = execute.NewPartitionKey(cols, vs)
|
||||
key = execute.NewGroupKey(cols, vs)
|
||||
}
|
||||
builder, created := t.cache.BlockBuilder(key)
|
||||
if created {
|
||||
|
|
|
@ -127,7 +127,7 @@ func NewShiftTransformation(d execute.Dataset, cache execute.BlockBuilderCache,
|
|||
}
|
||||
}
|
||||
|
||||
func (t *shiftTransformation) RetractBlock(id execute.DatasetID, key query.PartitionKey) error {
|
||||
func (t *shiftTransformation) RetractBlock(id execute.DatasetID, key query.GroupKey) error {
|
||||
return t.d.RetractBlock(key)
|
||||
}
|
||||
|
||||
|
@ -148,7 +148,7 @@ func (t *shiftTransformation) Process(id execute.DatasetID, b query.Block) error
|
|||
vs[j] = key.Value(j)
|
||||
}
|
||||
}
|
||||
key = execute.NewPartitionKey(cols, vs)
|
||||
key = execute.NewGroupKey(cols, vs)
|
||||
|
||||
builder, created := t.cache.BlockBuilder(key)
|
||||
if !created {
|
||||
|
|
|
@ -125,7 +125,7 @@ func NewSortTransformation(d execute.Dataset, cache execute.BlockBuilderCache, s
|
|||
}
|
||||
}
|
||||
|
||||
func (t *sortTransformation) RetractBlock(id execute.DatasetID, key query.PartitionKey) error {
|
||||
func (t *sortTransformation) RetractBlock(id execute.DatasetID, key query.GroupKey) error {
|
||||
return t.d.RetractBlock(key)
|
||||
}
|
||||
|
||||
|
@ -170,7 +170,7 @@ func (t *sortTransformation) Finish(id execute.DatasetID, err error) {
|
|||
t.d.Finish(err)
|
||||
}
|
||||
|
||||
func (t *sortTransformation) sortedKey(key query.PartitionKey) query.PartitionKey {
|
||||
func (t *sortTransformation) sortedKey(key query.GroupKey) query.GroupKey {
|
||||
cols := make([]query.ColMeta, len(key.Cols()))
|
||||
vs := make([]values.Value, len(key.Cols()))
|
||||
j := 0
|
||||
|
@ -189,5 +189,5 @@ func (t *sortTransformation) sortedKey(key query.PartitionKey) query.PartitionKe
|
|||
j++
|
||||
}
|
||||
}
|
||||
return execute.NewPartitionKey(cols, vs)
|
||||
return execute.NewGroupKey(cols, vs)
|
||||
}
|
||||
|
|
|
@ -205,7 +205,7 @@ func NewStateTrackingTransformation(d execute.Dataset, cache execute.BlockBuilde
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (t *stateTrackingTransformation) RetractBlock(id execute.DatasetID, key query.PartitionKey) error {
|
||||
func (t *stateTrackingTransformation) RetractBlock(id execute.DatasetID, key query.GroupKey) error {
|
||||
return t.d.RetractBlock(key)
|
||||
}
|
||||
|
||||
|
|
|
@ -155,7 +155,7 @@ func (bi *bockIterator) handleRead(f func(query.Block) error, ms *mergedStreams)
|
|||
frame := ms.next()
|
||||
s := frame.GetSeries()
|
||||
typ := convertDataType(s.DataType)
|
||||
key := partitionKeyForSeries(s, &bi.readSpec, bi.bounds)
|
||||
key := groupKeyForSeries(s, &bi.readSpec, bi.bounds)
|
||||
cols, defs := determineBlockColsForSeries(s, typ)
|
||||
block := newBlock(bi.bounds, key, cols, ms, &bi.readSpec, s.Tags, defs)
|
||||
|
||||
|
@ -177,7 +177,7 @@ func (bi *bockIterator) handleGroupRead(f func(query.Block) error, ms *mergedStr
|
|||
}
|
||||
frame := ms.next()
|
||||
s := frame.GetGroup()
|
||||
key := partitionKeyForGroup(s, &bi.readSpec, bi.bounds)
|
||||
key := groupKeyForGroup(s, &bi.readSpec, bi.bounds)
|
||||
|
||||
// try to infer type
|
||||
// TODO(sgc): this is a hack
|
||||
|
@ -279,7 +279,7 @@ func determineBlockColsForSeries(s *ReadResponse_SeriesFrame, typ query.DataType
|
|||
return cols, defs
|
||||
}
|
||||
|
||||
func partitionKeyForSeries(s *ReadResponse_SeriesFrame, readSpec *storage.ReadSpec, bnds execute.Bounds) query.PartitionKey {
|
||||
func groupKeyForSeries(s *ReadResponse_SeriesFrame, readSpec *storage.ReadSpec, bnds execute.Bounds) query.GroupKey {
|
||||
cols := make([]query.ColMeta, 2, len(s.Tags))
|
||||
vs := make([]values.Value, 2, len(s.Tags))
|
||||
cols[0] = query.ColMeta{
|
||||
|
@ -294,7 +294,7 @@ func partitionKeyForSeries(s *ReadResponse_SeriesFrame, readSpec *storage.ReadSp
|
|||
vs[1] = values.NewTimeValue(bnds.Stop)
|
||||
switch readSpec.GroupMode {
|
||||
case storage.GroupModeBy:
|
||||
// partition key in GroupKeys order, including tags in the GroupKeys slice
|
||||
// group key in GroupKeys order, including tags in the GroupKeys slice
|
||||
for _, k := range readSpec.GroupKeys {
|
||||
if i := indexOfTag(s.Tags, k); i < len(s.Tags) {
|
||||
cols = append(cols, query.ColMeta{
|
||||
|
@ -305,7 +305,7 @@ func partitionKeyForSeries(s *ReadResponse_SeriesFrame, readSpec *storage.ReadSp
|
|||
}
|
||||
}
|
||||
case storage.GroupModeExcept:
|
||||
// partition key in GroupKeys order, skipping tags in the GroupKeys slice
|
||||
// group key in GroupKeys order, skipping tags in the GroupKeys slice
|
||||
for _, k := range readSpec.GroupKeys {
|
||||
if i := indexOfTag(s.Tags, k); i == len(s.Tags) {
|
||||
cols = append(cols, query.ColMeta{
|
||||
|
@ -324,7 +324,7 @@ func partitionKeyForSeries(s *ReadResponse_SeriesFrame, readSpec *storage.ReadSp
|
|||
vs = append(vs, values.NewStringValue(string(s.Tags[i].Value)))
|
||||
}
|
||||
}
|
||||
return execute.NewPartitionKey(cols, vs)
|
||||
return execute.NewGroupKey(cols, vs)
|
||||
}
|
||||
|
||||
func determineBlockColsForGroup(f *ReadResponse_GroupFrame, typ query.DataType) ([]query.ColMeta, [][]byte) {
|
||||
|
@ -357,7 +357,7 @@ func determineBlockColsForGroup(f *ReadResponse_GroupFrame, typ query.DataType)
|
|||
return cols, defs
|
||||
}
|
||||
|
||||
func partitionKeyForGroup(g *ReadResponse_GroupFrame, readSpec *storage.ReadSpec, bnds execute.Bounds) query.PartitionKey {
|
||||
func groupKeyForGroup(g *ReadResponse_GroupFrame, readSpec *storage.ReadSpec, bnds execute.Bounds) query.GroupKey {
|
||||
cols := make([]query.ColMeta, 2, len(readSpec.GroupKeys)+2)
|
||||
vs := make([]values.Value, 2, len(readSpec.GroupKeys)+2)
|
||||
cols[0] = query.ColMeta{
|
||||
|
@ -377,14 +377,14 @@ func partitionKeyForGroup(g *ReadResponse_GroupFrame, readSpec *storage.ReadSpec
|
|||
})
|
||||
vs = append(vs, values.NewStringValue(string(g.PartitionKeyVals[i])))
|
||||
}
|
||||
return execute.NewPartitionKey(cols, vs)
|
||||
return execute.NewGroupKey(cols, vs)
|
||||
}
|
||||
|
||||
// block implement OneTimeBlock as it can only be read once.
|
||||
// Since it can only be read once it is also a ValueIterator for itself.
|
||||
type block struct {
|
||||
bounds execute.Bounds
|
||||
key query.PartitionKey
|
||||
key query.GroupKey
|
||||
cols []query.ColMeta
|
||||
|
||||
empty bool
|
||||
|
@ -421,7 +421,7 @@ type block struct {
|
|||
|
||||
func newBlock(
|
||||
bounds execute.Bounds,
|
||||
key query.PartitionKey,
|
||||
key query.GroupKey,
|
||||
cols []query.ColMeta,
|
||||
ms *mergedStreams,
|
||||
readSpec *storage.ReadSpec,
|
||||
|
@ -457,7 +457,7 @@ func (b *block) wait() {
|
|||
<-b.done
|
||||
}
|
||||
|
||||
func (b *block) Key() query.PartitionKey {
|
||||
func (b *block) Key() query.GroupKey {
|
||||
return b.key
|
||||
}
|
||||
func (b *block) Cols() []query.ColMeta {
|
||||
|
@ -780,7 +780,7 @@ type streamState struct {
|
|||
bounds execute.Bounds
|
||||
stream Storage_ReadClient
|
||||
rep ReadResponse
|
||||
currentKey query.PartitionKey
|
||||
currentKey query.GroupKey
|
||||
readSpec *storage.ReadSpec
|
||||
finished bool
|
||||
group bool
|
||||
|
@ -813,7 +813,7 @@ func (s *streamState) more() bool {
|
|||
return true
|
||||
}
|
||||
|
||||
func (s *streamState) key() query.PartitionKey {
|
||||
func (s *streamState) key() query.GroupKey {
|
||||
return s.currentKey
|
||||
}
|
||||
|
||||
|
@ -824,12 +824,12 @@ func (s *streamState) computeKey() {
|
|||
if s.group {
|
||||
if ft == groupType {
|
||||
group := p.GetGroup()
|
||||
s.currentKey = partitionKeyForGroup(group, s.readSpec, s.bounds)
|
||||
s.currentKey = groupKeyForGroup(group, s.readSpec, s.bounds)
|
||||
}
|
||||
} else {
|
||||
if ft == seriesType {
|
||||
series := p.GetSeries()
|
||||
s.currentKey = partitionKeyForSeries(series, s.readSpec, s.bounds)
|
||||
s.currentKey = groupKeyForSeries(series, s.readSpec, s.bounds)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -845,11 +845,11 @@ func (s *streamState) next() ReadResponse_Frame {
|
|||
|
||||
type mergedStreams struct {
|
||||
streams []*streamState
|
||||
currentKey query.PartitionKey
|
||||
currentKey query.GroupKey
|
||||
i int
|
||||
}
|
||||
|
||||
func (s *mergedStreams) key() query.PartitionKey {
|
||||
func (s *mergedStreams) key() query.GroupKey {
|
||||
if len(s.streams) == 1 {
|
||||
return s.streams[0].key()
|
||||
}
|
||||
|
@ -896,7 +896,7 @@ func (s *mergedStreams) advance() bool {
|
|||
|
||||
func (s *mergedStreams) determineNewKey() bool {
|
||||
minIdx := -1
|
||||
var minKey query.PartitionKey
|
||||
var minKey query.GroupKey
|
||||
for i, stream := range s.streams {
|
||||
if !stream.more() {
|
||||
continue
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string,string,string
|
||||
#partition,false,false,false,false,false,false,true,true,true,true,true,true
|
||||
#group,false,false,false,false,false,false,true,true,true,true,true,true
|
||||
#default,_result,,,,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,device,fstype,host,path
|
||||
,,96,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,34.98234271799806,used_percent,disk,disk1s1,apfs,host.local,/
|
||||
|
@ -7,4 +7,4 @@
|
|||
,,96,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:46Z,34.982447293755506,used_percent,disk,disk1s1,apfs,host.local,/
|
||||
,,96,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:56Z,34.982447293755506,used_percent,disk,disk1s1,apfs,host.local,/
|
||||
,,96,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:06Z,34.98204153981662,used_percent,disk,disk1s1,apfs,host.local,/
|
||||
,,96,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:16Z,34.982252364543626,used_percent,disk,disk1s1,apfs,host.local,/
|
||||
,,96,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:16Z,34.982252364543626,used_percent,disk,disk1s1,apfs,host.local,/
|
||||
|
|
|
|
@ -1,5 +1,5 @@
|
|||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string,string,string
|
||||
#partition,false,false,false,false,false,false,true,true,true,true,true,true
|
||||
#group,false,false,false,false,false,false,true,true,true,true,true,true
|
||||
#default,_result,,,,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,device,fstype,host,path
|
||||
,,0,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:36Z,0.00000006692848479872282,used_percent,disk,disk1s1,apfs,host.local,/
|
||||
|
|
|
|
@ -1,5 +1,5 @@
|
|||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string
|
||||
#partition,false,false,false,false,false,false,true,true,true,true
|
||||
#group,false,false,false,false,false,false,true,true,true,true
|
||||
#default,_result,,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,cpu,host
|
||||
,,0,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,1,usage_guest,cpu,cpu-total,host.local
|
||||
|
|
|
|
@ -1,5 +1,5 @@
|
|||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string,string
|
||||
#partition,false,false,false,false,false,false,true,true,true,true
|
||||
#group,false,false,false,false,false,false,true,true,true,true
|
||||
#default,_result,,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,host,name
|
||||
,,1,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,15204688,io_time,diskio,host.local,disk0
|
||||
|
|
|
|
@ -1,5 +1,5 @@
|
|||
#datatype,string,long,string,dateTime:RFC3339,long
|
||||
#partition,false,false,true,false,false
|
||||
#group,false,false,true,false,false
|
||||
#default,0,,,,
|
||||
,result,table,_measurement,_time,io_time
|
||||
,,0,diskio,2018-05-22T19:53:26Z,15204688
|
||||
|
|
|
|
@ -1,5 +1,5 @@
|
|||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string,string
|
||||
#partition,false,false,false,false,false,false,true,true,true,true
|
||||
#group,false,false,false,false,false,false,true,true,true,true
|
||||
#default,_result,,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,host,name
|
||||
,,1,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,15204688,io_time,diskio,host.local,disk0
|
||||
|
|
|
|
@ -1,5 +1,5 @@
|
|||
#datatype,string,long,string,dateTime:RFC3339,long
|
||||
#partition,false,false,true,false,false
|
||||
#group,false,false,true,false,false
|
||||
#default,0,,,,
|
||||
,result,table,_measurement,_time,io_time
|
||||
,,0,diskio,2018-05-22T19:53:26Z,15204688
|
||||
|
|
|
|
@ -1,5 +1,5 @@
|
|||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string,string
|
||||
#partition,false,false,false,false,false,false,true,true,true,true
|
||||
#group,false,false,false,false,false,false,true,true,true,true
|
||||
#default,_result,,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,host,name
|
||||
,,1,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,15204688,io_time,diskio,host.local,disk0
|
||||
|
|
|
|
@ -1,5 +1,5 @@
|
|||
#datatype,string,long,string,string,dateTime:RFC3339,long
|
||||
#partition,false,false,true,true,false,false
|
||||
#group,false,false,true,true,false,false
|
||||
#default,0,,,,,
|
||||
,result,table,_measurement,name,_time,max
|
||||
,,0,diskio,disk0,2018-05-22T19:54:16Z,15205755
|
||||
|
|
|
|
@ -1,5 +1,5 @@
|
|||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string
|
||||
#partition,false,false,false,false,false,false,true,true,true,true
|
||||
#group,false,false,false,false,false,false,true,true,true,true
|
||||
#default,_result,,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,cpu,host
|
||||
,,2,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,91.7364670583823,usage_idle,cpu,cpu-total,host1
|
||||
|
|
|
|
@ -1,5 +1,5 @@
|
|||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string
|
||||
#partition,false,false,false,false,false,true,false,false,false,false
|
||||
#group,false,false,false,false,false,true,false,false,false,false
|
||||
#default,_result,,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,cpu,host
|
||||
,,0,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:06Z,68.304576144036,usage_idle,cpu,cpu-total,host1
|
||||
|
|
|
|
@ -1,5 +1,5 @@
|
|||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string,string
|
||||
#partition,false,false,false,false,false,false,true,true,true,true
|
||||
#group,false,false,false,false,false,false,true,true,true,true
|
||||
#default,_result,,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,host,name
|
||||
,,1,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,15204688,io_time,diskio1,host.local,disk0
|
||||
|
|
|
|
@ -1,5 +1,5 @@
|
|||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string,string
|
||||
#partition,false,false,true,true,false,false,true,false,true,true
|
||||
#group,false,false,true,true,false,false,true,false,true,true
|
||||
#default,_result,,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,host,name
|
||||
,,0,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:16Z,15205755,io_time,diskio2,host.local,disk0
|
||||
|
|
|
|
@ -1,5 +1,5 @@
|
|||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string,string
|
||||
#partition,false,false,false,false,false,false,true,true,true,true
|
||||
#group,false,false,false,false,false,false,true,true,true,true
|
||||
#default,_result,,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,host,name
|
||||
,,1,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,15204688,io_time,diskio,host.local,disk0
|
||||
|
|
|
|
@ -1,5 +1,5 @@
|
|||
#datatype,string,long,dateTime:RFC3339,long
|
||||
#partition,false,false,false,false
|
||||
#group,false,false,false,false
|
||||
#default,0,,,
|
||||
,result,table,_time,io_time
|
||||
,,0,2018-05-22T19:53:26Z,15204688
|
||||
|
|
|
|
@ -1,5 +1,5 @@
|
|||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string
|
||||
#partition,false,false,false,false,false,false,true,true,true,true
|
||||
#group,false,false,false,false,false,false,true,true,true,true
|
||||
#default,_result,,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,cpu,host
|
||||
,,0,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,1,usage_guest,cpu,cpu-total,host.local
|
||||
|
|
|
|
@ -1,5 +1,5 @@
|
|||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string
|
||||
#partition,false,false,false,false,false,false,true,true,true,true
|
||||
#group,false,false,false,false,false,false,true,true,true,true
|
||||
#default,_result,,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,cpu,host
|
||||
,,0,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:36Z,1,usage_guest,cpu,cpu-total,host.local
|
||||
|
|
|
|
@ -1,5 +1,5 @@
|
|||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string
|
||||
#partition,false,false,false,false,false,false,true,true,true
|
||||
#group,false,false,false,false,false,false,true,true,true
|
||||
#default,_result,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,host
|
||||
,,161,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,82.9833984375,used_percent,swap,host.local
|
||||
|
|
|
|
@ -1,5 +1,5 @@
|
|||
#datatype,string,long,string,dateTime:RFC3339,double
|
||||
#partition,false,false,true,false,false
|
||||
#group,false,false,true,false,false
|
||||
#default,0,,,,
|
||||
,result,table,_measurement,_time,used_percent
|
||||
,,0,swap,2018-05-22T19:53:26Z,82.9833984375
|
||||
|
|
|
|
@ -1,5 +1,5 @@
|
|||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string
|
||||
#partition,false,false,false,false,false,false,true,true,true
|
||||
#group,false,false,false,false,false,false,true,true,true
|
||||
#default,_result,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,host
|
||||
,,161,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,82.9833984375,used_percent,swap,host.local
|
||||
|
|
|
|
@ -1,5 +1,5 @@
|
|||
#datatype,string,long,string,dateTime:RFC3339,double
|
||||
#partition,false,false,true,false,false
|
||||
#group,false,false,true,false,false
|
||||
#default,0,,,,
|
||||
,result,table,_measurement,_time,load1
|
||||
,,0,system,2018-05-22T19:53:26Z,1.83
|
||||
|
|
|
|
@ -1,5 +1,5 @@
|
|||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string
|
||||
#partition,false,false,false,false,false,false,true,true,true
|
||||
#group,false,false,false,false,false,false,true,true,true
|
||||
#default,_result,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,host
|
||||
,,140,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,36.946678161621094,available_percent,mem,host1
|
||||
|
|
|
|
@ -1,5 +1,5 @@
|
|||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string
|
||||
#partition,false,false,false,false,false,false,false,false,true
|
||||
#group,false,false,false,false,false,false,false,false,true
|
||||
#default,_result,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,host
|
||||
,,0,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:36Z,37.28463649749756,available_percent,mem,host2
|
||||
|
|
|
|
@ -1,5 +1,5 @@
|
|||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,string,string,double
|
||||
#partition,false,false,true,true,false,true,true,false
|
||||
#group,false,false,true,true,false,true,true,false
|
||||
#default,_result,,,,,,,
|
||||
,result,table,_start,_stop,_time,_measurement,_field,_value
|
||||
,,0,2018-04-17T00:00:00Z,2018-04-17T00:05:00Z,2018-04-17T00:00:00Z,m1,f1,42.0
|
||||
|
|
|
|
@ -1,5 +1,5 @@
|
|||
#datatype,string,long,string,dateTime:RFC3339,double
|
||||
#partition,false,false,true,false,false
|
||||
#group,false,false,true,false,false
|
||||
#default,0,,,,
|
||||
,result,table,_measurement,_time,max
|
||||
,,0,m1,2018-04-17T00:00:01Z,43
|
||||
|
|
|
|
@ -1,5 +1,5 @@
|
|||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string
|
||||
#partition,false,false,false,false,false,false,true,true,true
|
||||
#group,false,false,false,false,false,false,true,true,true
|
||||
#default,_result,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,host
|
||||
,,161,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,82.9833984375,used_percent,swap,host.local
|
||||
|
|
|
|
@ -1,5 +1,5 @@
|
|||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string
|
||||
#partition,false,false,false,false,false,false,true,true,true
|
||||
#group,false,false,false,false,false,false,true,true,true
|
||||
#default,_result,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,host
|
||||
,,0,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:56Z,1.63,load1,system,host.local
|
||||
|
|
|
|
@ -1,5 +1,5 @@
|
|||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,string,string,string,string,string,string,string
|
||||
#partition,false,false,false,false,false,false,true,true,true,true,true,true
|
||||
#group,false,false,false,false,false,false,true,true,true,true,true,true
|
||||
#default,_result,,,,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,device,fstype,host,path
|
||||
,,0,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:16Z,13F2,used_percent,disk,disk1,apfs,host.local,/
|
||||
|
|
|
|
@ -1,5 +1,5 @@
|
|||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,string,string,string,string,string,string,string
|
||||
#partition,false,false,false,false,false,false,true,true,true,true,true,true
|
||||
#group,false,false,false,false,false,false,true,true,true,true,true,true
|
||||
#default,_result,,,,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,device,fstype,host,path
|
||||
,,0,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:16Z,13F2,used_percent,disk,disk1,apfs,host.local,/
|
||||
|
|
|
|
@ -1,5 +1,5 @@
|
|||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,string,string,string,string,string,string,string
|
||||
#partition,false,false,false,false,false,false,true,true,true,true,true,true
|
||||
#group,false,false,false,false,false,false,true,true,true,true,true,true
|
||||
#default,_result,,,,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,device,fstype,host,path
|
||||
,,0,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:16Z,13F2,used_percent,disk,disk1,apfs,host.local,/
|
||||
|
|
|
|
@ -1,5 +1,5 @@
|
|||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string
|
||||
#partition,false,false,false,false,false,false,true,true,true,true
|
||||
#group,false,false,false,false,false,false,true,true,true,true
|
||||
#default,_result,,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,cpu,host
|
||||
,,0,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,0,usage_guest,cpu,cpu-total,host.local
|
||||
|
@ -544,7 +544,7 @@
|
|||
,,89,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:16Z,2.2022022022022023,usage_user,cpu,cpu7,host.local
|
||||
|
||||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string,string,string,string
|
||||
#partition,false,false,false,false,false,false,true,true,true,true,true,true
|
||||
#group,false,false,false,false,false,false,true,true,true,true,true,true
|
||||
#default,_result,,,,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,device,fstype,host,path
|
||||
,,90,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,318324641792,free,disk,disk1s1,apfs,host.local,/
|
||||
|
@ -585,7 +585,7 @@
|
|||
,,95,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:16Z,171272134656,used,disk,disk1s1,apfs,host.local,/
|
||||
|
||||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string,string,string
|
||||
#partition,false,false,false,false,false,false,true,true,true,true,true,true
|
||||
#group,false,false,false,false,false,false,true,true,true,true,true,true
|
||||
#default,_result,,,,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,device,fstype,host,path
|
||||
,,96,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,34.98234271799806,used_percent,disk,disk1s1,apfs,host.local,/
|
||||
|
@ -596,7 +596,7 @@
|
|||
,,96,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:16Z,34.982252364543626,used_percent,disk,disk1s1,apfs,host.local,/
|
||||
|
||||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string,string,string,string
|
||||
#partition,false,false,false,false,false,false,true,true,true,true,true,true
|
||||
#group,false,false,false,false,false,false,true,true,true,true,true,true
|
||||
#default,_result,,,,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,device,fstype,host,path
|
||||
,,97,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,318324641792,free,disk,disk1s4,apfs,host.local,/private/var/vm
|
||||
|
@ -637,7 +637,7 @@
|
|||
,,102,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:16Z,9663721472,used,disk,disk1s4,apfs,host.local,/private/var/vm
|
||||
|
||||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string,string,string
|
||||
#partition,false,false,false,false,false,false,true,true,true,true,true,true
|
||||
#group,false,false,false,false,false,false,true,true,true,true,true,true
|
||||
#default,_result,,,,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,device,fstype,host,path
|
||||
,,103,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,2.946361076908575,used_percent,disk,disk1s4,apfs,host.local,/private/var/vm
|
||||
|
@ -648,7 +648,7 @@
|
|||
,,103,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:16Z,2.9463571030660702,used_percent,disk,disk1s4,apfs,host.local,/private/var/vm
|
||||
|
||||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string,string,string,string
|
||||
#partition,false,false,false,false,false,false,true,true,true,true,true,true
|
||||
#group,false,false,false,false,false,false,true,true,true,true,true,true
|
||||
#default,_result,,,,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,device,fstype,host,path
|
||||
,,104,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,249991168,free,disk,disk2s1,hfs,host.local,/Volumes/Command Line Developer Tools
|
||||
|
@ -689,7 +689,7 @@
|
|||
,,109,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:16Z,200724480,used,disk,disk2s1,hfs,host.local,/Volumes/Command Line Developer Tools
|
||||
|
||||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string,string,string
|
||||
#partition,false,false,false,false,false,false,true,true,true,true,true,true
|
||||
#group,false,false,false,false,false,false,true,true,true,true,true,true
|
||||
#default,_result,,,,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,device,fstype,host,path
|
||||
,,110,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,44.53461531470946,used_percent,disk,disk2s1,hfs,host.local,/Volumes/Command Line Developer Tools
|
||||
|
@ -700,7 +700,7 @@
|
|||
,,110,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:16Z,44.53461531470946,used_percent,disk,disk2s1,hfs,host.local,/Volumes/Command Line Developer Tools
|
||||
|
||||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string,string
|
||||
#partition,false,false,false,false,false,false,true,true,true,true
|
||||
#group,false,false,false,false,false,false,true,true,true,true
|
||||
#default,_result,,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,host,name
|
||||
,,111,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,15204688,io_time,diskio,host.local,disk0
|
||||
|
@ -867,7 +867,7 @@
|
|||
,,137,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:16Z,0,writes,diskio,host.local,disk3
|
||||
|
||||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string
|
||||
#partition,false,false,false,false,false,false,true,true,true
|
||||
#group,false,false,false,false,false,false,true,true,true
|
||||
#default,_result,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,host
|
||||
,,138,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,6535598080,active,mem,host.local
|
||||
|
@ -884,7 +884,7 @@
|
|||
,,139,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:16Z,6448041984,available,mem,host.local
|
||||
|
||||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string
|
||||
#partition,false,false,false,false,false,false,true,true,true
|
||||
#group,false,false,false,false,false,false,true,true,true
|
||||
#default,_result,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,host
|
||||
,,140,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,36.946678161621094,available_percent,mem,host.local
|
||||
|
@ -895,7 +895,7 @@
|
|||
,,140,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:16Z,37.53254413604736,available_percent,mem,host.local
|
||||
|
||||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string
|
||||
#partition,false,false,false,false,false,false,true,true,true
|
||||
#group,false,false,false,false,false,false,true,true,true
|
||||
#default,_result,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,host
|
||||
,,141,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,0,buffered,mem,host.local
|
||||
|
@ -936,7 +936,7 @@
|
|||
,,146,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:16Z,10731827200,used,mem,host.local
|
||||
|
||||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string
|
||||
#partition,false,false,false,false,false,false,true,true,true
|
||||
#group,false,false,false,false,false,false,true,true,true
|
||||
#default,_result,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,host
|
||||
,,147,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,63.053321838378906,used_percent,mem,host.local
|
||||
|
@ -947,7 +947,7 @@
|
|||
,,147,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:16Z,62.46745586395264,used_percent,mem,host.local
|
||||
|
||||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string
|
||||
#partition,false,false,false,false,false,false,true,true,true
|
||||
#group,false,false,false,false,false,false,true,true,true
|
||||
#default,_result,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,host
|
||||
,,148,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,0,blocked,processes,host.local
|
||||
|
@ -1030,7 +1030,7 @@
|
|||
,,160,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:16Z,7098859520,used,swap,host.local
|
||||
|
||||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string
|
||||
#partition,false,false,false,false,false,false,true,true,true
|
||||
#group,false,false,false,false,false,false,true,true,true
|
||||
#default,_result,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,host
|
||||
,,161,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,82.9833984375,used_percent,swap,host.local
|
||||
|
@ -1059,7 +1059,7 @@
|
|||
,,164,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:16Z,1.93,load5,system,host.local
|
||||
|
||||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string
|
||||
#partition,false,false,false,false,false,false,true,true,true
|
||||
#group,false,false,false,false,false,false,true,true,true
|
||||
#default,_result,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,host
|
||||
,,165,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,8,n_cpus,system,host.local
|
||||
|
@ -1082,7 +1082,7 @@
|
|||
,,167,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:16Z,1320680,uptime,system,host.local
|
||||
|
||||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,string,string,string,string
|
||||
#partition,false,false,false,false,false,false,true,true,true
|
||||
#group,false,false,false,false,false,false,true,true,true
|
||||
#default,_result,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,host
|
||||
,,168,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,"15 days, 6:50",uptime_format,system,host.local
|
||||
|
|
Can't render this file because it has a wrong number of fields in line 546.
|
|
@ -1,5 +1,5 @@
|
|||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string,string
|
||||
#partition,false,false,false,false,false,false,true,true,true,true
|
||||
#group,false,false,false,false,false,false,true,true,true,true
|
||||
#default,_result,,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,host,name
|
||||
,,1,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,15204688,io_time,diskio,host.local,disk0
|
||||
|
|
|
|
@ -1,5 +1,5 @@
|
|||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,string,dateTime:RFC3339,double
|
||||
#partition,false,false,true,true,true,false,false
|
||||
#group,false,false,true,true,true,false,false
|
||||
#default,0,,,,,,
|
||||
,result,table,_start,_stop,_measurement,_time,mean
|
||||
,,0,1677-09-21T00:12:43.145224192Z,2262-04-11T23:47:16.854775807Z,diskio,2018-05-22T19:53:26Z,7602668
|
||||
|
|
|
|
@ -1,5 +1,5 @@
|
|||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string
|
||||
#partition,false,false,false,false,false,false,true,true,true
|
||||
#group,false,false,false,false,false,false,true,true,true
|
||||
#default,_result,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,host
|
||||
,,147,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,63.053321838378906,used_percent,mem,host.local
|
||||
|
|
|
|
@ -1,5 +1,5 @@
|
|||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string
|
||||
#partition,false,false,true,true,false,false,true,true,true
|
||||
#group,false,false,true,true,false,false,true,true,true
|
||||
#default,_result,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,host
|
||||
,,0,2018-05-22T19:54:00Z,2018-05-22T19:55:00Z,2018-05-22T19:54:36Z,62.71536350250244,used_percent,mem,host.local
|
||||
|
|
|
|
@ -1,5 +1,5 @@
|
|||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string,string
|
||||
#partition,false,false,false,false,false,false,true,true,true,true
|
||||
#group,false,false,false,false,false,false,true,true,true,true
|
||||
#default,_result,,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,host,name
|
||||
,,1,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,15204688,io_time,diskio,host.local,disk0
|
||||
|
|
|
|
@ -1,5 +1,5 @@
|
|||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string,string
|
||||
#partition,false,false,false,false,false,false,true,true,true,true
|
||||
#group,false,false,false,false,false,false,true,true,true,true
|
||||
#default,_result,,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,host,name
|
||||
,,1,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,15204688,io_time,diskio,host.local,disk0
|
||||
|
|
|
|
@ -1,5 +1,5 @@
|
|||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,string,dateTime:RFC3339,double
|
||||
#partition,false,false,true,true,true,false,false
|
||||
#group,false,false,true,true,true,false,false
|
||||
#default,0,,,,,,
|
||||
,result,table,_start,_stop,_measurement,_time,mean
|
||||
,,0,1677-09-21T00:12:43.145224192Z,2262-04-11T23:47:16.854775807Z,diskio,2018-05-22T19:53:26Z,7602668
|
||||
|
|
|
|
@ -1,5 +1,5 @@
|
|||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string
|
||||
#partition,false,false,false,false,false,false,true,true,true
|
||||
#group,false,false,false,false,false,false,true,true,true
|
||||
#default,_result,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,host
|
||||
,,147,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,63.053321838378906,used_percent,mem,host.local
|
||||
|
|
|
|
@ -1,5 +1,5 @@
|
|||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string
|
||||
#partition,false,false,true,true,false,false,true,true,true
|
||||
#group,false,false,true,true,false,false,true,true,true
|
||||
#default,_result,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,host
|
||||
,,0,2018-05-22T19:53:00Z,2018-05-22T19:53:30Z,2018-05-22T19:53:26Z,63.053321838378906,used_percent,mem,host.local
|
||||
|
|
|
|
@ -258,7 +258,7 @@ type ToHTTPTransformation struct {
|
|||
spec *ToHTTPProcedureSpec
|
||||
}
|
||||
|
||||
func (t *ToHTTPTransformation) RetractBlock(id execute.DatasetID, key query.PartitionKey) error {
|
||||
func (t *ToHTTPTransformation) RetractBlock(id execute.DatasetID, key query.GroupKey) error {
|
||||
return t.d.RetractBlock(key)
|
||||
}
|
||||
|
||||
|
|
|
@ -221,7 +221,7 @@ type ToKafkaTransformation struct {
|
|||
spec *ToKafkaProcedureSpec
|
||||
}
|
||||
|
||||
func (t *ToKafkaTransformation) RetractBlock(id execute.DatasetID, key query.PartitionKey) error {
|
||||
func (t *ToKafkaTransformation) RetractBlock(id execute.DatasetID, key query.GroupKey) error {
|
||||
return t.d.RetractBlock(key)
|
||||
}
|
||||
func NewToKafkaTransformation(d execute.Dataset, cache execute.BlockBuilderCache, spec *ToKafkaProcedureSpec) *ToKafkaTransformation {
|
||||
|
|
|
@ -104,7 +104,7 @@ func NewUniqueTransformation(d execute.Dataset, cache execute.BlockBuilderCache,
|
|||
}
|
||||
}
|
||||
|
||||
func (t *uniqueTransformation) RetractBlock(id execute.DatasetID, key query.PartitionKey) error {
|
||||
func (t *uniqueTransformation) RetractBlock(id execute.DatasetID, key query.GroupKey) error {
|
||||
return t.d.RetractBlock(key)
|
||||
}
|
||||
|
||||
|
|
|
@ -241,7 +241,7 @@ func NewFixedWindowTransformation(
|
|||
}
|
||||
}
|
||||
|
||||
func (t *fixedWindowTransformation) RetractBlock(id execute.DatasetID, key query.PartitionKey) (err error) {
|
||||
func (t *fixedWindowTransformation) RetractBlock(id execute.DatasetID, key query.GroupKey) (err error) {
|
||||
panic("not implemented")
|
||||
//tagKey := meta.Tags().Key()
|
||||
//t.cache.ForEachBuilder(func(bk execute.BlockKey, bld execute.BlockBuilder) {
|
||||
|
@ -321,7 +321,7 @@ func (t *fixedWindowTransformation) Process(id execute.DatasetID, b query.Block)
|
|||
vs[j] = b.Key().Value(keyColMap[j])
|
||||
}
|
||||
}
|
||||
key := execute.NewPartitionKey(cols, vs)
|
||||
key := execute.NewGroupKey(cols, vs)
|
||||
builder, created := t.cache.BlockBuilder(key)
|
||||
if created {
|
||||
for _, c := range newCols {
|
||||
|
|
|
@ -112,7 +112,7 @@ This step is skipped if there was no window function.
|
|||
|
||||
If there is only one group, this does not need to be done and can be skipped.
|
||||
|
||||
If there are multiple groups, as is the case when there are multiple function calls, then we perform an `outer_join` using the time and any remaining partition keys.
|
||||
If there are multiple groups, as is the case when there are multiple function calls, then we perform an `outer_join` using the time and any remaining group keys.
|
||||
|
||||
## <a name="map-and-eval"></a> Map and eval the columns
|
||||
|
||||
|
@ -120,9 +120,9 @@ After joining the results if a join was required, then a `map` call is used to b
|
|||
|
||||
result |> map(fn: (r) => {_time: r._time, max: r.val1, usage_system: r.val2})
|
||||
|
||||
This is the final result. It will also include any tags in the partition key and the time will be located in the `_time` variable.
|
||||
This is the final result. It will also include any tags in the group key and the time will be located in the `_time` variable.
|
||||
|
||||
TODO(jsternberg): The `_time` variable is only needed for selectors and raw queries. We can actually drop this variable for aggregate queries and use the `_start` time from the partition key. Consider whether or not we should do this and if it is worth it.
|
||||
TODO(jsternberg): The `_time` variable is only needed for selectors and raw queries. We can actually drop this variable for aggregate queries and use the `_start` time from the group key. Consider whether or not we should do this and if it is worth it.
|
||||
|
||||
## <a name="encoding"></a> Encoding the results
|
||||
|
||||
|
@ -158,7 +158,7 @@ The edge nodes from the query specification will be used to encode the results b
|
|||
]
|
||||
}
|
||||
|
||||
The measurement name is retrieved from the `_measurement` column in the results. For the tags, the values in the partition key that are of type string are included with both the keys and the values mapped to each other. Any values in the partition key that are not strings, like the start and stop times, are ignored and discarded. If the `_field` key is still present in the partition key, it is also discarded. For all normal fields, they are included in the array of values for each row. The `_time` field will be renamed to `time` (or whatever the time alias is set to by the query).
|
||||
The measurement name is retrieved from the `_measurement` column in the results. For the tags, the values in the group key that are of type string are included with both the keys and the values mapped to each other. Any values in the group key that are not strings, like the start and stop times, are ignored and discarded. If the `_field` key is still present in the group key, it is also discarded. For all normal fields, they are included in the array of values for each row. The `_time` field will be renamed to `time` (or whatever the time alias is set to by the query).
|
||||
|
||||
The chunking options that existed in 1.x are not supported by the encoder and should not be used. To minimize the amount of breaking code, using a chunking option will be ignored and the encoder will operate as normal, but it will include a message in the result so that a user can be informed that an invalid query option was used. The 1.x format has a field for sending back informational messages in it already.
|
||||
|
||||
|
|
|
@ -19,9 +19,9 @@ type MultiResultEncoder struct{}
|
|||
// Expectations/Assumptions:
|
||||
// 1. Each result will be published as a 'statement' in the top-level list of results. The result name
|
||||
// will be interpreted as an integer and used as the statement id.
|
||||
// 2. If the _measurement name is present in the partition key, it will be used as the result name instead
|
||||
// 2. If the _measurement name is present in the group key, it will be used as the result name instead
|
||||
// of as a normal tag.
|
||||
// 3. All columns in the partition key must be strings and they will be used as tags. There is no current way
|
||||
// 3. All columns in the group key must be strings and they will be used as tags. There is no current way
|
||||
// to have a tag and field be the same name in the results.
|
||||
// TODO(jsternberg): For full compatibility, the above must be possible.
|
||||
// 4. All other columns are fields and will be output in the order they are found.
|
||||
|
|
|
@ -96,7 +96,7 @@ func (t *transpilerState) Transpile(ctx context.Context, id int, stmt *influxql.
|
|||
}
|
||||
|
||||
// Join the cursors together on the measurement name.
|
||||
// TODO(jsternberg): This needs to join on all remaining partition keys.
|
||||
// TODO(jsternberg): This needs to join on all remaining group keys.
|
||||
cur := Join(t, cursors, []string{"_measurement"}, nil)
|
||||
|
||||
// Map each of the fields into another cursor. This evaluates any lingering expressions.
|
||||
|
|
|
@ -18,7 +18,7 @@ type BlockIterator interface {
|
|||
}
|
||||
|
||||
type Block interface {
|
||||
Key() PartitionKey
|
||||
Key() GroupKey
|
||||
|
||||
Cols() []ColMeta
|
||||
|
||||
|
@ -76,7 +76,7 @@ func (t DataType) String() string {
|
|||
// All data the ColReader exposes is guaranteed to be in memory.
|
||||
// Once a ColReader goes out of scope all slices are considered invalid.
|
||||
type ColReader interface {
|
||||
Key() PartitionKey
|
||||
Key() GroupKey
|
||||
// Cols returns a list of column metadata.
|
||||
Cols() []ColMeta
|
||||
// Len returns the length of the slices.
|
||||
|
@ -90,7 +90,7 @@ type ColReader interface {
|
|||
Times(j int) []values.Time
|
||||
}
|
||||
|
||||
type PartitionKey interface {
|
||||
type GroupKey interface {
|
||||
Cols() []ColMeta
|
||||
|
||||
HasCol(label string) bool
|
||||
|
@ -104,8 +104,8 @@ type PartitionKey interface {
|
|||
ValueTime(j int) values.Time
|
||||
Value(j int) values.Value
|
||||
|
||||
Equal(o PartitionKey) bool
|
||||
Less(o PartitionKey) bool
|
||||
Equal(o GroupKey) bool
|
||||
Less(o GroupKey) bool
|
||||
String() string
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue