683 lines
12 KiB
Plaintext
683 lines
12 KiB
Plaintext
SELECT mean(value) FROM cpu
|
|
WHERE service = 'redis'
|
|
GROUP BY region, time(10m)
|
|
|
|
|
|
based on group by, get unique tag sets for region
|
|
cpu region=uswest -> get series ids from cpu where <tagset> and <where cond>
|
|
cpu region=useast -> get series ids from cpu where <tagset> and <where cond>
|
|
|
|
for each shard group in time range {
|
|
for each group by tagset {
|
|
shardItrs := map[shard]itr
|
|
for id := range seriesIds {
|
|
shard := group.shardForId(id)
|
|
shardItrs[shard].addId(id)
|
|
}
|
|
for _, itr := range shardItrs {
|
|
itr.tags = tagset
|
|
itr.name = cpu
|
|
}
|
|
}
|
|
}
|
|
|
|
(host = 'serverA' AND value > 100) OR (region = 'uswest' AND value < 10)
|
|
|
|
value > 100 OR value < 10 (host=serverA, region=uswest)
|
|
value < 10 (host!=serverA, region=uswest)
|
|
value > 100
|
|
|
|
|
|
filters := make(map[whereCond]seriesIds)
|
|
filters := make(map[uint32]whereCond)
|
|
seriesIds
|
|
|
|
|
|
select mean(value) from foo WHERE someField = 'important' group by time(5m)
|
|
|
|
===================
|
|
|
|
|
|
|
|
select derivative(mean(value))
|
|
from cpu
|
|
group by time(5m)
|
|
|
|
select mean(value) from cpu group by time(5m)
|
|
select top(10, value) from cpu group by host where time > now() - 1h
|
|
|
|
this query uses this type of cycle
|
|
-------REMOTE HOST ------------- -----HOST THAT GOT QUERY ---
|
|
map -> reduce -> combine -> map -> reduce -> combine -> user
|
|
|
|
select mean(value) cpu group by time(5m), host where time > now() -4h
|
|
map -> reduce -> combine -> user
|
|
map -> reduce -> map -> reduce -> combine -> user
|
|
map -> reduce -> combine -> map -> reduce -> combine -> user
|
|
|
|
|
|
select value from
|
|
(
|
|
select mean(value) AS value FROM cpu GROUP BY time(5m)
|
|
)
|
|
|
|
[
|
|
{
|
|
name: cpu,
|
|
tags: {
|
|
host: servera,
|
|
},
|
|
columns: [time, mean],
|
|
values : [
|
|
[23423423, 88.8]
|
|
]
|
|
},
|
|
{
|
|
name: cpu,
|
|
tags: {
|
|
host: serverb,
|
|
}
|
|
}
|
|
]
|
|
|
|
|
|
================================================================================
|
|
|
|
// list series ->
|
|
/*
|
|
[
|
|
{
|
|
"name": "cpu",
|
|
"columns": ["id", "region", "host"],
|
|
"values": [
|
|
1, "uswest", "servera",
|
|
2, "uswest", "serverb"
|
|
]
|
|
},
|
|
{
|
|
""
|
|
}
|
|
]
|
|
|
|
list series where region = 'uswest'
|
|
|
|
list tags where name = 'cpu'
|
|
|
|
list tagKeys where name = 'cpu'
|
|
|
|
list series where name = 'cpu' and region = 'uswest'
|
|
|
|
select distinct(region) from cpu
|
|
|
|
list names
|
|
list tagKeys
|
|
|
|
list tagValeus where tagKey = 'region' and time > now() -1h
|
|
|
|
select a.value, b.value from a join b where a.user_id == 100
|
|
select a.value from a where a.user_id == 100
|
|
select b.value from b
|
|
|
|
3 1 2
|
|
select sum(a.value) + (sum(b.value) / min(b.value)) from a join b group by region
|
|
|
|
select suM(a.value) from a group by time(5m)
|
|
select sum(b.value) from b group by time(5m)
|
|
|
|
execute sum MR on series [23, 65, 88, 99, 101, 232]
|
|
|
|
map -> 1 tick per 5m
|
|
reduce -> combines ticks per 5m interval -> outputs
|
|
|
|
planner -> take reduce output per 5m interval from the two reducers
|
|
and combine with the join function, which is +
|
|
|
|
[1,/,2,+,3]
|
|
|
|
|
|
|
|
for v := s[0].Next(); v != nil; v = 2[0].Next() {
|
|
var result interface{}
|
|
for i := 1; i < len(s); i += 2 {
|
|
/ it's an operator
|
|
if i % 2 == 1 {
|
|
|
|
}
|
|
}
|
|
}
|
|
|
|
select count(distinct(host)) from cpu where time > now() - 5m
|
|
|
|
type mapper interface {
|
|
Map(iterator)
|
|
}
|
|
|
|
type floatCountMapper struct {}
|
|
func(m *floatCountMapper) Map(i Iterator) {
|
|
itr := i.(*floatIterator)
|
|
}
|
|
|
|
type Iterator interface {
|
|
itr()
|
|
}
|
|
|
|
type iterator struct {
|
|
cursor *bolt.Cursor
|
|
timeBucket time.Time
|
|
name string
|
|
seriesID uint32
|
|
tags map[string]string
|
|
fieldID uint8
|
|
where *WhereClause
|
|
}
|
|
|
|
func (i *intIterator) itr() {}
|
|
func (i *intIterator) Next() (k int64, v float64) {
|
|
// loop through bolt cursor applying where clause and yield next point
|
|
// if cursor is at end or time is out of range, yield nil
|
|
}
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
|
|
field: ipaddress
|
|
|
|
select top(10, count, ipaddress) from hits group by time(5m), host
|
|
|
|
map -> 10 records, <key(time,host)>, <value(count,ipaddresses)>
|
|
|
|
reducer -> take in all map outputs for each 5m bucket
|
|
combine them, sort, take out the top 10
|
|
output -> 10 records, count, ipaddresses, time
|
|
|
|
|
|
==========
|
|
|
|
select top(10, count, host) from hits group by time(5m)
|
|
|
|
select host, value from cpu where time > now() - 1h
|
|
|
|
select last(value) from cpu group by time(auto), host fill(previous) where time > now() - 1h
|
|
|
|
select sum(value) from cpu group by host where time > now() - 1h
|
|
|
|
|
|
|
|
|
|
|
|
|
|
select sum(value) from cpu where time > now() - 1h
|
|
|
|
select * from a;
|
|
|
|
[
|
|
{
|
|
"name": "cpu",
|
|
"tags": {
|
|
"host": "servera"
|
|
},
|
|
"fields": [
|
|
"time",
|
|
"count",
|
|
"ipaddress"
|
|
]
|
|
"values": [
|
|
[t, v, "123.23.22.2"],
|
|
[t, v, "192.232.2.2"],
|
|
|
|
]
|
|
},
|
|
{
|
|
"name": "cpu",
|
|
"tags": {
|
|
"host": "serverb"
|
|
},
|
|
"values": [
|
|
[t, v],
|
|
[t + 1, v],
|
|
|
|
]
|
|
},
|
|
]
|
|
|
|
[t, v, "servera"]
|
|
[t, v, "serverb"]
|
|
[t+1, v, "servera"]
|
|
[t+1, v, "serverb"]
|
|
|
|
======
|
|
|
|
a INNER JOIN b
|
|
|
|
- planner always has "group by"
|
|
|
|
select count(errors.value) / count(requests.value) as error_rate
|
|
from errors join requests as "mysuperseries"
|
|
group by time(5m)
|
|
fill(previous)
|
|
where time > now() - 4h
|
|
|
|
select mean(value) as cpu_mean from cpu group by time(5m) where host = 'servera'
|
|
|
|
select count(value) from errors group by time(5m) fill(previous) where..
|
|
select count(value) from requests group by time(5m) fill(previ...
|
|
|
|
{
|
|
"name": "errors.requests",
|
|
"tags": {},
|
|
"fields": ["time", "errors.count", "requests.count"],
|
|
"values": [
|
|
[t, n, m]
|
|
]
|
|
}
|
|
|
|
|
|
a MERGE b
|
|
|
|
a - t
|
|
b - t
|
|
a - t + 1
|
|
b - t + 1
|
|
b - t + 2
|
|
a - t + 3
|
|
|
|
<cpu, host>
|
|
|
|
select value from cpu
|
|
select mean(value) from cpu group by time(5m)
|
|
|
|
select first(value) from cpu
|
|
|
|
|
|
=====
|
|
|
|
1. Group by time
|
|
2. Group by
|
|
3. Raw
|
|
|
|
======
|
|
|
|
SELECT sum(value) FROM myseries
|
|
|
|
host=servera
|
|
host=serverb
|
|
|
|
{"host":"servera", "value":100}
|
|
{"host":"serverb", "value":"hello!"}
|
|
|
|
|
|
series = <name, tags>
|
|
series = seriesID
|
|
|
|
seriesID -> name
|
|
|
|
name has_many seriesIDs
|
|
name has_many fields
|
|
|
|
field -> (type, id)
|
|
|
|
<seriesName,fieldID> -> (type, id)
|
|
|
|
|
|
<seriesID, time> -> fieldValues
|
|
|
|
|
|
field
|
|
|
|
type topMapper struct {
|
|
count int
|
|
}
|
|
|
|
func newTopMaper(count int) {
|
|
|
|
}
|
|
|
|
func (t *topCountMapper) Map(i Iterator) {
|
|
topValues := make(map[string]int)
|
|
for p := i.Next(); p != nil; p = i.Next() {
|
|
topValues[p.String()] += 1
|
|
}
|
|
for k, v := range topValues {
|
|
t.job.Emit(k, v)
|
|
}
|
|
}
|
|
|
|
type topCountReducer struct {
|
|
count int
|
|
}
|
|
|
|
func (r *topCountReducer) Reduce(i Iterator) {
|
|
realzTop10 := make(map[string]int)
|
|
for v := i.Next(); v != nil; v = i.Next() {
|
|
top10 := v.(map[string]int)
|
|
for k, n := range top10 {
|
|
realzTop10[k] += n
|
|
}
|
|
}
|
|
realyrealTop10 := make(map[string]int)
|
|
// do sorty magic on reazTop10 and set realyreal
|
|
r.job.Emit(realyrealTop10)
|
|
}
|
|
|
|
type Transformer interface {
|
|
Transform(interface{}) Series
|
|
}
|
|
|
|
type ReduceOutput struct {
|
|
values [][]interface{}
|
|
fieldIDs []
|
|
}
|
|
|
|
// for topCountReducer ReduceOutput would look like
|
|
// values = [t, c, "some string"]
|
|
// fieldIDs = [0, 0, 3]
|
|
|
|
SELECT val1, val2 FROM abc
|
|
|
|
|
|
select mean(value) from cpu where region='uswest' group by time(5m), host
|
|
|
|
2000 series
|
|
|
|
200 series to each machine
|
|
|
|
|
|
|
|
================================================================================
|
|
|
|
|
|
|
|
type Mapper interface {
|
|
Map(Iterator)
|
|
}
|
|
|
|
|
|
type countMapper struct {}
|
|
|
|
// Iterator is the entire series if not an aggregate query
|
|
// or iterator is the entire time bucket if an aggregate query
|
|
func (m *sumMapper) Map(i Iterator) {
|
|
var sum int
|
|
for p := i.Next(); p != nil; p = i.Next() {
|
|
sum += p.Float()
|
|
}
|
|
m.Emitter.Emit(k, sum)
|
|
}
|
|
|
|
type Point interface {
|
|
String(name)
|
|
Int(name)
|
|
}
|
|
|
|
type cursorIterator struct {
|
|
Cursor *bolt.Cursor
|
|
FieldID uint8
|
|
Value []byte
|
|
}
|
|
|
|
func (i cursorIterator) Next() Point {
|
|
_, i.Value = i.Cursor.Next()
|
|
return byteSlicePoint(i.Value)
|
|
}
|
|
|
|
type byteSlicePoint []byte
|
|
|
|
func (p byteSlicePoint) String() string {
|
|
// unmarshal from byte slice.
|
|
}
|
|
|
|
/*
|
|
{
|
|
"name": "foo",
|
|
"fields": {
|
|
"value": 23.2,
|
|
"user_id": 23
|
|
},
|
|
"tags": {
|
|
|
|
}
|
|
}
|
|
*/
|
|
|
|
|
|
CNT ID0 VALUEVALUEVALUEVALUEVALUEVALUEVALUEVALU
|
|
0001 0000 0000 0000 0000 0000 0000 0000 0000 0000
|
|
|
|
CNT ID0 ID1 ID2 FLOATFLOA STRINGSTR STRINGSTR
|
|
0002 0001 0002 0003 0000 0000 0000 0000 0000 0000
|
|
|
|
|
|
|
|
// SELECT count() FROM cpu GROUP BY host
|
|
|
|
// SELECT mean(value) from cpu where region = 'uswest'
|
|
|
|
// SELECT derivative(value) from redis_key_count GROUP BY time(5m)
|
|
|
|
|
|
// SELECT host, mean(value)
|
|
// FROM cpu
|
|
// GROUP BY host
|
|
// HAVING top(20, mean)
|
|
// WHERE time > now() - 1h
|
|
// AND region = 'uswest'
|
|
|
|
// SELECT ipaddress, count(ipaddress)
|
|
// FROM hits
|
|
// GROUP BY ipaddress
|
|
// HAVING top(10, count)
|
|
// WHERE time > now() - 1h
|
|
|
|
|
|
series := meta.DistinctTagValues("cpu", "host")
|
|
|
|
tye Series struct {
|
|
name string
|
|
fields map[uint8]string
|
|
}
|
|
|
|
type SeriesData struct {
|
|
ID
|
|
tags map[string]string
|
|
}
|
|
|
|
<id, time, value>
|
|
|
|
mrJobs := make([]*MRJob, 0, len(series))
|
|
for _, s := range series {
|
|
j := NewMRJob(s)
|
|
mrJobs = append(mrJobs, j)
|
|
j.Execute()
|
|
}
|
|
|
|
for _, j := range mrJobs {
|
|
// pull in results
|
|
// construct series object with same tags as series
|
|
}
|
|
|
|
|
|
================================================================================
|
|
|
|
|
|
|
|
type Mapper interface {
|
|
Map(Iterator)
|
|
}
|
|
|
|
|
|
type countMapper struct {}
|
|
|
|
// Iterator is the entire series if not an aggregate query
|
|
// or iterator is the entire time bucket if an aggregate query
|
|
func (m *sumMapper) Map(i Iterator) {
|
|
var sum int
|
|
for p := i.Next(); p != nil; p = i.Next() {
|
|
sum += p.Float()
|
|
}
|
|
m.Emitter.Emit(k, sum)
|
|
}
|
|
|
|
type Point interface {
|
|
String(name)
|
|
Int(name)
|
|
}
|
|
|
|
type cursorIterator struct {
|
|
Cursor *bolt.Cursor
|
|
FieldID uint8
|
|
Value []byte
|
|
}
|
|
|
|
func (i cursorIterator) Next() Point {
|
|
_, i.Value = i.Cursor.Next()
|
|
return byteSlicePoint(i.Value)
|
|
}
|
|
|
|
type byteSlicePoint []byte
|
|
|
|
func (p byteSlicePoint) String() string {
|
|
// unmarshal from byte slice.
|
|
}
|
|
|
|
/*
|
|
{
|
|
"name": "foo",
|
|
"fields": {
|
|
"value": 23.2,
|
|
"user_id": 23
|
|
},
|
|
"tags": {
|
|
|
|
}
|
|
}
|
|
*/
|
|
|
|
|
|
CNT ID0 VALUEVALUEVALUEVALUEVALUEVALUEVALUEVALU
|
|
0001 0000 0000 0000 0000 0000 0000 0000 0000 0000
|
|
|
|
CNT ID0 ID1 ID2 FLOATFLOA STRINGSTR STRINGSTR
|
|
0002 0001 0002 0003 0000 0000 0000 0000 0000 0000
|
|
|
|
|
|
|
|
// SELECT count() FROM cpu GROUP BY host
|
|
|
|
// SELECT mean(value) from cpu where region = 'uswest'
|
|
|
|
// SELECT derivative(value) from redis_key_count GROUP BY time(5m)
|
|
|
|
|
|
// SELECT host, mean(value)
|
|
// FROM cpu
|
|
// GROUP BY host
|
|
// HAVING top(20, mean)
|
|
// WHERE time > now() - 1h
|
|
// AND region = 'uswest'
|
|
|
|
// SELECT ipaddress, count(ipaddress)
|
|
// FROM hits
|
|
// GROUP BY ipaddress
|
|
// HAVING top(10, count)
|
|
// WHERE time > now() - 1h
|
|
|
|
|
|
series := meta.DistinctTagValues("cpu", "host")
|
|
|
|
mrJobs := make([]*MRJob, 0, len(series))
|
|
for _, s := range series {
|
|
j := NewMRJob(s)
|
|
mrJobs = append(mrJobs, j)
|
|
j.Execute()
|
|
}
|
|
|
|
for _, j := range mrJobs {
|
|
// pull in results
|
|
// construct series object with same tags as series
|
|
}
|
|
|
|
|
|
================================================================================
|
|
|
|
|
|
type Iterator interface {
|
|
Next() (interface{}, bool)
|
|
}
|
|
|
|
type iteratorCounter struct {
|
|
iterator Iterator
|
|
}
|
|
|
|
func (iteratorCounter) Next() {
|
|
|
|
}
|
|
|
|
|
|
SELECT max(a.value), min(a.value), max(b.value)
|
|
FROM a, b
|
|
WHERE a.host = 'influxdb.org'
|
|
|
|
|
|
grouper {
|
|
[]Iterator
|
|
}
|
|
|
|
|
|
SELECT max(a.value) FROM a WHERE a.host = 'influxdb.org' --> 1 value
|
|
SELECT min(a.value) FROM a WHERE a.host = 'influxdb.org' --> 1 value
|
|
SELECT max(b.value) FROM b --> 1 value
|
|
|
|
|
|
SELECT max(a.value) FROM a GROUP BY time WHERE a.host = 'influxdb.org' --> key,value
|
|
|
|
|
|
timeGrouper {
|
|
[]Iterator
|
|
}
|
|
|
|
|
|
type maxMapper struct {
|
|
}
|
|
|
|
IntervalIterator {
|
|
}
|
|
|
|
|
|
|
|
maxMapper.Map(Iterator)
|
|
|
|
|
|
|
|
- GROUP BY time
|
|
- GROUP BY time, <tag>
|
|
- GROUP BY <tag>
|
|
|
|
|
|
|
|
|
|
|
|
COUNT(field)
|
|
MIN(field)
|
|
MAX(field)
|
|
MEAN(field)
|
|
MODE(field)
|
|
MEDIAN(field)
|
|
COUNT(DISTINCT field)
|
|
PERCENTILE(field, N)
|
|
HISTOGRAM(field [, bucketSize])
|
|
DERIVATIVE(field)
|
|
SUM(field)
|
|
STDDEV(field)
|
|
FIRST(field)
|
|
LAST(field)
|
|
DIFFERENCE(field)
|
|
TOP(field, N)
|
|
BOTTOM(field, N) <----- multivalue
|
|
|
|
|
|
|
|
================================================================================
|