Merge pull request #3441 from influxdata/pd/iox_catalog

feat: Add initial iox_catalog skeleton
pull/24376/head
kodiakhq[bot] 2022-01-17 15:15:37 +00:00 committed by GitHub
commit 450b324bc3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 1847 additions and 67 deletions

View File

@ -192,6 +192,9 @@ jobs:
- image: mcr.microsoft.com/azure-storage/azurite
- image: vectorized/redpanda:v21.9.2
command: redpanda start --overprovisioned --smp 1 --memory 1G --reserve-memory 0M
- image: postgres
environment:
POSTGRES_HOST_AUTH_METHOD: trust
resource_class: xlarge # use of a smaller executor tends crashes on link
environment:
# Disable incremental compilation to avoid overhead. We are not preserving these files anyway.
@ -212,6 +215,8 @@ jobs:
AWS_ENDPOINT: http://127.0.0.1:4566
AZURE_USE_EMULATOR: "1"
INFLUXDB_IOX_BUCKET: iox-test
POSTGRES_USER: postgres
DATABASE_URL: "postgres://postgres@localhost/iox_shared"
steps:
- run:
name: Setup localstack (AWS emulation)
@ -230,6 +235,9 @@ jobs:
- checkout
- rust_components
- cache_restore
- run: cargo install sqlx-cli
- run: sqlx database create
- run: cd iox_catalog && sqlx migrate run && cd ..
- run:
name: Cargo test
command: cargo test --workspace --features=aws,azure,azure_test,kafka

350
Cargo.lock generated
View File

