Skip to content
This repository was archived by the owner on May 18, 2023. It is now read-only.

Commit fc403be

Browse files
committed
Implement better Bundlr abstractions
Add abstractions for extracting Bundlr transactions from bundles.
1 parent 779f56f commit fc403be

File tree

15 files changed

+640
-26
lines changed

15 files changed

+640
-26
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,6 @@ wallet*.json
1010
/db/validator.db
1111
.history
1212
.devcontainer
13+
14+
# default cache folder for downloaded transactions
15+
/tx-cache

Cargo.lock

Lines changed: 6 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,16 @@ http = { version = "0.2.6", optional = true }
3131
httpdate = "1.0.2"
3232
jsonwebkey = "0.3.4"
3333
log = "0.4.17"
34+
num-bigint = { version = "0.4.3", features = ["serde"] }
35+
num-traits = "0.2.15"
3436
openssl = "0.10.40"
3537
reqwest = { version = "0.11.11", features = ["blocking", "json", "stream"], optional = true }
3638
serde = "1.0.132"
3739
serde_json = "1.0.73"
3840
sysinfo = "0.24.5"
3941
thiserror = "1.0"
4042
tokio = { version = "1.19", features = ["full"] }
43+
tokio-stream = { version = "0.1.10", features = ["fs"] }
4144
url = { version = "2.2.2" }
4245
urlencoding = "2.1.0"
4346

@@ -81,3 +84,7 @@ path = "src/bin/download_transaction.rs"
8184
[[bin]]
8285
name = "crawl-peers"
8386
path = "src/bin/crawl_peers.rs"
87+
88+
[[bin]]
89+
name = "bundle-tool"
90+
path = "src/bin/bundle_tool.rs"

