If the first block that needs to be read was partially deleted such
that the trailing end has no values, it was possible for the query
cursor end early.
This was caused by the KeyCursor.ReadFloatBlock returning no values instead
of checking the remaing blocks.
This adds the capability to the engine to force a full compaction
to be scheduled. When called, it snapshots any data in the cache,
aborts running compactions and prevents level plans from returning
level plans.
* did not handle cached values correctly
* sort shards by time in either ascending or descending
order depending on the RPC request ordering to ensure they
are traversed in the correct order.
The DropSeries code path ended up creating a MeasurementSeriesIterator
for each dropped series, this was too expensive just to see if a
series exists.
This adds a HasSeries func and fixes and issue where TSI files were
compacted while an iterator was still in use causing a panic.
This removes the containsSeries func which ends up creating a map
sized to the slice of keys passed in. This doesn't scale well to
high cardinalities and creates a lot of garbage.
The query language min and max times are slighly different than the
values used in the engine. This allows faster codes to be used when
the whole time range is deleted.
This is a version of DeleteRange that take a func predicate to determine
whether a series key should be deleted or not. This avoids the large
slice allocations with higher cardinalities.
This adds a new v4 tombstone format that extends the v3 format
by allowing multiple batches of tombstones to be written without
having to re-read all the existing tombstones. This uses gzip
multi stream to append multiple v3 files together to create a v4
format.
The previous sha was taken from a revision on a devel branch that I
thought would continue staying in the tree after it was merged. That
revision was rebased away and the API was changed for the logger.
This updates the usage of the logger and adds a simple package for
constructing the base logger.
The 1.0 version of zap changed the format of the default console logger
so this change moves over to this new logger instead of attempting to
retain backwards compatibility with the old format.
Update support in the `toml` package for parsing human-readble byte sizes.
Supported size suffixes are "k" or "K" for kibibytes, "m" or "M" for
mebibytes, and "g" or "G" for gibibytes. If a size suffix isn't specified
then bytes are assumed.
In the config, `cache-max-memory-size` and `cache-snapshot-memory-size` are
now typed as `toml.Size` and support the new syntax.
* Fprint* functions
* No nakedness
* clarify panic messages
* spacing between case statements
* remove break in favor of return
* remove goto in favor of for { continue }
* batch cursors return slices of timestamps and values to reduce call
overhead. Significantly improved iteration.
* added CreateCursor API to Shard, Engine
* moved build*Cursor to code gen
* array has already been sized correctly
* eliminates bounds checking for each element access
* reduces decoding of 30,000,000 points via storage API from
584ms to 540ms on average
* Introduces EXPLAIN ANALYZE command, which
produces a detailed tree of operations used to
execute the query.
introduce context.Context to APIs
metrics package
* create groups of named measurements
* safe for concurrent access
tracing package
EXPLAIN ANALYZE implementation for OSS
Serialize EXPLAIN ANALYZE traces from remote nodes
use context.Background for tests
group with other stdlib packages
additional documentation and remove unused API
use influxdb/pkg/testing/assert
remove testify reference
If multiple tombstones exists for a series that ended up causing the
full data to be deleted, the blocks were not removed from the offsets
in the index. This causes the TSMReader to report that a key exist
but does not have any data.
During a compaction, every key should have at least one value. Since
this invariant was broken, the compaction aborted early and ends up
dropping all series keys that are lexigraphically greater than where
the breakage occured. This would cause data to be dropped during the
compaction.
This fixes a potential bug where the BlockIterator would skip blocks
if the underlying TSMReader had deletes on it concurrently. This
could possibly occur due to changes in 91eb9de3 that now use the
existing TSMReaders from the FileStore instead of creating new ones
during compaction.
There was a race on the WaitGroup where we could end up calling Add
while another goroutine was still waiting. The functions were confusing
so they have been simplified a bit since the compactions goroutines
have been reworked a lot already.
The scheduling logic ended up favoring more backlogged shards
too much and would starved active, less backed up shards. This
occurred because the scheduling kicks in once a second. When it
runs, it schedules as many compactions as it can. A backed up shard
would end up having more compactions to run during the loop an would
generally get to schedule them more frequently.
This now allows each shard to try and schedule one compaction at a time
which provides a more balanced approach. At some point, we'll probably
want to more directly balanc the each shards backlog vs letting it happen
somewhat randomly.
Some files seem to get orphan behind higher levels. This causes
the compactions to get blocked as the lowere level files will not
get picked up by their lower level planners. This allows the full
plan to identify them and pull them into their plans.
This check doesn't make sense for high cardinality data as the files
typically get big and sparse very quickly. This causes a lot of extra
disk space to be used which is taken up by large indexes and sparse
data.
One shard might be able to run a compaction, but could fail to
limits being hit. This loop would continue indefinitely as the
same task would continue to be rescheduled.
With higher cardinality or larger series keys, the files can roll
over early which causes them to take longer to be compacted by higher
levels. This causes larger disk usage and higher numbers of tsm files
at times.
This changes the compaction scheduling to better utilize the available
cores that are free. Previously, a level was planned in its own goroutine
and would kick off a number of compactions groups. The problem with this
model was that if there were 4 groups, and 3 completed quickly, the planning
would be blocked for that level until the last group finished. If the compactions
at the prior level are running more quickly, a large backlog could accumlate.
This now moves the planning to a single goroutine that plans each level in
succession and starts as many groups as it can. When one group finishes,
the planning will start the next group for the level.
The fysncs due to large writes when writing to TSM files and the
WAL can eventually cause large pauses. Since we already buffer
writes, using synchronous IO reduces fsync latency by ensuring
the individiual writes hit disk. This spreads out the latecncy
across multiple writes better.
With higher cardinalities, the encoder pools where become a bottleneck.
This changes the snapshot compactions ot checkout one encoder of each
type and re-use it while writing the snapshots as opposed to repeatedly
checking it out and in.
This perioically re-allocates the cache store to avoid memory
fragmentation and gradual slow down of the store after repeated
deletes and inserts into the map.
This instructs the kernel that it can release memory used by mmap'd
TSM files when they are not actively being used. It the mappings are
use, the kernel will fault the pages back in. On linux, this causes
RES memory to drop immediately when run.
Compactions would create their own TSMReaders for simplicity. With
very high cardinality compactions, creating the reader and indirectIndex
can start to use a significant amount of memory.
This changes the compactions to use a reader that is already allocated
and managed by the FileStore.
These are already sorted during compaction, so switch to sorting lazily
to avoid the CPU and allocations. This would only occur when using if
using the writer directly.
The directIndex used by the TSMWriter maintained a map of series keys
to index entries. When the index is written to the TSM file, the keys
are sorted and then written out in order.
The reason for this is because directIndex used to be the only index
and it was optimized more for reading. The reading has been replaced
by the indirectIndex so the map of keys ends up wasting space.
During compactions, the series keys (and index entries) are already sorted
so this change uses the sorting to avoid the map and sort when writing the
index. This reduces allocations and CPU usage quite a bit for larger cardinality
TSM files.
This leaves the slower compactions that create full blocks to only
the full compaction. This helps reduce CPU usage and memory while shards
are hot, but increases disk usage (reduced compression) slightly.
Deleting high cardinality series could take a very long time, cause
write timeouts as well as dead lock the process. This fixes these
issue to by changing the approach for cleaning up the indexes and
reducing lock contention.
The prior approach delete each series and updated every index (inmem)
during the delete. This was very slow and cause the index to be locked
while it items in a slice were removed one by one. This has been changed
to mark series as deleted and then rebuild the index asynchronously which
speeds up the process.
There was also a dead lock that could occur when deleing the field set.
Deleting the field set held a write lock and the function it invoked under
the lock could try to take a read lock on the field set. This would then
deadlock. This approach was also very slow and caused time out for writes.
It now uses faster approach that checks for the existing of the measurment
in the cache and filestore which does not take write locks.
It prints the statistics of each iterator that will access the storage
engine. For each access of the storage engine, it will print the number
of shards that will potentially be accessed, the number of files that
may be accessed, the number of series that will be created, the number
of blocks, and the size of those blocks.
This is used quite a bit to determine which fields are needed in a
condition. When the condition gets large, the memory usage begins to
slow it down considerably and it doesn't take care of duplicates.
There are several places in the code where comma-ok map retrieval was
being used poorly. Some were benign, like checking existence before
issuing an unconditional delete with no cleanup. Others were potentially
far more serious: assuming that if 'ok' was true, then the resulting
pointer retrieved from the map would be non-nil. `nil` is a perfectly
valid value to store in a map of pointers, and the comma-ok syntax is
meant for when membership is distinct from having a non-zero value.
There was only one or two cases that I saw that being used correctly for
maps of pointers.
If a large compaction was running and was aborted. It could would leave
some tmp files around for files that it had fully written. The current
active file was cleaned up, but already completed ones would not. This
would occur when a TSM file needed to rollover due to size.
* <type>FinalizerIterator sets a runtime finalizer and calls Close
when garbage collected. This will ensure any associated cursors
are closed and the associated TSM files released
* `query.Iterators#Merge` call could return an error and the inputs
would not be closed, causing a cursor leak
The OnReplace func ends up trying to acquire locks on MeasurementFields. When
its called via snapshotting, this can deadlock because the snapshotting goroutine
also holds an RLock on the engine. If a delete measurement calls is run at the
right time, it will lock the MeasurementFields and try to acquire a lock on the engine
to disable compactions. This creates a deadlock.
To fix this, the OnReplace callback is moved to a function param to allow only Replace
calls as part of a compaction to invoke it as opposed to both snapshotting and compactions.
Fixes#8713
This change provides a clear separation between the query engine
mechanics and the query language so that the language can be parsed and
dealt with separate from the query engine itself.
Previously pseudo iterators could be created for meta data such
as series, measurement, and tag data. These iterators were created
at a higher level and lacked a lot of the power of the query engine.
This commit moves system iterators down to the series level and
supports the following:
- _name
- _seriesKey
- _tagKey
- _tagValue
- _fieldKey
These can be used as normal fields such as:
SELECT _seriesKey FROM cpu
This will return all the series keys for `cpu`.
If there were multiple shards, drop measurement could update the index
and remove the measurement before the other shards ran their deletes.
This causes the later shards to not see any series to delete.
The fix is to all deleteSeries to handle the index delete which already
accounts for removing the measurement when it is fully removed from the
index.
The partiallyRead func didn't account for the initial values and would
return true for blocks that had not been read at all. This causes a
slower path during compactions that forces a block to be decoded when
it could just be merged as is without decoded. This causes compactions
to consume more CPU and run slower at times.
This switches all the interfaces that take string series key to
take a []byte. This eliminates many small allocations where we
convert between to two repeatedly. Eventually, this change should
propogate futher up the stack.
The refs map was to increment the file references one time each.
It doesn't hurt to increment them multiple times though.
We also do not need to copy the files slice as we are accessing it
under a read lock so it can't be changed.
When snapshots and compactions are disabled, the check to see if
the compaction should be aborted occurs in between writing to the
next TSM file. If a large compaction is running, it might take
a while for the file to be finished writing causing long delays.
This now interrupts compactions while iterating over the blocks to
write which allows them to abort immediately.
* introduced UnsignedValue type
* leveraged existing int64 compression algorithms (RLE, Simple 8B)
* tsm and WAL can read and write UnsignedValue
* compaction is aware of UnsignedValue
* unsigned support to model, cursors and write points
NOTE: there is no support to create unsigned points, as the line
protocol has not been modified.
There was a race in the WAL writeToLog and scheduleSync which could
lead to a writing goroutine blocking indefinitely on its syncErr channel.
The issue was that the clearing of the syncCount happenend after the
wal was unlock. If a goroutine was able to lock, write and call scheduleSync
before the existing scheduleSync goroutine returns and ran the defer to
clear the syncCount, then a new scheduleSync goroutine would not get started.
This left the writing goroutine block with nothing to signal it.
While in this state, a RLock on the engine was held. If a Lock was requested
on the engine during this time, all future writes and queries would block waiting
on the blocked wal writer.
The fix is to move the atomic clearing of syncCount before the Lock is released.
The min key was not used in OverlapsKeyRange which caused it to return
false when it should be true. This causes a bug where deletes would not
write tombstones for files that actually contained the data it was supposed
to delete.
The in-memory index can get out of sync when deletes and writes
to the same measurement are running concurrently. The index is
updated independently from data on disk and it's possible for the
index to unassign a shard when data still exists on disk. What happens
is that there are TSM files on disk, but the index does not know that
the series that exist in those files still are in the shard. Restarting
the server reloads the index and the data is visible again. From and
end user perspective, this can look like more data is deleted than should
have been or that deleted data re-appears after a restart or writes to the
shard occur again.
There isn't an easy way to resolve this since the index and storage
are not transactional resources and we cannot atomically commit or
rollback changes to both at once.
As a workaround, after new TSM files are installed, we refresh the
index with series keys that exist in the new tsm files as well as
any lingering data still in the cache. There is a small window of time
when the index may be missing series, but it will re-appear after the refresh
completes.
This adds a v3 format that is a gzip compressed version of the v2
format. It reduces the size of tombstone files substantially without
having to support a more feature rich file format for tombstones.
The monitor goroutine calls enable compactions every 10s to spin down
(or start up) goroutines for cold shards. This frequent Lock may be
causing lock contention for writes and queries which get blocked trying
to acquire an RLock.
The go RWMutex says that new RLock calls will block if there is a
pending Lock call that is blocked. Switching the common path to use
an RLock should avoid the Lock and reduce lock contention for writes
and queries.
WriteBlock was missing the check for the max series keys which allowed
series keys to be written that were larger than the 2 bytes allocated
to store their length. When this occurred, the TSM can fail to load.
The defer was never executed because the planning happens in a
long running goroutine that loops. The plans need to be released
immediately after applying them.
TMP files could leak when compactions failed for various reasons. They
were also being deleted inadvertently when compactions were disabled causing
other errors to be reported in the logs.
This changes full compactions within a shard to run sequentially
instead of running all the compaction groups in parallel. Normally,
there is only 1 full compaction group to run. At times, there could
be several which causes instability if they are all running concurrently
as they tie up a cpu for long periods of time.
Level compactions are also capped to a max of 4 concurrently running for each level
in a shard. This prevents sudden spikes in CPU and disk usage due to a large backlog
of tsm files at a given level.
Measurement name and field were converted between []byte and string
repetively causing lots of garbage. This switches the code to use
[]byte in the write path.
This pool was previously a pool.Bytes to avoid repetitive allocations.
It was recently switchted to a sync.Pool because pool.Bytes held onto
very larger buffers at times which were never released. sync.Pool is
showing up in allocation profiles quite frequently.
This switches the pool to a new pool that limits how many buffers are
in the pool as well as the max size of each buffer in the pool. This
provides better bounds on allocations.
This speeds up time encoding and decoding by skipping the divisor
scaling if scaling by 1. Since division and multiplication are expensive
cpu and scaling by 1 has no effect, this just slows encoding and decoding
down.
Tombstone files would be written to all TSM files even if the deleted
keys or timerange did not exist in the TSM file. This had the side
effect of causing shards to get recompacted back to the same state. If
any shards or large numbers of TSM files existed, disk usage and CPU
utilization would spike causing issues.
This prevents tombstones being written for TSM files that could not
possiby contain the series keys being deleted or if the delted time
range is outside the range of the file.
Since this is called more frequently now, the cleanup func was invoked
quite a bit which makes several syscalls per shard. This should only
be called the first time compactions are disabled.
This was causing a shard to appear idle when in fact a snapshot compaction
was running. If the time was write, the compactions would be disabled and
the snapshot compaction would be aborted.
The monitor goroutine ran for each shard and updated disk stats
as well as logged cardinality warnings. This goroutine has been
removed by making the disks stats more lightweight and callable
direclty from Statisics and move the logging to the tsdb.Store. The
latter allows one goroutine to handle all shards.
Each shard has a number of goroutines for compacting different levels
of TSM files. When a shard goes cold and is fully compacted, these
goroutines are still running.
This change will stop background shard goroutines when the shard goes
cold and start them back up if new writes arrive.
The compactor prevents the same file from being compacted by different
compaction runs, but it can result in warning errors in the logs that
are confusing.
This adds compaction plan tracking to the planner so that files are
only part of one plan at a given time.
This limit allows the number of concurrent level and full compactions
to be throttled. Snapshot compactions are not affected by this limit
as then need to run continously.
This limit can be used to control how much CPU is consumed by compactions.
The default is to limit to the number of CPU available.
The current bytes.Pool will hold onto byte slices indefinitely. Large
writes can cause the pool to hold onto very large buffers over time.
Testing w/ sync/pool seems to perform similarly now so using a sync/pool
will allow these buffers to be GC'd when necessary.
Under high write load, the sync goroutine would startup, and end
very frequently. Starting a new goroutine so frequently adds a small
amount of latency which causes writes to take long and sometimes timeout.
This changes the goroutine to loop until there are no more waiters which
reduce the churn and latency.
If the sync waiters channel was full, it would block sending to the
channel while holding a the wal write lock. The sync goroutine would
then be stuck acquiring the write lock and could not drain the channel.
This increases the buffer to 1024 which would require a very high write
load to fill as well as retuns and error if the channel is full to prevent
the blocking.
The Point is intended to be immutable after being parsed since it
is shared by several goroutines. When dropping a field (e.g. time),
corrupted data can result if one goroutine is delete the field
while another is marshaling the underlying byte slices.
To avoid this, the shard will just skip invalid fields and series
instead of trying to mutate them by deleting them.
This reworks drop measurement to use a sorted list of series keys
instead of creating an intermediate map. It remove allocations
and some extra garbage that is created during drop measurement.
WalkKeys serially walked each TSM file and invoked fn for each key.
Caller needed to handle duplicate calls to fn with the same key
because the same key could exist in multiple TSM files. The serial
execution was also slower.
Since the series keys are already sorted, we can iterate over all
files in parallel and skip duplicates using a sorted merge. This
fixes the duplicate invocation issue as well as speeds up walking
all keys.
This can significant improve startup performance when many TSM files
exists that may not have been fully compacted. This also has benefits
for deletes (measurements/series) since duplicates are removed saving
extra allocations and work. This may also allow for the optimize
compaction to be removed provided startup times are fast enough.
The previous version was very innefficient due to the benchmarks used
to optimize it having a bug. This version always allocates a new
slice, but is O(n).
This switches compactions to use type values (FloatValues) from the
generic Values type. It avoids a bunch of allocations where each value
much be converted from a specific type to an interface{}.
This code was added to address some slow startup issues. It is believed
to be the cause of some segfault panic's that occur at query time when
the underlying MMAP array has been unmapped. The current structure of
code makes this change unnecessary now.
If a bad query is run, kill query and limits would not kick in until
after it started executing. Some bad queries that involve high
cardinality can cause the server to OOM just from planning which
defeats the purpose of the max-select-series limit.
This change primarily fixes max-select-series limit so that the query
is killed earlier and has the side effect that kill query now can kill
a query while it's being planned.
The limit waited until all the iterators had been created which still
allows problem queries to be planned. This allows the queries to be
aborted much earlier in some cases.
Fsyncs to the WAL can cause higher IO with lots of small writes or
slower disks. This reworks the previous wal fsyncing to remove the
extra goroutine and remove the hard-coded 100ms delay. Writes to
the wal still maintain the invariant that they do not return to the
caller until the write is fsync'd.
This also adds a new config options wal-fsync-delay (default 0s)
which can be increased if a delay is desired. This is somewhat useful
for system with slower disks, but the current default works well as
is.
The previous version was very innefficient due to the benchmarks used
to optimize it having a bug. This version always allocates a new
slice, but is O(n).
This switches compactions to use type values (FloatValues) from the
generic Values type. It avoids a bunch of allocations where each value
much be converted from a specific type to an interface{}.
Still seeing the panic that switching this logic around was supposed
to fix. We now delete the bulk of data outside of the fields lock
and then again, under the write lock, to ensure that the field mapping
is accurate. We don't do the full delete under the lock because it
can block writes and queries that require a read lock.
If blocks containing overlapping ranges of time where partially
recombined, it was possible for the some points to get dropped
during compactions. This occurred because the window of time of
the points we need to merge did not account for the partial blocks
created from a prior merge.
Fixes#8084
There is a race where the field type can be deleted while a new type
is written and during a query. When this happens, an iterator for
the new type is created but old data make still exist in the cache
for TSM files causing a panic.
Under high query load, a race exists in the cache and the WAL. Since
writes currently hit the cache first, they are availble for query before
they hit the WAL. If the WAL is writing and accessign the Value slice
at the same time that a query is run that needs to dedup the same slice,
a race occurs.
To fix this, the cache now just copies the values instead of storing the
slice passed in. Another way to fix this might be to have the writes go
to the wal before the cache. I think the latter would be better, but it
introduces some larger write path issues that we'd need to also address.
e.g. if the cache was full, writes to the WAL would need to be rejected
to avoid filling the disk.
Copying the slice in the cache is simpler for now and does not appear to
dramatically affect performance.
Previously, tags had a `shouldCopy` flag to indicate if those tags
referenced an underlying buffer and should be copied to allow GC.
Unfortunately, this prevented tags from being copied that were
created and referenced the mmap which caused segfaults.
This change removes the `shouldCopy` flag and replaces it with a
`forceCopy` argument in `CreateSeriesIfNotExists()`. This allows
the write path to indicate that tags must be cloned on insert.
They rebased a revision we were previously relying upon that allowed us
to use the vanity name so we are reverting back to an older version with
the old import path.
The order of series keys is in ascending alphabetical order, not
descending alphabetical order, when it is ordered by descending time.
This fixes the ordering so points are returned in descending order. The
emitter also had the conditions for choosing which iterator to use in
the wrong direction (which only affects aggregates with `FILL(none)`).
This fixes LIMIT and OFFSET when they are used in a subquery where the
grouping of the inner query is different than the grouping of the outer
query. When organizing tag sets, the grouping of the outer query is
used so the final result is in the correct order. But, unfortunately,
the optimization incorrectly limited the number of points based on the
grouping in the outer query rather than the grouping in the inner query.
The ideal solution would be to use the outer grouping to further
organize it by the grouping for the inner subquery, but that's more
difficult to do at the moment. As an easier fix, the query engine now
limits the output of each series. This may result in these types of
queries being slower in some situations like this one:
SELECT mean(value) FROM (SELECT value FROM cpu GROUP BY host LIMIT 1)
This will be slower in a situation where the `cpu` measurement has a
high cardinality and many different tags.
This also fixes `last()` and `first()` when they are used in a subquery
because those functions use `LIMIT 1` as an internal optimization.
Every write to the WAL current runs and fsync before returning. When
there are lot of concurrent writes, this can cause the WAL to bottleneck
write throughput since fsyncs are very expensive.
This changes the writeToLog to fsync on an interval to allow multiple fsyncs
calls to be batched up into one. The writeToLog behavior is the same in that
it won't return until an fsync has been performed.
I ran into an issue where the cache snapshotting seemed to stop
completely causing the cache to fill up and never recover. I believe
this is due to the the Timer being reused incorrectly. Instead,
use a Ticker that will fire more regularly and not require the resetting
logic (which was wrong).
The memory stats as well as the size of the cache were not accurate.
There was also a problem where the cache size would be increased
optimisitically, but if the cache size limit was hit, it would not
be decreased. This would cause the cache size to grow without
bounds with every failed write.
The CacheKeyIterator (used for snapshot compactions), iterated over
each key and serially encoded the values for that key as the TSM
file is written. With many series, this can be slow and will only
use 1 CPU core even if more are available.
This changes it so that the key space is split amongst a number of
goroutines that start encoding all keys in parallel to improve
throughput.
This simplifies the cache.Snapshot func to swap the hot cache to
the snapshot cache instead of copy and appending entries. This
reduces the amount of time the cache is write locked which should
reduce cache contention for the read only code paths.
Also, fix the `Iterators.Merge(IteratorOptions)` function so it consults
the `Ordered` attribute to determine which iterator it should use to
merge the input iterators.
The backup command can fail if a snapshot is running which silently
closes the connection. This causes the backup shard command to continue
on as if nothing failed.
This adds query syntax support for subqueries and adds support to the
query engine to execute queries on subqueries.
Subqueries act as a source for another query. It is the equivalent of
writing the results of a query to a temporary database, executing
a query on that temporary database, and then deleting the database
(except this is all performed in-memory).
The syntax is like this:
SELECT sum(derivative) FROM (SELECT derivative(mean(value)) FROM cpu GROUP BY *)
This will execute derivative and then sum the result of those derivatives.
Another example:
SELECT max(min) FROM (SELECT min(value) FROM cpu GROUP BY host)
This would let you find the maximum minimum value of each host.
There is complete freedom to mix subqueries with auxiliary fields. The only
caveat is that the following two queries:
SELECT mean(value) FROM cpu
SELECT mean(value) FROM (SELECT value FROM cpu)
Have different performance characteristics. The first will calculate
`mean(value)` at the shard level and will be faster, especially when it comes to
clustered setups. The second will process the mean at the top level and will not
include that optimization.