diff --git a/.DS_Store b/.DS_Store deleted file mode 100644 index 3092efebc79be99288ab0e613eed6d4b40db7222..0000000000000000000000000000000000000000 Binary files a/.DS_Store and /dev/null differ diff --git a/.gitignore b/.gitignore index bd89261d5e3eb34beb4c819ebdfd1389deddc910..0954d4ac8344fd158cb0975518c80cb45b430be0 100644 --- a/.gitignore +++ b/.gitignore @@ -8,4 +8,10 @@ venv *.log *.key -*.partitions \ No newline at end of file +*.partitions + +.DS_Store + +out.csv +metadata +btree_* diff --git a/Cargo.lock b/Cargo.lock index 1249656ad408a8c8d47292f14997c9c0e26c5091..a983d45e713ef4fe4720b969cbe959b3afc1c5a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -288,9 +288,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.30" +version = "4.5.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92b7b18d71fad5313a1e320fa9897994228ce274b60faa4d694fe0ea89cd9e6d" +checksum = "6088f3ae8c3608d19260cd7445411865a485688711b78b5be70d78cd96136f83" dependencies = [ "clap_builder", "clap_derive", @@ -298,9 +298,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.30" +version = "4.5.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a35db2071778a7344791a4fb4f95308b5673d219dee3ae348b86642574ecc90c" +checksum = "22a7ef7f676155edfb82daa97f99441f3ebf4a58d5e32f295a56259f1b6facc8" dependencies = [ "anstream", "anstyle", @@ -310,9 +310,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.5.28" +version = "4.5.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf4ced95c6f4a675af3da73304b9ac4ed991640c36374e4b46795c49e17cf1ed" +checksum = "09176aae279615badda0765c0c0b3f6ed53f4709118af73cf4655d85d1530cd7" dependencies = [ "heck", "proc-macro2", @@ -363,6 +363,50 @@ dependencies = [ "url", ] +[[package]] +name = "client-distributed" +version = "0.1.0" +dependencies = [ + "anyhow", + "clap", + "comfy-table", + "crypto-utils", + "csv", + "env_logger", + "futures", + "indexmap", + "itertools", + "json-writer", + "log", + "regex", + "reqwest", + "serde", + "serde_json", + "tokio", + "url", +] + +[[package]] +name = "client-with-insert" +version = "0.1.0" +dependencies = [ + "anyhow", + "clap", + "comfy-table", + "crypto-utils", + "csv", + "env_logger", + "indexmap", + "itertools", + "json-writer", + "log", + "regex", + "reqwest", + "serde", + "serde_json", + "url", +] + [[package]] name = "colorchoice" version = "1.0.3" @@ -493,8 +537,10 @@ dependencies = [ "hex", "hmac", "log", + "once_cell", "rand 0.9.0", "rand_chacha 0.9.0", + "regex", "sha2", "tiger", ] @@ -1204,6 +1250,15 @@ version = "1.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" +[[package]] +name = "itertools" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.14" @@ -1220,6 +1275,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "json-writer" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "279046e6427c19c86f93df06fe9dc90c32b43f4a2a85bb3083d579e4a1e7ef03" +dependencies = [ + "itoa", + "ryu", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -1412,9 +1477,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.20.2" +version = "1.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775" +checksum = "cde51589ab56b20a6f686b2c68f7a0bd6add753d697abf720d63f8db3ab7b1ad" [[package]] name = "opaque-debug" @@ -1882,18 +1947,18 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.217" +version = "1.0.219" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02fc4265df13d6fa1d00ecff087228cc0a2b5f3c0e87e258d8b94a156e984c70" +checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.217" +version = "1.0.219" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a9bf7cf98d04a2b28aead066b7496853d4779c9cc183c440dbac457641e19a0" +checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" dependencies = [ "proc-macro2", "quote", @@ -1902,9 +1967,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.138" +version = "1.0.140" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d434192e7da787e94a6ea7e9670b26a036d0ca41e0b7efb2676dd32bae872949" +checksum = "20068b6e96dc6c9bd23e01df8827e6c7e1f2fddd43c21810382803c136b99373" dependencies = [ "itoa", "memchr", @@ -1951,6 +2016,35 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "server-distributed" +version = "0.1.0" +dependencies = [ + "axum", + "clap", + "crypto-utils", + "serde", + "serde_json", + "tokio", + "tower-http", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "server-with-insert" +version = "0.1.0" +dependencies = [ + "axum", + "crypto-utils", + "serde", + "serde_json", + "tokio", + "tower-http", + "tracing", + "tracing-subscriber", +] + [[package]] name = "sha1" version = "0.10.6" @@ -1988,6 +2082,15 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "signal-hook-registry" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1" +dependencies = [ + "libc", +] + [[package]] name = "signature" version = "2.2.0" @@ -2393,17 +2496,20 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.43.0" +version = "1.44.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d61fa4ffa3de412bfea335c6ecff681de2b609ba3c77ef3e00e521813a9ed9e" +checksum = "9975ea0f48b5aa3972bf2d888c238182458437cc2a19374b81b25cdf1023fb3a" dependencies = [ "backtrace", "bytes", "libc", "mio", + "parking_lot", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", + "tracing", "windows-sys 0.52.0", ] diff --git a/Cargo.toml b/Cargo.toml index 20a4f2393ad8a00a10bd55bf557eb1f92aafe577..22737dda16b894f39caf1bb587004acce7ad0b29 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,5 +4,5 @@ resolver = "2" members = [ "client", "server", - "crypto-utils", "client-brc", + "crypto-utils", "client-brc", "server-with-insert", "client-with-insert", "client-distributed", "server-distributed", ] diff --git a/client-brc/src/brc.rs b/client-brc/src/brc.rs index 7b426e17c6512c2554a2b99b8c54cf4798ec11ed..ea22315588bca4228638cb0eae4f8f19a24cbe1f 100644 --- a/client-brc/src/brc.rs +++ b/client-brc/src/brc.rs @@ -9,6 +9,7 @@ pub mod brc_tree { min: isize, max: isize, contents: Vec<String>, + length: isize, left: Subtree, right: Subtree, } @@ -21,10 +22,13 @@ pub mod brc_tree { if len == 0 { return Subtree(None); } else if len == 1 { + let contents = hm.get_index(0).unwrap().1.to_vec(); + let length = contents.len() as isize; return Subtree(Some(Box::new(Node { min: *hm.get_index(0).unwrap().0, max: *hm.get_index(0).unwrap().0, - contents: hm.get_index(0).unwrap().1.to_vec(), + contents, + length, left: Subtree(None), right: Subtree(None), }))); @@ -46,9 +50,17 @@ pub mod brc_tree { None => *hm.get_index(len - 1).unwrap().0, }, }; + let length = match &left { + Subtree(Some(node)) => node.length, + Subtree(None) => 0, + } + match &right { + Subtree(Some(node)) => node.length, + Subtree(None) => 0, + }; Subtree(Some(Box::new(Node { min, max, + length, contents: vec![], left, right, @@ -82,34 +94,28 @@ pub mod brc_tree { } pub fn to_hm(&self) -> IndexMap<(isize, isize), Vec<String>> { let mut hm: IndexMap<(isize, isize), Vec<String>> = IndexMap::new(); - match &self.0 { - Some(node) => { - let left = node.left.to_hm(); - let right = node.right.to_hm(); - for (key, value) in left.iter() { - hm.insert(*key, value.clone()); - } - for (key, value) in right.iter() { - hm.insert(*key, value.clone()); - } - let value = self.get_all_child_values(); - hm.insert(self.range(), value); + if let Some(node) = &self.0 { + let left = node.left.to_hm(); + let right = node.right.to_hm(); + for (key, value) in left.iter() { + hm.insert(*key, value.clone()); + } + for (key, value) in right.iter() { + hm.insert(*key, value.clone()); } - None => {} + let value = self.get_all_child_values(); + hm.insert(self.range(), value); } hm } pub fn write_structure(&self, mut file: &std::fs::File) -> anyhow::Result<()> { - match &self.0 { - Some(node) => { - file.write_all(format!("{},{}(", node.min, node.max).as_bytes())?; - node.left.write_structure(file)?; - file.write_all(")(".as_bytes())?; - node.right.write_structure(file)?; - file.write_all(");".as_bytes())?; - } - None => {} + if let Some(node) = &self.0 { + file.write_all(format!("{},{}[{}](", node.min, node.max, node.length).as_bytes())?; + node.left.write_structure(file)?; + file.write_all(")(".as_bytes())?; + node.right.write_structure(file)?; + file.write_all(");".as_bytes())?; } Ok(()) } @@ -219,36 +225,12 @@ pub mod brc_tree { urc } - #[test] - fn test_brc_to_urc() { - let tree = from_sorted_list(indexmap::indexmap! { - 0 => vec!["a".to_string()], - 1 => vec!["a".to_string()], - 2 => vec!["b".to_string()], - 3 => vec!["c".to_string()], - 4 => vec!["d".to_string()], - 5 => vec!["e".to_string()], - 6 => vec!["f".to_string()], - 7 => vec!["g".to_string()], - }); - let urc = brc_to_urc(tree.covers(2, 7)); - println!("{:?}", urc.iter().map(|x| x.range()).collect::<Vec<_>>()); - } - - impl TryInto<Subtree> for String { - type Error = anyhow::Error; - fn try_into(self) -> Result<Subtree, Self::Error> { - parse_from_string(self) - } - } - - fn parse_from_string(s: String) -> anyhow::Result<Subtree> { + pub fn parse_from_string(s: String) -> anyhow::Result<Subtree> { let s = s.trim(); if s.is_empty() { return Ok(Subtree(None)); } - - let (min_max, rest) = s.split_at(s.find('(').unwrap()); + let (min_max, rest_before_length) = s.split_at(s.find('[').unwrap()); let (min, max) = { let mut parts = min_max.split(','); ( @@ -256,33 +238,63 @@ pub mod brc_tree { parts.next().unwrap().parse::<isize>()?, ) }; + let (length_str, rest) = rest_before_length.split_at(rest_before_length.find(']').unwrap()); + let length = length_str[1..].parse::<isize>()?; - let mut balance = 0; - let mut split_index = 0; - for (i, c) in rest.chars().enumerate() { + // Find the closing parenthesis for the left subtree + let mut balance = 1; // Start at 1 because we're already inside the first parenthesis + let mut left_end = 0; + for (i, c) in rest[2..].chars().enumerate() { match c { '(' => balance += 1, ')' => balance -= 1, _ => {} } if balance == 0 { - split_index = i; + left_end = i + 2; // +2 to account for the substring offset break; } } - let left = parse_from_string(rest[1..split_index].to_string())?; - let right = parse_from_string(rest[split_index + 2..rest.len() - 2].to_string())?; + // The right subtree starts after the )( separator + let right_start = left_end + 2; // +2 to skip the ")(" + + let left = parse_from_string(rest[2..left_end].to_string())?; + let right = parse_from_string(rest[right_start..rest.len() - 2].to_string())?; Ok(Subtree(Some(Box::new(Node { min, max, + length, contents: vec![], left, right, })))) } + #[test] + fn test_brc_to_urc() { + let tree = from_sorted_list(indexmap::indexmap! { + 0 => vec!["a".to_string()], + 1 => vec!["a".to_string()], + 2 => vec!["b".to_string()], + 3 => vec!["c".to_string()], + 4 => vec!["d".to_string()], + 5 => vec!["e".to_string()], + 6 => vec!["f".to_string()], + 7 => vec!["g".to_string()], + }); + let urc = brc_to_urc(tree.covers(2, 7)); + println!("{:?}", urc.iter().map(|x| x.range()).collect::<Vec<_>>()); + } + + impl TryInto<Subtree> for String { + type Error = anyhow::Error; + fn try_into(self) -> Result<Subtree, Self::Error> { + parse_from_string(self) + } + } + #[test] fn test_parse() { let tree = parse_from_file("../brc_table.partitions".as_ref()).unwrap(); diff --git a/client-brc/src/commandline.rs b/client-brc/src/commandline.rs index 8e71b5b130e7edac8623c7df68379f78352837be..929a00dafa8d7412f687b1bdc79362a322a79445 100644 --- a/client-brc/src/commandline.rs +++ b/client-brc/src/commandline.rs @@ -12,7 +12,7 @@ pub struct Cli { pub command: Commands, #[clap(global = true)] - #[arg(short, long, default_value = "http://localhost:8080/")] + #[arg(short, long, default_value = "http://localhost:8081/")] pub address: String, } @@ -38,5 +38,8 @@ pub enum Commands { /// Optional urf option #[clap(long = "urf", short = 'u', default_value_t = false)] urf: bool, + + #[arg(short, long, default_value = "out.csv")] + output_path: String, }, } diff --git a/client-brc/src/main.rs b/client-brc/src/main.rs index f20a1b17fad1d5e8d2796edad511ccd034970da9..955317a0c8e197d94afb21bbb3192db35bb60580 100644 --- a/client-brc/src/main.rs +++ b/client-brc/src/main.rs @@ -34,8 +34,19 @@ fn main() -> Result<()> { cli.address.clone(), )?; } - Commands::Search { min, max, urf } => { - search(*min, *max, cli.address.clone(), *urf)?; + Commands::Search { + min, + max, + urf, + output_path, + } => { + search( + *min, + *max, + cli.address.clone(), + *urf, + output_path.to_owned(), + )?; } } Ok(()) diff --git a/client-brc/src/search.rs b/client-brc/src/search.rs index 1402138b3f2158ece2c0ab971c708ce47ea23f98..a25224124fbfecbd92d16bfd4148110b89aa99da 100644 --- a/client-brc/src/search.rs +++ b/client-brc/src/search.rs @@ -1,9 +1,9 @@ use std::collections::HashMap; +use std::io::prelude::*; use anyhow::Result; // This is just a current implementation of the outputs -use comfy_table::Table; use crypto_utils::chacha::rand::SeedableRng; use log::info; use url::Url; @@ -19,7 +19,13 @@ use rand::seq::SliceRandom; use crate::brc::brc_tree; /// Search function -pub fn search(min: isize, max: isize, address: String, urf: bool) -> Result<()> { +pub fn search( + min: isize, + max: isize, + address: String, + urf: bool, + output_path: String, +) -> Result<()> { let enc_key = load_from_file("enc.key")?; let prf_key = load_from_file("prf.key")?; let mut burl = Url::parse(&address)?; @@ -51,10 +57,7 @@ pub fn search(min: isize, max: isize, address: String, urf: bool) -> Result<()> } } info!("Found {} results", fnl.len()); - let mut table = Table::new(); - for decrypted in fnl.iter() { - table.add_row(decrypted.split(',')); - } - println!("{table}"); + let mut out_file = std::fs::File::create(output_path)?; + out_file.write_all(fnl.join("\n").as_bytes())?; Ok(()) } diff --git a/client-distributed/Cargo.toml b/client-distributed/Cargo.toml new file mode 100644 index 0000000000000000000000000000000000000000..183c211426ebf958a83a1c4d759f78f0c63e3d23 --- /dev/null +++ b/client-distributed/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "client-distributed" +version = "0.1.0" +edition = "2021" + +[dependencies] +clap = { version = "4.5.30", features = ["derive"] } +env_logger = "0.11.6" +log = "0.4.25" +crypto-utils = { path = "../crypto-utils" } +anyhow = "1.0.95" +csv = "1.3.1" +indexmap = "2.7.1" +reqwest = { version = "0.12.12", features = ["json"] } +comfy-table = "7.1.4" +url = "2.5.4" +json-writer = "0.4.0" +serde = { version = "1.0.219", features = ["derive"] } +serde_json = "1.0.140" +regex = "1.11.1" +itertools = "0.14.0" +tokio = { version = "1.44.0", features = ["macros", "rt-multi-thread"] } +futures = "0.3.31" diff --git a/client-distributed/src/brc.rs b/client-distributed/src/brc.rs new file mode 100644 index 0000000000000000000000000000000000000000..bf77891338f6c237f83e4a867cda61d29994fe4f --- /dev/null +++ b/client-distributed/src/brc.rs @@ -0,0 +1,338 @@ +pub mod brc_tree { + #![allow(dead_code)] + use std::{collections::VecDeque, io::Write, path::Path, vec}; + + use crypto_utils::chacha::{ + rand::{seq::SliceRandom, SeedableRng}, + ChaCha20Rng, + }; + use indexmap::IndexMap; + + #[derive(Debug, Clone)] + pub struct Node { + min: isize, + max: isize, + contents: Vec<(String, bool)>, + length: isize, + left: Subtree, + right: Subtree, + } + + #[derive(Debug, Clone)] + pub struct Subtree(Option<Box<Node>>); + + pub fn from_sorted_list(hm: IndexMap<isize, Vec<(String, bool)>>) -> Subtree { + let len = hm.len(); + if len == 0 { + return Subtree(None); + } else if len == 1 { + let contents = hm.get_index(0).unwrap().1.to_vec(); + let length = contents.len() as isize; + return Subtree(Some(Box::new(Node { + min: *hm.get_index(0).unwrap().0, + max: *hm.get_index(0).unwrap().0, + contents, + length, + left: Subtree(None), + right: Subtree(None), + }))); + } + let mid = len / 2; + let left = from_sorted_list(hm.clone().drain(..mid).collect()); + let right = from_sorted_list(hm.clone().drain(mid..).collect()); + let min = match &left.0 { + Some(node) => node.min, + None => match &right.0 { + Some(node) => node.min, + None => *hm.get_index(0).unwrap().0, + }, + }; + let max = match &right.0 { + Some(node) => node.max, + None => match &left.0 { + Some(node) => node.max, + None => *hm.get_index(len - 1).unwrap().0, + }, + }; + let length = match &left { + Subtree(Some(node)) => node.length, + Subtree(None) => 0, + } + match &right { + Subtree(Some(node)) => node.length, + Subtree(None) => 0, + }; + Subtree(Some(Box::new(Node { + min, + max, + length, + contents: vec![], + left, + right, + }))) + } + #[test] + fn test_fsl() { + let test_data = indexmap::indexmap! { + 1 => vec![("a".to_string(), true)], + 2 => vec![("b".to_string(), true)], + 3 => vec![("c".to_string(), true)], + 4 => vec![("d".to_string(), true)], + }; + let tree = from_sorted_list(test_data); + println!("{:?}", tree); + println!("{:?}", tree.to_hm()); + } + + impl Subtree { + pub fn get_all_child_values(&self) -> Vec<(String, bool)> { + match &self.0 { + Some(node) => { + let mut left = node.left.get_all_child_values(); + let right = node.right.get_all_child_values(); + left.extend(right); + left.extend(node.contents.clone()); + left + } + None => vec![], + } + } + pub fn to_hm(&self) -> IndexMap<(isize, isize), Vec<(String, bool)>> { + let mut hm: IndexMap<(isize, isize), Vec<(String, bool)>> = IndexMap::new(); + if let Some(node) = &self.0 { + let left = node.left.to_hm(); + let right = node.right.to_hm(); + for (key, value) in left.iter() { + hm.insert(*key, value.clone()); + } + for (key, value) in right.iter() { + hm.insert(*key, value.clone()); + } + let value = self.get_all_child_values(); + hm.insert(self.range(), value); + } + hm + } + + pub fn write_string(&self) -> String { + match &self.0 { + Some(node) => format!( + "{},{}[{}]({})({});", + node.min, + node.max, + node.length, + node.left.write_string(), + node.right.write_string() + ), + None => "".to_string(), + } + } + pub fn write_structure(&self, mut file: &std::fs::File) -> anyhow::Result<()> { + if self.0.is_some() { + file.write_all(self.write_string().as_bytes())? + } + Ok(()) + } + + pub fn covers(&self, min: isize, max: isize) -> Vec<Subtree> { + // Returns the set of nodes that covers the entire range + match &self.0 { + Some(node) => { + if node.min > max || node.max < min { + return vec![]; + } + if min <= node.min && max >= node.max { + return vec![self.clone()]; + } + let mut left = node.left.covers(min, max); + let right = node.right.covers(min, max); + left.extend(right); + left.shuffle(&mut ChaCha20Rng::from_os_rng()); + left + } + None => vec![], + } + } + + pub fn range(&self) -> (isize, isize) { + match &self.0 { + Some(node) => (node.min, node.max), + None => (0, 0), + } + } + + pub fn range_level(&self) -> (isize, isize, isize) { + match &self.0 { + Some(node) => (node.min, node.max, self.level()), + None => (0, 0, 0), + } + } + + pub fn level(&self) -> isize { + match &self.0 { + Some(node) => { + if node.max == node.min { + return 0; + } + let left = node.left.level(); + let right = node.right.level(); + 1 + left.max(right) + } + None => -1, + } + } + } + #[test] + fn test_possible_covers() { + let tree = parse_from_file(std::path::Path::new("../brc_table.partitions")).unwrap(); + let mut covers = tree.covers(0, 111000); + println!("{:?}", tree); + println!("{:?}", covers.iter().map(|x| x.range()).collect::<Vec<_>>()); + covers.sort_by_key(|a| a.level()); + println!( + "{:?}", + brc_to_urc(covers) + .iter() + .map(|x| x.range()) + .collect::<Vec<_>>() + ); + } + pub fn parse_from_file(file_name: &Path) -> anyhow::Result<Subtree> { + let contents = std::fs::read_to_string(file_name)?; + parse_from_string(contents) + } + + impl PartialEq for Subtree { + fn eq(&self, other: &Self) -> bool { + self.range() == other.range() + } + } + + pub fn brc_to_urc(vec: Vec<Subtree>) -> Vec<Subtree> { + // assume vec is sorted by level + let mut deq = VecDeque::from(vec); + let max_level = deq[deq.len() - 1].level(); + let mut urc: Vec<Subtree> = vec![]; + let mut seen = VecDeque::from_iter(0..=max_level + 1); + while !deq.is_empty() { + let node = deq.pop_front().unwrap().to_owned(); + if node.0.is_none() { + continue; + } + match node.level() { + i if i < seen[0] => { + if !urc.contains(&node) { + urc.push(node); + } + } + i if i == seen[0] => { + seen.pop_front(); + if !urc.contains(&node) { + urc.push(node); + } + } + _ => { + deq.push_front(node.clone().0.unwrap().left); + deq.push_front(node.0.unwrap().right); + } + } + } + urc + } + + pub fn parse_from_string(s: String) -> anyhow::Result<Subtree> { + let s = s.trim(); + if s.is_empty() { + return Ok(Subtree(None)); + } + let (min_max, rest_before_length) = s.split_at(s.find('[').unwrap()); + let (min, max) = { + let mut parts = min_max.split(','); + ( + parts.next().unwrap().parse::<isize>()?, + parts.next().unwrap().parse::<isize>()?, + ) + }; + let (length_str, rest) = rest_before_length.split_at(rest_before_length.find(']').unwrap()); + let length = length_str[1..].parse::<isize>()?; + + // Find the closing parenthesis for the left subtree + let mut balance = 1; // Start at 1 because we're already inside the first parenthesis + let mut left_end = 0; + for (i, c) in rest[2..].chars().enumerate() { + match c { + '(' => balance += 1, + ')' => balance -= 1, + _ => {} + } + if balance == 0 { + left_end = i + 2; // +2 to account for the substring offset + break; + } + } + + // The right subtree starts after the )( separator + let right_start = left_end + 2; // +2 to skip the ")(" + + let left = parse_from_string(rest[2..left_end].to_string())?; + let right = parse_from_string(rest[right_start..rest.len() - 2].to_string())?; + + Ok(Subtree(Some(Box::new(Node { + min, + max, + length, + contents: vec![], + left, + right, + })))) + } + + #[test] + fn test_from_hm() { + let hm = indexmap::indexmap! { + 1 => vec![("a".to_string(), true)], + 2 => vec![("a".to_string(), true)], + 3 => vec![("a".to_string(), true)], + 4 => vec![("a".to_string(), true)], + 5 => vec![("a".to_string(), true)] + }; + let tree = from_sorted_list(hm); + println!("{:?}", tree.to_hm()); + println!("{:?}", tree.covers(2, 5)); + } + + #[test] + fn test_brc_to_urc() { + let tree = from_sorted_list(indexmap::indexmap! { + 0 => vec![("a".to_string(), true)], + 1 => vec![("a".to_string(), true)], + 2 => vec![("a".to_string(), true)], + 3 => vec![("a".to_string(), true)], + 4 => vec![("a".to_string(), true)], + 5 => vec![("a".to_string(), true)], + 6 => vec![("a".to_string(), true)], + 7 => vec![("a".to_string(), true)], + }); + let urc = brc_to_urc(tree.covers(2, 7)); + println!("{:?}", urc.iter().map(|x| x.range()).collect::<Vec<_>>()); + } + + impl TryInto<Subtree> for String { + type Error = anyhow::Error; + fn try_into(self) -> Result<Subtree, Self::Error> { + parse_from_string(self) + } + } + + #[test] + fn test_parse() { + let tree = parse_from_file("../brc_table.partitions".as_ref()).unwrap(); + println!("{:?}", tree); + } + + impl std::fmt::Display for Subtree { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + fmt.write_str(&self.write_string())?; + Ok(()) + } + } +} diff --git a/client-distributed/src/commandline.rs b/client-distributed/src/commandline.rs new file mode 100644 index 0000000000000000000000000000000000000000..61a1db498a08d58eb9ac30c4d12b84cab7616b23 --- /dev/null +++ b/client-distributed/src/commandline.rs @@ -0,0 +1,56 @@ +use clap::{Parser, Subcommand}; +use url::Url; + +#[derive(Parser)] +#[command(version, about, long_about=None)] +pub struct Cli { + /// Verbose level (Trace / Info) + #[clap(global = true)] + #[arg(short, long, default_value_t = false)] + pub verbose: bool, + + /// Subcommands + #[command(subcommand)] + pub command: Commands, + + #[clap(global = true)] + #[arg(short, long, num_args = 1.., default_values = ["http://localhost:8080/"])] + pub address: Vec<Url>, +} + +#[derive(Subcommand)] +pub enum Commands { + /// Initializes the remote server with file + Init { + /// File path to be inputted + filename_option: String, + /// set searchable column to key + #[clap(long = "key", short = 'k')] + key_option: Option<String>, + /// set searchable index + #[clap(long = "index", short = 'i')] + index_option: Option<usize>, + }, + /// Searches remote server with optional string + Search { + /// Minimum value in range (inclusive) + min: isize, + /// Maximum in range (inclusive) + max: isize, + /// Optional urf option + #[clap(long = "urf", short = 'u', default_value_t = false)] + urf: bool, + + #[arg(short, long, default_value = "out.csv")] + output_path: String, + }, + /// Adds to remote key-value store + Insert { + /// Value to add, must be in a csv row format that fits the given scheme + value: String, + }, + Delete { + /// Key to delete + key: String, + }, +} diff --git a/client-distributed/src/initialize.rs b/client-distributed/src/initialize.rs new file mode 100644 index 0000000000000000000000000000000000000000..7453f9369f53c208456ca7b5b6110992bcc56d34 --- /dev/null +++ b/client-distributed/src/initialize.rs @@ -0,0 +1,183 @@ +use std::collections::HashMap; +use std::time::Instant; + +use anyhow::{anyhow, Result}; +use crypto_utils::chacha::rand::Rng; +use indexmap::IndexMap; +use log::{debug, info, warn}; + +use crate::pools::{first_available_sda_server, post_metadata, MetaData}; +use crypto_utils::chacha::{ + encrypt, encrypt_with_action_and_validity, generate_nonce_string, ChaCha20Rng, +}; +use crypto_utils::hash::{hash_with_counter_and_nonce, prf}; +use crypto_utils::keys::load_from_file; +use serde::{Deserialize, Serialize}; +use url::Url; + +use crate::brc::brc_tree; + +pub fn generate_one( + hm: &mut IndexMap<isize, Vec<(String, bool)>>, + rng: &mut ChaCha20Rng, +) -> Result<()> { + let mut min: i64 = ((hm.keys().min().unwrap_or(&0).to_owned()) * 95 / 100).try_into()?; + let mut max: i64 = (hm.keys().max().unwrap_or(&100).to_owned() * 105 / 100).try_into()?; + let row_length = hm + .values() + .next() + .unwrap_or(&vec![("a".repeat(100), false)])[0] + .0 + .len(); + let min_row_length = (row_length * 3) / 10; + let max_row_length = (row_length * 20) / 10; + let mut key; + loop { + info!("{},{}", min, max); + key = rng.random_range(min..max + 1) as isize; + if !hm.contains_key(&key) { + break; + } else { + min -= 1; + max += 1; + } + } + let mut vec: Vec<(String, bool)> = Vec::new(); + let repeat = "a".repeat(rng.random_range(min_row_length..max_row_length)); + vec.push((repeat, false)); + hm.insert(key, vec); + Ok(()) +} + +#[derive(Serialize, Deserialize)] +struct Init { + level: usize, + db: HashMap<String, String>, + metadata: String, + nonce: String, + btree: String, +} + +pub async fn init( + filename: String, + key_option: Option<String>, + index_option: Option<usize>, + database_addresses: Vec<Url>, + rng: &mut ChaCha20Rng, +) -> Result<()> { + let start_time = Instant::now(); + if key_option.is_none() && index_option.is_none() { + info!("Key not provided, will use the first avaliable") + } + if key_option.is_some() && index_option.is_some() { + warn!("Both key and index provided!"); + return Err(anyhow!("Both key and index provided")); + } + debug!("Given file {}, searching whether it exists", filename); + let filepath = std::path::Path::new(&filename); + // verify that file exists + if !&filepath.exists() { + warn!("{:?} cannot be found", filepath); + return Err(anyhow!("Provided file cannot be found {:?}", filepath)); + } + debug!("File exists"); + let mut hm: IndexMap<isize, Vec<(String, bool)>> = IndexMap::new(); + let mut reader = csv::Reader::from_path(filepath)?; + let enc_key = load_from_file("enc.key")?; + let prf_key = load_from_file("prf.key")?; + let index = match key_option { + Some(s) => { + let headers = reader.headers()?; + let mut ret: usize = usize::MAX; + for (index, word) in headers.iter().enumerate() { + if word.eq_ignore_ascii_case(&s) { + ret = index; + break; + } + } + if ret == usize::MAX { + warn!("{} cannot be found in headers", s); + return Err(anyhow!("{} not found in headers", s)); + } + ret + } + None => match index_option { + Some(i) => { + let headers = reader.headers()?; + if i > headers.len() { + warn!( + "Header has length {}, which is less than {}", + headers.len(), + i + ); + return Err(anyhow!("Index out of bounds")); + } + i + } + None => 0, + }, + }; + let meta: MetaData = MetaData { + index: index as i32, + header: reader.headers()?.iter().map(|x| x.to_string()).collect(), + row_length: reader.headers()?.len() as i32, + }; + let meta_string = encrypt(&enc_key, serde_json::to_string(&meta)?)?; + for record in reader.records() { + let record = record?; + let key = record.get(index).unwrap().parse(); + let record_vec = record.iter().collect::<Vec<&str>>().join(","); + + if key.is_err() { + warn!("Could not parse key {}", record.get(index).unwrap()); + return Err(anyhow!("Could not parse key")); + } + hm.entry(key.unwrap()).or_default().push((record_vec, true)); + } + // populate hashmap until it is a power of 2 + let level = (hm.len() as f64).log2().ceil() as usize; + let diff = (2_isize.pow(level as u32) as usize - hm.len()) as isize; + debug!( + "{} distinct keys seen, we will insert {} keys", + hm.len(), + diff + ); + for _ in 0..diff { + generate_one(&mut hm, rng)?; + } + hm.sort_keys(); + debug!("{:?}", hm); + info!("{:?}", hm.len()); + let subtree = brc_tree::from_sorted_list(hm); + let nonce = generate_nonce_string(); + let mut json: HashMap<String, String> = HashMap::new(); + for (key, value) in subtree.to_hm().iter() { + let prfd_key = prf(&prf_key, format!("{:?}", key))?; + for (counter, record) in value.iter().enumerate() { + let (pt, va) = record; + let encrypted = + encrypt_with_action_and_validity(&enc_key, pt.to_owned(), true, va.to_owned())?; + let hashed = hash_with_counter_and_nonce(prfd_key.clone(), counter, &nonce)?; + json.insert(hashed, encrypted); + } + } + info!("Took {:?}", Instant::now().duration_since(start_time)); + post_metadata(database_addresses.clone(), meta_string.clone()).await?; + let init_json = Init { + level, + db: json, + nonce, + metadata: meta_string, + btree: encrypt(&enc_key, subtree.write_string())?, + }; + let database_address = first_available_sda_server(0, database_addresses, rng).await?; + let client = reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(60 * 30)) + .build()?; + client + .post(database_address) + .json(&init_json) + .send() + .await?; + Ok(()) +} diff --git a/client-distributed/src/insert.rs b/client-distributed/src/insert.rs new file mode 100644 index 0000000000000000000000000000000000000000..b0e0398b4d542d29f816607e62b02c8f640b0970 --- /dev/null +++ b/client-distributed/src/insert.rs @@ -0,0 +1,254 @@ +use std::collections::HashMap; +use std::iter; + +use crate::brc::brc_tree; +use crate::initialize::generate_one; +use crate::pools::{first_available_sda_server, MetaData}; +use crate::search::get_metadata; +use anyhow::{anyhow, Result}; +use crypto_utils::chacha::{ + decrypt_with_action_and_validity, encrypt, encrypt_with_action_and_time, + encrypt_with_action_and_validity, generate_nonce_string, ChaCha20Rng, +}; +use crypto_utils::hash::{hash_with_counter_and_nonce, prf}; +use crypto_utils::keys::load_from_file; +use indexmap::{indexmap, IndexMap}; +use itertools::Itertools; +use log::info; +use serde::{Deserialize, Serialize}; +use url::Url; + +#[derive(Serialize, Deserialize, Debug)] +pub struct InsertResponse { + sda: bool, + values: Vec<String>, + sda_level: usize, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct InsertRequest { + nonce: String, + db: HashMap<String, String>, + btree: String, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct SdaRequest { + level: usize, + nonce: String, + db: HashMap<String, String>, + btree: String, +} +pub async fn insert(value: String, address: Vec<Url>, rng: &mut ChaCha20Rng) -> Result<()> { + if address.is_empty() { + return Err(anyhow!("No addresses provided")); + } + let enc_key = load_from_file("enc.key")?; + let prf_key = load_from_file("prf.key")?; + let metadata = get_metadata(address.clone(), &enc_key, rng).await?; + let index_vec = value.split(",").collect::<Vec<&str>>(); + let key = index_vec + .get(metadata.index as usize) + .ok_or(anyhow!("Index out of bounds"))? + .to_owned() + .to_owned() + .parse()?; + let btree = brc_tree::from_sorted_list(indexmap! {key => vec![(value.clone(), true)]}); + // info!("Inserting into {} with values {}", key, value); + let encrypted_btree = encrypt(&enc_key, btree.to_string())?; + let mut item_map: HashMap<String, String> = HashMap::new(); + let nonce = generate_nonce_string(); + item_map.insert( + hash_with_counter_and_nonce(prf(&prf_key, format!("{:?}", (key, key)))?, 0, &nonce)?, + encrypt_with_action_and_validity(&enc_key, value, true, true)?, + ); + let insert_request_json = InsertRequest { + nonce, + db: item_map, + btree: encrypted_btree, + }; + // info!("Loading url.."); + let mut database_url = first_available_sda_server(0, address.clone(), rng).await?; + // info!("Has url: {}", database_url); + database_url.set_path("key"); + let resp = reqwest::Client::new() + .put(database_url) + .json(&insert_request_json) + .send() + .await + .unwrap() + .json::<InsertResponse>() + .await?; + info!("Got response {:?}", resp); + if resp.sda { + sda(resp, metadata, address, rng).await + } else { + Ok(()) + } +} + +pub async fn sda( + response: InsertResponse, + metadata: MetaData, + address: Vec<Url>, + rng: &mut ChaCha20Rng, +) -> Result<()> { + if !response.sda { + return Ok(()); + } + // info!( + // "Starting SDA Level {} with {} elements", + // response.sda_level, + // response.values.len() + // ); + let enc_key = load_from_file("enc.key")?; + let prf_key = load_from_file("prf.key")?; + let value_parsed: Result<Vec<(String, bool, u64)>, _> = response + .values + .iter() + .map(|x| decrypt_with_action_and_validity(&enc_key, x.to_owned())) + .collect(); + info!("Values parsed"); + let mut values = value_parsed? + .iter() + .filter(|x| x.2 != 0) + .map(|x| x.to_owned()) + .unique() + .collect::<Vec<_>>(); + // info!("Values Length: {}", values.len()); + values.sort_by_key(|x| x.2); + // info!("Sorted"); + let mut hm: IndexMap<isize, Vec<(String, bool)>> = IndexMap::new(); + let mut actions: HashMap<String, (bool, u64)> = HashMap::new(); + for (value, add, timer) in values { + // filter out nonvalid items + if timer == 0 { + continue; + } + let split: Vec<String> = value.split(",").map(|x| x.to_string()).collect(); + let parsed = split + .get(metadata.index as usize) + .ok_or(anyhow!("Index out of range"))? + .parse::<isize>()?; + hm.entry(parsed).or_default().push((value.clone(), true)); + if !add { + info!("add is false with value {}", value); + } + actions.insert(value, (add, timer)); + } + // info!( + // "Total elements: {}", + // hm.values().map(|x| x.len()).sum::<usize>() + // ); + let target_amount = 2_isize.pow(response.sda_level as u32) as usize; + let diff = target_amount - hm.len(); + // info!( + // "Generating {} dummy records (target: {}, curr: {})", + // diff, + // target_amount, + // hm.len() + // ); + for _ in 0..diff { + generate_one(&mut hm, rng)?; + // info!("{}", i); + } + hm.sort_keys(); + let subtree = brc_tree::from_sorted_list(hm); + let mut json: HashMap<String, String> = HashMap::new(); + // let mut real = 0; + // let mut fake = 0; + let nonce = generate_nonce_string(); + for (key, value) in subtree.to_hm().iter_mut() { + for (counter, record) in value.iter().enumerate() { + let (pt, va) = record; + + let encrypted = match *va { + true => { + // real += 1; + let (action, timer) = actions + .get(pt) + .ok_or(anyhow!("Action should have been registered prior"))?; + encrypt_with_action_and_time(&enc_key, pt.to_owned(), *action, *timer)? + } + false => { + // fake += 1; + encrypt_with_action_and_validity(&enc_key, pt.to_owned(), true, false)? + } + }; + let prfd_key = prf(&prf_key, format!("{:?}", key))?; + let hashed = hash_with_counter_and_nonce(prfd_key, counter, &nonce)?; + json.insert(hashed, encrypted); + } + } + // info!("Real {} vs Fake {}", real, fake); + let insert = SdaRequest { + level: response.sda_level, + nonce, + db: json, + btree: encrypt(&enc_key, subtree.to_string())?, + }; + // info!("Loading url"); + let mut database_address = + first_available_sda_server(response.sda_level, address.clone(), rng).await?; + database_address.set_path("sda"); + // info!("Url: {}", database_address); + let client = reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(30)) + .build()?; + let resp = client + .put(database_address) + .json(&insert) + .send() + .await? + .json::<InsertResponse>() + .await?; + // info!("Writing to sda level {}", response.sda_level); + if resp.sda { + Ok(Box::pin(sda(resp, metadata, address, rng)).await?) + } else { + Ok(()) + } +} + +pub async fn delete(key: String, address: Vec<Url>, rng: &mut ChaCha20Rng) -> Result<()> { + let enc_key = load_from_file("enc.key")?; + let prf_key = load_from_file("prf.key")?; + let metadata = get_metadata(address.clone(), &enc_key, rng).await?; + let key_int: isize = key.parse()?; + let prfd_key = prf(&prf_key, format!("{:?}", (key_int, key_int)))?; + let mut plaintext_iter = iter::repeat("a") + .take(metadata.row_length as usize) + .collect::<Vec<&str>>(); + plaintext_iter[metadata.index as usize] = &key; + let plaintext = plaintext_iter.join(","); + let stree = brc_tree::from_sorted_list(indexmap! { + key.parse::<isize>()? => vec![(plaintext.clone(), true)] + }); + // info!("stree {}", stree); + let btree = encrypt(&enc_key, stree.to_string())?; + let mut db: HashMap<String, String> = HashMap::new(); + let nonce = generate_nonce_string(); + db.insert( + hash_with_counter_and_nonce(prfd_key, 0, &nonce)?, + encrypt_with_action_and_validity(&enc_key, plaintext, false, true)?, + ); + let delete_request = InsertRequest { nonce, db, btree }; + // info!( + // "Deleting key {}, with request data {:?}", + // key, delete_request + // ); + let mut database_url = first_available_sda_server(0, address.clone(), rng).await?; + database_url.set_path("key"); + let resp = reqwest::Client::new() + .put(database_url) + .json(&delete_request) + .send() + .await? + .json::<InsertResponse>() + .await?; + if resp.sda { + sda(resp, metadata, address, rng).await + } else { + Ok(()) + } +} diff --git a/client-distributed/src/main.rs b/client-distributed/src/main.rs new file mode 100644 index 0000000000000000000000000000000000000000..e99e282fd41865c2a89ca34a562456ea41c0c223 --- /dev/null +++ b/client-distributed/src/main.rs @@ -0,0 +1,68 @@ +mod brc; +mod commandline; +mod initialize; +mod insert; +mod pools; +mod search; +use anyhow::Result; +use commandline::{Cli, Commands}; +use crypto_utils::chacha::{rand::SeedableRng, ChaCha20Rng}; +use env_logger::Builder; +use initialize::init; +use insert::{delete, insert}; +use log::{debug, LevelFilter}; + +use search::search; + +#[tokio::main] +async fn main() -> Result<()> { + let cli = <Cli as clap::Parser>::parse(); + + // initial logging + let max_log_level = match &cli.verbose { + true => LevelFilter::Trace, + false => LevelFilter::Error, + }; + Builder::new().filter(None, max_log_level).init(); + debug!("Parsing subcommand"); + let mut rng = ChaCha20Rng::from_os_rng(); + match &cli.command { + Commands::Init { + filename_option, + key_option, + index_option, + } => { + init( + filename_option.clone(), + key_option.clone(), + *index_option, + cli.address, + &mut rng, + ) + .await?; + } + Commands::Search { + min, + max, + urf, + output_path, + } => { + search( + *min, + *max, + cli.address, + *urf, + output_path.to_owned(), + &mut rng, + ) + .await?; + } + Commands::Insert { value } => { + insert(value.to_owned(), cli.address.clone(), &mut rng).await?; + } + Commands::Delete { key } => { + delete(key.to_owned(), cli.address.clone(), &mut rng).await?; + } + } + Ok(()) +} diff --git a/client-distributed/src/pools.rs b/client-distributed/src/pools.rs new file mode 100644 index 0000000000000000000000000000000000000000..449ce081d3a63639687893003a564fc3f4d52173 --- /dev/null +++ b/client-distributed/src/pools.rs @@ -0,0 +1,84 @@ +use std::collections::HashMap; + +use anyhow::{anyhow, Result}; + +use crypto_utils::chacha::{rand::seq::IndexedRandom, ChaCha20Rng}; +use serde::{Deserialize, Serialize}; +use url::Url; + +#[derive(Serialize, Deserialize, Debug)] +pub struct MetaData { + pub index: i32, + pub header: Vec<String>, + pub row_length: i32, +} + +pub async fn search_availability(server_list: Vec<Url>) -> Result<HashMap<Url, Vec<String>>> { + let mut futures = Vec::new(); + for server_url in server_list { + futures.push(async move { + let client = reqwest::Client::new(); + let mut path = server_url.clone(); + path.set_path("/btrees"); + let res = client + .get(path.to_owned()) + .send() + .await? + .json::<Vec<String>>() + .await?; + Ok::<(Url, Vec<String>), reqwest::Error>((server_url, res)) + }) + } + Ok(futures::future::join_all(futures) + .await + .into_iter() + .collect::<Result<Vec<(Url, Vec<String>)>, reqwest::Error>>()? + .into_iter() + .collect()) +} +pub async fn post_metadata(server_list: Vec<Url>, content: String) -> Result<()> { + let mut futures = Vec::new(); + for server_url in server_list { + let content = content.clone(); + futures.push(async move { + let client = reqwest::Client::new(); + let mut path = server_url.clone(); + path.set_path("/metadata"); + println!("senidng {:?}", content); + client.post(path).body(content.clone()).send().await?; + Ok::<(), reqwest::Error>(()) + }) + } + futures::future::join_all(futures).await; + Ok(()) +} +pub async fn pool_get_metadata(server_list: Vec<Url>, rng: &mut ChaCha20Rng) -> Result<String> { + let mut metadata = server_list + .choose(rng) + .ok_or(anyhow!("no available servers"))? + .to_owned(); + metadata.set_path("/metadata"); + Ok(reqwest::Client::new() + .get(metadata) + .send() + .await? + .text() + .await?) +} + +pub async fn first_available_sda_server( + sda_level: usize, + urls: Vec<Url>, + rng: &mut ChaCha20Rng, +) -> Result<Url> { + // Ok(sda_availability(sda_level, urls.clone()) + // .await? + // .choose(rng) + // .or(urls.choose(&mut ChaCha20Rng::from_os_rng())) + // .ok_or(anyhow!("No sda servers available"))? + // .to_owned()) + let _ = sda_level; + urls.choose(rng) + .ok_or(anyhow!("No sda servers available")) + .map(|x| x.to_owned()) +} diff --git a/client-distributed/src/search.rs b/client-distributed/src/search.rs new file mode 100644 index 0000000000000000000000000000000000000000..0f03806dcbc6b7eebe104619d5ae2c3addc54638 --- /dev/null +++ b/client-distributed/src/search.rs @@ -0,0 +1,164 @@ +use std::collections::HashMap; + +use crate::pools::pool_get_metadata; +use crate::pools::search_availability; +use crate::pools::MetaData; +use anyhow::{anyhow, Result}; + +use crypto_utils::chacha::decrypt_with_action_and_validity; +use crypto_utils::chacha::ChaCha20Rng; +// This is just a current implementation of the outputs +use log::info; +use url::Url; + +use crypto_utils::chacha::decrypt; +use crypto_utils::chacha::rand; + +use crypto_utils::hash::prf; +use crypto_utils::keys::load_from_file; +use rand::seq::SliceRandom; + +use crate::brc::brc_tree; +use crate::brc::brc_tree::Subtree; + +/// Search function +pub async fn search( + min: isize, + max: isize, + address: Vec<Url>, + urf: bool, + output_path: String, + rng: &mut ChaCha20Rng, +) -> Result<()> { + // info!("Searching for range between {} and {}", min, max); + let enc_key = load_from_file("enc.key")?; + let prf_key = load_from_file("prf.key")?; + // let burl = Url::parse(&address)?; + let metadata: MetaData = get_metadata(address.clone(), &enc_key, rng).await?; + let btrees = get_btrees(address, &enc_key).await?; + let mut existing_covers: HashMap<Url, Vec<(isize, isize)>> = HashMap::new(); + for (url, v_subtrees) in btrees { + for subtree in v_subtrees { + let mut covers = subtree.covers(min, max); + if !covers.is_empty() { + if urf { + covers.sort_by_key(|a| a.level()); + covers = brc_tree::brc_to_urc(covers); + } + existing_covers + .entry(url.clone()) + .or_default() + .extend(covers.iter().map(|x| x.to_owned().range())); + } + } + } + for covers in existing_covers.values_mut() { + covers.sort(); + covers.dedup(); + } + // println!("{:?}", existing_covers.values()); + let mut queries: Vec<(Url, String)> = Vec::new(); + for (urls, covers) in existing_covers { + for cover in covers { + println!("{}: {:?}", urls, cover); + queries.push((urls.clone(), prf(&prf_key, format!("{:?}", cover))?)); + } + } + queries.shuffle(rng); + let mut futures = Vec::new(); + // different servers get different keys + for (urls, key) in queries { + let mut search_url = urls.clone(); + search_url.set_path(&format!("key/{}", key.clone())); + futures.push(tokio::spawn(async move { + let client = reqwest::Client::new(); + // info!("Searching for {}", key); + let resp = client + .get(search_url) + .send() + .await? + .json::<Vec<String>>() + .await?; + Ok::<Vec<String>, reqwest::Error>(resp) + })); + } + let joined_futures = futures::future::join_all(futures).await; + + let mut fnl: Vec<(isize, String, bool, u64)> = Vec::new(); + for ciphertext_v in joined_futures { + for ciphertext in ciphertext_v?? { + let decrypted = decrypt_with_action_and_validity(&enc_key, ciphertext)?; + if decrypted.2 != 0 { + let split: Vec<String> = decrypted + .0 + .split(",") + .map(|x| x.to_string()) + .collect::<Vec<String>>(); + let parsed = split + .get(metadata.index as usize) + .ok_or(anyhow!("Index out of range"))? + .parse::<isize>()?; + fnl.push((parsed, decrypted.0, decrypted.1, decrypted.2)); + } else { + info!("Ignoring {:?}", decrypted); + } + } + } + fnl.sort_by(|a, b| a.3.cmp(&b.3)); + fnl.dedup(); + // remove elements that are at some point removed + let mut hm: HashMap<isize, Vec<String>> = HashMap::new(); + for (key, value, add, _) in fnl { + if add { + hm.entry(key).or_default().push(value); + } else { + info!("Removing {:?}", key); + hm.remove(&key); + } + } + for value in hm.values_mut() { + value.sort(); + value.dedup(); + } + // write to csv file + let write_path = std::fs::OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(output_path)?; + let mut wtr = csv::Writer::from_writer(write_path); + wtr.write_record(&metadata.header)?; + let mut hm_keys: Vec<isize> = hm.keys().clone().map(|x| x.to_owned()).collect(); + hm_keys.sort(); + for key in hm_keys { + for value in hm.get(&key).unwrap() { + wtr.write_record(value.split(","))?; + } + } + Ok(()) +} + +async fn get_btrees(urls: Vec<Url>, key: &[u8]) -> Result<HashMap<Url, Vec<Subtree>>> { + let btree_map = search_availability(urls).await?; + let mut return_map = HashMap::new(); + for (url, vec) in btree_map { + let mut btrees = Vec::new(); + for value in vec.iter() { + let decrypted = decrypt(key, value.to_string())?; + let subtree = brc_tree::parse_from_string(decrypted)?; + btrees.push(subtree); + } + return_map.insert(url, btrees); + } + Ok(return_map) +} + +pub async fn get_metadata( + base_url: Vec<Url>, + key: &[u8], + rng: &mut ChaCha20Rng, +) -> Result<MetaData> { + let encrypted = pool_get_metadata(base_url, rng).await?; + let decrypted = decrypt(key, encrypted)?; + Ok(serde_json::from_str(&decrypted)?) +} diff --git a/client-with-insert/Cargo.toml b/client-with-insert/Cargo.toml new file mode 100644 index 0000000000000000000000000000000000000000..d59f72505f38bd96b18153f1e944f2b00a277634 --- /dev/null +++ b/client-with-insert/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "client-with-insert" +version = "0.1.0" +edition = "2021" + +[dependencies] +clap = { version = "4.5.30", features = ["derive"] } +env_logger = "0.11.6" +log = "0.4.25" +crypto-utils = { path = "../crypto-utils" } +anyhow = "1.0.95" +csv = "1.3.1" +indexmap = "2.7.1" +reqwest = { version = "0.12.12", features = ["json", "blocking"] } +comfy-table = "7.1.4" +url = "2.5.4" +json-writer = "0.4.0" +serde = { version = "1.0.219", features = ["derive"] } +serde_json = "1.0.140" +regex = "1.11.1" +itertools = "0.14.0" diff --git a/client-with-insert/src/brc.rs b/client-with-insert/src/brc.rs new file mode 100644 index 0000000000000000000000000000000000000000..bf77891338f6c237f83e4a867cda61d29994fe4f --- /dev/null +++ b/client-with-insert/src/brc.rs @@ -0,0 +1,338 @@ +pub mod brc_tree { + #![allow(dead_code)] + use std::{collections::VecDeque, io::Write, path::Path, vec}; + + use crypto_utils::chacha::{ + rand::{seq::SliceRandom, SeedableRng}, + ChaCha20Rng, + }; + use indexmap::IndexMap; + + #[derive(Debug, Clone)] + pub struct Node { + min: isize, + max: isize, + contents: Vec<(String, bool)>, + length: isize, + left: Subtree, + right: Subtree, + } + + #[derive(Debug, Clone)] + pub struct Subtree(Option<Box<Node>>); + + pub fn from_sorted_list(hm: IndexMap<isize, Vec<(String, bool)>>) -> Subtree { + let len = hm.len(); + if len == 0 { + return Subtree(None); + } else if len == 1 { + let contents = hm.get_index(0).unwrap().1.to_vec(); + let length = contents.len() as isize; + return Subtree(Some(Box::new(Node { + min: *hm.get_index(0).unwrap().0, + max: *hm.get_index(0).unwrap().0, + contents, + length, + left: Subtree(None), + right: Subtree(None), + }))); + } + let mid = len / 2; + let left = from_sorted_list(hm.clone().drain(..mid).collect()); + let right = from_sorted_list(hm.clone().drain(mid..).collect()); + let min = match &left.0 { + Some(node) => node.min, + None => match &right.0 { + Some(node) => node.min, + None => *hm.get_index(0).unwrap().0, + }, + }; + let max = match &right.0 { + Some(node) => node.max, + None => match &left.0 { + Some(node) => node.max, + None => *hm.get_index(len - 1).unwrap().0, + }, + }; + let length = match &left { + Subtree(Some(node)) => node.length, + Subtree(None) => 0, + } + match &right { + Subtree(Some(node)) => node.length, + Subtree(None) => 0, + }; + Subtree(Some(Box::new(Node { + min, + max, + length, + contents: vec![], + left, + right, + }))) + } + #[test] + fn test_fsl() { + let test_data = indexmap::indexmap! { + 1 => vec![("a".to_string(), true)], + 2 => vec![("b".to_string(), true)], + 3 => vec![("c".to_string(), true)], + 4 => vec![("d".to_string(), true)], + }; + let tree = from_sorted_list(test_data); + println!("{:?}", tree); + println!("{:?}", tree.to_hm()); + } + + impl Subtree { + pub fn get_all_child_values(&self) -> Vec<(String, bool)> { + match &self.0 { + Some(node) => { + let mut left = node.left.get_all_child_values(); + let right = node.right.get_all_child_values(); + left.extend(right); + left.extend(node.contents.clone()); + left + } + None => vec![], + } + } + pub fn to_hm(&self) -> IndexMap<(isize, isize), Vec<(String, bool)>> { + let mut hm: IndexMap<(isize, isize), Vec<(String, bool)>> = IndexMap::new(); + if let Some(node) = &self.0 { + let left = node.left.to_hm(); + let right = node.right.to_hm(); + for (key, value) in left.iter() { + hm.insert(*key, value.clone()); + } + for (key, value) in right.iter() { + hm.insert(*key, value.clone()); + } + let value = self.get_all_child_values(); + hm.insert(self.range(), value); + } + hm + } + + pub fn write_string(&self) -> String { + match &self.0 { + Some(node) => format!( + "{},{}[{}]({})({});", + node.min, + node.max, + node.length, + node.left.write_string(), + node.right.write_string() + ), + None => "".to_string(), + } + } + pub fn write_structure(&self, mut file: &std::fs::File) -> anyhow::Result<()> { + if self.0.is_some() { + file.write_all(self.write_string().as_bytes())? + } + Ok(()) + } + + pub fn covers(&self, min: isize, max: isize) -> Vec<Subtree> { + // Returns the set of nodes that covers the entire range + match &self.0 { + Some(node) => { + if node.min > max || node.max < min { + return vec![]; + } + if min <= node.min && max >= node.max { + return vec![self.clone()]; + } + let mut left = node.left.covers(min, max); + let right = node.right.covers(min, max); + left.extend(right); + left.shuffle(&mut ChaCha20Rng::from_os_rng()); + left + } + None => vec![], + } + } + + pub fn range(&self) -> (isize, isize) { + match &self.0 { + Some(node) => (node.min, node.max), + None => (0, 0), + } + } + + pub fn range_level(&self) -> (isize, isize, isize) { + match &self.0 { + Some(node) => (node.min, node.max, self.level()), + None => (0, 0, 0), + } + } + + pub fn level(&self) -> isize { + match &self.0 { + Some(node) => { + if node.max == node.min { + return 0; + } + let left = node.left.level(); + let right = node.right.level(); + 1 + left.max(right) + } + None => -1, + } + } + } + #[test] + fn test_possible_covers() { + let tree = parse_from_file(std::path::Path::new("../brc_table.partitions")).unwrap(); + let mut covers = tree.covers(0, 111000); + println!("{:?}", tree); + println!("{:?}", covers.iter().map(|x| x.range()).collect::<Vec<_>>()); + covers.sort_by_key(|a| a.level()); + println!( + "{:?}", + brc_to_urc(covers) + .iter() + .map(|x| x.range()) + .collect::<Vec<_>>() + ); + } + pub fn parse_from_file(file_name: &Path) -> anyhow::Result<Subtree> { + let contents = std::fs::read_to_string(file_name)?; + parse_from_string(contents) + } + + impl PartialEq for Subtree { + fn eq(&self, other: &Self) -> bool { + self.range() == other.range() + } + } + + pub fn brc_to_urc(vec: Vec<Subtree>) -> Vec<Subtree> { + // assume vec is sorted by level + let mut deq = VecDeque::from(vec); + let max_level = deq[deq.len() - 1].level(); + let mut urc: Vec<Subtree> = vec![]; + let mut seen = VecDeque::from_iter(0..=max_level + 1); + while !deq.is_empty() { + let node = deq.pop_front().unwrap().to_owned(); + if node.0.is_none() { + continue; + } + match node.level() { + i if i < seen[0] => { + if !urc.contains(&node) { + urc.push(node); + } + } + i if i == seen[0] => { + seen.pop_front(); + if !urc.contains(&node) { + urc.push(node); + } + } + _ => { + deq.push_front(node.clone().0.unwrap().left); + deq.push_front(node.0.unwrap().right); + } + } + } + urc + } + + pub fn parse_from_string(s: String) -> anyhow::Result<Subtree> { + let s = s.trim(); + if s.is_empty() { + return Ok(Subtree(None)); + } + let (min_max, rest_before_length) = s.split_at(s.find('[').unwrap()); + let (min, max) = { + let mut parts = min_max.split(','); + ( + parts.next().unwrap().parse::<isize>()?, + parts.next().unwrap().parse::<isize>()?, + ) + }; + let (length_str, rest) = rest_before_length.split_at(rest_before_length.find(']').unwrap()); + let length = length_str[1..].parse::<isize>()?; + + // Find the closing parenthesis for the left subtree + let mut balance = 1; // Start at 1 because we're already inside the first parenthesis + let mut left_end = 0; + for (i, c) in rest[2..].chars().enumerate() { + match c { + '(' => balance += 1, + ')' => balance -= 1, + _ => {} + } + if balance == 0 { + left_end = i + 2; // +2 to account for the substring offset + break; + } + } + + // The right subtree starts after the )( separator + let right_start = left_end + 2; // +2 to skip the ")(" + + let left = parse_from_string(rest[2..left_end].to_string())?; + let right = parse_from_string(rest[right_start..rest.len() - 2].to_string())?; + + Ok(Subtree(Some(Box::new(Node { + min, + max, + length, + contents: vec![], + left, + right, + })))) + } + + #[test] + fn test_from_hm() { + let hm = indexmap::indexmap! { + 1 => vec![("a".to_string(), true)], + 2 => vec![("a".to_string(), true)], + 3 => vec![("a".to_string(), true)], + 4 => vec![("a".to_string(), true)], + 5 => vec![("a".to_string(), true)] + }; + let tree = from_sorted_list(hm); + println!("{:?}", tree.to_hm()); + println!("{:?}", tree.covers(2, 5)); + } + + #[test] + fn test_brc_to_urc() { + let tree = from_sorted_list(indexmap::indexmap! { + 0 => vec![("a".to_string(), true)], + 1 => vec![("a".to_string(), true)], + 2 => vec![("a".to_string(), true)], + 3 => vec![("a".to_string(), true)], + 4 => vec![("a".to_string(), true)], + 5 => vec![("a".to_string(), true)], + 6 => vec![("a".to_string(), true)], + 7 => vec![("a".to_string(), true)], + }); + let urc = brc_to_urc(tree.covers(2, 7)); + println!("{:?}", urc.iter().map(|x| x.range()).collect::<Vec<_>>()); + } + + impl TryInto<Subtree> for String { + type Error = anyhow::Error; + fn try_into(self) -> Result<Subtree, Self::Error> { + parse_from_string(self) + } + } + + #[test] + fn test_parse() { + let tree = parse_from_file("../brc_table.partitions".as_ref()).unwrap(); + println!("{:?}", tree); + } + + impl std::fmt::Display for Subtree { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + fmt.write_str(&self.write_string())?; + Ok(()) + } + } +} diff --git a/client-with-insert/src/commandline.rs b/client-with-insert/src/commandline.rs new file mode 100644 index 0000000000000000000000000000000000000000..5aea0d8e0b8a4fd1bca116b45064d8b667f3a7a9 --- /dev/null +++ b/client-with-insert/src/commandline.rs @@ -0,0 +1,55 @@ +use clap::{Parser, Subcommand}; + +#[derive(Parser)] +#[command(version, about, long_about=None)] +pub struct Cli { + /// Verbose level (Trace / Info) + #[clap(global = true)] + #[arg(short, long, default_value_t = false)] + pub verbose: bool, + + /// Subcommands + #[command(subcommand)] + pub command: Commands, + + #[clap(global = true)] + #[arg(short, long, default_value = "http://localhost:8080/")] + pub address: String, +} + +#[derive(Subcommand)] +pub enum Commands { + /// Initializes the remote server with file + Init { + /// File path to be inputted + filename_option: String, + /// set searchable column to key + #[clap(long = "key", short = 'k')] + key_option: Option<String>, + /// set searchable index + #[clap(long = "index", short = 'i')] + index_option: Option<usize>, + }, + /// Searches remote server with optional string + Search { + /// Minimum value in range (inclusive) + min: isize, + /// Maximum in range (inclusive) + max: isize, + /// Optional urf option + #[clap(long = "urf", short = 'u', default_value_t = false)] + urf: bool, + + #[arg(short, long, default_value = "out.csv")] + output_path: String, + }, + /// Adds to remote key-value store + Insert { + /// Value to add, must be in a csv row format that fits the given scheme + value: String, + }, + Delete { + /// Key to delete + key: String, + }, +} diff --git a/client-with-insert/src/initialize.rs b/client-with-insert/src/initialize.rs new file mode 100644 index 0000000000000000000000000000000000000000..3ae56b45417b4508b91e67e2c54ad6f0dfcb842a --- /dev/null +++ b/client-with-insert/src/initialize.rs @@ -0,0 +1,179 @@ +use std::collections::HashMap; + +use anyhow::{anyhow, Result}; +use crypto_utils::chacha::rand::{Rng, SeedableRng}; +use indexmap::IndexMap; +use log::{debug, info, warn}; + +use crypto_utils::chacha::{ + encrypt, encrypt_with_action_and_validity, generate_nonce_string, ChaCha20Rng, +}; +use crypto_utils::hash::{hash_with_counter_and_nonce, prf}; +use crypto_utils::keys::load_from_file; +use serde::{Deserialize, Serialize}; + +use crate::brc::brc_tree; + +pub fn generate_one( + hm: &mut IndexMap<isize, Vec<(String, bool)>>, + rng: &mut ChaCha20Rng, +) -> Result<()> { + let mut min: i64 = ((hm.keys().min().unwrap_or(&0).to_owned()) * 95 / 100).try_into()?; + let mut max: i64 = (hm.keys().max().unwrap_or(&100).to_owned() * 105 / 100).try_into()?; + let row_length = hm + .values() + .next() + .unwrap_or(&vec![("a".repeat(100), false)])[0] + .0 + .len(); + let min_row_length = (row_length * 3) / 10; + let max_row_length = (row_length * 20) / 10; + let mut key; + loop { + info!("{},{}", min, max); + key = rng.random_range(min..max) as isize; + if !hm.contains_key(&key) { + break; + } else { + min -= 1; + max += 1; + } + } + let mut vec: Vec<(String, bool)> = Vec::new(); + let repeat = "a".repeat(rng.random_range(min_row_length..max_row_length)); + vec.push((repeat, false)); + hm.insert(key, vec); + Ok(()) +} + +#[derive(Serialize, Deserialize)] +struct Init { + level: usize, + db: HashMap<String, String>, + metadata: String, + nonce: String, + btree: String, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct MetaData { + pub index: i32, + pub header: Vec<String>, + pub row_length: i32, +} + +pub fn init( + filename: String, + key_option: Option<String>, + index_option: Option<usize>, + database_address: String, +) -> Result<()> { + if key_option.is_none() && index_option.is_none() { + info!("Key not provided, will use the first avaliable") + } + if key_option.is_some() && index_option.is_some() { + warn!("Both key and index provided!"); + return Err(anyhow!("Both key and index provided")); + } + debug!("Given file {}, searching whether it exists", filename); + let filepath = std::path::Path::new(&filename); + // verify that file exists + if !&filepath.exists() { + warn!("{:?} cannot be found", filepath); + return Err(anyhow!("Provided file cannot be found {:?}", filepath)); + } + debug!("File exists"); + let mut hm: IndexMap<isize, Vec<(String, bool)>> = IndexMap::new(); + let mut reader = csv::Reader::from_path(filepath)?; + let enc_key = load_from_file("enc.key")?; + let prf_key = load_from_file("prf.key")?; + let index = match key_option { + Some(s) => { + let headers = reader.headers()?; + let mut ret: usize = usize::MAX; + for (index, word) in headers.iter().enumerate() { + if word.eq_ignore_ascii_case(&s) { + ret = index; + break; + } + } + if ret == usize::MAX { + warn!("{} cannot be found in headers", s); + return Err(anyhow!("{} not found in headers", s)); + } + ret + } + None => match index_option { + Some(i) => { + let headers = reader.headers()?; + if i > headers.len() { + warn!( + "Header has length {}, which is less than {}", + headers.len(), + i + ); + return Err(anyhow!("Index out of bounds")); + } + i + } + None => 0, + }, + }; + let meta: MetaData = MetaData { + index: index as i32, + header: reader.headers()?.iter().map(|x| x.to_string()).collect(), + row_length: reader.headers()?.len() as i32, + }; + let meta_string = encrypt(&enc_key, serde_json::to_string(&meta)?)?; + for record in reader.records() { + let record = record?; + let key = record.get(index).unwrap().parse(); + let record_vec = record.iter().collect::<Vec<&str>>().join(","); + + if key.is_err() { + warn!("Could not parse key {}", record.get(index).unwrap()); + return Err(anyhow!("Could not parse key")); + } + hm.entry(key.unwrap()).or_default().push((record_vec, true)); + } + // populate hashmap until it is a power of 2 + let level = (hm.len() as f64).log2().ceil() as usize; + let diff = (2_isize.pow(level as u32) as usize - hm.len()) as isize; + debug!( + "{} distinct keys seen, we will insert {} keys", + hm.len(), + diff + ); + let mut rng = ChaCha20Rng::from_os_rng(); + for _ in 0..diff { + generate_one(&mut hm, &mut rng)?; + } + hm.sort_keys(); + debug!("{:?}", hm); + info!("{:?}", hm.len()); + let subtree = brc_tree::from_sorted_list(hm); + let nonce = generate_nonce_string(); + let mut json: HashMap<String, String> = HashMap::new(); + for (key, value) in subtree.to_hm().iter() { + let prfd_key = prf(&prf_key, format!("{:?}", key))?; + for (counter, record) in value.iter().enumerate() { + let (pt, va) = record; + let encrypted = + encrypt_with_action_and_validity(&enc_key, pt.to_owned(), true, va.to_owned())?; + let hashed = hash_with_counter_and_nonce(prfd_key.clone(), counter, &nonce)?; + json.insert(hashed, encrypted); + } + } + let init_json = Init { + level, + db: json, + nonce, + metadata: meta_string, + btree: encrypt(&enc_key, subtree.write_string())?, + }; + let client = reqwest::blocking::Client::builder() + .timeout(std::time::Duration::from_secs(60 * 30)) + .build()?; + client.post(database_address).json(&init_json).send()?; + Ok(()) +} diff --git a/client-with-insert/src/insert.rs b/client-with-insert/src/insert.rs new file mode 100644 index 0000000000000000000000000000000000000000..d1eaf69c43dc96c2b6daac87884d6ed51128c56c --- /dev/null +++ b/client-with-insert/src/insert.rs @@ -0,0 +1,227 @@ +use std::collections::HashMap; +use std::iter; + +use crate::brc::brc_tree; +use crate::initialize::{generate_one, MetaData}; +use crate::search::get_metadata; +use anyhow::{anyhow, Result}; +use crypto_utils::chacha::rand::SeedableRng; +use crypto_utils::chacha::{ + decrypt_with_action_and_validity, encrypt, encrypt_with_action_and_time, + encrypt_with_action_and_validity, generate_nonce_string, ChaCha20Rng, +}; +use crypto_utils::hash::{hash_with_counter_and_nonce, prf}; +use crypto_utils::keys::load_from_file; +use indexmap::{indexmap, IndexMap}; +use itertools::Itertools; +use log::{debug, info}; +use serde::{Deserialize, Serialize}; +use url::Url; + +#[derive(Serialize, Deserialize, Debug)] +pub struct InsertResponse { + sda: bool, + values: Vec<String>, + sda_level: usize, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct InsertRequest { + nonce: String, + db: HashMap<String, String>, + btree: String, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct SdaRequest { + level: usize, + nonce: String, + db: HashMap<String, String>, + btree: String, +} +pub fn insert(value: String, address: String) -> Result<()> { + let base_url: Url = Url::parse(&address)?; + let enc_key = load_from_file("enc.key")?; + let prf_key = load_from_file("prf.key")?; + let metadata = get_metadata(base_url.clone(), &enc_key)?; + let index_vec = value.split(",").collect::<Vec<&str>>(); + let key = index_vec + .get(metadata.index as usize) + .ok_or(anyhow!("Index out of bounds"))? + .to_owned() + .to_owned() + .parse()?; + let btree = brc_tree::from_sorted_list(indexmap! {key => vec![(value.clone(), true)]}); + info!("Inserting into {} with values {}", key, value); + let encrypted_btree = encrypt(&enc_key, btree.to_string())?; + let mut item_map: HashMap<String, String> = HashMap::new(); + let nonce = generate_nonce_string(); + item_map.insert( + hash_with_counter_and_nonce(prf(&prf_key, format!("{:?}", (key, key)))?, 0, &nonce)?, + encrypt_with_action_and_validity(&enc_key, value, true, true)?, + ); + let insert_request_json = InsertRequest { + nonce, + db: item_map, + btree: encrypted_btree, + }; + let resp = reqwest::blocking::Client::new() + .put(base_url.join("key")?) + .json(&insert_request_json) + .send() + .unwrap() + .json::<InsertResponse>()?; + debug!("Got response {:?}", resp); + sda(resp, metadata, address) +} + +pub fn sda(response: InsertResponse, metadata: MetaData, address: String) -> Result<()> { + if !response.sda { + return Ok(()); + } + info!( + "Starting SDA Level {} with {} elements", + response.sda_level, + response.values.len() + ); + let enc_key = load_from_file("enc.key")?; + let prf_key = load_from_file("prf.key")?; + let value_parsed: Result<Vec<(String, bool, u64)>, _> = response + .values + .iter() + .map(|x| decrypt_with_action_and_validity(&enc_key, x.to_owned())) + .collect(); + info!("Values parsed"); + let mut values = value_parsed? + .iter() + .filter(|x| x.2 != 0) + .map(|x| x.to_owned()) + .unique() + .collect::<Vec<_>>(); + info!("Values Length: {}", values.len()); + values.sort_by_key(|x| x.2); + info!("Sorted"); + let mut hm: IndexMap<isize, Vec<(String, bool)>> = IndexMap::new(); + let mut actions: HashMap<String, (bool, u64)> = HashMap::new(); + for (value, add, timer) in values { + // filter out nonvalid items + if timer == 0 { + continue; + } + let split: Vec<String> = value.split(",").map(|x| x.to_string()).collect(); + let parsed = split + .get(metadata.index as usize) + .ok_or(anyhow!("Index out of range"))? + .parse::<isize>()?; + hm.entry(parsed).or_default().push((value.clone(), true)); + if !add { + info!("add is false with value {}", value); + } + actions.insert(value, (add, timer)); + } + info!( + "Total elements: {}", + hm.values().map(|x| x.len()).sum::<usize>() + ); + let target_amount = 2_isize.pow(response.sda_level as u32) as usize; + let diff = target_amount - hm.len(); + info!( + "Generating {} dummy records (target: {}, curr: {})", + diff, + target_amount, + hm.len() + ); + let mut rng = ChaCha20Rng::from_os_rng(); + for i in 0..diff { + generate_one(&mut hm, &mut rng)?; + info!("{}", i); + } + hm.sort_keys(); + let subtree = brc_tree::from_sorted_list(hm); + let mut json: HashMap<String, String> = HashMap::new(); + let mut real = 0; + let mut fake = 0; + let nonce = generate_nonce_string(); + for (key, value) in subtree.to_hm().iter_mut() { + for (counter, record) in value.iter().enumerate() { + let (pt, va) = record; + + let encrypted = match *va { + true => { + real += 1; + let (action, timer) = actions + .get(pt) + .ok_or(anyhow!("Action should have been registered prior"))?; + encrypt_with_action_and_time(&enc_key, pt.to_owned(), *action, *timer)? + } + false => { + fake += 1; + encrypt_with_action_and_validity(&enc_key, pt.to_owned(), true, false)? + } + }; + let prfd_key = prf(&prf_key, format!("{:?}", key))?; + let hashed = hash_with_counter_and_nonce(prfd_key, counter, &nonce)?; + json.insert(hashed, encrypted); + } + } + info!("Real {} vs Fake {}", real, fake); + let insert = SdaRequest { + level: response.sda_level, + nonce, + db: json, + btree: encrypt(&enc_key, subtree.to_string())?, + }; + let client = reqwest::blocking::Client::builder() + .timeout(std::time::Duration::from_secs(30)) + .build()?; + let resp = client + .put(Url::parse(&address)?.join("sda")?) + .json(&insert) + .send() + .unwrap() + .json::<InsertResponse>()?; + info!("Writing to sda level {}", response.sda_level); + if resp.sda { + sda(resp, metadata, address) + } else { + Ok(()) + } +} + +pub fn delete(key: String, address: String) -> Result<()> { + let base_url: Url = Url::parse(&address)?; + let enc_key = load_from_file("enc.key")?; + let prf_key = load_from_file("prf.key")?; + let metadata = get_metadata(base_url.clone(), &enc_key)?; + let key_int: isize = key.parse()?; + let prfd_key = prf(&prf_key, format!("{:?}", (key_int, key_int)))?; + let mut plaintext_iter = iter::repeat("a") + .take(metadata.row_length as usize) + .collect::<Vec<&str>>(); + plaintext_iter[metadata.index as usize] = &key; + let plaintext = plaintext_iter.join(","); + let stree = brc_tree::from_sorted_list(indexmap! { + key.parse::<isize>()? => vec![(plaintext.clone(), true)] + }); + info!("stree {}", stree); + let btree = encrypt(&enc_key, stree.to_string())?; + let mut db: HashMap<String, String> = HashMap::new(); + let nonce = generate_nonce_string(); + db.insert( + hash_with_counter_and_nonce(prfd_key, 0, &nonce)?, + encrypt_with_action_and_validity(&enc_key, plaintext, false, true)?, + ); + let delete_request = InsertRequest { nonce, db, btree }; + info!( + "Deleting key {}, with request data {:?}", + key, delete_request + ); + let resp = reqwest::blocking::Client::new() + .put(base_url.join("key")?) + .json(&delete_request) + .send() + .unwrap() + .json::<InsertResponse>()?; + debug!("Got response {:?}", resp); + sda(resp, metadata, address) +} diff --git a/client-with-insert/src/main.rs b/client-with-insert/src/main.rs new file mode 100644 index 0000000000000000000000000000000000000000..7b6a0156e05fb03c5070898704cf8924bb3ee12a --- /dev/null +++ b/client-with-insert/src/main.rs @@ -0,0 +1,61 @@ +mod brc; +mod commandline; +mod initialize; +mod insert; +mod search; + +use anyhow::Result; +use commandline::{Cli, Commands}; +use env_logger::Builder; +use initialize::init; +use insert::{delete, insert}; +use log::{debug, LevelFilter}; + +use search::search; + +fn main() -> Result<()> { + let cli = <Cli as clap::Parser>::parse(); + + // initial logging + let max_log_level = match &cli.verbose { + true => LevelFilter::Trace, + false => LevelFilter::Info, + }; + Builder::new().filter(None, max_log_level).init(); + debug!("Parsing subcommand"); + match &cli.command { + Commands::Init { + filename_option, + key_option, + index_option, + } => { + init( + filename_option.clone(), + key_option.clone(), + *index_option, + cli.address.clone(), + )?; + } + Commands::Search { + min, + max, + urf, + output_path, + } => { + search( + *min, + *max, + cli.address.clone(), + *urf, + output_path.to_owned(), + )?; + } + Commands::Insert { value } => { + insert(value.to_owned(), cli.address.clone())?; + } + Commands::Delete { key } => { + delete(key.to_owned(), cli.address.clone())?; + } + } + Ok(()) +} diff --git a/client-with-insert/src/search.rs b/client-with-insert/src/search.rs new file mode 100644 index 0000000000000000000000000000000000000000..0933cb1d793e70783d1cf2cb5f285096e8ebaa4b --- /dev/null +++ b/client-with-insert/src/search.rs @@ -0,0 +1,156 @@ +use std::collections::HashMap; +use std::io::prelude::*; + +use crate::initialize::MetaData; +use anyhow::{anyhow, Result}; + +use crypto_utils::chacha::decrypt_with_action_and_validity; +// This is just a current implementation of the outputs +use crypto_utils::chacha::rand::SeedableRng; +use log::info; +use url::Url; + +use crypto_utils::chacha::decrypt; +use crypto_utils::chacha::rand; +use crypto_utils::chacha::ChaCha20Rng; + +use crypto_utils::hash::prf; +use crypto_utils::keys::load_from_file; +use rand::seq::SliceRandom; + +use crate::brc::brc_tree; +use crate::brc::brc_tree::Subtree; + +/// Search function +pub fn search( + min: isize, + max: isize, + address: String, + urf: bool, + output_path: String, +) -> Result<()> { + info!("Searching for range between {} and {}", min, max); + let enc_key = load_from_file("enc.key")?; + let prf_key = load_from_file("prf.key")?; + let mut burl = Url::parse(&address)?; + let metadata = get_metadata(burl.clone(), &enc_key)?; + let btrees = get_btrees(burl.clone(), &enc_key)?; + let mut covers = Vec::new(); + for subtree in btrees { + // load covers from brc tree + let mut c = subtree.covers(min, max); + if c.is_empty() { + // ignore cases where it is out of bounds + continue; + } + if urf { + c.sort_by_key(|a| a.level()); + c = brc_tree::brc_to_urc(c); + } + covers.extend(c); + } + info!( + "Best range covers with height: {:?}", + covers.iter().map(|x| x.range_level()).collect::<Vec<_>>() + ); + let search_keys: Result<Vec<String>> = covers + .iter() + .map(|x| prf(&prf_key, format!("{:?}", x.range()))) + .collect(); + let mut keys = search_keys?; + // shuffle key order + keys.shuffle(&mut ChaCha20Rng::from_os_rng()); + let mut fnl: Vec<(isize, String, bool, u64)> = Vec::new(); + for key in keys { + info!("Searching for key {}", key); + burl.set_path(&format!("key/{}", key)); + let resp = reqwest::blocking::get(burl.clone())?.json::<Vec<String>>()?; + for ciphertext in resp.iter() { + let decrypted = decrypt_with_action_and_validity(&enc_key, ciphertext.to_string())?; + if decrypted.2 != 0 { + let split: Vec<String> = decrypted + .0 + .split(",") + .map(|x| x.to_string()) + .collect::<Vec<String>>(); + let parsed = split + .get(metadata.index as usize) + .ok_or(anyhow!("Index out of range"))? + .parse::<isize>()?; + fnl.push((parsed, decrypted.0, decrypted.1, decrypted.2)); + } else { + info!("Ignoring {:?}", decrypted); + } + } + } + // sort edits by timeline + fnl.sort_by(|a, b| a.3.cmp(&b.3)); + fnl.dedup(); + // remove elements that are at some point removed + let mut hm: HashMap<isize, Vec<String>> = HashMap::new(); + for (key, value, add, _) in fnl { + if add { + // info!("Adding {:?} {:?}", key, value); + hm.entry(key).or_default().push(value); + } else { + // info!("Removing {:?}", key); + hm.remove(&key); + } + } + for value in hm.values_mut() { + value.sort(); + value.dedup(); + } + // write to csv file + let write_path = std::fs::OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(output_path)?; + let mut wtr = csv::Writer::from_writer(write_path); + wtr.write_record(&metadata.header)?; + let mut hm_keys: Vec<isize> = hm.keys().clone().map(|x| x.to_owned()).collect(); + hm_keys.sort(); + for key in hm_keys { + for value in hm.get(&key).unwrap() { + wtr.write_record(value.split(","))?; + } + } + Ok(()) +} + +fn get_btrees(mut base_url: Url, key: &[u8]) -> Result<Vec<Subtree>> { + base_url.set_path("/btrees"); + let resp = reqwest::blocking::get(base_url)?.json::<Vec<String>>()?; + let mut btrees = Vec::new(); + for value in resp.iter() { + let decrypted = decrypt(key, value.to_string())?; + let subtree = brc_tree::parse_from_string(decrypted)?; + btrees.push(subtree); + } + Ok(btrees) +} + +#[test] +fn test_get_btrees() { + let base_url = Url::parse("http://localhost:8080").unwrap(); + let enc_key = load_from_file("enc.key").unwrap(); + let btrees = get_btrees(base_url, &enc_key).unwrap(); + println!("{:?}", btrees); +} + +pub fn get_metadata(mut base_url: Url, key: &[u8]) -> Result<MetaData> { + base_url.set_path("/metadata"); + let mut buf = String::new(); + let _ = reqwest::blocking::get(base_url)?.read_to_string(&mut buf)?; + let decrypted = decrypt(key, buf)?; + Ok(serde_json::from_str(&decrypted)?) +} + +#[test] +fn test_get_metadata() { + let base_url = Url::parse("http://localhost:8080").unwrap(); + let enc_key = load_from_file("enc.key").unwrap(); + let meta = get_metadata(base_url, &enc_key).unwrap(); + println!("{:?}", meta); +} diff --git a/crypto-utils/Cargo.toml b/crypto-utils/Cargo.toml index cfb7ecbfd44e2f3654367237534fdac289a27ac2..4a3519b242973c1f287a47896ac27018966ad9d2 100644 --- a/crypto-utils/Cargo.toml +++ b/crypto-utils/Cargo.toml @@ -10,7 +10,9 @@ chacha20poly1305 = "0.10.1" hex = "0.4.3" hmac = "0.12.1" log = "0.4.25" +once_cell = "1.21.0" rand = "0.9.0" rand_chacha = "0.9.0" +regex = "1.11.1" sha2 = "0.10.8" tiger = "0.2.1" diff --git a/crypto-utils/src/lib.rs b/crypto-utils/src/lib.rs index 8aa4e987eda7ebe70bc7681c545725bce84ca9fa..a8f93d6570742bb16fb052632e47a12e2536ddf8 100644 --- a/crypto-utils/src/lib.rs +++ b/crypto-utils/src/lib.rs @@ -1,5 +1,6 @@ pub mod chacha { use core::str; + use std::time::{Duration, SystemTime}; use anyhow::{anyhow, Result}; use chacha20poly1305::{ @@ -7,15 +8,20 @@ pub mod chacha { AeadCore, KeyInit, XChaCha20Poly1305, XNonce, }; + use once_cell::sync::Lazy; pub use rand; pub use rand_chacha::ChaCha20Rng; - use base64::prelude::*; - use log::debug; + pub use base64::prelude::*; + use regex::Regex; + + pub fn generate_nonce_string() -> String { + BASE64_STANDARD.encode(XChaCha20Poly1305::generate_nonce(&mut OsRng)) + } pub fn encrypt(key: &[u8], plaintext: String) -> Result<String> { - let cipher = XChaCha20Poly1305::new_from_slice(key)?; + let cipher = Lazy::new(|| XChaCha20Poly1305::new_from_slice(key).unwrap()); let nonce = XChaCha20Poly1305::generate_nonce(&mut OsRng); let ciphertext = match cipher.encrypt(&nonce, plaintext.as_bytes()) { Ok(v) => Ok(v), @@ -24,11 +30,66 @@ pub mod chacha { Ok(BASE64_STANDARD.encode(nonce) + &BASE64_STANDARD.encode(ciphertext)) } + pub fn encrypt_with_action_and_validity( + key: &[u8], + plaintext: String, + action: bool, + validity: bool, + ) -> Result<String> { + let time_as_seconds = match validity { + true => SystemTime::now().duration_since(SystemTime::UNIX_EPOCH), + false => Ok(Duration::from_secs(0)), + }? + .as_secs(); + let encrypt_string = format!("({}:{}:{})", plaintext, action as i64, time_as_seconds); + encrypt(key, encrypt_string) + } + + pub fn encrypt_with_action_and_time( + key: &[u8], + plaintext: String, + action: bool, + time: u64, + ) -> Result<String> { + let encrypt_string = format!("({}:{}:{})", plaintext, action as i64, time); + encrypt(key, encrypt_string) + } + + pub fn decrypt_with_action_and_validity( + key: &[u8], + ciphertext: String, + ) -> Result<(String, bool, u64)> { + let plaintext_with_action = decrypt(key, ciphertext)?; + static RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"\((.+):(\d):(\d+)\)").unwrap()); + let (plaintext, action, validity) = match RE.captures(&plaintext_with_action) { + Some(caps) => Ok(( + caps.get(1).map_or("", |m| m.as_str()).to_string(), + caps.get(2).map_or("", |m| m.as_str()).parse::<i64>()? != 0, + caps.get(3).map_or("", |m| m.as_str()).parse::<u64>()?, + )), + None => Err(anyhow!("No match found")), + }?; + Ok((plaintext, action, validity)) + } + + #[test] + fn test_encrypt_decrypt_with_action_and_validity() { + let plaintext = "hello".to_string(); + let action = true; + let validity = false; + let key = XChaCha20Poly1305::generate_key(&mut OsRng).to_vec(); + let ciphertext = + encrypt_with_action_and_validity(&key, plaintext.clone(), action, validity).unwrap(); + let (decrypted, decrypted_action, decrypted_validity) = + decrypt_with_action_and_validity(&key, ciphertext).unwrap(); + assert_eq!(plaintext, decrypted, "Plaintexts do not match"); + assert_eq!(action, decrypted_action, "Actions do not match"); + assert_eq!(0, decrypted_validity, "Validities do not match"); + } + pub fn decrypt(key: &[u8], ciphertext: String) -> Result<String> { - let cipher = XChaCha20Poly1305::new_from_slice(key)?; + let cipher = Lazy::new(|| XChaCha20Poly1305::new_from_slice(key).unwrap()); let cipher_bytes = BASE64_STANDARD.decode(ciphertext.clone())?; - let nonce_string = &cipher_bytes[0..24]; - debug!("{}, {:?}", nonce_string.len(), nonce_string); let nonce = XNonce::from_iter(cipher_bytes[0..24].iter().cloned()); let remaining = &cipher_bytes[24..]; let plaintext = match cipher.decrypt(&nonce, remaining) { @@ -62,6 +123,15 @@ pub mod hash { pub fn hash_with_counter(plaintext: String, counter: usize) -> Result<String> { hash(format!("{}{}", plaintext, counter)) } + + pub fn hash_with_counter_and_nonce( + plaintext: String, + counter: usize, + nonce: &String, + ) -> Result<String> { + // let nonce = + hash(format!("{}{}{}", nonce, plaintext, counter)) + } } pub mod keys { use anyhow::Result; diff --git a/server-distributed/Cargo.toml b/server-distributed/Cargo.toml new file mode 100644 index 0000000000000000000000000000000000000000..731bdf600436b8c7e60c4b36f9ef9a181aec4c46 --- /dev/null +++ b/server-distributed/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "server-distributed" +version = "0.1.0" +edition = "2021" + +[dependencies] +axum = "0.8.1" +clap = { version = "4.5.32", features = ["derive"] } +crypto-utils = { path = "../crypto-utils" } +serde = { version = "1.0.219", features = ["derive"] } +serde_json = "1.0.140" +tokio = { version = "1.44.0", features = ["full", "tracing"] } +tower-http = { version = "0.6.2", features = ["trace"] } +tracing = "0.1.41" +tracing-subscriber = "0.3.19" diff --git a/server-distributed/src/cli.rs b/server-distributed/src/cli.rs new file mode 100644 index 0000000000000000000000000000000000000000..30b569a66bad185efb802936cefaab680efb5593 --- /dev/null +++ b/server-distributed/src/cli.rs @@ -0,0 +1,11 @@ +use clap::Parser; + +#[derive(Parser)] +#[command(version, about, long_about=None)] +pub struct Cli { + #[clap(global = true, default_value_t = 8080)] + pub port: u16, + + #[clap(global = true, default_value_t = 0)] + pub index: usize, +} diff --git a/server-distributed/src/main.rs b/server-distributed/src/main.rs new file mode 100644 index 0000000000000000000000000000000000000000..d11abce1bac09e1954ff02cf8e131d154a5e2c95 --- /dev/null +++ b/server-distributed/src/main.rs @@ -0,0 +1,403 @@ +use axum::{ + extract::{self, DefaultBodyLimit, Json, Path}, + http::StatusCode, + routing::{get, post, put}, + Router, +}; +use crypto_utils::{ + chacha::{ + rand::{seq::SliceRandom, SeedableRng}, + ChaCha20Rng, + }, + hash::hash_with_counter_and_nonce, +}; +mod cli; +use cli::Cli; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; +use std::{ + collections::HashMap, + fs::{self, OpenOptions}, + io::Write, +}; +use tower_http::trace::{self, TraceLayer}; +use tracing::{info, Level}; + +#[derive(Serialize, Deserialize)] +struct ED { + db: HashMap<String, String>, +} + +#[derive(Serialize, Deserialize)] +struct Init { + level: usize, + db: HashMap<String, String>, + nonce: String, + metadata: String, + btree: String, +} + +#[derive(Clone)] +pub struct ServerConfig { + pub port: u16, + pub index: usize, +} + +#[tokio::main] +async fn main() { + let cli = <Cli as clap::Parser>::parse(); + + let config = ServerConfig { + port: cli.port, + index: cli.index, + }; + + tracing_subscriber::fmt() + .with_target(false) + .compact() + .init(); + + let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", config.port)) + .await + .unwrap(); + info!("listening on {}", listener.local_addr().unwrap()); + + let app = Router::new() + .route("/", post(init)) + .route("/key/{query}", get(search)) + .route("/key", put(insert)) + .route("/sda", put(sda)) + .route("/sda/{level}", get(get_sda)) + .route("/btrees", get(get_btrees)) + .route("/metadata", post(init_metadata)) + .route("/metadata", get(get_metadata)) + .layer(DefaultBodyLimit::disable()) + .layer( + TraceLayer::new_for_http() + .make_span_with(trace::DefaultMakeSpan::new().level(Level::INFO)) + .on_response(trace::DefaultOnResponse::new().level(Level::INFO)), + ) + .with_state(config); + axum::serve(listener, app).await.unwrap(); +} + +async fn init_metadata( + extract::State(config): extract::State<ServerConfig>, + payload: String, +) -> Result<(), StatusCode> { + let file_name = format!("metadata_{}", config.index); + match fs::write(file_name, payload) { + Ok(_) => Ok(()), + Err(x) => { + println!("{:#?}", x); + Err(StatusCode::INTERNAL_SERVER_ERROR) + } + } +} + +/// Initializes the database from a json payload. +/// +/// Dumps the contents into `sda_{level}.json`. +async fn init( + extract::State(config): extract::State<ServerConfig>, + extract::Json(payload): extract::Json<Init>, +) -> Result<(), StatusCode> { + let file_name = format!("sda_{}_{}.json", config.index, payload.level); + match fs::write( + file_name, + serde_json::from_value::<Value>(json!(payload.db)) + .unwrap() + .to_string(), + ) { + Ok(_) => Ok(()), + Err(x) => { + println!("{:#?}", x); + Err(StatusCode::INTERNAL_SERVER_ERROR) + } + }?; + let file_name = format!("metadata_{}", config.index); + match fs::write(file_name, payload.metadata) { + Ok(_) => Ok(()), + Err(x) => { + println!("{:#?}", x); + Err(StatusCode::INTERNAL_SERVER_ERROR) + } + }?; + let file_name = format!("btree_{}_{}", config.index, payload.level); + match fs::write(file_name, payload.btree) { + Ok(_) => Ok(()), + Err(x) => { + println!("{:#?}", x); + Err(StatusCode::INTERNAL_SERVER_ERROR) + } + }?; + let nonce_name = format!("nonce_{}_{}", config.index, payload.level); + match fs::write(nonce_name, payload.nonce) { + Ok(_) => Ok(()), + Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR), + } +} +async fn get_btrees( + extract::State(config): extract::State<ServerConfig>, +) -> Result<Json<Value>, StatusCode> { + let mut ret_vec: Vec<String> = Vec::new(); + for i in 0..64 { + let file_name = format!("btree_{}_{}", config.index, i); + // check if file exists + if !std::path::Path::new(&file_name).exists() { + continue; + } + let data = match fs::read_to_string(file_name) { + Ok(x) => x, + Err(x) => { + info!("{:#?} on loop {}", x, i); + continue; + } + }; + ret_vec.push(data); + } + // shuffle vector + ret_vec.shuffle(&mut crypto_utils::chacha::ChaCha20Rng::from_os_rng()); + Ok(Json(json!(ret_vec))) +} + +#[derive(Serialize, Deserialize)] +struct Sda { + level: usize, + db: HashMap<String, String>, + nonce: String, + btree: String, +} + +async fn get_sda( + extract::Path(level): extract::Path<usize>, + extract::State(config): extract::State<ServerConfig>, +) -> Result<Json<bool>, StatusCode> { + let file_name = format!("sda_{}_{}.json", config.index, level); + let r = !std::path::Path::new(&file_name).exists(); + Ok(Json(r)) +} +async fn sda( + extract::State(config): extract::State<ServerConfig>, + extract::Json(payload): extract::Json<Sda>, +) -> Result<Json<InsertResults>, StatusCode> { + let sda_path = format!("sda_{}_{}.json", config.index, payload.level); + let btree_path = format!("btree_{}_{}", config.index, payload.level); + let nonce_path = format!("nonce_{}_{}", config.index, payload.level); + let sda_exists = std::path::Path::new(&sda_path).exists(); + // if that bin doesn't already exist, we create a it + if !sda_exists { + // write payload + let mut writer = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(&sda_path) + .unwrap(); + if let Err(x) = writer.write_all(json!(payload.db).to_string().as_bytes()) { + info!("{:#?}", x); + return Err(StatusCode::INTERNAL_SERVER_ERROR); + }; + // write btree + let mut writer = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(btree_path) + .unwrap(); + if let Err(x) = writer.write_all(payload.btree.as_bytes()) { + info!("{:#?}", x); + return Err(StatusCode::INTERNAL_SERVER_ERROR); + }; + let mut writer = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(nonce_path) + .unwrap(); + if let Err(x) = writer.write_all(payload.nonce.as_bytes()) { + info!("{:#?}", x); + return Err(StatusCode::INTERNAL_SERVER_ERROR); + }; + return Ok(Json(InsertResults { + sda: false, + values: vec![], + sda_level: payload.level, + })); + } + // info!("sda_{} exists", payload.level); + // we cluster the bins + let data_string = match fs::read_to_string(&sda_path) { + Ok(x) => x, + Err(x) => { + info!("{:#?}", x); + return Err(StatusCode::INTERNAL_SERVER_ERROR); + } + }; + let mut sda_existing_value = serde_json::from_str::<HashMap<String, String>>(&data_string) + .unwrap() + .values() + .map(|x| x.to_owned()) + .collect::<Vec<String>>(); + // extend with payload + sda_existing_value.extend(payload.db.values().map(|x| x.to_owned())); + sda_existing_value.shuffle(&mut ChaCha20Rng::from_os_rng()); + // remove existing files + fs::remove_file(&sda_path).unwrap(); + fs::remove_file(&btree_path).unwrap(); + fs::remove_file(&nonce_path).unwrap(); + let response = Json(InsertResults { + sda: true, + values: sda_existing_value, + sda_level: payload.level + 1, + }); + // info!( + // "Requesting new sda with requeset {:?} and level {}", + // response, + // payload.level + 1 + // ); + Ok(response) +} +async fn get_metadata( + extract::State(config): extract::State<ServerConfig>, +) -> Result<String, StatusCode> { + let data = match fs::read_to_string(format!("metadata_{}", config.index)) { + Ok(x) => x, + Err(x) => { + info!("{:#?}", x); + return Err(StatusCode::INTERNAL_SERVER_ERROR); + } + }; + // info!("Returning metadata {}", data); + Ok(data) +} +#[derive(Serialize, Deserialize, Debug)] +struct InsertResults { + sda: bool, + values: Vec<String>, + sda_level: usize, +} + +#[derive(Serialize, Deserialize)] +struct InsertRequest { + db: HashMap<String, String>, + btree: String, + nonce: String, +} +async fn insert( + extract::State(config): extract::State<ServerConfig>, + extract::Json(payload): extract::Json<InsertRequest>, +) -> Result<Json<InsertResults>, StatusCode> { + let sda0_path = format!("sda_{}_0.json", config.index); + let btree0_path = format!("btree_{}_0", config.index); + let nonce0_path = format!("nonce_{}_0", config.index); + let sda_0_exists = std::path::Path::new(&sda0_path).exists(); + // need to update sda + if sda_0_exists { + let data_string = fs::read_to_string(&sda0_path).unwrap_or_default(); + let mut sda_0_existing_value = + serde_json::from_str::<HashMap<String, String>>(&data_string) + .unwrap() + .values() + .map(|x| x.to_owned()) + .collect::<Vec<String>>(); + // payload has only one value + sda_0_existing_value.push(payload.db.values().next().unwrap().to_owned()); + sda_0_existing_value.shuffle(&mut ChaCha20Rng::from_os_rng()); + let respond_json = Json(InsertResults { + sda: true, + values: sda_0_existing_value, + sda_level: 1, + }); + //delete file + fs::remove_file(sda0_path).unwrap(); + fs::remove_file(btree0_path).unwrap(); + fs::remove_file(nonce0_path).unwrap(); + return Ok(respond_json); + } + // write database + let mut writer = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(&sda0_path) + .unwrap(); + writer + .write_all(json!(payload.db).to_string().as_bytes()) + .unwrap(); + // write btree + let mut writer = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(btree0_path) + .unwrap(); + writer.write_all(payload.btree.as_bytes()).unwrap(); + let mut writer = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(nonce0_path) + .unwrap(); + writer.write_all(payload.nonce.as_bytes()).unwrap(); + Ok(Json(InsertResults { + sda: false, + values: vec![], + sda_level: 0, + })) +} + +/// Searches for database entries matching the `key` path parameter. +/// +/// Hashes the key with a counter until no more results are found. +/// +/// Returns json in the form: +/// +/// ```json +/// { +/// ctr: "ciphertext" +/// } +/// ``` +async fn search( + Path(key): Path<String>, + extract::State(config): extract::State<ServerConfig>, +) -> Result<Json<Value>, StatusCode> { + // search now looks thorugh all sda bins + // we note that a given key is valid in possibly multiple bins, + let mut ret_vec: Vec<String> = Vec::new(); + for i in 0..64 { + let file_name = format!("sda_{}_{}.json", config.index, i); + // check if file exists + if !std::path::Path::new(&file_name).exists() { + continue; + } + let data = match fs::read_to_string(file_name) { + Ok(x) => x, + Err(x) => { + info!("{:#?} on loop {}", x, i); + continue; + } + }; + let file_name = format!("nonce_{}_{}", config.index, i); + let nonce = match fs::read_to_string(file_name) { + Ok(x) => x, + Err(x) => { + info!("{:#?} on loop {}", x, i); + continue; + } + }; + let db: HashMap<String, String> = serde_json::from_str(&data).unwrap(); + let mut n = 0; + while let Some(ciphertext) = + db.get(&hash_with_counter_and_nonce(key.clone(), n, &nonce).unwrap()) + { + info!("Found {}({}) in sda level {} ", key, n, i); + ret_vec.push(ciphertext.to_owned()); + n += 1; + } + } + if !ret_vec.is_empty() { + return Ok(Json(json!(ret_vec))); + } + Err(StatusCode::NOT_FOUND) +} diff --git a/server-with-insert/Cargo.toml b/server-with-insert/Cargo.toml new file mode 100644 index 0000000000000000000000000000000000000000..849762ed32ad3dec164a6064dbaded74f650073a --- /dev/null +++ b/server-with-insert/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "server-with-insert" +version = "0.1.0" +edition = "2021" + +[dependencies] +axum = "0.8.1" +crypto-utils = { path = "../crypto-utils" } +serde = { version = "1.0.219", features = ["derive"] } +serde_json = "1.0.140" +tokio = { version = "1.44.0", features = ["full", "tracing"] } +tower-http = { version = "0.6.2", features = ["trace"] } +tracing = "0.1.41" +tracing-subscriber = "0.3.19" diff --git a/server-with-insert/src/main.rs b/server-with-insert/src/main.rs new file mode 100644 index 0000000000000000000000000000000000000000..93e41d50ef3afbaea18116c48ac64626a8d42659 --- /dev/null +++ b/server-with-insert/src/main.rs @@ -0,0 +1,344 @@ +use axum::{ + extract::{self, DefaultBodyLimit, Json, Path}, + http::StatusCode, + routing::{get, post, put}, + Router, +}; +use crypto_utils::{ + chacha::{ + rand::{seq::SliceRandom, SeedableRng}, + ChaCha20Rng, + }, + hash::hash_with_counter_and_nonce, +}; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; +use std::{ + collections::HashMap, + fs::{self, OpenOptions}, + io::Write, +}; +use tower_http::trace::{self, TraceLayer}; +use tracing::{info, Level}; + +#[derive(Serialize, Deserialize)] +struct ED { + db: HashMap<String, String>, +} + +#[derive(Serialize, Deserialize)] +struct Init { + level: usize, + db: HashMap<String, String>, + nonce: String, + metadata: String, + btree: String, +} + +#[tokio::main] +async fn main() { + tracing_subscriber::fmt() + .with_target(false) + .compact() + .init(); + + let app = Router::new() + .route("/", post(init)) + .route("/key/{query}", get(search)) + .route("/key", put(insert)) + .route("/sda", put(sda)) + .route("/btrees", get(get_btrees)) + .route("/metadata", get(get_metadata)) + .layer(DefaultBodyLimit::disable()) + .layer( + TraceLayer::new_for_http() + .make_span_with(trace::DefaultMakeSpan::new().level(Level::INFO)) + .on_response(trace::DefaultOnResponse::new().level(Level::INFO)), + ); + + let listener = tokio::net::TcpListener::bind("127.0.0.1:8080") + .await + .unwrap(); + info!("listening on {}", listener.local_addr().unwrap()); + axum::serve(listener, app).await.unwrap(); +} + +/// Initializes the database from a json payload. +/// +/// Dumps the contents into `sda_{level}.json`. +async fn init(extract::Json(payload): extract::Json<Init>) -> Result<(), StatusCode> { + let file_name = format!("sda_{}.json", payload.level); + match fs::write( + file_name, + serde_json::from_value::<Value>(json!(payload.db)) + .unwrap() + .to_string(), + ) { + Ok(_) => Ok(()), + Err(x) => { + println!("{:#?}", x); + Err(StatusCode::INTERNAL_SERVER_ERROR) + } + }?; + let file_name = "metadata"; + match fs::write(file_name, payload.metadata) { + Ok(_) => Ok(()), + Err(x) => { + println!("{:#?}", x); + Err(StatusCode::INTERNAL_SERVER_ERROR) + } + }?; + let file_name = format!("btree_{}", payload.level); + match fs::write(file_name, payload.btree) { + Ok(_) => Ok(()), + Err(x) => { + println!("{:#?}", x); + Err(StatusCode::INTERNAL_SERVER_ERROR) + } + }?; + let nonce_name = format!("nonce_{}", payload.level); + match fs::write(nonce_name, payload.nonce) { + Ok(_) => Ok(()), + Err(x) => { + println!("{:#?}", x); + Err(StatusCode::INTERNAL_SERVER_ERROR) + } + } +} +async fn get_btrees() -> Result<Json<Value>, StatusCode> { + let mut ret_vec: Vec<String> = Vec::new(); + for i in 0..64 { + let file_name = format!("btree_{}", i); + // check if file exists + if !std::path::Path::new(&file_name).exists() { + continue; + } + let data = match fs::read_to_string(file_name) { + Ok(x) => x, + Err(x) => { + info!("{:#?} on loop {}", x, i); + continue; + } + }; + ret_vec.push(data); + } + // shuffle vector + ret_vec.shuffle(&mut crypto_utils::chacha::ChaCha20Rng::from_os_rng()); + info!( + "Returning btrees {:?} with length {}", + ret_vec, + ret_vec.len() + ); + Ok(Json(json!(ret_vec))) +} + +#[derive(Serialize, Deserialize)] +struct Sda { + level: usize, + db: HashMap<String, String>, + nonce: String, + btree: String, +} +async fn sda( + extract::Json(payload): extract::Json<Sda>, +) -> Result<Json<InsertResults>, StatusCode> { + let sda_exists = std::path::Path::new(&format!("sda_{}.json", payload.level)).exists(); + // if that bin doesn't already exist, we create a it + if !sda_exists { + info!("sda_{} does not exist", payload.level); + // write payload + let mut writer = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(format!("sda_{}.json", payload.level)) + .unwrap(); + if let Err(x) = writer.write_all(json!(payload.db).to_string().as_bytes()) { + info!("{:#?}", x); + return Err(StatusCode::INTERNAL_SERVER_ERROR); + }; + // write btree + let mut writer = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(format!("btree_{}", payload.level)) + .unwrap(); + if let Err(x) = writer.write_all(payload.btree.as_bytes()) { + info!("{:#?}", x); + return Err(StatusCode::INTERNAL_SERVER_ERROR); + }; + let mut writer = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(format!("nonce_{}", payload.level)) + .unwrap(); + if let Err(x) = writer.write_all(payload.nonce.as_bytes()) { + info!("{:#?}", x); + return Err(StatusCode::INTERNAL_SERVER_ERROR); + }; + return Ok(Json(InsertResults { + sda: false, + values: vec![], + sda_level: payload.level, + })); + } + info!("sda_{} exists", payload.level); + // we cluster the bins + let data_string = match fs::read_to_string(format!("sda_{}.json", payload.level)) { + Ok(x) => x, + Err(x) => { + info!("{:#?}", x); + return Err(StatusCode::INTERNAL_SERVER_ERROR); + } + }; + let mut sda_existing_value = serde_json::from_str::<HashMap<String, String>>(&data_string) + .unwrap() + .values() + .map(|x| x.to_owned()) + .collect::<Vec<String>>(); + // extend with payload + sda_existing_value.extend(payload.db.values().map(|x| x.to_owned())); + sda_existing_value.shuffle(&mut ChaCha20Rng::from_os_rng()); + // remove existing files + fs::remove_file(format!("sda_{}.json", payload.level)).unwrap(); + fs::remove_file(format!("btree_{}", payload.level)).unwrap(); + fs::remove_file(format!("nonce_{}", payload.level)).unwrap(); + let response = Json(InsertResults { + sda: true, + values: sda_existing_value, + sda_level: payload.level + 1, + }); + info!( + "Requesting new sda with requeset {:?} and level {}", + response, + payload.level + 1 + ); + Ok(response) +} +async fn get_metadata() -> Result<String, StatusCode> { + let data = match fs::read_to_string("metadata") { + Ok(x) => x, + Err(x) => { + info!("{:#?}", x); + return Err(StatusCode::INTERNAL_SERVER_ERROR); + } + }; + Ok(data) +} +#[derive(Serialize, Deserialize, Debug)] +struct InsertResults { + sda: bool, + values: Vec<String>, + sda_level: usize, +} + +#[derive(Serialize, Deserialize)] +struct InsertRequest { + db: HashMap<String, String>, + btree: String, +} +async fn insert( + extract::Json(payload): extract::Json<InsertRequest>, +) -> Result<Json<InsertResults>, StatusCode> { + let sda_0_exists = std::path::Path::new("sda_0.json").exists(); + // need to update sda + if sda_0_exists { + let data_string = fs::read_to_string("sda_0.json").unwrap_or_default(); + let mut sda_0_existing_value = + serde_json::from_str::<HashMap<String, String>>(&data_string) + .unwrap() + .values() + .map(|x| x.to_owned()) + .collect::<Vec<String>>(); + // payload has only one value + sda_0_existing_value.push(payload.db.values().next().unwrap().to_owned()); + sda_0_existing_value.shuffle(&mut ChaCha20Rng::from_os_rng()); + let respond_json = Json(InsertResults { + sda: true, + values: sda_0_existing_value, + sda_level: 1, + }); + info!("Response json = {:?}", respond_json); + //delete file + fs::remove_file("sda_0.json").unwrap(); + fs::remove_file("btree_0").unwrap(); + return Ok(respond_json); + } + // write database + let mut writer = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open("sda_0.json") + .unwrap(); + writer + .write_all(json!(payload.db).to_string().as_bytes()) + .unwrap(); + // write btree + let mut writer = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open("btree_0") + .unwrap(); + writer.write_all(payload.btree.as_bytes()).unwrap(); + Ok(Json(InsertResults { + sda: false, + values: vec![], + sda_level: 0, + })) +} + +/// Searches for database entries matching the `key` path parameter. +/// +/// Hashes the key with a counter until no more results are found. +/// +/// Returns json in the form: +/// +/// ```json +/// { +/// ctr: "ciphertext" +/// } +/// ``` +async fn search(Path(key): Path<String>) -> Result<Json<Value>, StatusCode> { + // search now looks thorugh all sda bins + // we note that a given key is valid in possibly multiple bins, + let mut ret_vec: Vec<String> = Vec::new(); + for i in 0..64 { + let file_name = format!("sda_{}.json", i); + // check if file exists + if !std::path::Path::new(&file_name).exists() { + continue; + } + let data = match fs::read_to_string(file_name) { + Ok(x) => x, + Err(x) => { + info!("{:#?} on loop {}", x, i); + continue; + } + }; + let file_name = format!("nonce_{}", i); + let nonce = match fs::read_to_string(file_name) { + Ok(x) => x, + Err(x) => { + info!("{:#?} on loop {}", x, i); + continue; + } + }; + let db: HashMap<String, String> = serde_json::from_str(&data).unwrap(); + let mut n = 0; + while let Some(ciphertext) = + db.get(&hash_with_counter_and_nonce(key.clone(), n, &nonce).unwrap()) + { + info!("Found {}({}) in sda level {} ", key, n, i); + ret_vec.push(ciphertext.to_owned()); + n += 1; + } + } + if !ret_vec.is_empty() { + return Ok(Json(json!(ret_vec))); + } + Err(StatusCode::NOT_FOUND) +} diff --git a/server/src/main.rs b/server/src/main.rs index 85f0c5c061200cd9d27fe5dadaca7d7182a42221..de73b74f4427d75c5b55f37d77065e692bbbed0c 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -65,10 +65,7 @@ async fn init(Json(payload): Json<Value>) -> Result<(), StatusCode> { /// ``` async fn search(Path(key): Path<String>) -> Result<Json<Value>, StatusCode> { // Read the database from db.json - let data_string = match fs::read_to_string("./db.json") { - Ok(x) => x, - Err(_) => String::from(""), - }; + let data_string = fs::read_to_string("./db.json").unwrap_or_default(); // Parse the database let ed: ED = ED { db: { diff --git a/src/pools.rs b/src/pools.rs new file mode 100644 index 0000000000000000000000000000000000000000..f245e15a403230710cdd77bb63c61300d225474e --- /dev/null +++ b/src/pools.rs @@ -0,0 +1,3 @@ + Ok(first_metadata.clone()) +pub async fn metadata(server_list: Vec<Url>) -> Result<MetaData> { +#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)] \ No newline at end of file diff --git a/static/generate_files.sh b/static/generate_files.sh index 514ef3842af58d4b943ba04f95c0c110c44b3cd4..ac56e5ce0fbd3bde0a4f73bc19aeb82408eb0a56 100755 --- a/static/generate_files.sh +++ b/static/generate_files.sh @@ -6,5 +6,10 @@ source venv/bin/activate pip3 install faker -# generate files with 2**20 elements -python3 ../utils/generator.py \ No newline at end of file +python3 generator.py 0 1024 +python3 generator.py 1 8192 +python3 generator.py 2 16384 +python3 generator.py 3 65536 +python3 generator.py 4 131072 +python3 generator.py 5 524288 +python3 generator.py 6 1048576 diff --git a/utils/generator.py b/utils/generator.py index 954da20fa54d64b8ef38d81d43db0a33356e2027..ef0522cba44ece7c578a102a9e9b1dc6fa054a2f 100644 --- a/utils/generator.py +++ b/utils/generator.py @@ -1,52 +1,41 @@ import csv -import sqlite3 from faker import Faker ## TO USE: ## python3 generator.py <seed> <num_rows> + def generate(seed=0, num_rows=2**20): fake = Faker() Faker.seed(seed) - col_headers = ["Name", "Email", "Country", "Age", "Salary", "Years of Experience"] - + col_headers = ["Name", "Email", "Country", "Age", "Salary", "Years of Experience"] + data = { - 'Name': [fake.name() for _ in range(num_rows)], - 'Email': [fake.email() for _ in range(num_rows)], - 'Country': [fake.country() for _ in range(num_rows)], - 'Age': [fake.random_int(min=18, max=65) for _ in range(num_rows)], - 'Salary':[fake.random_int(min=37, max=150)*1000 for _ in range(num_rows)], - 'Years of Experience': [fake.random_int(min=0, max=15) for _ in range(num_rows)] + "Name": [fake.name() for _ in range(num_rows)], + "Email": [fake.email() for _ in range(num_rows)], + "Country": [fake.country() for _ in range(num_rows)], + "Age": [fake.random_int(min=18, max=65) for _ in range(num_rows)], + "Salary": [fake.random_int(min=37, max=150) * 1000 for _ in range(num_rows)], + "Years of Experience": [ + fake.random_int(min=0, max=15) for _ in range(num_rows) + ], } - with open("syn_data.csv", "w") as file: + with open(f"syn_data_{num_rows}.csv", "w") as file: writer = csv.DictWriter(file, fieldnames=col_headers) writer.writeheader() for row in zip(*data.values()): - writer.writerow(dict(zip(col_headers, row))) - - con = sqlite3.connect("syn_data.db") - cursor = con.cursor() - cursor.execute("CREATE TABLE IF NOT EXISTS employees (name, email, country, age, salary, yoe)") - - for row in zip(*data.values()): - # print(row) - cursor.execute("INSERT INTO employees (name, email, country, age, salary, yoe) VALUES (?, ?, ?, ?, ? ,?)", row) - - con.commit() - con.close() + writer.writerow(dict(zip(col_headers, row))) - -if __name__ == '__main__': +if __name__ == "__main__": from sys import argv if len(argv) == 3: generate(int(argv[1]), int(argv[2])) else: generate() - diff --git a/utils/test.py b/utils/test.py new file mode 100644 index 0000000000000000000000000000000000000000..d6ed1d95bd52d06a07d4686bc4a5cc827c4d445d --- /dev/null +++ b/utils/test.py @@ -0,0 +1,249 @@ +import csv +import glob +import json +import os +import subprocess +import time + +import matplotlib.pyplot as plt + + +def test_correctness(): + subprocess.run(["./client", "init", "syn_data_16384.csv", "--index", "4"]) + subprocess.run(["./client", "search", "31000", "55000"]) + # check whether the output is the same + expected = [] + with open("syn_data_16384.csv") as f: + reader = csv.DictReader(f) + for row in reader: + if 31000 <= int(row["Salary"]) <= 55000: + expected.append(",".join(row.values())) + expected = list(set(expected)) + with open("out.csv", "r") as f: + reader = csv.DictReader(f) + exists = [] + for row in reader: + cur = ",".join(row.values()) + exists.append(cur) + assert cur in expected + assert len(exists) == len(expected) + subprocess.run(["./client", "search", "31000", "55000", "-u"]) + with open("out.csv", "r") as f: + reader = csv.DictReader(f) + exists = [] + for row in reader: + cur = ",".join(row.values()) + exists.append(cur) + assert cur in expected + assert len(exists) == len(expected) + subprocess.run( + [ + "./client", + "insert", + "Russell Reynolds,crobinson@example.com,Armenia,64,45000,10", + ] + ) + expected.append("Russell Reynolds,crobinson@example.com,Armenia,64,45000,10") + expected = list(set(expected)) + subprocess.run(["./client", "search", "31000", "55000", "-u"]) + with open("out.csv", "r") as f: + reader = csv.DictReader(f) + exists = [] + for row in reader: + cur = ",".join(row.values()) + exists.append(cur) + assert cur in expected + with open("expected", "w") as f: + f.write("\n".join(expected)) + with open("exists", "w") as f: + f.write("\n".join(exists)) + assert len(exists) == len(expected), ( + f"{len(exists)} != {len(expected)}, {[x for x in exists if x not in expected]} + {[x for x in expected if x not in exists]}" + ) + time.sleep(1) + subprocess.run(["./client", "delete", "45000"]) + expected = [x for x in expected if x.split(",")[4] != "45000"] + subprocess.run(["./client", "search", "31000", "55000", "-u"]) + with open("out.csv", "r") as f: + reader = csv.DictReader(f) + exists = [] + for row in reader: + cur = ",".join(row.values()) + exists.append(cur) + assert cur in expected, f"{cur} not in expected" + assert len(exists) == len(expected), ( + f"{len(exists)} != {len(expected)}, {[x for x in exists if x not in expected]}" + ) + with open("syn_data_1024.csv") as f: + reader = csv.DictReader(f) + for row in reader: + ins_string = ",".join(row.values()) + if 31000 <= int(row["Salary"]) <= 55000: + expected.append(ins_string) + subprocess.run(["./client", "insert", ins_string]) + time.sleep(1) + subprocess.run(["./client", "search", "31000", "55000", "-u"]) + with open("out.csv", "r") as f: + reader = csv.DictReader(f) + exists = [] + for row in reader: + cur = ",".join(row.values()) + exists.append(cur) + assert cur in expected, f"{cur} not in expected" + assert len(exists) == len(expected) + + +# test_correctness() + + +def size_difference(): + files = [ + "syn_data_1024.csv", + "syn_data_8192.csv", + "syn_data_16384.csv", + "syn_data_65536.csv", + "syn_data_131072.csv", + "syn_data_524288.csv", + "syn_data_1048576.csv", + ] + sizes = {} + size_ratio = {} + times = {} + for file in files: + print(file) + start = time.perf_counter() + subprocess.run(["./client", "init", file, "--index", "5"]) + end = time.perf_counter() + newfile = glob.glob("../server/*.json")[0] + sizes[file] = os.path.getsize(newfile) + times[file] = end - start + size_ratio[file] = sizes[file] / os.path.getsize(file) + os.remove(newfile) + print(sizes) + print(size_ratio) + print(times) + + +# size_difference() +def update_times(): + results = {} + with open("syn_data_16384.csv", "r") as f: + count = 0 + reader = csv.DictReader(f) + for row in reader: + ins_string = ",".join(row.values()) + start = time.perf_counter() + subprocess.run(["./client", "insert", ins_string]) + end = time.perf_counter() + print(f"{count}: {end - start}") + results[count] = end - start + count += 1 + print(results) + with open("results.json", "w") as f: + json.dump(results, f) + + +def check(): + expect = [] + with open("syn_data_16384.csv", "r") as f: + reader = csv.DictReader(f) + for row in reader: + if 3 <= int(row["Years of Experience"]) <= 6: + expect.append(",".join(row.values())) + expect = list(set(expect)) + subprocess.run(["./client", "search", "3", "6", "-u"]) + with open("out.csv", "r") as f: + reader = csv.DictReader(f) + exists = [] + for row in reader: + cur = ",".join(row.values()) + exists.append(cur) + assert cur in expect + assert len(exists) == len(expect), ( + f"{len(exists)} != {len(expect)}, {len([x for x in exists if x not in expect]) + len([x for x in expect if x not in exists])}" + ) + + +# check() +# +def visualize(): + with open("results.json", "r") as f: + results = json.load(f) + + plt.ylabel("Time (s)") + plt.plot(results.keys(), results.values()) + plt.show() + + +def visualize_brc(): + plt.title("BRC") + with open("results_brc.json", "r") as f: + results = json.load(f) + plt.ylabel("Time (s)") + plt.plot(results.keys(), results.values()) + plt.show() + + +# visualize_brc() + + +def average(): + with open("results.json", "r") as f: + results = json.load(f) + print(sum(results.values()) / len(results)) + + +def search_time(): + results_brc = {} + results_urc = {} + with open("syn_data_1048576.csv", "r") as f: + reader = csv.DictReader(f) + values = {i: [] for i in range(0, 16)} + for row in reader: + ins_string = ",".join(row.values()) + values[int(list(row.values())[-1])].append(ins_string) + subprocess.run(["./client", "init", "syn_data_1048576.csv", "--index", "5"]) + for i in [0, 1, 3, 7, 15]: + expected = [] + for j in range(0, i + 1): + expected += values[j] + expected.sort() + start = time.perf_counter() + subprocess.run(["./client", "search", "0", f"{i}"]) + end = time.perf_counter() + results_brc[i] = end - start + with open("out.csv", "r") as f: + reader = csv.DictReader(f) + exists = [] + for row in reader: + cur = ",".join(row.values()) + exists.append(cur) + exists.sort() + assert exists == expected, ( + f"{len(exists)} != {len(expected)}, {len([x for x in exists if x not in expected]) + len([x for x in expected if x not in exists])}" + ) + start = time.perf_counter() + subprocess.run(["./client", "search", "0", f"{i}", "-u"]) + end = time.perf_counter() + results_urc[i] = end - start + with open("out.csv", "r") as f: + reader = csv.DictReader(f) + exists = [] + for row in reader: + cur = ",".join(row.values()) + exists.append(cur) + exists.sort() + assert exists == expected, ( + f"{len(exists)} != {len(expected)}, {len([x for x in exists if x not in expected]) + len([x for x in expected if x not in exists])}" + ) + with open("results_brc.json", "w") as f: + json.dump(results_brc, f) + with open("results_urc.json", "w") as f: + json.dump(results_urc, f) + + +# search_time() +# average() +# visualize() +# update_times() +# size_difference()