feat: gitops adapter (#3656)

* feat: scaffolding of gitops adapter bin crate

* chore: refactor gitops adapter; calls CLI now; status update fixes

* feat: gitops adapter now calls out to CLI once per topic; improved tests

* chore: add mock failure script for gitops adapter

* chore: update workspace-hack

* chore: refactor away unecessary to_string in gitops syncer

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Luke Bond 2022-02-08 13:27:36 +00:00 committed by GitHub
parent 53cbc549e7
commit de2a013786
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 1247 additions and 6 deletions

338
Cargo.lock generated
View File

@ -841,6 +841,51 @@ dependencies = [
"syn",
]
[[package]]
name = "darling"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0d720b8683f8dd83c65155f0530560cba68cd2bf395f6513a483caee57ff7f4"
dependencies = [
"darling_core",
"darling_macro",
]
[[package]]
name = "darling_core"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a340f241d2ceed1deb47ae36c4144b2707ec7dd0b649f894cb39bb595986324"
dependencies = [
"fnv",
"ident_case",
"proc-macro2",
"quote",
"strsim 0.10.0",
"syn",
]
[[package]]
name = "darling_macro"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72c41b3b7352feb3211a0d743dc5700a4e3b60f51bd2b368892d1e0f9a95f44b"
dependencies = [
"darling_core",
"quote",
"syn",
]
[[package]]
name = "dashmap"
version = "4.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e77a43b28d0668df09411cb0bc9a8c2adc40f9a048afe863e05fd43251e8e39c"
dependencies = [
"cfg-if",
"num_cpus",
]
[[package]]
name = "data_types"
version = "0.1.0"
@ -962,6 +1007,17 @@ dependencies = [
"uuid",
]
[[package]]
name = "derivative"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fcc3dd5e9e9c0b295d6e1e4d811fb6f157d5ffd784b8d202fc62eac8035a770b"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "diff"
version = "0.1.12"
@ -1428,6 +1484,36 @@ version = "0.26.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78cc372d058dcf6d5ecd98510e7fbc9e5aec4d21de70f65fea8fecebcd881bd4"
[[package]]
name = "gitops_adapter"
version = "0.1.0"
dependencies = [
"assert_matches",
"async-trait",
"chrono",
"clap 3.0.13",
"dotenv",
"futures",
"glob",
"k8s-openapi",
"kube",
"kube-derive",
"kube-runtime",
"parking_lot 0.11.2",
"pbjson-build",
"prost",
"schemars",
"serde",
"serde_json",
"thiserror",
"tokio",
"tonic",
"tonic-build",
"tracing",
"trogging",
"workspace-hack",
]
[[package]]
name = "glob"
version = "0.3.0"
@ -1690,6 +1776,12 @@ dependencies = [
"tokio-native-tls",
]
[[package]]
name = "ident_case"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39"
[[package]]
name = "idna"
version = "0.2.3"
@ -2129,6 +2221,28 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "json-patch"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f995a3c8f2bc3dd52a18a583e90f9ec109c047fa1603a853e46bcda14d2e279d"
dependencies = [
"serde",
"serde_json",
"treediff",
]
[[package]]
name = "jsonpath_lib"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eaa63191d68230cccb81c5aa23abd53ed64d83337cacbb25a7b8c7979523774f"
dependencies = [
"log",
"serde",
"serde_json",
]
[[package]]
name = "jsonwebtoken"
version = "7.2.0"
@ -2136,13 +2250,126 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "afabcc15e437a6484fc4f12d0fd63068fe457bf93f1c148d3d9649c60b103f32"
dependencies = [
"base64 0.12.3",
"pem",
"pem 0.8.3",
"ring",
"serde",
"serde_json",
"simple_asn1",
]
[[package]]
name = "k8s-openapi"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4f8de9873b904e74b3533f77493731ee26742418077503683db44e1b3c54aa5c"
dependencies = [
"base64 0.13.0",
"bytes",
"chrono",
"schemars",
"serde",
"serde-value",
"serde_json",
]
[[package]]
name = "kube"
version = "0.64.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "84dcc2f8ca3f2427a72acc31fa9538159f6b33a97002e315a3fcd5323cf51a2b"
dependencies = [
"k8s-openapi",
"kube-client",
"kube-core",
]
[[package]]
name = "kube-client"
version = "0.64.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8957106140aa24a76de3f7d005966f381b30a4cd6a9c003b3bba6828e9617535"
dependencies = [
"base64 0.13.0",
"bytes",
"chrono",
"dirs-next",
"either",
"futures",
"http",
"http-body",
"hyper",
"hyper-timeout",
"hyper-tls",
"jsonpath_lib",
"k8s-openapi",
"kube-core",
"openssl",
"pem 1.0.2",
"pin-project",
"serde",
"serde_json",
"serde_yaml",
"thiserror",
"tokio",
"tokio-native-tls",
"tokio-util",
"tower",
"tower-http",
"tracing",
]
[[package]]
name = "kube-core"
version = "0.64.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2ec73e7d8e937dd055d962af06e635e262fdb6ed341c36ecf659d4fece0a8005"
dependencies = [
"chrono",
"form_urlencoded",
"http",
"json-patch",
"k8s-openapi",
"once_cell",
"serde",
"serde_json",
"thiserror",
]
[[package]]
name = "kube-derive"
version = "0.64.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c6651bfae82bc23439da1099174b52bcbf68df065dc33317c912e3c5c5cea43c"
dependencies = [
"darling",
"proc-macro2",
"quote",
"serde_json",
"syn",
]
[[package]]
name = "kube-runtime"
version = "0.64.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6b090d3d7b43e2d60fa93ca51b19fe9f2e05a5252c97880fe834f8fa9f2de605"
dependencies = [
"dashmap",
"derivative",
"futures",
"json-patch",
"k8s-openapi",
"kube-client",
"pin-project",
"serde",
"serde_json",
"smallvec",
"thiserror",
"tokio",
"tokio-util",
"tracing",
]
[[package]]
name = "lazy_static"
version = "1.4.0"
@ -2257,6 +2484,12 @@ dependencies = [
"workspace-hack",
]
[[package]]
name = "linked-hash-map"
version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fb9b38af92608140b86b693604b9ffcc5824240a484d1ecd4795bacb2fe88f3"
[[package]]
name = "linux-raw-sys"
version = "0.0.37"
@ -3149,6 +3382,15 @@ dependencies = [
"regex",
]
[[package]]
name = "pem"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e9a3b09a20e374558580a4914d3b7d89bd61b954a5a5e1dcbea98753addb1947"
dependencies = [
"base64 0.13.0",
]
[[package]]
name = "percent-encoding"
version = "2.1.0"
@ -4083,6 +4325,30 @@ dependencies = [
"workspace-hack",
]
[[package]]
name = "schemars"
version = "0.8.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c6b5a3c80cea1ab61f4260238409510e814e38b4b563c06044edf91e7dc070e3"
dependencies = [
"dyn-clone",
"schemars_derive",
"serde",
"serde_json",
]
[[package]]
name = "schemars_derive"
version = "0.8.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41ae4dce13e8614c46ac3c38ef1c0d668b101df6ac39817aebdaa26642ddae9b"
dependencies = [
"proc-macro2",
"quote",
"serde_derive_internals",
"syn",
]
[[package]]
name = "scopeguard"
version = "1.1.0"
@ -4137,6 +4403,16 @@ dependencies = [
"serde_derive",
]
[[package]]
name = "serde-value"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3a1a3341211875ef120e117ea7fd5228530ae7e7036a779fdc9117be6b3282c"
dependencies = [
"ordered-float 2.10.0",
"serde",
]
[[package]]
name = "serde-xml-rs"
version = "0.4.1"
@ -4170,6 +4446,17 @@ dependencies = [
"syn",
]
[[package]]
name = "serde_derive_internals"
version = "0.25.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1dbab34ca63057a1f15280bdf3c39f2b1eb1b54c17e98360e511637aef7418c6"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "serde_json"
version = "1.0.78"
@ -4203,6 +4490,18 @@ dependencies = [
"serde",
]
[[package]]
name = "serde_yaml"
version = "0.8.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4a521f2940385c165a24ee286aa8599633d162077a54bdcae2a6fd5a7bfa7a0"
dependencies = [
"indexmap",
"ryu",
"serde",
"yaml-rust",
]
[[package]]
name = "server"
version = "0.1.0"
@ -4905,6 +5204,7 @@ dependencies = [
"futures-sink",
"log",
"pin-project-lite",
"slab",
"tokio",
]
@ -5011,6 +5311,24 @@ dependencies = [
"tracing",
]
[[package]]
name = "tower-http"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81eca72647e58054bbfa41e6f297c23436f1c60aff6e5eb38455a0f9ca420bb5"
dependencies = [
"base64 0.13.0",
"bytes",
"futures-core",
"futures-util",
"http",
"http-body",
"pin-project",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tower-layer"
version = "0.3.1"
@ -5171,6 +5489,15 @@ dependencies = [
"workspace-hack",
]
[[package]]
name = "treediff"
version = "3.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "761e8d5ad7ce14bb82b7e61ccc0ca961005a275a060b9644a2431aa11553c2ff"
dependencies = [
"serde_json",
]
[[package]]
name = "trogging"
version = "0.1.0"
@ -5669,6 +5996,15 @@ version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2d7d3948613f75c98fd9328cfdcc45acc4d360655289d0a7d4ec931392200a3"
[[package]]
name = "yaml-rust"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56c1936c4cc7a1c9ab21a1ebb602eb942ba868cbd44a99cb7cdc5892335e1c85"
dependencies = [
"linked-hash-map",
]
[[package]]
name = "zeroize"
version = "1.5.2"

View File

@ -9,6 +9,7 @@ members = [
"db",
"dml",
"generated_types",
"gitops_adapter",
"grpc-router",
"grpc-router-test-gen",
"influxdb_iox",

44
gitops_adapter/Cargo.toml Normal file
View File

@ -0,0 +1,44 @@
[package]
name = "gitops_adapter"
version = "0.1.0"
authors = ["Luke Bond <luke.n.bond@gmail.com>"]
edition = "2021"
# Prevent this from being published to crates.io!
publish = false
[[bin]]
name = "iox-gitops-adapter"
path = "src/main.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-trait = "0.1"
chrono = "0.4.15"
clap = { version = "3", features = ["derive", "env"] }
dotenv = "0.15"
futures = "0.3"
k8s-openapi = { version = "0.13.1", features = ["v1_17", "schemars"], default-features = false }
kube = "0.64"
kube-derive = { version = "0.64", default-features = false } # only needed to opt out of schema
kube-runtime = "0.64"
prost = "0.9"
schemars = "0.8.3"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"
tokio = { version = "1.0", features = ["rt-multi-thread", "macros", "parking_lot"] }
tonic = "0.6"
tracing = { version = "0.1", features = ["release_max_level_debug"] }
workspace-hack = { path = "../workspace-hack"}
trogging = { path = "../trogging", default-features = false, features = ["clap"] }
[build-dependencies]
glob = "0.3.0"
pbjson-build = "0.2"
tonic-build = "0.6"
[dev-dependencies]
assert_matches = "1.5"
parking_lot = { version = "0.11.1" }

25
gitops_adapter/build.rs Normal file
View File

@ -0,0 +1,25 @@
use std::process::Command;
fn main() -> Result<(), Box<dyn std::error::Error>> {
// Populate env!(GIT_HASH) with the current git commit
println!("cargo:rustc-env=GIT_HASH={}", get_git_hash());
Ok(())
}
fn get_git_hash() -> String {
let out = match std::env::var("VERSION_HASH") {
Ok(v) => v,
Err(_) => {
let output = Command::new("git")
.args(&["describe", "--always", "--dirty", "--abbrev=64"])
.output()
.expect("failed to execute git rev-parse to read the current git hash");
String::from_utf8(output.stdout).expect("non-utf8 found in git hash")
}
};
assert!(!out.is_empty(), "attempting to embed empty git hash");
out
}

View File

@ -0,0 +1,49 @@
use async_trait::async_trait;
use kube::{
api::{Patch, PatchParams},
Api,
};
use serde_json::json;
use crate::kafka_topic_list::resources::{KafkaTopicList, KafkaTopicListStatus};
#[async_trait]
pub trait KafkaTopicListApi: Send + Sync + Clone + 'static {
/// Gets a KafkaTopicList resource by name.
async fn get_kafka_topic_list(
&self,
kafka_topic_list_name: String,
) -> Result<KafkaTopicList, kube::Error>;
/// Patch status block, if it exists, with the given status.
async fn patch_resource_status(
&self,
kafka_topic_list_name: String,
status: KafkaTopicListStatus,
) -> Result<KafkaTopicList, kube::Error>;
}
#[async_trait]
impl KafkaTopicListApi for Api<KafkaTopicList> {
async fn get_kafka_topic_list(
&self,
kafka_topic_list_name: String,
) -> Result<KafkaTopicList, kube::Error> {
self.get(kafka_topic_list_name.as_str()).await
}
async fn patch_resource_status(
&self,
kafka_topic_list_name: String,
status: KafkaTopicListStatus,
) -> Result<KafkaTopicList, kube::Error> {
let patch_params = PatchParams::default();
let s = json!({ "status": status });
self.patch_status(
kafka_topic_list_name.as_str(),
&patch_params,
&Patch::Merge(&s),
)
.await
}
}

View File

@ -0,0 +1,129 @@
#![allow(missing_docs)]
use std::sync::{mpsc::SyncSender, Arc};
use async_trait::async_trait;
use parking_lot::Mutex;
use crate::kafka_topic_list::{
api::KafkaTopicListApi,
resources::{KafkaTopicList, KafkaTopicListStatus},
};
#[derive(Debug, Clone, PartialEq)]
#[allow(clippy::large_enum_variant)]
pub enum MockKafkaTopicListApiCall {
Get(String),
PatchStatus {
kafka_topic_list_name: String,
status: KafkaTopicListStatus,
},
}
#[derive(Debug, Default)]
pub struct ClientInner {
/// A channel to push call notifications into as they occur.
pub notify: Option<SyncSender<MockKafkaTopicListApiCall>>,
/// A vector of calls in call order for assertions.
pub calls: Vec<MockKafkaTopicListApiCall>,
// Return values
pub get_ret: Vec<Result<KafkaTopicList, kube::Error>>,
pub patch_status_ret: Vec<Result<KafkaTopicList, kube::Error>>,
}
impl ClientInner {
fn record_call(&mut self, c: MockKafkaTopicListApiCall) {
self.calls.push(c.clone());
if let Some(ref n) = self.notify {
let _ = n.send(c);
}
}
}
impl From<ClientInner> for MockKafkaTopicListApi {
fn from(state: ClientInner) -> Self {
Self {
state: Arc::new(Mutex::new(state)),
}
}
}
/// Mock helper to record a call and return the pre-configured value.
///
/// Pushes `$call` to call record, popping `self.$return` and returning it to
/// the caller. If no value exists, the pop attempt causes a panic.
macro_rules! record_and_return {
($self:ident, $call:expr, $return:ident) => {{
let mut state = $self.state.lock();
state.record_call($call);
state.$return.pop().expect("no mock result to return")
}};
}
#[derive(Debug, Default)]
pub struct MockKafkaTopicListApi {
pub state: Arc<Mutex<ClientInner>>,
}
impl MockKafkaTopicListApi {
pub fn with_notify(self, s: SyncSender<MockKafkaTopicListApiCall>) -> Self {
self.state.lock().notify = Some(s);
self
}
pub fn with_get_ret(self, ret: Vec<Result<KafkaTopicList, kube::Error>>) -> Self {
self.state.lock().get_ret = ret;
self
}
pub fn with_patch_status_ret(self, ret: Vec<Result<KafkaTopicList, kube::Error>>) -> Self {
self.state.lock().patch_status_ret = ret;
self
}
pub fn get_calls(&self) -> Vec<MockKafkaTopicListApiCall> {
self.state.lock().calls.clone()
}
}
#[async_trait]
impl KafkaTopicListApi for Arc<MockKafkaTopicListApi> {
/// Gets a KafkaTopicList resource by name.
async fn get_kafka_topic_list(
&self,
kafka_topic_list_name: String,
) -> Result<KafkaTopicList, kube::Error> {
record_and_return!(
self,
MockKafkaTopicListApiCall::Get(kafka_topic_list_name,),
get_ret
)
}
/// Patch status block, if it exists, with the given status.
async fn patch_resource_status(
&self,
kafka_topic_list_name: String,
status: KafkaTopicListStatus,
) -> Result<KafkaTopicList, kube::Error> {
record_and_return!(
self,
MockKafkaTopicListApiCall::PatchStatus {
kafka_topic_list_name,
status,
},
patch_status_ret
)
}
}
/// Cloning a client shares the same mock state across both client instances.
impl Clone for MockKafkaTopicListApi {
fn clone(&self) -> Self {
Self {
state: Arc::clone(&self.state),
}
}
}

View File

@ -0,0 +1,5 @@
pub mod api;
pub mod resources;
#[cfg(test)]
pub mod mock_api;

View File

@ -0,0 +1,108 @@
use kube_derive::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
#[derive(CustomResource, Debug, Serialize, Deserialize, Clone, JsonSchema)]
#[kube(
group = "iox.influxdata.com",
version = "v1alpha1",
kind = "KafkaTopicList",
namespaced,
shortname = "topics"
)]
#[kube(status = "KafkaTopicListStatus")]
#[serde(rename_all = "camelCase")]
pub struct KafkaTopicListSpec {
topics: Vec<String>,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct KafkaTopicListStatus {
conditions: Vec<KafkaTopicListStatusCondition>,
observed_generation: i64, // type matches that of metadata.generation
}
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct KafkaTopicListStatusCondition {
type_: String,
status: String,
message: String,
last_transition_time: String,
last_update_time: String,
}
impl KafkaTopicListSpec {
pub fn new(topics: Vec<String>) -> Self {
Self { topics }
}
pub fn topics(&self) -> &Vec<String> {
&self.topics
}
}
impl KafkaTopicListStatus {
pub fn conditions(&self) -> &Vec<KafkaTopicListStatusCondition> {
&self.conditions
}
pub fn conditions_mut(&mut self) -> &mut Vec<KafkaTopicListStatusCondition> {
&mut self.conditions
}
pub fn observed_generation(&self) -> i64 {
self.observed_generation
}
pub fn set_observed_generation(&mut self, observed_generation: i64) {
self.observed_generation = observed_generation;
}
}
impl KafkaTopicListStatusCondition {
pub fn new(
type_: String,
status: String,
message: String,
last_transition_time: String,
last_update_time: String,
) -> Self {
Self {
type_,
status,
message,
last_transition_time,
last_update_time,
}
}
pub fn type_(&self) -> &String {
&self.type_
}
pub fn status(&self) -> &String {
&self.status
}
pub fn message(&self) -> &String {
&self.message
}
pub fn last_transition_time(&self) -> &String {
&self.last_transition_time
}
pub fn last_update_time(&self) -> &String {
&self.last_update_time
}
}
impl PartialEq for KafkaTopicListStatusCondition {
// just for assertions in tests; too tedious to have to have the items the same
// too
fn eq(&self, other: &Self) -> bool {
self.type_ == other.type_ && self.status == other.status && self.message == other.message
}
}

537
gitops_adapter/src/main.rs Normal file
View File

@ -0,0 +1,537 @@
use std::{
io::ErrorKind,
sync::Arc,
time::{Duration, SystemTime},
};
use chrono::{DateTime, Utc};
use dotenv::dotenv;
use futures::StreamExt;
use kube::{api::ListParams, Api, Client as K8sClient};
use kube_runtime::controller::{Context, Controller, ReconcilerAction};
use std::process::Command as Cmd;
use thiserror::Error;
use tracing::*;
use trogging::{cli::LoggingConfig, LogFormat};
use crate::kafka_topic_list::{
api::KafkaTopicListApi,
resources::{KafkaTopicList, KafkaTopicListStatus, KafkaTopicListStatusCondition},
};
pub mod kafka_topic_list;
static CONDITION_TYPE_RECONCILED: &str = "Reconciled";
static CONDITION_STATUS_TRUE: &str = "True";
static CONDITION_STATUS_FALSE: &str = "False";
#[derive(Debug, Error)]
enum CatalogError {
#[error("Malformed KafkaTopicList resource: {message}")]
MalformedKafkaTopicListResource { message: String },
#[error("Request to patch status of k8s custom resource failed: {0}")]
PatchStatusError(#[from] kube::Error),
#[error("Failed to execute iox binary to update catalog: {0}")]
IOxBinaryExecFailed(#[from] std::io::Error),
#[error("Request to update catalog with topic failed: {stderr}")]
UpdateTopicError { stderr: String },
#[error("Failed to parse stdout of catalog update command to ID: {0}")]
TopicIdParseError(#[from] std::num::ParseIntError),
}
// Config defines the runtime configuration variables settable on the command
// line.
//
// These fields are automatically converted into a [Clap] CLI.
//
// This has an `allow(missing_docs)` annotation as otherwise the comment is
// added to the CLI help text.
//
// [Clap]: https://github.com/clap-rs/clap
#[derive(Debug, clap::Parser)]
#[clap(
name = "iox-gitops-adapter",
about = "Adapter to configure IOx Catalog from Kubernetes Custom Resources",
long_about = r#"Kubernetes controller responsible for synchronising the IOx Catalog to cluster configuration in a Kubernetes Custom Resource.
Examples:
# Run the gitops adapter server:
iox-gitops-adapter
# See all configuration options
iox-gitops-adapter --help
"#,
version = concat!(env!("CARGO_PKG_VERSION"), " - ", env!("GIT_HASH"))
)]
#[allow(missing_docs)]
pub struct Config {
/// Configure the log level & filter.
///
/// Example values:
/// iox_gitops_adapter=debug
#[clap(flatten)]
logging_config: LoggingConfig,
/// Configure the Kubernetes namespace where custom resources are found.
///
/// Example values:
/// namespace=conductor
#[clap(long = "--namespace", env = "GITOPS_ADAPTER_NAMESPACE")]
namespace: String,
/// Configure the Catalog's Postgres DSN.
///
/// Example values:
/// catalog-dsn=postgres://postgres:postgres@localhost:5432/iox_shared
#[clap(long = "--catalog-dsn", env = "GITOPS_ADAPTER_CATALOG_DSN")]
catalog_dsn: String,
/// Configure the path to the IOx CLI.
///
/// Example values:
/// iox-cli=/usr/bin/influxdb_iox
#[clap(long = "--iox-cli", env = "GITOPS_ADAPTER_IOX_CLI")]
iox_cli: String,
}
#[derive(Debug, clap::Parser)]
enum Command {
Config,
}
impl Config {
/// Returns the (possibly invalid) log filter string.
pub fn log_filter(&self) -> &Option<String> {
&self.logging_config.log_filter
}
/// Returns the (possibly invalid) log format string.
pub fn log_format(&self) -> &LogFormat {
&self.logging_config.log_format
}
}
/// Load the config.
///
/// This pulls in config from the following sources, in order of precedence:
///
/// - command line arguments
/// - user set environment variables
/// - .env file contents
/// - pre-configured default values
pub fn load_config() -> Result<Config, Box<dyn std::error::Error>> {
// Source the .env file before initialising the Config struct - this sets
// any envs in the file, which the Config struct then uses.
//
// Precedence is given to existing env variables.
match dotenv() {
Ok(_) => {}
Err(dotenv::Error::Io(err)) if err.kind() == ErrorKind::NotFound => {
// Ignore this - a missing env file is not an error,
// defaults will be applied when initialising the Config struct.
}
Err(e) => return Err(Box::new(e)),
};
// Load the Config struct - this pulls in any envs set by the user or
// sourced above, and applies any defaults.
Ok(clap::Parser::parse())
}
/// Initialise the tracing subscribers.
fn setup_tracing(
logging_config: &LoggingConfig,
log_env_var: Option<String>,
) -> Result<trogging::TroggingGuard, trogging::Error> {
let drop_handle = logging_config
.to_builder()
.with_default_log_filter(log_env_var.unwrap_or_else(|| "info".to_string()))
.install_global()?;
trace!("logging initialised!");
Ok(drop_handle)
}
async fn reconcile_topics(
path_to_iox_binary: &str,
catalog_dsn: &str,
topics: &[String],
) -> Result<Vec<u32>, CatalogError> {
trace!(
"calling out to {} for topics {:?}",
path_to_iox_binary,
topics
);
topics
.iter()
.map(|topic| {
match Cmd::new(path_to_iox_binary)
.arg("catalog")
.arg("topic")
.arg("update")
.arg("--catalog-dsn")
.arg(catalog_dsn)
.arg(topic)
.output()
{
Ok(output) => match output.status.success() {
true => {
trace!(
"Updated catalog with kafka topic {}. stdout: {}",
topic,
String::from_utf8_lossy(&output.stdout).trim()
);
// The CLI returns an ID on success; try to parse it here to ensure it
// worked; not sure that return zero is enough? e.g. --help will return 0.
// also, we'd like to print the IDs out later
String::from_utf8_lossy(&output.stdout)
.trim()
.parse::<u32>()
.map_err(CatalogError::TopicIdParseError)
}
false => Err(CatalogError::UpdateTopicError {
stderr: String::from_utf8_lossy(&output.stderr).into(),
}),
},
Err(e) => Err(CatalogError::IOxBinaryExecFailed(e)),
}
})
.collect()
}
/// Controller triggers this whenever our main object or our children changed
async fn reconcile<T>(
topics: KafkaTopicList,
ctx: Context<Data<T>>,
) -> Result<ReconcilerAction, CatalogError>
where
T: KafkaTopicListApi,
{
debug!(
"got a change to the kafka topic list custom resource: {:?}",
topics.spec
);
let kafka_topic_list_api = ctx.get_ref().kafka_topic_list_api.clone();
let topics = Arc::new(topics);
// if CR doesn't contain status field, add it
let mut topics_status = match &topics.status {
Some(status) => status.clone(),
None => KafkaTopicListStatus::default(),
};
let kafka_topic_list_name = match &topics.metadata.name {
Some(n) => n.clone(),
None => {
return Err(CatalogError::MalformedKafkaTopicListResource {
message: "Missing metadata.name field".to_string(),
})
}
};
// have we seen this update before?
// NOTE: we may find that we'd prefer to do the reconcile anyway, if it's cheap.
// for now this seems okay
let generation = match topics.metadata.generation {
Some(gen) => {
if topics_status.observed_generation() == gen {
info!("Nothing to reconcile; observedGeneration == generation");
return Ok(ReconcilerAction {
requeue_after: None,
});
}
gen
}
_ => {
return Err(CatalogError::MalformedKafkaTopicListResource {
message: "Missing metadata.generation field".to_string(),
})
}
};
// make a note that we've seen this update
topics_status.set_observed_generation(generation);
// call out to the iox CLI to update the catalog for each topic name in the list
let reconcile_result = reconcile_topics(
&ctx.get_ref().path_to_iox_binary,
&ctx.get_ref().catalog_dsn,
topics.spec.topics(),
)
.await;
// update status subresource based on outcome of reconcile
let now: DateTime<Utc> = SystemTime::now().into();
let now_str = now.to_rfc3339();
let prev_condition = topics_status.conditions().get(0);
let last_transition_time = match prev_condition {
Some(c) if c.status() == CONDITION_STATUS_TRUE => c.last_transition_time().clone(),
_ => now_str.clone(),
};
let new_status = match &reconcile_result {
Ok(v) => {
debug!(
"Updated catalog with kafka topic list: {:?}. IDs returned: {:?}.",
topics.spec.topics(),
v
);
KafkaTopicListStatusCondition::new(
CONDITION_TYPE_RECONCILED.to_string(),
CONDITION_STATUS_TRUE.to_string(),
"".to_string(),
last_transition_time,
now_str.clone(),
)
}
Err(e) => KafkaTopicListStatusCondition::new(
CONDITION_TYPE_RECONCILED.to_string(),
CONDITION_STATUS_FALSE.to_string(),
e.to_string(),
last_transition_time,
now_str.clone(),
),
};
if topics_status.conditions().is_empty() {
topics_status.conditions_mut().insert(0, new_status);
} else {
topics_status.conditions_mut()[0] = new_status;
}
// patch the status field with the updated condition and observed generation
match kafka_topic_list_api
.patch_resource_status(kafka_topic_list_name.clone(), topics_status)
.await
{
Ok(_) => {}
Err(e) => {
// Not great to silently swallow the error here but doesn't feel warranted to requeue
// just because the status wasn't updated
error!("Failed to patch KafkaTopicList status subresource: {}", e);
}
}
reconcile_result.map(|_| ReconcilerAction {
requeue_after: None,
})
}
/// an error handler that will be called when the reconciler fails
fn error_policy<T>(error: &CatalogError, _ctx: Context<Data<T>>) -> ReconcilerAction
where
T: KafkaTopicListApi,
{
error!(%error, "reconciliation error");
ReconcilerAction {
// if a sync fails we want to retry- it could simply be in the process of
// doing another redeploy. there may be a deeper problem, in which case it'll keep trying
// and we'll see errors and investigate. arbitrary duration chosen ¯\_(ツ)_/¯
requeue_after: Some(Duration::from_secs(5)),
}
}
// Data we want access to in error/reconcile calls
struct Data<T>
where
T: KafkaTopicListApi,
{
path_to_iox_binary: String,
catalog_dsn: String,
kafka_topic_list_api: T,
}
#[tokio::main]
async fn main() {
let config = load_config().expect("failed to load config");
let _drop_handle = setup_tracing(&config.logging_config, None).unwrap();
debug!(?config, "loaded config");
info!(git_hash = env!("GIT_HASH"), "starting iox-gitops-adapter");
let k8s_client = K8sClient::try_default()
.await
.expect("couldn't create k8s client");
let topics = Api::<KafkaTopicList>::namespaced(k8s_client.clone(), config.namespace.as_str());
info!("initialised Kubernetes API client");
info!("starting IOx GitOps Adapter");
Controller::new(topics.clone(), ListParams::default())
.run(
reconcile,
error_policy,
Context::new(Data {
path_to_iox_binary: config.iox_cli.clone(),
catalog_dsn: config.catalog_dsn.clone(),
kafka_topic_list_api: topics,
}),
)
.for_each(|res| async move {
match res {
Ok(o) => info!("reconciled {:?}", o),
Err(e) => info!("reconcile failed: {:?}", e),
}
})
.await; // controller does nothing unless polled
}
#[cfg(test)]
mod tests {
use assert_matches::assert_matches;
use kafka_topic_list::{
mock_api::{MockKafkaTopicListApi, MockKafkaTopicListApiCall},
resources::KafkaTopicListSpec,
};
use super::*;
fn create_topics(
name: &str,
spec: KafkaTopicListSpec,
generation: i64,
status: KafkaTopicListStatus,
) -> KafkaTopicList {
let mut c = KafkaTopicList::new(name, spec);
c.metadata.generation = Some(generation);
c.status = Some(status);
c
}
fn create_topics_status(
observed_generation: i64,
reconciled: bool,
message: String,
t: SystemTime,
) -> KafkaTopicListStatus {
let now: DateTime<Utc> = t.into();
let now_str = now.to_rfc3339();
let mut status = KafkaTopicListStatus::default();
status
.conditions_mut()
.push(KafkaTopicListStatusCondition::new(
CONDITION_TYPE_RECONCILED.to_string(),
if reconciled {
CONDITION_STATUS_TRUE.to_string()
} else {
CONDITION_STATUS_FALSE.to_string()
},
message,
now_str.clone(),
now_str,
));
status.set_observed_generation(observed_generation);
status
}
#[tokio::test]
async fn test_single_topic_success() {
let now = SystemTime::now();
let mock_topics_api = Arc::new(MockKafkaTopicListApi::default().with_patch_status_ret(
vec![Ok(create_topics(
"iox",
KafkaTopicListSpec::new(vec!["iox_shared".to_string()]),
1,
create_topics_status(0, true, "".to_string(), now),
))],
));
let data = Data {
path_to_iox_binary: "test/mock-iox-single-topic.sh".to_string(),
catalog_dsn: "unused".to_string(),
kafka_topic_list_api: Arc::clone(&mock_topics_api),
};
let c = create_topics(
"iox",
KafkaTopicListSpec::new(vec!["iox_shared".to_string()]),
1,
create_topics_status(0, true, "".to_string(), now),
);
let result = reconcile(c, Context::new(data)).await;
// whole operation returns a successful result.
assert_matches!(result, Ok(ReconcilerAction { .. }));
// ensure status was updated accordingly.
// alas, we don't have a success patch result either, due to the above
assert_eq!(
mock_topics_api.get_calls(),
vec![MockKafkaTopicListApiCall::PatchStatus {
kafka_topic_list_name: "iox".to_string(),
status: create_topics_status(1, true, "".to_string(), now),
}]
);
}
#[tokio::test]
async fn test_multi_topic_success() {
let now = SystemTime::now();
let mock_topics_api = Arc::new(MockKafkaTopicListApi::default().with_patch_status_ret(
vec![Ok(create_topics(
"iox",
KafkaTopicListSpec::new(vec!["one".to_string(), "two".to_string()]),
1,
create_topics_status(0, true, "".to_string(), now),
))],
));
let data = Data {
path_to_iox_binary: "test/mock-iox-single-topic.sh".to_string(),
catalog_dsn: "unused".to_string(),
kafka_topic_list_api: Arc::clone(&mock_topics_api),
};
let c = create_topics(
"iox",
KafkaTopicListSpec::new(vec!["one".to_string(), "two".to_string()]),
1,
create_topics_status(0, true, "".to_string(), now),
);
let result = reconcile(c, Context::new(data)).await;
// whole operation returns a successful result.
assert_matches!(result, Ok(ReconcilerAction { .. }));
// ensure status was updated accordingly.
assert_eq!(
mock_topics_api.get_calls(),
vec![MockKafkaTopicListApiCall::PatchStatus {
kafka_topic_list_name: "iox".to_string(),
status: create_topics_status(1, true, "".to_string(), now),
}]
);
}
#[tokio::test]
async fn test_single_topic_error() {
let now = SystemTime::now();
let mock_topics_api = Arc::new(MockKafkaTopicListApi::default().with_patch_status_ret(
vec![Ok(create_topics(
"iox",
KafkaTopicListSpec::new(vec!["iox_shared".to_string()]),
1,
create_topics_status(0, true, "".to_string(), now),
))],
));
let data = Data {
path_to_iox_binary: "test/mock-iox-failure.sh".to_string(),
catalog_dsn: "unused".to_string(),
kafka_topic_list_api: Arc::clone(&mock_topics_api),
};
let c = create_topics(
"iox",
KafkaTopicListSpec::new(vec!["iox_shared".to_string()]),
1,
create_topics_status(0, false, "".to_string(), now),
);
let result = reconcile(c, Context::new(data)).await;
// whole operation returns a successful result
assert_matches!(result, Err(CatalogError::UpdateTopicError { .. }));
// Ensure status was updated accordingly
assert_eq!(
mock_topics_api.get_calls(),
vec![MockKafkaTopicListApiCall::PatchStatus {
kafka_topic_list_name: "iox".to_string(),
status: create_topics_status(
1,
false,
"Request to update catalog with topic failed: ".to_string(),
now
),
}]
);
}
}

View File

@ -0,0 +1,2 @@
#!/bin/bash
exit 1

View File

@ -0,0 +1,3 @@
#!/bin/bash
echo 42
echo 93

View File

@ -0,0 +1,2 @@
#!/bin/bash
echo 42

View File

@ -19,7 +19,7 @@ base64 = { version = "0.13", features = ["std"] }
bitflags = { version = "1" }
byteorder = { version = "1", features = ["std"] }
bytes = { version = "1", features = ["std"] }
chrono = { version = "0.4", default-features = false, features = ["alloc", "clock", "libc", "std", "winapi"] }
chrono = { version = "0.4", features = ["alloc", "clock", "libc", "oldtime", "serde", "std", "time", "winapi"] }
digest = { version = "0.9", default-features = false, features = ["alloc", "std"] }
either = { version = "1", features = ["use_std"] }
futures-channel = { version = "0.3", features = ["alloc", "futures-sink", "sink", "std"] }
@ -52,7 +52,7 @@ sha2 = { version = "0.9", features = ["std"] }
smallvec = { version = "1", default-features = false, features = ["union"] }
tokio = { version = "1", features = ["bytes", "fs", "full", "io-std", "io-util", "libc", "macros", "memchr", "mio", "net", "num_cpus", "once_cell", "parking_lot", "process", "rt", "rt-multi-thread", "signal", "signal-hook-registry", "sync", "time", "tokio-macros", "winapi"] }
tokio-stream = { version = "0.1", features = ["fs", "net", "time"] }
tokio-util = { version = "0.6", features = ["codec", "io"] }
tokio-util = { version = "0.6", features = ["codec", "io", "slab", "time"] }
tower = { version = "0.4", features = ["balance", "buffer", "discover", "futures-util", "indexmap", "limit", "load", "log", "make", "rand", "ready-cache", "slab", "timeout", "tokio", "tokio-stream", "tokio-util", "tracing", "util"] }
tracing = { version = "0.1", features = ["attributes", "log", "max_level_trace", "release_max_level_debug", "std", "tracing-attributes"] }
tracing-core = { version = "0.1", features = ["lazy_static", "std"] }
@ -66,6 +66,7 @@ base64 = { version = "0.13", features = ["std"] }
bitflags = { version = "1" }
byteorder = { version = "1", features = ["std"] }
bytes = { version = "1", features = ["std"] }
cc = { version = "1", default-features = false, features = ["jobserver", "parallel"] }
digest = { version = "0.9", default-features = false, features = ["alloc", "std"] }
either = { version = "1", features = ["use_std"] }
futures-channel = { version = "0.3", features = ["alloc", "futures-sink", "sink", "std"] }
@ -98,7 +99,6 @@ uuid = { version = "0.8", features = ["getrandom", "std", "v4"] }
libc = { version = "0.2", features = ["extra_traits", "std"] }
[target.x86_64-unknown-linux-gnu.build-dependencies]
cc = { version = "1", default-features = false, features = ["jobserver", "parallel"] }
libc = { version = "0.2", features = ["extra_traits", "std"] }
[target.x86_64-apple-darwin.dependencies]
@ -115,10 +115,10 @@ libc = { version = "0.2", features = ["extra_traits", "std"] }
[target.x86_64-pc-windows-msvc.dependencies]
scopeguard = { version = "1", features = ["use_std"] }
winapi = { version = "0.3", default-features = false, features = ["basetsd", "cfg", "consoleapi", "errhandlingapi", "evntrace", "fileapi", "handleapi", "impl-debug", "impl-default", "in6addr", "inaddr", "ioapiset", "knownfolders", "libloaderapi", "lmcons", "minschannel", "minwinbase", "minwindef", "mstcpip", "mswsock", "namedpipeapi", "ntdef", "ntsecapi", "ntstatus", "objbase", "processenv", "schannel", "securitybaseapi", "shellapi", "shlobj", "sspi", "std", "stringapiset", "synchapi", "sysinfoapi", "threadpoollegacyapiset", "timezoneapi", "winbase", "wincon", "wincrypt", "windef", "winerror", "winioctl", "winnt", "winreg", "winsock2", "winuser", "ws2def", "ws2ipdef", "ws2tcpip"] }
winapi = { version = "0.3", default-features = false, features = ["basetsd", "cfg", "consoleapi", "errhandlingapi", "evntrace", "fileapi", "handleapi", "impl-debug", "impl-default", "in6addr", "inaddr", "ioapiset", "knownfolders", "libloaderapi", "lmcons", "minschannel", "minwinbase", "minwindef", "mstcpip", "mswsock", "namedpipeapi", "ntdef", "ntsecapi", "ntstatus", "objbase", "processenv", "profileapi", "schannel", "securitybaseapi", "shellapi", "shlobj", "sspi", "std", "stringapiset", "synchapi", "sysinfoapi", "threadpoollegacyapiset", "timezoneapi", "winbase", "wincon", "wincrypt", "windef", "winerror", "winioctl", "winnt", "winreg", "winsock2", "winuser", "ws2def", "ws2ipdef", "ws2tcpip"] }
[target.x86_64-pc-windows-msvc.build-dependencies]
scopeguard = { version = "1", features = ["use_std"] }
winapi = { version = "0.3", default-features = false, features = ["basetsd", "cfg", "consoleapi", "errhandlingapi", "evntrace", "fileapi", "handleapi", "impl-debug", "impl-default", "in6addr", "inaddr", "ioapiset", "knownfolders", "libloaderapi", "lmcons", "minschannel", "minwinbase", "minwindef", "mstcpip", "mswsock", "namedpipeapi", "ntdef", "ntsecapi", "ntstatus", "objbase", "processenv", "schannel", "securitybaseapi", "shellapi", "shlobj", "sspi", "std", "stringapiset", "synchapi", "sysinfoapi", "threadpoollegacyapiset", "timezoneapi", "winbase", "wincon", "wincrypt", "windef", "winerror", "winioctl", "winnt", "winreg", "winsock2", "winuser", "ws2def", "ws2ipdef", "ws2tcpip"] }
winapi = { version = "0.3", default-features = false, features = ["basetsd", "cfg", "consoleapi", "errhandlingapi", "evntrace", "fileapi", "handleapi", "impl-debug", "impl-default", "in6addr", "inaddr", "ioapiset", "knownfolders", "libloaderapi", "lmcons", "minschannel", "minwinbase", "minwindef", "mstcpip", "mswsock", "namedpipeapi", "ntdef", "ntsecapi", "ntstatus", "objbase", "processenv", "profileapi", "schannel", "securitybaseapi", "shellapi", "shlobj", "sspi", "std", "stringapiset", "synchapi", "sysinfoapi", "threadpoollegacyapiset", "timezoneapi", "winbase", "wincon", "wincrypt", "windef", "winerror", "winioctl", "winnt", "winreg", "winsock2", "winuser", "ws2def", "ws2ipdef", "ws2tcpip"] }
### END HAKARI SECTION