From 10511ae08b3ad87829b037d7d2716893e211db3b Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Fri, 25 Sep 2020 10:03:56 +0100 Subject: [PATCH] refactor: restore tsm ingest --- delorean_ingest/src/lib.rs | 132 +++---------------------------------- 1 file changed, 9 insertions(+), 123 deletions(-) diff --git a/delorean_ingest/src/lib.rs b/delorean_ingest/src/lib.rs index 45a9374d9e..16bf2d7773 100644 --- a/delorean_ingest/src/lib.rs +++ b/delorean_ingest/src/lib.rs @@ -719,132 +719,18 @@ impl TSMFileConverter { match next_measurement { Some(mut table) => { - if table.name != "http_api_requests_total" { - continue; - } // convert (potentially merged) measurement.. - let (schema, mut packed_columns) = + let (schema, packed_columns) = Self::process_measurement_table(&mut block_reader, &mut table)?; + let mut table_writer = self + .table_writer_source + .next_writer(&schema) + .context(WriterCreation)?; - // println!("col def {:?}", schema.get_col_defs()); - // // cardinality - // for (i, col) in packed_columns.iter().enumerate() { - // println!("processing column {:?}", i); - // if let Packers::String(p) = col { - // let mut set: std::collections::BTreeSet<_> = BTreeSet::new(); - // for v in p.iter() { - // if let Some(v) = v { - // set.insert(String::from(v.as_utf8().unwrap())); - // } - // } - // println!("Cardinality for col is {:?}", set.len()); - // } - // } - // col def [ColumnDefinition { name: "env", index: 0, data_type: String }, - // ColumnDefinition { name: "handler", index: 1, data_type: String }, - // ColumnDefinition { name: "host", index: 2, data_type: String }, - // ColumnDefinition { name: "hostname", index: 3, data_type: String }, - // ColumnDefinition { name: "method", index: 4, data_type: String }, - // ColumnDefinition { name: "nodename", index: 5, data_type: String }, - // ColumnDefinition { name: "path", index: 6, data_type: String }, - // ColumnDefinition { name: "role", index: 7, data_type: String }, - // ColumnDefinition { name: "status", index: 8, data_type: String }, - // ColumnDefinition { name: "url", index: 9, data_type: String }, - // ColumnDefinition { name: "user_agent", index: 10, data_type: String }, - // ColumnDefinition { name: "counter", index: 11, data_type: Float }, - // ColumnDefinition { name: "time", index: 12, data_type: Timestamp }] - // processing column 0 - // Cardinality for col is 8 - // processing column 1 - // Cardinality for col is 8 - // processing column 2 - // Cardinality for col is 3005 - // processing column 3 - // Cardinality for col is 3005 - // processing column 4 - // Cardinality for col is 6 - // processing column 5 - // Cardinality for col is 148 - // processing column 6 - // Cardinality for col is 78 - // processing column 7 - // Cardinality for col is 14 - // processing column 8 - // Cardinality for col is 4 - // processing column 9 - // Cardinality for col is 6 - // processing column 10 - // Cardinality for col is 71 - // processing column 11 - // processing column 12 - // got all card - // println!("got all card"); - - // sort low to high == - // - // status 8 (4) - // method 4 (6) - // url 9 (6) - // env 0 (8) - // handler 1 (8) - // role 7 (14) - // user_agent 10 (71) - // path 6 (78) - // nodename 5 (148) - // host 2 (3005) - // hostname 3 (3005) - // - // time 12 - - if packed_columns.len() < 13 { - continue; - } - - println!("length of column s is {:?}", packed_columns.len()); - // let sort = [0, 7, 6, 12]; - // let sort = [8, 4, 9, 0, 1, 7, 10, 6, 5, 2, 3, 12]; - // let sort = [3, 2, 5, 6, 10, 7, 1, 0, 9, 4, 8, 12]; - let sort = [12]; - println!("Starting sort with {:?}", sort); - let now = std::time::Instant::now(); - - delorean_table::sorter::sort(&mut packed_columns, &sort).unwrap(); - - println!("verifying order"); - let values = packed_columns[12].i64_packer_mut().values(); - let mut last = values[0]; - for &v in values.iter().skip(1) { - assert!(v >= last); - last = v; - } - println!("finished sort in {:?}", now.elapsed()); - - println!("Writing to arrow file!"); - write_arrow_file(schema, packed_columns).unwrap(); - println!("Done!"); - - let _ = self.table_writer_source; - // if packed_columns.len() < 13 { - // continue; - // } - // println!("length of column s is {:?}", packed_columns.len()); - // // let sort = [0, 7, 6, 12]; - // // let sort = [8, 4, 9, 0, 1, 7, 10, 6, 5, 2, 3, 12]; - // let sort = [3, 2, 5, 6, 10, 7, 1, 0, 9, 4, 8, 12]; - // println!("Starting sort with {:?}", sort); - // let now = std::time::Instant::now(); - // delorean_table::sorter::sort(&mut packed_columns, &sort).unwrap(); - // println!("finished sort in {:?}", now.elapsed()); - - // let mut table_writer = self - // .table_writer_source - // .next_writer(&schema) - // .context(WriterCreation)?; - - // table_writer - // .write_batch(&packed_columns) - // .context(WriterCreation)?; - // table_writer.close().context(WriterCreation)?; + table_writer + .write_batch(&packed_columns) + .context(WriterCreation)?; + table_writer.close().context(WriterCreation)?; } None => break, }