@ -151,9 +151,9 @@ dependencies = [
[[package]]
name = "assert_cmd"
version = "2.0.2"
version = "2.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e996dc7940838b7ef1096b882e29ec30a3149a3a443cdc8dba19ed382eca1fe2"
checksum = "93ae1ddd39efd67689deb1979d80bad3bf7f2b09c6e6117c8d1f2443b5e2f83e"
dependencies = [
"bstr",
"doc-comment",
@ -201,6 +201,15 @@ dependencies = [
"syn",
]
[[package]]
name = "atoi"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "616896e05fc0e2649463a93a15183c6a16bf03413a7af88ef1285ddedfa9cda5"
dependencies = [
"num-traits",
]
[[package]]
name = "atty"
version = "0.2.14"
@ -320,9 +329,9 @@ dependencies = [
[[package]]
name = "bitflags"
version = "1.2.1"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "block-buffer"
@ -356,9 +365,9 @@ dependencies = [
[[package]]
name = "brotli"
version = "3.3.2"
version = "3.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "71cb90ade945043d3d53597b2fc359bb063db8ade2bcffe7997351d0756e9d50"
checksum = "f838e47a451d5a8fa552371f80024dd6ace9b7acdf25c4c3d0f9bc6816fb1c39"
dependencies = [
"alloc-no-stdlib",
"alloc-stdlib",
@ -513,9 +522,9 @@ dependencies = [
[[package]]
name = "clap"
version = "3.0.6"
version = "3.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1957aa4a5fb388f0a0a73ce7556c5b42025b874e5cdc2c670775e346e97adec0"
checksum = "12e8611f9ae4e068fa3e56931fded356ff745e70987ff76924a6e0ab1c8ef2e3"
dependencies = [
"atty",
"bitflags",
@ -643,6 +652,21 @@ dependencies = [
"libc",
]
[[package]]
name = "crc"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49fc9a695bca7f35f5f4c15cddc84415f66a74ea78eef08e90c5024f2b540e23"
dependencies = [
"crc-catalog",
]
[[package]]
name = "crc-catalog"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ccaeedb56da03b09f598226e25e80088cb4cd25f316e6e4df7d695f0feeb1403"
[[package]]
name = "crc32fast"
version = "1.3.0"
@ -746,6 +770,16 @@ dependencies = [
"scopeguard",
]
[[package]]
name = "crossbeam-queue"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b979d76c9fcb84dffc80a73f7290da0f83e4c95773494674cb44b76d13a7a110"
dependencies = [
"cfg-if",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.6"
@ -937,6 +971,15 @@ dependencies = [
"generic-array 0.14.5",
]
[[package]]
name = "dirs"
version = "4.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca3aa72a6f96ea37bbc5aa912f6788242832f75369bdfdadcb0e38423f100059"
dependencies = [
"dirs-sys",
]
[[package]]
name = "dirs-next"
version = "2.0.0"
@ -947,6 +990,17 @@ dependencies = [
"dirs-sys-next",
]
[[package]]
name = "dirs-sys"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03d86534ed367a67548dc68113a0f5db55432fdfbb6e6f9d77704397d95d5780"
dependencies = [
"libc",
"redox_users",
"winapi",
]
[[package]]
name = "dirs-sys-next"
version = "0.1.2"
@ -1192,6 +1246,17 @@ dependencies = [
"futures-util",
]
[[package]]
name = "futures-intrusive"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62007592ac46aa7c2b6416f7deb9a8a8f63a01e0f1d6e1787d5630170db2b63e"
dependencies = [
"futures-core",
"lock_api",
"parking_lot",
]
[[package]]
name = "futures-io"
version = "0.3.19"
@ -1298,9 +1363,9 @@ dependencies = [
[[package]]
name = "getrandom"
version = "0.2.3"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fcd999463524c52659517fe2cea98493cfe485d10565e7b0fb07dbba7ad2753"
checksum = "418d37c8b1d42553c93648be529cb70f920d3baf8ef469b74b9638df426e0b4c"
dependencies = [
"cfg-if",
"js-sys",
@ -1381,9 +1446,9 @@ checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7"
[[package]]
name = "handlebars"
version = "4.2.0"
version = "4.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2483bce82dd3ed52509d0117e4a30a488bd608be250ed7a0185301314239ed31"
checksum = "25546a65e5cf1f471f3438796fc634650b31d7fcde01d444c309aeb28b92e3a8"
dependencies = [
"log",
"pest",
@ -1402,6 +1467,15 @@ dependencies = [
"ahash",
]
[[package]]
name = "hashlink"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7249a3129cbc1ffccd74857f81464a323a152173cdb134e0fd81bc803b29facf"
dependencies = [
"hashbrown",
]
[[package]]
name = "heappy"
version = "0.1.0"
@ -1631,7 +1705,7 @@ dependencies = [
"byteorder",
"bytes",
"chrono",
"clap 3.0.6",
"clap 3.0.7",
"comfy-table",
"csv",
"data_types",
@ -1810,13 +1884,29 @@ dependencies = [
"workspace-hack",
]
[[package]]
name = "iox_catalog"
version = "0.1.0"
dependencies = [
"async-trait",
"chrono",
"dotenv",
"futures",
"influxdb_line_protocol",
"observability_deps",
"snafu",
"sqlx",
"tokio",
"workspace-hack",
]
[[package]]
name = "iox_data_generator"
version = "0.1.0"
dependencies = [
"chrono",
"chrono-english",
"clap 3.0.6",
"clap 3.0.7",
"criterion",
"data_types",
"futures",
@ -2011,9 +2101,9 @@ checksum = "1b03d17f364a3a042d5e5d46b053bbbf82c92c9430c592dd4c064dc6ee997125"
[[package]]
name = "libloading"
version = "0.7.2"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "afe203d669ec979b7128619bae5a63b7b42e9203c1b29146079ee05e2f604b52"
checksum = "efbc0f03f9a775e9f6aed295c6a1ba2253c5757a9e03d55c6caa46a681abcddd"
dependencies = [
"cfg-if",
"winapi",
@ -2377,19 +2467,6 @@ dependencies = [
"smallvec",
]
[[package]]
name = "nix"
version = "0.22.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3bb9a13fa32bc5aeb64150cd3f32d6cf4c748f8f8a417cce5d2eb976a8370ba"
dependencies = [
"bitflags",
"cc",
"cfg-if",
"libc",
"memoffset",
]
[[package]]
name = "nix"
version = "0.23.1"
@ -2675,9 +2752,9 @@ dependencies = [
[[package]]
name = "openssl-probe"
version = "0.1.4"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28988d872ab76095a6e6ac88d99b54fd267702734fd7ffe610ca27f533ddb95a"
checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
[[package]]
name = "openssl-sys"
@ -2996,7 +3073,7 @@ checksum = "54be6e404f5317079812fc8f9f5279de376d8856929e21c184ecf6bbd692a11d"
dependencies = [
"maplit",
"pest",
"sha-1",
"sha-1 0.8.2",
]
[[package]]
@ -3088,7 +3165,7 @@ dependencies = [
"lazy_static",
"libc",
"log",
"nix 0.23.1",
"nix",
"parking_lot",
"prost",
"prost-build",
@ -3128,9 +3205,9 @@ dependencies = [
[[package]]
name = "predicates"
version = "2.1.0"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95e5a7689e456ab905c22c2b48225bb921aba7c8dfa58440d68ba13f6222a715"
checksum = "a5aab5be6e4732b473071984b3164dbbfb7a3674d30ea5ff44410b6bcd960c3c"
dependencies = [
"difflib",
"float-cmp",
@ -3142,15 +3219,15 @@ dependencies = [
[[package]]
name = "predicates-core"
version = "1.0.2"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57e35a3326b75e49aa85f5dc6ec15b41108cf5aee58eabb1f274dd18b73c2451"
checksum = "da1c2388b1513e1b605fcec39a95e0a9e8ef088f71443ef37099fa9ae6673fcb"
[[package]]
name = "predicates-tree"
version = "1.0.4"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "338c7be2905b732ae3984a2f40032b5e94fd8f52505b186c7d4d68d193445df7"
checksum = "4d86de6de25020a36c6d3643a86d9a6a9f552107c0559c60ea03551b5e16c032"
dependencies = [
"predicates-core",
"termtree",
@ -3337,9 +3414,9 @@ dependencies = [
[[package]]
name = "quote"
version = "1.0.10"
version = "1.0.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38bc8cc6a5f2e3655e0899c1b848643b2562f853f114bfec7be120678e3ace05"
checksum = "47aa80447ce4daf1717500037052af176af5d38cc3e571d9ec1c7353fc10c87d"
dependencies = [
"proc-macro2",
]
@ -3785,9 +3862,9 @@ dependencies = [
[[package]]
name = "rustyline"
version = "9.0.0"
version = "9.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "790487c3881a63489ae77126f57048b42d62d3b2bafbf37453ea19eedb6340d6"
checksum = "db7826789c0e25614b03e5a54a0717a86f9ff6e6e5247f92b369472869320039"
dependencies = [
"bitflags",
"cfg-if",
@ -3796,7 +3873,7 @@ dependencies = [
"libc",
"log",
"memchr",
"nix 0.22.2",
"nix",
"radix_trie",
"scopeguard",
"smallvec",
@ -3867,9 +3944,9 @@ dependencies = [
[[package]]
name = "security-framework"
version = "2.3.1"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23a2ac85147a3a11d77ecf1bc7166ec0b92febfa4461c37944e180f319ece467"
checksum = "d09d3c15d814eda1d6a836f2f2b56a6abc1446c8a34351cb3180d3db92ffe4ce"
dependencies = [
"bitflags",
"core-foundation",
@ -3880,9 +3957,9 @@ dependencies = [
[[package]]
name = "security-framework-sys"
version = "2.4.2"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9dd14d83160b528b7bfd66439110573efcfbe281b17fc2ca9f39f550d619c7e"
checksum = "e90dd10c41c6bfc633da6e0c659bd25d31e0791e5974ac42970267d59eba87f7"
dependencies = [
"core-foundation-sys",
"libc",
@ -3959,12 +4036,12 @@ dependencies = [
[[package]]
name = "serde_urlencoded"
version = "0.7.0"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "edfa57a7f8d9c1d260a549e7224100f6c43d43f9103e06dd8b4095a9b2b43ce9"
checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd"
dependencies = [
"form_urlencoded",
"itoa 0.4.8",
"itoa 1.0.1",
"ryu",
"serde",
]
@ -4051,6 +4128,19 @@ dependencies = [
"opaque-debug 0.2.3",
]
[[package]]
name = "sha-1"
version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99cd6713db3cf16b6c84e06321e049a9b9f699826e16096d23bbcc44d15d51a6"
dependencies = [
"block-buffer 0.9.0",
"cfg-if",
"cpufeatures",
"digest 0.9.0",
"opaque-debug 0.3.0",
]
[[package]]
name = "sha2"
version = "0.9.9"
@ -4101,9 +4191,9 @@ dependencies = [
[[package]]
name = "siphasher"
version = "0.3.7"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "533494a8f9b724d33625ab53c6c4800f7cc445895924a8ef649222dcb76e938b"
checksum = "ba1eead9e94aa5a2e02de9e7839f96a007f686ae7a1d57c7797774810d24908a"
[[package]]
name = "slab"
@ -4113,9 +4203,9 @@ checksum = "9def91fd1e018fe007022791f865d0ccc9b3a0d5001e01aabb8b40e46000afb5"
[[package]]
name = "smallvec"
version = "1.7.0"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ecab6c735a6bb4139c0caafd0cc3635748bbb3acf4550e8138122099251f309"
checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83"
[[package]]
name = "snafu"
@ -4170,6 +4260,17 @@ dependencies = [
"lock_api",
]
[[package]]
name = "sqlformat"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4b7922be017ee70900be125523f38bdd644f4f06a1b16e8fa5a8ee8c34bffd4"
dependencies = [
"itertools",
"nom",
"unicode_categories",
]
[[package]]
name = "sqlparser"
version = "0.13.0"
@ -4179,6 +4280,96 @@ dependencies = [
"log",
]
[[package]]
name = "sqlx"
version = "0.5.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "692749de69603d81e016212199d73a2e14ee20e2def7d7914919e8db5d4d48b9"
dependencies = [
"sqlx-core",
"sqlx-macros",
]
[[package]]
name = "sqlx-core"
version = "0.5.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "518be6f6fff5ca76f985d434f9c37f3662af279642acf730388f271dff7b9016"
dependencies = [
"ahash",
"atoi",
"base64 0.13.0",
"bitflags",
"byteorder",
"bytes",
"crc",
"crossbeam-channel",
"crossbeam-queue",
"crossbeam-utils",
"dirs",
"either",
"futures-channel",
"futures-core",
"futures-intrusive",
"futures-util",
"hashlink",
"hex",
"hmac",
"indexmap",
"itoa 1.0.1",
"libc",
"log",
"md-5",
"memchr",
"once_cell",
"parking_lot",
"percent-encoding",
"rand",
"serde",
"serde_json",
"sha-1 0.9.8",
"sha2",
"smallvec",
"sqlformat",
"sqlx-rt",
"stringprep",
"thiserror",
"tokio-stream",
"url",
"whoami",
]
[[package]]
name = "sqlx-macros"
version = "0.5.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38e45140529cf1f90a5e1c2e561500ca345821a1c513652c8f486bbf07407cc8"
dependencies = [
"dotenv",
"either",
"heck 0.3.3",
"once_cell",
"proc-macro2",
"quote",
"sha2",
"sqlx-core",
"sqlx-rt",
"syn",
"url",
]
[[package]]
name = "sqlx-rt"
version = "0.5.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8061cbaa91ee75041514f67a09398c65a64efed72c90151ecd47593bad53da99"
dependencies = [
"native-tls",
"once_cell",
"tokio",
"tokio-native-tls",
]
[[package]]
name = "stable_deref_trait"
version = "1.2.0"
@ -4203,6 +4394,16 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9091b6114800a5f2141aee1d1b9d6ca3592ac062dc5decb3764ec5895a47b4eb"
[[package]]
name = "stringprep"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ee348cb74b87454fff4b551cbf727025810a004f88aeacae7f85b87f4e9a1c1"
dependencies = [
"unicode-bidi",
"unicode-normalization",
]
[[package]]
name = "strsim"
version = "0.8.0"
@ -4264,9 +4465,9 @@ dependencies = [
[[package]]
name = "syn"
version = "1.0.80"
version = "1.0.85"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d010a1623fbd906d51d650a9916aaefc05ffa0e4053ff7fe601167f3e715d194"
checksum = "a684ac3dcd8913827e18cd09a68384ee66c1de24157e3c556c9ab16d85695fb7"
dependencies = [
"proc-macro2",
"quote",
@ -4670,7 +4871,7 @@ version = "0.1.0"
dependencies = [
"async-trait",
"chrono",
"clap 3.0.6",
"clap 3.0.7",
"futures",
"observability_deps",
"snafu",
@ -4765,9 +4966,9 @@ dependencies = [
[[package]]
name = "tracing-subscriber"
version = "0.3.5"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d81bfa81424cc98cb034b837c985b7a290f592e5b4322f353f94a0ab0f9f594"
checksum = "77be66445c4eeebb934a7340f227bfe7b338173d3f8c00a60a5a58005c9faecf"
dependencies = [
"ansi_term",
"lazy_static",
@ -4805,7 +5006,7 @@ dependencies = [
name = "trogging"
version = "0.1.0"
dependencies = [
"clap 3.0.6",
"clap 3.0.7",
"logfmt",
"observability_deps",
"regex",
@ -4866,6 +5067,12 @@ version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3"
[[package]]
name = "unicode_categories"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e"
[[package]]
name = "untrusted"
version = "0.7.1"
@ -5060,6 +5267,16 @@ dependencies = [
"libc",
]
[[package]]
name = "whoami"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "524b58fa5a20a2fb3014dd6358b70e6579692a56ef6fce928834e488f42f65e8"
dependencies = [
"wasm-bindgen",
"web-sys",
]
[[package]]
name = "winapi"
version = "0.3.9"
@ -5147,9 +5364,14 @@ dependencies = [
name = "workspace-hack"
version = "0.1.0"
dependencies = [
"ahash",
"base64 0.13.0",
"bitflags",
"byteorder",
"bytes",
"cc",
"chrono",
"digest 0.9.0",
"either",
"futures-channel",
"futures-core",
@ -5162,6 +5384,7 @@ dependencies = [
"indexmap",
"log",
"memchr",
"nom",
"num-bigint 0.4.3",
"num-integer",
"num-traits",
@ -5173,6 +5396,7 @@ dependencies = [
"reqwest",
"serde",
"serde_json",
"sha2",
"smallvec",
"syn",
"tokio",
@ -5222,9 +5446,9 @@ checksum = "d2d7d3948613f75c98fd9328cfdcc45acc4d360655289d0a7d4ec931392200a3"
[[package]]
name = "zeroize"
version = "1.4.3"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d68d9dcec5f9b43a30d38c49f91dfedfaac384cb8f085faca366c26207dd1619"
checksum = "cc222aec311c323c717f56060324f32b82da1ce1dd81d9a09aa6a9030bfe08db"
[[package]]
name = "zstd"

View File

@ -18,6 +18,7 @@ members = [
"influxdb_tsm",
"influxdb2_client",
"internal_types",
"iox_catalog",
"iox_data_generator",
"iox_object_store",
"job_registry",

19
iox_catalog/Cargo.toml Normal file
View File

@ -0,0 +1,19 @@
[package]
name = "iox_catalog"
version = "0.1.0"
authors = ["Paul Dix <paul@pauldix.net>"]
edition = "2021"
[dependencies] # In alphabetical order
async-trait = "0.1.42"
chrono = { version = "0.4", default-features = false, features = ["clock"] }
futures = "0.3"
observability_deps = { path = "../observability_deps" }
snafu = "0.7"
sqlx = { version = "0.5", features = [ "runtime-tokio-native-tls" , "postgres" ] }
tokio = { version = "1.13", features = ["full", "io-util", "macros", "parking_lot", "rt-multi-thread", "time"] }
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
workspace-hack = { path = "../workspace-hack"}
[dev-dependencies] # In alphabetical order
dotenv = "0.15.0"

33
iox_catalog/README.md Normal file
View File

@ -0,0 +1,33 @@
# IOx Catalog
This crate contains the code for the IOx Catalog. This includes the definitions of namespaces, their tables,
the columns of those tables and their types, what Parquet files are in object storage and delete tombstones.
There's also some configuration information that the overal distributed system uses for operation.
To run this crate's tests you'll need Postgres installed and running locally. You'll also need to set the
`DATABASE_URL` environment variable so that sqlx will be able to connect to your local DB. For example with
user and password filled in:
```
DATABASE_URL=postgres://<postgres user>:<postgres password>@localhost/iox_shared
```
You'll then need to create the database and run the migrations. You can do this via the sqlx command line.
```
cargo install sqlx-cli
sqlx database create
sqlx migrate run
```
This will set up the database based on the files in `./migrations` in this crate. SQLx also creates a table
to keep track of which migrations have been run.
## Tests
To run the Postgres integration tests, ensure the above setup is complete first.
* Set `DATABASE_URL=<dsn>` env (see above)
* Set `TEST_INTEGRATION=1`
* Run `cargo test`
**CAUTION:** existing data in the database is dropped when tests are run

View File

@ -0,0 +1,235 @@
-- Add migration script here
-- iox_shared schema
BEGIN;
CREATE SCHEMA IF NOT EXISTS iox_catalog;
CREATE TABLE IF NOT EXISTS iox_catalog.kafka_topic
(
id INT GENERATED ALWAYS AS IDENTITY,
name VARCHAR NOT NULL,
PRIMARY KEY (id),
CONSTRAINT kafka_topic_name_unique UNIQUE (name)
);
CREATE TABLE IF NOT EXISTS iox_catalog.query_pool
(
id SMALLINT GENERATED ALWAYS AS IDENTITY,
name VARCHAR NOT NULL,
PRIMARY KEY (id),
CONSTRAINT query_pool_name_unique UNIQUE (name)
);
CREATE TABLE IF NOT EXISTS iox_catalog.namespace
(
id INT GENERATED ALWAYS AS IDENTITY,
name VARCHAR NOT NULL,
retention_duration VARCHAR,
kafka_topic_id integer NOT NULL,
query_pool_id SMALLINT NOT NULL,
PRIMARY KEY (id),
CONSTRAINT namespace_name_unique UNIQUE (name)
);
CREATE TABLE IF NOT EXISTS iox_catalog.table_name
(
id INT GENERATED ALWAYS AS IDENTITY,
namespace_id integer NOT NULL,
name VARCHAR NOT NULL,
PRIMARY KEY (id),
CONSTRAINT table_name_unique UNIQUE (namespace_id, name)
);
CREATE TABLE IF NOT EXISTS iox_catalog.column_name
(
id INT GENERATED ALWAYS AS IDENTITY,
table_id INT NOT NULL,
name VARCHAR NOT NULL,
column_type SMALLINT NOT NULL,
PRIMARY KEY (id),
CONSTRAINT column_name_unique UNIQUE (table_id, name)
);
CREATE TABLE IF NOT EXISTS iox_catalog.sequencer
(
id SMALLINT GENERATED ALWAYS AS IDENTITY,
kafka_topic_id INT NOT NULL,
kafka_partition INT NOT NULL,
min_unpersisted_sequence_number BIGINT,
PRIMARY KEY (id),
CONSTRAINT sequencer_unique UNIQUE (kafka_topic_id, kafka_partition)
);
CREATE TABLE IF NOT EXISTS iox_catalog.sharding_rule_override
(
id INT GENERATED ALWAYS AS IDENTITY,
namespace_id INT NOT NULL,
table_id INT NOT NULL,
column_id INT NOT NULL,
PRIMARY KEY (id)
);
CREATE TABLE IF NOT EXISTS iox_catalog.partition
(
id BIGINT GENERATED ALWAYS AS IDENTITY,
sequencer_id SMALLINT NOT NULL,
table_id INT NOT NULL,
partition_key VARCHAR NOT NULL,
PRIMARY KEY (id),
CONSTRAINT partition_key_unique UNIQUE (table_id, partition_key)
);
CREATE TABLE IF NOT EXISTS iox_catalog.parquet_file
(
id BIGINT GENERATED ALWAYS AS IDENTITY,
sequencer_id SMALLINT NOT NULL,
table_id INT NOT NULL,
partition_id INT NOT NULL,
file_location VARCHAR NOT NULL,
min_sequence_number BIGINT,
max_sequence_number BIGINT,
min_time BIGINT,
max_time BIGINT,
to_delete BOOLEAN,
PRIMARY KEY (id),
CONSTRAINT parquet_location_unique UNIQUE (file_location)
);
CREATE TABLE IF NOT EXISTS iox_catalog.tombstone
(
id BIGINT GENERATED ALWAYS AS IDENTITY,
table_id INT NOT NULL,
sequencer_id SMALLINT NOT NULL,
sequence_number BIGINT NOT NULL,
min_time BIGINT NOT NULL,
max_time BIGINT NOT NULL,
serialized_predicate TEXT NOT NULL,
PRIMARY KEY (id)
);
CREATE TABLE IF NOT EXISTS iox_catalog.processed_tombstone
(
tombstone_id BIGINT NOT NULL,
parquet_file_id BIGINT NOT NULL,
PRIMARY KEY (tombstone_id, parquet_file_id)
);
ALTER TABLE IF EXISTS iox_catalog.namespace
ADD FOREIGN KEY (kafka_topic_id)
REFERENCES iox_catalog.kafka_topic (id) MATCH SIMPLE
ON UPDATE NO ACTION
ON DELETE NO ACTION
NOT VALID;
ALTER TABLE IF EXISTS iox_catalog.namespace
ADD FOREIGN KEY (query_pool_id)
REFERENCES iox_catalog.query_pool (id) MATCH SIMPLE
ON UPDATE NO ACTION
ON DELETE NO ACTION
NOT VALID;
ALTER TABLE IF EXISTS iox_catalog.table_name
ADD FOREIGN KEY (namespace_id)
REFERENCES iox_catalog.namespace (id) MATCH SIMPLE
ON UPDATE NO ACTION
ON DELETE NO ACTION
NOT VALID;
ALTER TABLE IF EXISTS iox_catalog.column_name
ADD FOREIGN KEY (table_id)
REFERENCES iox_catalog.table_name (id) MATCH SIMPLE
ON UPDATE NO ACTION
ON DELETE NO ACTION
NOT VALID;
ALTER TABLE IF EXISTS iox_catalog.sequencer
ADD FOREIGN KEY (kafka_topic_id)
REFERENCES iox_catalog.kafka_topic (id) MATCH SIMPLE
ON UPDATE NO ACTION
ON DELETE NO ACTION
NOT VALID;
ALTER TABLE IF EXISTS iox_catalog.sharding_rule_override
ADD FOREIGN KEY (namespace_id)
REFERENCES iox_catalog.namespace (id) MATCH SIMPLE
ON UPDATE NO ACTION
ON DELETE NO ACTION
NOT VALID;
ALTER TABLE IF EXISTS iox_catalog.sharding_rule_override
ADD FOREIGN KEY (table_id)
REFERENCES iox_catalog.table_name (id) MATCH SIMPLE
ON UPDATE NO ACTION
ON DELETE NO ACTION
NOT VALID;
ALTER TABLE IF EXISTS iox_catalog.sharding_rule_override
ADD FOREIGN KEY (column_id)
REFERENCES iox_catalog.column_name (id) MATCH SIMPLE
ON UPDATE NO ACTION
ON DELETE NO ACTION
NOT VALID;
ALTER TABLE IF EXISTS iox_catalog.partition
ADD FOREIGN KEY (sequencer_id)
REFERENCES iox_catalog.sequencer (id) MATCH SIMPLE
ON UPDATE NO ACTION
ON DELETE NO ACTION
NOT VALID;
ALTER TABLE IF EXISTS iox_catalog.partition
ADD FOREIGN KEY (table_id)
REFERENCES iox_catalog.table_name (id) MATCH SIMPLE
ON UPDATE NO ACTION
ON DELETE NO ACTION
NOT VALID;
ALTER TABLE IF EXISTS iox_catalog.parquet_file
ADD FOREIGN KEY (sequencer_id)
REFERENCES iox_catalog.sequencer (id) MATCH SIMPLE
ON UPDATE NO ACTION
ON DELETE NO ACTION
NOT VALID;
ALTER TABLE IF EXISTS iox_catalog.parquet_file
ADD FOREIGN KEY (table_id)
REFERENCES iox_catalog.table_name (id) MATCH SIMPLE
ON UPDATE NO ACTION
ON DELETE NO ACTION
NOT VALID;
ALTER TABLE IF EXISTS iox_catalog.parquet_file
ADD FOREIGN KEY (partition_id)
REFERENCES iox_catalog.partition (id) MATCH SIMPLE
ON UPDATE NO ACTION
ON DELETE NO ACTION
NOT VALID;
ALTER TABLE IF EXISTS iox_catalog.tombstone
ADD FOREIGN KEY (sequencer_id)
REFERENCES iox_catalog.sequencer (id) MATCH SIMPLE
ON UPDATE NO ACTION
ON DELETE NO ACTION
NOT VALID;
ALTER TABLE IF EXISTS iox_catalog.tombstone
ADD FOREIGN KEY (table_id)
REFERENCES iox_catalog.table_name (id) MATCH SIMPLE
ON UPDATE NO ACTION
ON DELETE NO ACTION
NOT VALID;
ALTER TABLE IF EXISTS iox_catalog.processed_tombstone
ADD FOREIGN KEY (tombstone_id)
REFERENCES iox_catalog.tombstone (id) MATCH SIMPLE
ON UPDATE NO ACTION
ON DELETE NO ACTION
NOT VALID;
ALTER TABLE IF EXISTS iox_catalog.processed_tombstone
ADD FOREIGN KEY (parquet_file_id)
REFERENCES iox_catalog.parquet_file (id) MATCH SIMPLE
ON UPDATE NO ACTION
ON DELETE NO ACTION
NOT VALID;
END;

View File

@ -0,0 +1,392 @@
//! This module contains the traits and data objects for the Catalog API.
use async_trait::async_trait;
use influxdb_line_protocol::FieldValue;
use snafu::Snafu;
use std::collections::BTreeMap;
use std::convert::TryFrom;
use std::fmt::Formatter;
use std::sync::Arc;
#[derive(Debug, Snafu)]
#[allow(missing_copy_implementations, missing_docs)]
pub enum Error {
#[snafu(display("Name {} already exists", name))]
NameExists { name: String },
#[snafu(display("Unhandled sqlx error: {}", source))]
SqlxError { source: sqlx::Error },
#[snafu(display("Foreign key violation: {}", source))]
ForeignKeyViolation { source: sqlx::Error },
#[snafu(display("Column {} is type {} but write has type {}", name, existing, new))]
ColumnTypeMismatch {
name: String,
existing: String,
new: String,
},
#[snafu(display(
"Column type {} is in the db for column {}, which is unknown",
data_type,
name
))]
UnknownColumnType { data_type: i16, name: String },
}
/// A specialized `Error` for Catalog errors
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Container that can return repos for each of the catalog data types.
#[async_trait]
pub trait RepoCollection {
/// repo for kafka topics
fn kafka_topic(&self) -> Arc<dyn KafkaTopicRepo + Sync + Send>;
/// repo fo rquery pools
fn query_pool(&self) -> Arc<dyn QueryPoolRepo + Sync + Send>;
/// repo for namespaces
fn namespace(&self) -> Arc<dyn NamespaceRepo + Sync + Send>;
/// repo for tables
fn table(&self) -> Arc<dyn TableRepo + Sync + Send>;
/// repo for columns
fn column(&self) -> Arc<dyn ColumnRepo + Sync + Send>;
/// repo for sequencers
fn sequencer(&self) -> Arc<dyn SequencerRepo + Sync + Send>;
}
/// Functions for working with Kafka topics in the catalog.
#[async_trait]
pub trait KafkaTopicRepo {
/// Creates the kafka topic in the catalog or gets the existing record by name.
async fn create_or_get(&self, name: &str) -> Result<KafkaTopic>;
}
/// Functions for working with query pools in the catalog.
#[async_trait]
pub trait QueryPoolRepo {
/// Creates the query pool in the catalog or gets the existing record by name.
async fn create_or_get(&self, name: &str) -> Result<QueryPool>;
}
/// Functions for working with namespaces in the catalog
#[async_trait]
pub trait NamespaceRepo {
/// Creates the namespace in the catalog, or get the existing record by name. Then
/// constructs a namespace schema with all tables and columns under the namespace.
async fn create(
&self,
name: &str,
retention_duration: &str,
kafka_topic_id: i32,
query_pool_id: i16,
) -> Result<NamespaceSchema>;
/// Gets the namespace schema including all tables and columns.
async fn get_by_name(&self, name: &str) -> Result<Option<NamespaceSchema>>;
}
/// Functions for working with tables in the catalog
#[async_trait]
pub trait TableRepo {
/// Creates the table in the catalog or get the existing record by name.
async fn create_or_get(&self, name: &str, namespace_id: i32) -> Result<Table>;
/// Lists all tables in the catalog for the given namespace id.
async fn list_by_namespace_id(&self, namespace_id: i32) -> Result<Vec<Table>>;
}
/// Functions for working with columns in the catalog
#[async_trait]
pub trait ColumnRepo {
/// Creates the column in the catalog or returns the existing column. Will return a
/// `Error::ColumnTypeMismatch` if the existing column type doesn't match the type
/// the caller is attempting to create.
async fn create_or_get(
&self,
name: &str,
table_id: i32,
column_type: ColumnType,
) -> Result<Column>;
/// Lists all columns in the passed in namespace id.
async fn list_by_namespace_id(&self, namespace_id: i32) -> Result<Vec<Column>>;
}
/// Functions for working with sequencers in the catalog
#[async_trait]
pub trait SequencerRepo {
/// create a sequencer record for the kafka topic and partition or return the existing record
async fn create_or_get(&self, topic: &KafkaTopic, partition: i32) -> Result<Sequencer>;
/// list all sequencers
async fn list(&self) -> Result<Vec<Sequencer>>;
}
/// Data object for a kafka topic
#[derive(Debug, Eq, PartialEq, sqlx::FromRow)]
pub struct KafkaTopic {
/// The id of the topic
pub id: i32,
/// The unique name of the topic
pub name: String,
}
/// Data object for a query pool
#[derive(Debug, Eq, PartialEq, sqlx::FromRow)]
pub struct QueryPool {
/// The id of the pool
pub id: i16,
/// The unique name of the pool
pub name: String,
}
/// Data object for a namespace
#[derive(Debug, sqlx::FromRow)]
pub struct Namespace {
/// The id of the namespace
pub id: i32,
/// The unique name of the namespace
pub name: String,
/// The retention duration as a string. 'inf' or not present represents infinite duration (i.e. never drop data).
#[sqlx(default)]
pub retention_duration: Option<String>,
/// The kafka topic that writes to this namespace will land in
pub kafka_topic_id: i32,
/// The query pool assigned to answer queries for this namespace
pub query_pool_id: i16,
}
/// Schema collection for a namespace
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct NamespaceSchema {
/// the namespace id
pub id: i32,
/// the kafka topic this namespace gets data written to
pub kafka_topic_id: i32,
/// the query pool assigned to answer queries for this namespace
pub query_pool_id: i16,
/// the tables in the namespace by name
pub tables: BTreeMap<String, TableSchema>,
}
impl NamespaceSchema {
/// Create a new `NamespaceSchema`
pub fn new(id: i32, kafka_topic_id: i32, query_pool_id: i16) -> Self {
Self {
id,
tables: BTreeMap::new(),
kafka_topic_id,
query_pool_id,
}
}
/// Adds tables and columns to the `NamespaceSchema`. These are created
/// incrementally while validating the schema for a write and this helper
/// method takes them in to add them to the schema.
pub fn add_tables_and_columns(
&mut self,
new_tables: BTreeMap<String, i32>,
new_columns: BTreeMap<i32, BTreeMap<String, ColumnSchema>>,
) {
for (table_name, table_id) in new_tables {
self.tables
.entry(table_name)
.or_insert_with(|| TableSchema::new(table_id));
}
for (table_id, new_columns) in new_columns {
let table = self
.get_table_mut(table_id)
.expect("table must be in namespace to add columns");
table.add_columns(new_columns);
}
}
fn get_table_mut(&mut self, table_id: i32) -> Option<&mut TableSchema> {
for table in self.tables.values_mut() {
if table.id == table_id {
return Some(table);
}
}
None
}
}
/// Data object for a table
#[derive(Debug, sqlx::FromRow, Eq, PartialEq)]
pub struct Table {
/// The id of the table
pub id: i32,
/// The namespace id that the table is in
pub namespace_id: i32,
/// The name of the table, which is unique within the associated namespace
pub name: String,
}
/// Column definitions for a table
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct TableSchema {
/// the table id
pub id: i32,
/// the table's columns by their name
pub columns: BTreeMap<String, ColumnSchema>,
}
impl TableSchema {
/// Initialize new `TableSchema`
pub fn new(id: i32) -> Self {
Self {
id,
columns: BTreeMap::new(),
}
}
/// Add the map of columns to the `TableSchema`
pub fn add_columns(&mut self, columns: BTreeMap<String, ColumnSchema>) {
for (name, column) in columns {
self.columns.insert(name, column);
}
}
}
/// Data object for a column
#[derive(Debug, sqlx::FromRow, Eq, PartialEq)]
pub struct Column {
/// the column id
pub id: i32,
/// the table id the column is in
pub table_id: i32,
/// the name of the column, which is unique in the table
pub name: String,
/// the logical type of the column
pub column_type: i16,
}
impl Column {
/// returns true if the column type is a tag
pub fn is_tag(&self) -> bool {
self.column_type == ColumnType::Tag as i16
}
/// returns true if the column type matches the line protocol field value type
pub fn matches_field_type(&self, field_value: &FieldValue) -> bool {
match field_value {
FieldValue::I64(_) => self.column_type == ColumnType::I64 as i16,
FieldValue::U64(_) => self.column_type == ColumnType::U64 as i16,
FieldValue::F64(_) => self.column_type == ColumnType::F64 as i16,
FieldValue::String(_) => self.column_type == ColumnType::String as i16,
FieldValue::Boolean(_) => self.column_type == ColumnType::Bool as i16,
}
}
}
/// The column id and its type for a column
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub struct ColumnSchema {
/// the column id
pub id: i32,
/// the column type
pub column_type: ColumnType,
}
impl ColumnSchema {
/// returns true if the column is a tag
pub fn is_tag(&self) -> bool {
self.column_type == ColumnType::Tag
}
/// returns true if the column matches the line protocol field value type
pub fn matches_field_type(&self, field_value: &FieldValue) -> bool {
matches!(
(field_value, self.column_type),
(FieldValue::I64(_), ColumnType::I64)
| (FieldValue::U64(_), ColumnType::U64)
| (FieldValue::F64(_), ColumnType::F64)
| (FieldValue::String(_), ColumnType::String)
| (FieldValue::Boolean(_), ColumnType::Bool)
)
}
}
/// The column data type
#[allow(missing_docs)]
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum ColumnType {
I64 = 1,
U64 = 2,
F64 = 3,
Bool = 4,
String = 5,
Time = 6,
Tag = 7,
}
impl ColumnType {
/// the short string description of the type
pub fn as_str(&self) -> &'static str {
match self {
ColumnType::I64 => "i64",
ColumnType::U64 => "u64",
ColumnType::F64 => "f64",
ColumnType::Bool => "bool",
ColumnType::String => "string",
ColumnType::Time => "time",
ColumnType::Tag => "tag",
}
}
}
impl std::fmt::Display for ColumnType {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let s = self.as_str();
write!(f, "{}", s)
}
}
impl TryFrom<i16> for ColumnType {
type Error = Box<dyn std::error::Error>;
fn try_from(value: i16) -> std::prelude::rust_2015::Result<Self, Self::Error> {
match value {
x if x == Self::I64 as i16 => Ok(Self::I64),
x if x == Self::U64 as i16 => Ok(Self::U64),
x if x == Self::F64 as i16 => Ok(Self::F64),
x if x == Self::Bool as i16 => Ok(Self::Bool),
x if x == Self::String as i16 => Ok(Self::String),
x if x == Self::Time as i16 => Ok(Self::Time),
x if x == Self::Tag as i16 => Ok(Self::Tag),
_ => Err("invalid column value".into()),
}
}
}
/// Returns the `ColumnType` for the passed in line protocol `FieldValue` type
pub fn column_type_from_field(field_value: &FieldValue) -> ColumnType {
match field_value {
FieldValue::I64(_) => ColumnType::I64,
FieldValue::U64(_) => ColumnType::U64,
FieldValue::F64(_) => ColumnType::F64,
FieldValue::String(_) => ColumnType::String,
FieldValue::Boolean(_) => ColumnType::Bool,
}
}
/// Data object for a sequencer. Only one sequencer record can exist for a given
/// kafka topic and partition (enforced via uniqueness constraint).
#[derive(Debug, Copy, Clone, PartialEq, sqlx::FromRow)]
pub struct Sequencer {
/// the id of the sequencer
pub id: i16,
/// the topic the sequencer is reading from
pub kafka_topic_id: i32,
/// the kafka partition the sequencer is reading from
pub kafka_partition: i32,
/// The minimum unpersisted sequence number. Because different tables
/// can be persisted at different times, it is possible some data has been persisted
/// with a higher sequence number than this. However, all data with a sequence number
/// lower than this must have been persisted to Parquet.
pub min_unpersisted_sequence_number: i64,
}

197
iox_catalog/src/lib.rs Normal file
View File

@ -0,0 +1,197 @@
//! The IOx catalog which keeps track of what namespaces, tables, columns, parquet files,
//! and deletes are in the system. Configuration information for distributing ingest, query
//! and compaction is also stored here.
#![warn(
missing_copy_implementations,
missing_debug_implementations,
missing_docs,
clippy::explicit_iter_loop,
clippy::future_not_send,
clippy::use_self,
clippy::clone_on_ref_ptr
)]
use crate::interface::{
column_type_from_field, ColumnSchema, ColumnType, Error, KafkaTopic, NamespaceSchema,
QueryPool, RepoCollection, Result, Sequencer,
};
use futures::{stream::FuturesOrdered, StreamExt};
use influxdb_line_protocol::ParsedLine;
use std::collections::BTreeMap;
#[allow(dead_code)]
const SHARED_KAFKA_TOPIC: &str = "iox_shared";
const SHARED_QUERY_POOL: &str = SHARED_KAFKA_TOPIC;
const TIME_COLUMN: &str = "time";
pub mod interface;
pub mod postgres;
/// Given the lines of a write request and an in memory schema, this will validate the write
/// against the schema, or if new schema is defined, attempt to insert it into the Postgres
/// catalog. If any new schema is created or found, this function will return a new
/// `NamespaceSchema` struct which can replace the passed in one in cache.
///
/// If another writer attempts to create a column of the same name with a different
/// type at the same time and beats this caller to it, an error will be returned. If another
/// writer adds the same schema before this one, then this will load that schema here.
pub async fn validate_or_insert_schema<T: RepoCollection + Sync + Send>(
lines: Vec<ParsedLine<'_>>,
schema: &NamespaceSchema,
repo: &T,
) -> Result<Option<NamespaceSchema>> {
// table name to table_id
let mut new_tables: BTreeMap<String, i32> = BTreeMap::new();
// table_id to map of column name to column
let mut new_columns: BTreeMap<i32, BTreeMap<String, ColumnSchema>> = BTreeMap::new();
for line in &lines {
let table_name = line.series.measurement.as_str();
match schema.tables.get(table_name) {
Some(table) => {
// validate existing tags or insert in new
if let Some(tagset) = &line.series.tag_set {
for (key, _) in tagset {
match table.columns.get(key.as_str()) {
Some(c) => {
if !c.is_tag() {
return Err(Error::ColumnTypeMismatch {
name: key.to_string(),
existing: c.column_type.to_string(),
new: ColumnType::Tag.to_string(),
});
};
}
None => {
let entry = new_columns.entry(table.id).or_default();
if entry.get(key.as_str()).is_none() {
let column_repo = repo.column();
let column = column_repo
.create_or_get(key.as_str(), table.id, ColumnType::Tag)
.await?;
entry.insert(
column.name,
ColumnSchema {
id: column.id,
column_type: ColumnType::Tag,
},
);
}
}
}
}
}
// validate existing fields or insert
for (key, value) in &line.field_set {
if let Some(column) = table.columns.get(key.as_str()) {
if !column.matches_field_type(value) {
return Err(Error::ColumnTypeMismatch {
name: key.to_string(),
existing: column.column_type.to_string(),
new: column_type_from_field(value).to_string(),
});
}
} else {
let entry = new_columns.entry(table.id).or_default();
if entry.get(key.as_str()).is_none() {
let data_type = column_type_from_field(value);
let column_repo = repo.column();
let column = column_repo
.create_or_get(key.as_str(), table.id, data_type)
.await?;
entry.insert(
column.name,
ColumnSchema {
id: column.id,
column_type: data_type,
},
);
}
}
}
}
None => {
let table_repo = repo.table();
let new_table = table_repo.create_or_get(table_name, schema.id).await?;
let new_table_columns = new_columns.entry(new_table.id).or_default();
let column_repo = repo.column();
if let Some(tagset) = &line.series.tag_set {
for (key, _) in tagset {
let new_column = column_repo
.create_or_get(key.as_str(), new_table.id, ColumnType::Tag)
.await?;
new_table_columns.insert(
new_column.name,
ColumnSchema {
id: new_column.id,
column_type: ColumnType::Tag,
},
);
}
}
for (key, value) in &line.field_set {
let data_type = column_type_from_field(value);
let new_column = column_repo
.create_or_get(key.as_str(), new_table.id, data_type)
.await?;
new_table_columns.insert(
new_column.name,
ColumnSchema {
id: new_column.id,
column_type: data_type,
},
);
}
let time_column = column_repo
.create_or_get(TIME_COLUMN, new_table.id, ColumnType::Time)
.await?;
new_table_columns.insert(
time_column.name,
ColumnSchema {
id: time_column.id,
column_type: ColumnType::Time,
},
);
new_tables.insert(new_table.name, new_table.id);
}
};
}
if !new_tables.is_empty() || !new_columns.is_empty() {
let mut new_schema = schema.clone();
new_schema.add_tables_and_columns(new_tables, new_columns);
return Ok(Some(new_schema));
}
Ok(None)
}
/// Creates or gets records in the catalog for the shared kafka topic, query pool, and sequencers for
/// each of the partitions.
pub async fn create_or_get_default_records<T: RepoCollection + Sync + Send>(
kafka_partition_count: i32,
repo: &T,
) -> Result<(KafkaTopic, QueryPool, BTreeMap<i16, Sequencer>)> {
let kafka_repo = repo.kafka_topic();
let query_repo = repo.query_pool();
let sequencer_repo = repo.sequencer();
let kafka_topic = kafka_repo.create_or_get(SHARED_KAFKA_TOPIC).await?;
let query_pool = query_repo.create_or_get(SHARED_QUERY_POOL).await?;
let sequencers = (1..=kafka_partition_count)
.map(|partition| sequencer_repo.create_or_get(&kafka_topic, partition))
.collect::<FuturesOrdered<_>>()
.map(|v| {
let v = v.expect("failed to create sequencer");
(v.id, v)
})
.collect::<BTreeMap<_, _>>()
.await;
Ok((kafka_topic, query_pool, sequencers))
}

644
iox_catalog/src/postgres.rs Normal file
View File

@ -0,0 +1,644 @@
//! A Postgres backed implementation of the Catalog
use crate::interface::{
Column, ColumnRepo, ColumnSchema, ColumnType, Error, KafkaTopic, KafkaTopicRepo, Namespace,
NamespaceRepo, NamespaceSchema, QueryPool, QueryPoolRepo, RepoCollection, Result, Sequencer,
SequencerRepo, Table, TableRepo, TableSchema,
};
use async_trait::async_trait;
use observability_deps::tracing::info;
use sqlx::{postgres::PgPoolOptions, Executor, Pool, Postgres};
use std::collections::BTreeMap;
use std::convert::TryFrom;
use std::sync::Arc;
use std::time::Duration;
const MAX_CONNECTIONS: u32 = 5;
const CONNECT_TIMEOUT: Duration = Duration::from_secs(2);
const IDLE_TIMEOUT: Duration = Duration::from_secs(500);
#[allow(dead_code)]
const SCHEMA_NAME: &str = "iox_catalog";
/// Connect to the catalog store.
pub async fn connect_catalog_store(
app_name: &'static str,
schema_name: &'static str,
dsn: &str,
) -> Result<Pool<Postgres>, sqlx::Error> {
let pool = PgPoolOptions::new()
.min_connections(1)
.max_connections(MAX_CONNECTIONS)
.connect_timeout(CONNECT_TIMEOUT)
.idle_timeout(IDLE_TIMEOUT)
.test_before_acquire(true)
.after_connect(move |c| {
Box::pin(async move {
// Tag the connection with the provided application name.
c.execute(sqlx::query("SET application_name = '$1';").bind(app_name))
.await?;
let search_path_query = format!("SET search_path TO {}", schema_name);
c.execute(sqlx::query(&search_path_query)).await?;
Ok(())
})
})
.connect(dsn)
.await?;
// Log a connection was successfully established and include the application
// name for cross-correlation between Conductor logs & database connections.
info!(application_name=%app_name, "connected to catalog store");
Ok(pool)
}
struct PostgresCatalog {
pool: Pool<Postgres>,
}
impl RepoCollection for Arc<PostgresCatalog> {
fn kafka_topic(&self) -> Arc<dyn KafkaTopicRepo + Sync + Send> {
Self::clone(self) as Arc<dyn KafkaTopicRepo + Sync + Send>
}
fn query_pool(&self) -> Arc<dyn QueryPoolRepo + Sync + Send> {
Self::clone(self) as Arc<dyn QueryPoolRepo + Sync + Send>
}
fn namespace(&self) -> Arc<dyn NamespaceRepo + Sync + Send> {
Self::clone(self) as Arc<dyn NamespaceRepo + Sync + Send>
}
fn table(&self) -> Arc<dyn TableRepo + Sync + Send> {
Self::clone(self) as Arc<dyn TableRepo + Sync + Send>
}
fn column(&self) -> Arc<dyn ColumnRepo + Sync + Send> {
Self::clone(self) as Arc<dyn ColumnRepo + Sync + Send>
}
fn sequencer(&self) -> Arc<dyn SequencerRepo + Sync + Send> {
Self::clone(self) as Arc<dyn SequencerRepo + Sync + Send>
}
}
#[async_trait]
impl KafkaTopicRepo for PostgresCatalog {
async fn create_or_get(&self, name: &str) -> Result<KafkaTopic> {
let rec = sqlx::query_as::<_, KafkaTopic>(
r#"
INSERT INTO kafka_topic ( name )
VALUES ( $1 )
ON CONFLICT ON CONSTRAINT kafka_topic_name_unique
DO UPDATE SET name = kafka_topic.name RETURNING *;
"#,
)
.bind(&name) // $1
.fetch_one(&self.pool)
.await
.map_err(|e| Error::SqlxError { source: e })?;
Ok(rec)
}
}
#[async_trait]
impl QueryPoolRepo for PostgresCatalog {
async fn create_or_get(&self, name: &str) -> Result<QueryPool> {
let rec = sqlx::query_as::<_, QueryPool>(
r#"
INSERT INTO query_pool ( name )
VALUES ( $1 )
ON CONFLICT ON CONSTRAINT query_pool_name_unique
DO UPDATE SET name = query_pool.name RETURNING *;
"#,
)
.bind(&name) // $1
.fetch_one(&self.pool)
.await
.map_err(|e| Error::SqlxError { source: e })?;
Ok(rec)
}
}
#[async_trait]
impl NamespaceRepo for PostgresCatalog {
async fn create(
&self,
name: &str,
retention_duration: &str,
kafka_topic_id: i32,
query_pool_id: i16,
) -> Result<NamespaceSchema> {
let rec = sqlx::query_as::<_, Namespace>(
r#"
INSERT INTO namespace ( name, retention_duration, kafka_topic_id, query_pool_id )
VALUES ( $1, $2, $3, $4 )
RETURNING *
"#,
)
.bind(&name) // $1
.bind(&retention_duration) // $2
.bind(kafka_topic_id) // $3
.bind(query_pool_id) // $4
.fetch_one(&self.pool)
.await
.map_err(|e| {
if is_unique_violation(&e) {
Error::NameExists {
name: name.to_string(),
}
} else if is_fk_violation(&e) {
Error::ForeignKeyViolation { source: e }
} else {
Error::SqlxError { source: e }
}
})?;
Ok(NamespaceSchema::new(rec.id, kafka_topic_id, query_pool_id))
}
async fn get_by_name(&self, name: &str) -> Result<Option<NamespaceSchema>> {
// TODO: maybe get all the data in a single call to Postgres?
let rec = sqlx::query_as::<_, Namespace>(
r#"
SELECT * FROM namespace WHERE name = $1;
"#,
)
.bind(&name) // $1
.fetch_one(&self.pool)
.await;
if let Err(sqlx::Error::RowNotFound) = rec {
return Ok(None);
}
let namespace = rec.map_err(|e| Error::SqlxError { source: e })?;
// get the columns first just in case someone else is creating schema while we're doing this.
let columns = ColumnRepo::list_by_namespace_id(self, namespace.id).await?;
let tables = TableRepo::list_by_namespace_id(self, namespace.id).await?;
let mut namespace = NamespaceSchema::new(
namespace.id,
namespace.kafka_topic_id,
namespace.query_pool_id,
);
let mut table_id_to_schema = BTreeMap::new();
for t in tables {
table_id_to_schema.insert(t.id, (t.name, TableSchema::new(t.id)));
}
for c in columns {
let (_, t) = table_id_to_schema.get_mut(&c.table_id).unwrap();
match ColumnType::try_from(c.column_type) {
Ok(column_type) => {
t.columns.insert(
c.name,
ColumnSchema {
id: c.id,
column_type,
},
);
}
_ => {
return Err(Error::UnknownColumnType {
data_type: c.column_type,
name: c.name.to_string(),
});
}
}
}
for (_, (table_name, schema)) in table_id_to_schema {
namespace.tables.insert(table_name, schema);
}
return Ok(Some(namespace));
}
}
#[async_trait]
impl TableRepo for PostgresCatalog {
async fn create_or_get(&self, name: &str, namespace_id: i32) -> Result<Table> {
let rec = sqlx::query_as::<_, Table>(
r#"
INSERT INTO table_name ( name, namespace_id )
VALUES ( $1, $2 )
ON CONFLICT ON CONSTRAINT table_name_unique
DO UPDATE SET name = table_name.name RETURNING *;
"#,
)
.bind(&name) // $1
.bind(&namespace_id) // $2
.fetch_one(&self.pool)
.await
.map_err(|e| {
if is_fk_violation(&e) {
Error::ForeignKeyViolation { source: e }
} else {
Error::SqlxError { source: e }
}
})?;
Ok(rec)
}
async fn list_by_namespace_id(&self, namespace_id: i32) -> Result<Vec<Table>> {
let rec = sqlx::query_as::<_, Table>(
r#"
SELECT * FROM table_name
WHERE namespace_id = $1;
"#,
)
.bind(&namespace_id)
.fetch_all(&self.pool)
.await
.map_err(|e| Error::SqlxError { source: e })?;
Ok(rec)
}
}
#[async_trait]
impl ColumnRepo for PostgresCatalog {
async fn create_or_get(
&self,
name: &str,
table_id: i32,
column_type: ColumnType,
) -> Result<Column> {
let ct = column_type as i16;
let rec = sqlx::query_as::<_, Column>(
r#"
INSERT INTO column_name ( name, table_id, column_type )
VALUES ( $1, $2, $3 )
ON CONFLICT ON CONSTRAINT column_name_unique
DO UPDATE SET name = column_name.name RETURNING *;
"#,
)
.bind(&name) // $1
.bind(&table_id) // $2
.bind(&ct) // $3
.fetch_one(&self.pool)
.await
.map_err(|e| {
if is_fk_violation(&e) {
Error::ForeignKeyViolation { source: e }
} else {
Error::SqlxError { source: e }
}
})?;
if rec.column_type != ct {
return Err(Error::ColumnTypeMismatch {
name: name.to_string(),
existing: rec.name,
new: column_type.to_string(),
});
}
Ok(rec)
}
async fn list_by_namespace_id(&self, namespace_id: i32) -> Result<Vec<Column>> {
let rec = sqlx::query_as::<_, Column>(
r#"
SELECT column_name.* FROM table_name
INNER JOIN column_name on column_name.table_id = table_name.id
WHERE table_name.namespace_id = $1;
"#,
)
.bind(&namespace_id)
.fetch_all(&self.pool)
.await
.map_err(|e| Error::SqlxError { source: e })?;
Ok(rec)
}
}
#[async_trait]
impl SequencerRepo for PostgresCatalog {
async fn create_or_get(&self, topic: &KafkaTopic, partition: i32) -> Result<Sequencer> {
sqlx::query_as::<_, Sequencer>(
r#"
INSERT INTO sequencer
( kafka_topic_id, kafka_partition, min_unpersisted_sequence_number )
VALUES
( $1, $2, 0 )
ON CONFLICT ON CONSTRAINT sequencer_unique
DO UPDATE SET kafka_topic_id = sequencer.kafka_topic_id RETURNING *;
"#,
)
.bind(&topic.id) // $1
.bind(&partition) // $2
.fetch_one(&self.pool)
.await
.map_err(|e| {
if is_fk_violation(&e) {
Error::ForeignKeyViolation { source: e }
} else {
Error::SqlxError { source: e }
}
})
}
async fn list(&self) -> Result<Vec<Sequencer>> {
sqlx::query_as::<_, Sequencer>(r#"SELECT * FROM sequencer;"#)
.fetch_all(&self.pool)
.await
.map_err(|e| Error::SqlxError { source: e })
}
}
/// The error code returned by Postgres for a unique constraint violation.
///
/// See <https://www.postgresql.org/docs/9.2/errcodes-appendix.html>
const PG_UNIQUE_VIOLATION: &str = "23505";
/// Returns true if `e` is a unique constraint violation error.
fn is_unique_violation(e: &sqlx::Error) -> bool {
if let sqlx::Error::Database(inner) = e {
if let Some(code) = inner.code() {
if code == PG_UNIQUE_VIOLATION {
return true;
}
}
}
false
}
/// Error code returned by Postgres for a foreign key constraint violation.
const PG_FK_VIOLATION: &str = "23503";
fn is_fk_violation(e: &sqlx::Error) -> bool {
if let sqlx::Error::Database(inner) = e {
if let Some(code) = inner.code() {
if code == PG_FK_VIOLATION {
return true;
}
}
}
false
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{create_or_get_default_records, validate_or_insert_schema};
use futures::{stream::FuturesOrdered, StreamExt};
use influxdb_line_protocol::parse_lines;
use std::env;
// Helper macro to skip tests if TEST_INTEGRATION and the AWS environment variables are not set.
macro_rules! maybe_skip_integration {
() => {{
dotenv::dotenv().ok();
let required_vars = ["DATABASE_URL"];
let unset_vars: Vec<_> = required_vars
.iter()
.filter_map(|&name| match env::var(name) {
Ok(_) => None,
Err(_) => Some(name),
})
.collect();
let unset_var_names = unset_vars.join(", ");
let force = env::var("TEST_INTEGRATION");
if force.is_ok() && !unset_var_names.is_empty() {
panic!(
"TEST_INTEGRATION is set, \
but variable(s) {} need to be set",
unset_var_names
);
} else if force.is_err() {
eprintln!(
"skipping Postgres integration test - set {}TEST_INTEGRATION to run",
if unset_var_names.is_empty() {
String::new()
} else {
format!("{} and ", unset_var_names)
}
);
return;
}
}};
}
async fn setup_db() -> (Arc<PostgresCatalog>, KafkaTopic, QueryPool) {
let dsn = std::env::var("DATABASE_URL").unwrap();
let pool = connect_catalog_store("test", SCHEMA_NAME, &dsn)
.await
.unwrap();
let postgres_catalog = Arc::new(PostgresCatalog { pool });
let (kafka_topic, query_pool, _) = create_or_get_default_records(2, &postgres_catalog)
.await
.unwrap();
(postgres_catalog, kafka_topic, query_pool)
}
#[tokio::test]
async fn test_catalog() {
// If running an integration test on your laptop, this requires that you have Postgres
// running and that you've done the sqlx migrations. See the README in this crate for
// info to set it up.
maybe_skip_integration!();
let (postgres, kafka_topic, query_pool) = setup_db().await;
clear_schema(&postgres.pool).await;
let namespace = NamespaceRepo::create(postgres.as_ref(), "foo", "inf", 0, 0).await;
assert!(matches!(
namespace.unwrap_err(),
Error::ForeignKeyViolation { source: _ }
));
let namespace = NamespaceRepo::create(
postgres.as_ref(),
"foo",
"inf",
kafka_topic.id,
query_pool.id,
)
.await
.unwrap();
assert!(namespace.id > 0);
assert_eq!(namespace.kafka_topic_id, kafka_topic.id);
assert_eq!(namespace.query_pool_id, query_pool.id);
// test that we can create or get a table
let t = TableRepo::create_or_get(postgres.as_ref(), "foo", namespace.id)
.await
.unwrap();
let tt = TableRepo::create_or_get(postgres.as_ref(), "foo", namespace.id)
.await
.unwrap();
assert!(t.id > 0);
assert_eq!(t, tt);
// test that we can craete or get a column
let c = ColumnRepo::create_or_get(postgres.as_ref(), "foo", t.id, ColumnType::I64)
.await
.unwrap();
let cc = ColumnRepo::create_or_get(postgres.as_ref(), "foo", t.id, ColumnType::I64)
.await
.unwrap();
assert!(c.id > 0);
assert_eq!(c, cc);
// test that attempting to create an already defined column of a different type returns error
let err = ColumnRepo::create_or_get(postgres.as_ref(), "foo", t.id, ColumnType::F64)
.await
.expect_err("should error with wrong column type");
assert!(matches!(
err,
Error::ColumnTypeMismatch {
name: _,
existing: _,
new: _
}
));
// now test with a new namespace
let namespace = NamespaceRepo::create(
postgres.as_ref(),
"asdf",
"inf",
kafka_topic.id,
query_pool.id,
)
.await
.unwrap();
let data = r#"
m1,t1=a,t2=b f1=2i,f2=2.0 1
m1,t1=a f1=3i 2
m2,t3=b f1=true 1
"#;
// test that new schema gets returned
let lines: Vec<_> = parse_lines(data).map(|l| l.unwrap()).collect();
let schema = Arc::new(NamespaceSchema::new(
namespace.id,
namespace.kafka_topic_id,
namespace.query_pool_id,
));
let new_schema = validate_or_insert_schema(lines, &schema, &postgres)
.await
.unwrap();
let new_schema = new_schema.unwrap();
// ensure new schema is in the db
let schema_from_db = NamespaceRepo::get_by_name(postgres.as_ref(), "asdf")
.await
.unwrap()
.unwrap();
assert_eq!(new_schema, schema_from_db);
// test that a new table will be created
let data = r#"
m1,t1=c f1=1i 2
new_measurement,t9=a f10=true 1
"#;
let lines: Vec<_> = parse_lines(data).map(|l| l.unwrap()).collect();
let new_schema = validate_or_insert_schema(lines, &schema_from_db, &postgres)
.await
.unwrap()
.unwrap();
let new_table = new_schema.tables.get("new_measurement").unwrap();
assert_eq!(
ColumnType::Bool,
new_table.columns.get("f10").unwrap().column_type
);
assert_eq!(
ColumnType::Tag,
new_table.columns.get("t9").unwrap().column_type
);
let schema = NamespaceRepo::get_by_name(postgres.as_ref(), "asdf")
.await
.unwrap()
.unwrap();
assert_eq!(new_schema, schema);
// test that a new column for an existing table will be created
// test that a new table will be created
let data = r#"
m1,new_tag=c new_field=1i 2
"#;
let lines: Vec<_> = parse_lines(data).map(|l| l.unwrap()).collect();
let new_schema = validate_or_insert_schema(lines, &schema, &postgres)
.await
.unwrap()
.unwrap();
let table = new_schema.tables.get("m1").unwrap();
assert_eq!(
ColumnType::I64,
table.columns.get("new_field").unwrap().column_type
);
assert_eq!(
ColumnType::Tag,
table.columns.get("new_tag").unwrap().column_type
);
let schema = NamespaceRepo::get_by_name(postgres.as_ref(), "asdf")
.await
.unwrap()
.unwrap();
assert_eq!(new_schema, schema);
}
#[tokio::test]
async fn test_sequencers() {
maybe_skip_integration!();
let (postgres, kafka_topic, _query_pool) = setup_db().await;
clear_schema(&postgres.pool).await;
// Create 10 sequencers
let created = (1..=10)
.map(|partition| {
SequencerRepo::create_or_get(postgres.as_ref(), &kafka_topic, partition)
})
.collect::<FuturesOrdered<_>>()
.map(|v| {
let v = v.expect("failed to create sequencer");
(v.id, v)
})
.collect::<BTreeMap<_, _>>()
.await;
// List them and assert they match
let listed = SequencerRepo::list(postgres.as_ref())
.await
.expect("failed to list sequencers")
.into_iter()
.map(|v| (v.id, v))
.collect::<BTreeMap<_, _>>();
assert_eq!(created, listed);
}
async fn clear_schema(pool: &Pool<Postgres>) {
sqlx::query("delete from column_name;")
.execute(pool)
.await
.unwrap();
sqlx::query("delete from table_name;")
.execute(pool)
.await
.unwrap();
sqlx::query("delete from namespace;")
.execute(pool)
.await
.unwrap();
sqlx::query("delete from sequencer;")
.execute(pool)
.await
.unwrap();
}
}

View File

@ -13,8 +13,13 @@ publish = false
### BEGIN HAKARI SECTION
[dependencies]
ahash = { version = "0.7", features = ["std"] }
base64 = { version = "0.13", features = ["std"] }
bitflags = { version = "1" }
byteorder = { version = "1", features = ["std"] }
bytes = { version = "1", features = ["std"] }
chrono = { version = "0.4", default-features = false, features = ["alloc", "clock", "libc", "std", "winapi"] }
digest = { version = "0.9", default-features = false, features = ["alloc", "std"] }
either = { version = "1", features = ["use_std"] }
futures-channel = { version = "0.3", features = ["alloc", "futures-sink", "sink", "std"] }
futures-core = { version = "0.3", features = ["alloc", "std"] }
@ -27,6 +32,7 @@ hyper = { version = "0.14", features = ["client", "full", "h2", "http1", "http2"
indexmap = { version = "1", default-features = false, features = ["std"] }
log = { version = "0.4", default-features = false, features = ["std"] }
memchr = { version = "2", features = ["std"] }
nom = { version = "7", features = ["alloc", "std"] }
num-bigint = { version = "0.4", features = ["std"] }
num-integer = { version = "0.1", default-features = false, features = ["i128", "std"] }
num-traits = { version = "0.2", features = ["i128", "libm", "std"] }
@ -36,11 +42,12 @@ regex = { version = "1", features = ["aho-corasick", "memchr", "perf", "perf-cac
regex-automata = { version = "0.1", features = ["regex-syntax", "std"] }
regex-syntax = { version = "0.6", features = ["unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }
reqwest = { version = "0.11", features = ["__tls", "default-tls", "hyper-tls", "json", "native-tls-crate", "serde_json", "tokio-native-tls"] }
serde = { version = "1", features = ["derive", "serde_derive", "std"] }
serde_json = { version = "1", features = ["arbitrary_precision", "indexmap", "preserve_order", "std"] }
serde = { version = "1", features = ["derive", "rc", "serde_derive", "std"] }
serde_json = { version = "1", features = ["arbitrary_precision", "indexmap", "preserve_order", "raw_value", "std"] }
sha2 = { version = "0.9", features = ["std"] }
smallvec = { version = "1", default-features = false, features = ["union"] }
tokio = { version = "1", features = ["bytes", "fs", "full", "io-std", "io-util", "libc", "macros", "memchr", "mio", "net", "num_cpus", "once_cell", "parking_lot", "process", "rt", "rt-multi-thread", "signal", "signal-hook-registry", "sync", "time", "tokio-macros", "winapi"] }
tokio-stream = { version = "0.1", features = ["net", "time"] }
tokio-stream = { version = "0.1", features = ["fs", "net", "time"] }
tokio-util = { version = "0.6", features = ["codec", "io"] }
tower = { version = "0.4", features = ["balance", "buffer", "discover", "futures-util", "indexmap", "limit", "load", "log", "make", "rand", "ready-cache", "slab", "timeout", "tokio", "tokio-stream", "tokio-util", "tracing", "util"] }
tracing = { version = "0.1", features = ["attributes", "log", "max_level_trace", "release_max_level_debug", "std", "tracing-attributes"] }
@ -48,16 +55,36 @@ tracing-core = { version = "0.1", features = ["lazy_static", "std"] }
tracing-subscriber = { version = "0.3", features = ["alloc", "ansi", "ansi_term", "env-filter", "fmt", "lazy_static", "matchers", "regex", "registry", "sharded-slab", "smallvec", "std", "thread_local", "tracing", "tracing-log"] }
[build-dependencies]
ahash = { version = "0.7", features = ["std"] }
base64 = { version = "0.13", features = ["std"] }
bitflags = { version = "1" }
byteorder = { version = "1", features = ["std"] }
bytes = { version = "1", features = ["std"] }
cc = { version = "1", default-features = false, features = ["jobserver", "parallel"] }
digest = { version = "0.9", default-features = false, features = ["alloc", "std"] }
either = { version = "1", features = ["use_std"] }
futures-channel = { version = "0.3", features = ["alloc", "futures-sink", "sink", "std"] }
futures-core = { version = "0.3", features = ["alloc", "std"] }
futures-sink = { version = "0.3", features = ["alloc", "std"] }
futures-task = { version = "0.3", default-features = false, features = ["alloc", "std"] }
futures-util = { version = "0.3", features = ["alloc", "async-await", "async-await-macro", "channel", "futures-channel", "futures-io", "futures-macro", "futures-sink", "io", "memchr", "sink", "slab", "std"] }
getrandom = { version = "0.2", default-features = false, features = ["std"] }
hashbrown = { version = "0.11", features = ["ahash", "inline-more", "raw"] }
indexmap = { version = "1", default-features = false, features = ["std"] }
log = { version = "0.4", default-features = false, features = ["std"] }
memchr = { version = "2", features = ["std"] }
nom = { version = "7", features = ["alloc", "std"] }
num-traits = { version = "0.2", features = ["i128", "libm", "std"] }
once_cell = { version = "1", features = ["alloc", "parking_lot", "race", "std"] }
rand = { version = "0.8", features = ["alloc", "getrandom", "libc", "rand_chacha", "rand_hc", "small_rng", "std", "std_rng"] }
regex = { version = "1", features = ["aho-corasick", "memchr", "perf", "perf-cache", "perf-dfa", "perf-inline", "perf-literal", "std", "unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }
regex-syntax = { version = "0.6", features = ["unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }
serde = { version = "1", features = ["derive", "serde_derive", "std"] }
serde = { version = "1", features = ["derive", "rc", "serde_derive", "std"] }
serde_json = { version = "1", features = ["arbitrary_precision", "indexmap", "preserve_order", "raw_value", "std"] }
sha2 = { version = "0.9", features = ["std"] }
smallvec = { version = "1", default-features = false, features = ["union"] }
syn = { version = "1", features = ["clone-impls", "derive", "extra-traits", "full", "parsing", "printing", "proc-macro", "quote", "visit", "visit-mut"] }
tokio = { version = "1", features = ["bytes", "fs", "full", "io-std", "io-util", "libc", "macros", "memchr", "mio", "net", "num_cpus", "once_cell", "parking_lot", "process", "rt", "rt-multi-thread", "signal", "signal-hook-registry", "sync", "time", "tokio-macros", "winapi"] }
tokio-stream = { version = "0.1", features = ["fs", "net", "time"] }
### END HAKARI SECTION