src/bin/bundle_tool.rs

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
use std::{fmt, path::PathBuf, str::FromStr};
2+
3+
use clap::{Parser, Subcommand};
4+
5+
use env_logger::Env;
6+
use thiserror::Error;
7+
use tokio::{
8+
fs::{DirBuilder, File},
9+
io::AsyncWriteExt,
10+
};
11+
use validator::bundlr::bundle::{
12+
extract_transaction_details, get_bundled_transactions, read_transaction_data, TransactionId,
13+
};
14+
15+
#[derive(Debug, Error)]
16+
struct RangeParseError;
17+
18+
impl fmt::Display for RangeParseError {
19+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
20+
f.write_str("RangeParseError")
21+
}
22+
}
23+
24+
struct Range {
25+
start: Option<usize>,
26+
end: Option<usize>,
27+
}
28+
29+
impl FromStr for Range {
30+
type Err = RangeParseError;
31+
32+
fn from_str(s: &str) -> Result<Self, Self::Err> {
33+
let s = s.trim();
34+
let s = if s.starts_with("(") { &s[1..] } else { &s[..] };
35+
let s = if s.ends_with(")") {
36+
&s[..s.len() - 1]
37+
} else {
38+
&s[..]
39+
};
40+
match s.split_once("..") {
41+
Some((start, end)) => {
42+
let start = if start.is_empty() {
43+
None
44+
} else {
45+
Some(start.parse::<usize>().map_err(|_| RangeParseError)?)
46+
};
47+
let end = if end.is_empty() {
48+
None
49+
} else {
50+
Some(end.parse::<usize>().map_err(|_| RangeParseError)?)
51+
};
52+
Ok(Self { start, end })
53+
}
54+
None => Err(RangeParseError),
55+
}
56+
}
57+
}
58+
59+
#[derive(Subcommand)]
60+
enum Command {
61+
/// List transactions in the bundle
62+
ListTransactions {
63+
/// Path to bundle
64+
#[clap(short = 'b', long)]
65+
bundle: PathBuf,
66+
67+
/// Range of indexes that should be listed
68+
/// Syntax is "(start..end)" or just "start..end". When start is omitted,
69+
/// range starts from zero. When end is omitted, range ends to the last item.
70+
#[clap(short = 'r', long)]
71+
range: Option<Range>,
72+
},
73+
/// Extract bundled transaction from the bundle
74+
ExtractTransaction {
75+
/// Path to bundle
76+
#[clap(short = 'b', long)]
77+
bundle: PathBuf,
78+
79+
/// Transaction ID
80+
#[clap(short = 't', long)]
81+
tx: TransactionId,
82+
83+
/// Where to store extracted transaction data
84+
#[clap(long, env = "TX_CACHE", default_value = "./tx-cache/")]
85+
tx_cache: PathBuf,
86+
},
87+
}
88+
89+
impl Command {
90+
async fn execute(self) {
91+
match self {
92+
Command::ListTransactions { bundle, range } => {
93+
let mut bundle_file = File::open(bundle)
94+
.await
95+
.expect("Failed to open bundle file");
96+
97+
let transactions = get_bundled_transactions(&mut bundle_file)
98+
.await
99+
.expect("Failed to extract list of bundled transactions");
100+
101+
let transactions = if let Some(range) = range {
102+
match (range.start, range.end) {
103+
(None, None) => &transactions[..],
104+
(None, Some(end)) => &transactions[..end],
105+
(Some(start), None) => &transactions[start..],
106+
(Some(start), Some(end)) => &transactions[start..end],
107+
}
108+
} else {
109+
&transactions[..]
110+
};
111+
112+
println!(
113+
"{}",
114+
serde_json::to_string(transactions)
115+
.expect("Failed to deserialize transaction data")
116+
);
117+
}
118+
Command::ExtractTransaction {
119+
bundle,
120+
tx,
121+
tx_cache,
122+
} => {
123+
let mut bundle_file = File::open(bundle)
124+
.await
125+
.expect("Failed to open bundle file");
126+
127+
let transactions = get_bundled_transactions(&mut bundle_file)
128+
.await
129+
.expect("Failed to extract list of bundled transactions");
130+
131+
let tx_offset = match transactions.iter().find(|item| &item.id == &tx) {
132+
Some(tx) => tx,
133+
None => panic!("Requested transaction is not contained in the bundle"),
134+
};
135+
136+
let tx = extract_transaction_details(&mut bundle_file, tx_offset)
137+
.await
138+
.expect("Failed to extract bundled transaction");
139+
140+
if !tx_cache
141+
.try_exists()
142+
.expect("Failed to check if tx cache folder exists")
143+
{
144+
DirBuilder::new()
145+
.recursive(true)
146+
.create(tx_cache.clone())
147+
.await
148+
.expect("Failed to create tx cache dir");
149+
}
150+
151+
let mut tx_metadata_file_path = tx_cache.clone();
152+
tx_metadata_file_path.push(format!("{}.json", tx.id));
153+
154+
let mut tx_metadata_file = File::create(tx_metadata_file_path.clone())
155+
.await
156+
.expect("Failed to open metadata file for writing");
157+
158+
let json = serde_json::to_string(&tx).expect("Failed to serialize tx metadata");
159+
tx_metadata_file
160+
.write(json.as_bytes())
161+
.await
162+
.expect("Failed to write metadata file");
163+
164+
if let Some(ref data_offset) = tx.data_offset {
165+
let mut tx_data_file_path = tx_cache;
166+
tx_data_file_path.push(format!("{}.data", tx.id));
167+
168+
let mut tx_data_file = File::create(tx_data_file_path.clone())
169+
.await
170+
.expect("Failed to open data file for writing");
171+
172+
tx_data_file
173+
.set_len(data_offset.size as u64)
174+
.await
175+
.expect("Failed to reserve space for data file");
176+
177+
read_transaction_data(&mut bundle_file, &mut tx_data_file, &tx)
178+
.await
179+
.expect("Failed to copy transaction data from bundle");
180+
}
181+
}
182+
}
183+
}
184+
}
185+
186+
#[derive(Parser)]
187+
struct Args {
188+
#[clap(subcommand)]
189+
command: Command,
190+
}
191+
192+
#[tokio::main]
193+
async fn main() {
194+
dotenv::dotenv().ok();
195+
env_logger::init_from_env(Env::default().default_filter_or("info"));
196+
197+
let args = Args::parse();
198+
199+
args.command.execute().await;
200+
}

src/bin/validator.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use sysinfo::{System, SystemExt};
1212
use url::Url;
1313

1414
use validator::{
15-
bundler::BundlerConfig,
15+
bundlr::bundler::BundlerConfig,
1616
hardware::HardwareCheck,
1717
http::reqwest::ReqwestClient,
1818
key_manager::{InMemoryKeyManager, InMemoryKeyManagerConfig},

0 commit comments

Comments
 (0)