refactor: restore tsm ingest

pull/24376/head
Edd Robinson 2020-09-25 10:03:56 +01:00
parent 9eee0c2852
commit 10511ae08b
1 changed files with 9 additions and 123 deletions

View File

@ -719,132 +719,18 @@ impl TSMFileConverter {
match next_measurement {
Some(mut table) => {
if table.name != "http_api_requests_total" {
continue;
}
// convert (potentially merged) measurement..
let (schema, mut packed_columns) =
let (schema, packed_columns) =
Self::process_measurement_table(&mut block_reader, &mut table)?;
let mut table_writer = self
.table_writer_source
.next_writer(&schema)
.context(WriterCreation)?;
// println!("col def {:?}", schema.get_col_defs());
// // cardinality
// for (i, col) in packed_columns.iter().enumerate() {
// println!("processing column {:?}", i);
// if let Packers::String(p) = col {
// let mut set: std::collections::BTreeSet<_> = BTreeSet::new();
// for v in p.iter() {
// if let Some(v) = v {
// set.insert(String::from(v.as_utf8().unwrap()));
// }
// }
// println!("Cardinality for col is {:?}", set.len());
// }
// }
// col def [ColumnDefinition { name: "env", index: 0, data_type: String },
// ColumnDefinition { name: "handler", index: 1, data_type: String },
// ColumnDefinition { name: "host", index: 2, data_type: String },
// ColumnDefinition { name: "hostname", index: 3, data_type: String },
// ColumnDefinition { name: "method", index: 4, data_type: String },
// ColumnDefinition { name: "nodename", index: 5, data_type: String },
// ColumnDefinition { name: "path", index: 6, data_type: String },
// ColumnDefinition { name: "role", index: 7, data_type: String },
// ColumnDefinition { name: "status", index: 8, data_type: String },
// ColumnDefinition { name: "url", index: 9, data_type: String },
// ColumnDefinition { name: "user_agent", index: 10, data_type: String },
// ColumnDefinition { name: "counter", index: 11, data_type: Float },
// ColumnDefinition { name: "time", index: 12, data_type: Timestamp }]
// processing column 0
// Cardinality for col is 8
// processing column 1
// Cardinality for col is 8
// processing column 2
// Cardinality for col is 3005
// processing column 3
// Cardinality for col is 3005
// processing column 4
// Cardinality for col is 6
// processing column 5
// Cardinality for col is 148
// processing column 6
// Cardinality for col is 78
// processing column 7
// Cardinality for col is 14
// processing column 8
// Cardinality for col is 4
// processing column 9
// Cardinality for col is 6
// processing column 10
// Cardinality for col is 71
// processing column 11
// processing column 12
// got all card
// println!("got all card");
// sort low to high ==
//
// status 8 (4)
// method 4 (6)
// url 9 (6)
// env 0 (8)
// handler 1 (8)
// role 7 (14)
// user_agent 10 (71)
// path 6 (78)
// nodename 5 (148)
// host 2 (3005)
// hostname 3 (3005)
//
// time 12
if packed_columns.len() < 13 {
continue;
}
println!("length of column s is {:?}", packed_columns.len());
// let sort = [0, 7, 6, 12];
// let sort = [8, 4, 9, 0, 1, 7, 10, 6, 5, 2, 3, 12];
// let sort = [3, 2, 5, 6, 10, 7, 1, 0, 9, 4, 8, 12];
let sort = [12];
println!("Starting sort with {:?}", sort);
let now = std::time::Instant::now();
delorean_table::sorter::sort(&mut packed_columns, &sort).unwrap();
println!("verifying order");
let values = packed_columns[12].i64_packer_mut().values();
let mut last = values[0];
for &v in values.iter().skip(1) {
assert!(v >= last);
last = v;
}
println!("finished sort in {:?}", now.elapsed());
println!("Writing to arrow file!");
write_arrow_file(schema, packed_columns).unwrap();
println!("Done!");
let _ = self.table_writer_source;
// if packed_columns.len() < 13 {
// continue;
// }
// println!("length of column s is {:?}", packed_columns.len());
// // let sort = [0, 7, 6, 12];
// // let sort = [8, 4, 9, 0, 1, 7, 10, 6, 5, 2, 3, 12];
// let sort = [3, 2, 5, 6, 10, 7, 1, 0, 9, 4, 8, 12];
// println!("Starting sort with {:?}", sort);
// let now = std::time::Instant::now();
// delorean_table::sorter::sort(&mut packed_columns, &sort).unwrap();
// println!("finished sort in {:?}", now.elapsed());
// let mut table_writer = self
// .table_writer_source
// .next_writer(&schema)
// .context(WriterCreation)?;
// table_writer
// .write_batch(&packed_columns)
// .context(WriterCreation)?;
// table_writer.close().context(WriterCreation)?;
table_writer
.write_batch(&packed_columns)
.context(WriterCreation)?;
table_writer.close().context(WriterCreation)?;
}
None => break,
}