Skip to content
This repository was archived by the owner on May 18, 2023. It is now read-only.

Commit 2aca63d

Browse files
committed
Implement downloading Arweave transaction data
1 parent c794503 commit 2aca63d

File tree

2 files changed

+103
-92
lines changed

2 files changed

+103
-92
lines changed

src/arweave.rs

Lines changed: 70 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,30 @@
1+
use bytes::{Buf, BufMut};
2+
use chrono::Utc;
3+
use data_encoding::BASE64URL_NOPAD;
14
use derive_more::{Display, Error};
2-
use diesel::expression::UncheckedBind;
35
use futures::future::BoxFuture;
4-
use futures::{pin_mut, stream, FutureExt, StreamExt, TryFutureExt};
5-
use http::header::CONTENT_LENGTH;
6-
use http::{StatusCode, Uri};
7-
use log::{debug, error, info, warn};
6+
use futures::{pin_mut, FutureExt, StreamExt};
7+
use http::header::{ToStrError, CONTENT_LENGTH};
8+
use http::Uri;
9+
use log::{debug, error, info};
810
use serde::{Deserialize, Serialize};
9-
use std::collections::hash_map::Entry;
1011
use std::collections::HashMap;
1112
use std::convert::Infallible;
1213
use std::fmt::{self, Debug};
13-
use std::pin::Pin;
14-
use std::sync::atomic::{AtomicU16, AtomicU8, Ordering};
14+
use std::sync::atomic::{AtomicU16, Ordering};
1515
use std::sync::{Arc, Mutex};
1616
use std::time::Duration;
17-
use tokio::io::{AsyncWrite, AsyncWriteExt};
17+
use tokio::io::{AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufWriter};
1818

1919
use std::num::ParseIntError;
2020
use std::str::FromStr;
2121
use url::Url;
2222

2323
use crate::dynamic_source::DynamicSource;
2424
use crate::http::reqwest::execute_with_retry;
25-
use crate::http::{Client, ClientAccess};
25+
use crate::http::{Client, ClientAccess, RetryAfter};
2626
use crate::key_manager::public_key_to_address;
27-
use crate::pool::Pool;
27+
use crate::retry::{retry, RetryControl};
2828

2929
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
3030
#[serde(transparent)]
@@ -307,6 +307,12 @@ impl FromStr for TransactionSize {
307307
}
308308
}
309309

310+
impl Into<u64> for TransactionSize {
311+
fn into(self) -> u64 {
312+
self.0 as u64
313+
}
314+
}
315+
310316
impl TryFrom<String> for TransactionSize {
311317
type Error = ParseIntError;
312318

@@ -386,6 +392,11 @@ pub struct Offset {
386392
size: u64,
387393
}
388394

395+
#[derive(Clone, Debug, Deserialize)]
396+
struct Chunk<'a> {
397+
chunk: &'a [u8],
398+
}
399+
389400
#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
390401
pub struct Node(String);
391402

@@ -539,66 +550,6 @@ where
539550
.boxed()
540551
}
541552

542-
// fn get_peers<'a, HttpClient>(
543-
// client: &'a HttpClient,
544-
// node: &'a Node,
545-
// timeout: Option<Duration>,
546-
// ) -> BoxFuture<'a, Result<Vec<Node>, FetchPeersError<HttpClient::Error>>>
547-
// where
548-
// HttpClient:
549-
// Client<Request = reqwest::Request, Response = reqwest::Response> + Send + Sync + 'static,
550-
// HttpClient::Error: From<reqwest::Error>,
551-
// {
552-
// async move {
553-
// debug!("Get peers for {}", node);
554-
// let url =
555-
// match Url::from_str(&format!("http://{}", node)).and_then(|base| base.join("/peers")) {
556-
// Ok(url) => url,
557-
// Err(err) => {
558-
// debug!(
559-
// "Failed to build URL request peers for node: {}, {:?}",
560-
// node, err
561-
// );
562-
// return Err(FetchPeersError::UnsupportedPeerAddress(node.clone()));
563-
// }
564-
// };
565-
566-
// let res = match get(client, url, timeout).await {
567-
// Ok(res) => match res.error_for_status() {
568-
// Ok(res) => res,
569-
// Err(err) => {
570-
// debug!(
571-
// "Request for fetching peers failed, peer: {}, err: {:?}",
572-
// node, err
573-
// );
574-
// return Err(FetchPeersError::HttpClientError(err.into()));
575-
// }
576-
// },
577-
// Err(err) => {
578-
// error!(
579-
// "Request for fetching peers failed, peer: {}, err: {:?}",
580-
// node, err
581-
// );
582-
// return Err(FetchPeersError::ArweaveError(err));
583-
// }
584-
// };
585-
586-
// let peers: Vec<Node> = match res.json().await {
587-
// Ok(peers) => peers,
588-
// Err(err) => {
589-
// error!(
590-
// "Failed to deserialize peers, peer: {}, error: {:?}",
591-
// node, err
592-
// );
593-
// return Err(FetchPeersError::ResponseDeserializationError);
594-
// }
595-
// };
596-
597-
// Ok(peers)
598-
// }
599-
// .boxed()
600-
// }
601-
602553
#[derive(Debug, Display, Error, Clone, PartialEq)]
603554
pub enum ArweaveError {
604555
MalformedRequest,
@@ -834,7 +785,7 @@ impl Arweave {
834785
Context: ClientAccess<HttpClient>,
835786
HttpClient: Client<Request = reqwest::Request, Response = reqwest::Response>,
836787
HttpClient::Error: From<reqwest::Error>,
837-
Output: AsyncWrite + Unpin,
788+
Output: AsyncWrite + AsyncSeek + Unpin,
838789
{
839790
let client = ctx.get_http_client();
840791
let base_url = if let Some(ref peer) = peer {
@@ -854,11 +805,12 @@ impl Arweave {
854805
ArweaveError::UnknownErr
855806
})?;
856807

857-
let mut download_size = 0;
858-
859-
while download_size < size {
860-
let chunk_offset = offset - download_size;
808+
info!("Transaction offset={}, size={}", offset, size);
861809

810+
let end_offset = offset;
811+
let start_offset = offset - size + 1;
812+
let mut chunk_offset = start_offset;
813+
while chunk_offset < end_offset + 1 {
862814
let url = self
863815
.base_url
864816
.join(&format!("/chunk/{}", &chunk_offset))
@@ -869,24 +821,58 @@ impl Arweave {
869821

870822
let mut res = get(client, url, None).await?;
871823

872-
let chunk_size = res.headers().get(CONTENT_LENGTH).ok_or_else(|| {
873-
error!("Could not read chunk size, missing Content-Length header");
874-
ArweaveError::RequestFailed
875-
})?;
824+
let content_length: u64 = res
825+
.headers()
826+
.get(CONTENT_LENGTH)
827+
.and_then(|h| h.to_str().ok())
828+
.and_then(|s| s.parse::<u64>().ok())
829+
.ok_or_else(|| {
830+
error!("Could not read chunk size, missing Content-Length header");
831+
ArweaveError::RequestFailed
832+
})?;
876833

877-
let chunk_size: u64 = chunk_size.to_str().unwrap().parse().unwrap();
878-
info!("Chunk: offset={}, size={}", chunk_offset, chunk_size);
834+
let file_offset = chunk_offset - start_offset;
879835

836+
// This is not strictly needed at the moment, but keeping as
837+
// an example, because this will be needed once we add retry
838+
// and concurrency
839+
output
840+
.seek(std::io::SeekFrom::Start(file_offset))
841+
.await
842+
.map_err(|err| {
843+
error!(
844+
"Failed to seek into the right position in the output: {:?}",
845+
err
846+
);
847+
ArweaveError::RequestFailed
848+
})?;
849+
850+
let mut buf = Vec::with_capacity(content_length as usize);
880851
while let Some(chunk) = res.chunk().await.map_err(|err| {
881852
error!("Failed to read chunk data: {:?}", err);
882853
ArweaveError::RequestFailed
883854
})? {
884-
download_size += chunk.len() as u64;
885-
output.write_all(&chunk).await.map_err(|err| {
855+
buf.write_all(&chunk).await.map_err(|err| {
886856
error!("Failed to write chunk data to output: {:?}", err);
887857
ArweaveError::RequestFailed
888858
})?;
889859
}
860+
let chunk: Chunk = serde_json::from_slice(buf.as_slice()).unwrap();
861+
let data = BASE64URL_NOPAD.decode(chunk.chunk).unwrap();
862+
863+
output.write_all(data.as_slice()).await.map_err(|err| {
864+
error!("Failed to write chunk data: {:?}", err);
865+
ArweaveError::UnknownErr
866+
})?;
867+
868+
let chunk_size = data.len() as u64;
869+
870+
info!(
871+
"Got chunk: offset={}, chunk_size={}",
872+
chunk_offset, chunk_size
873+
);
874+
875+
chunk_offset += chunk_size;
890876
}
891877

892878
Ok(())

src/bin/download_transaction.rs

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
use std::path::PathBuf;
1+
use std::{
2+
fs::{self, DirBuilder},
3+
path::PathBuf,
4+
};
25

36
use clap::Parser;
47

@@ -7,7 +10,7 @@ use log::info;
710
use tokio::fs::File;
811
use url::Url;
912
use validator::{
10-
arweave::{Arweave, ArweaveContext, ArweaveError, TransactionId},
13+
arweave::{Arweave, ArweaveContext, Transaction, TransactionId},
1114
http::{reqwest::ReqwestClient, ClientAccess},
1215
};
1316

@@ -46,7 +49,7 @@ impl ArweaveContext<ReqwestClient> for Context {
4649
}
4750

4851
#[tokio::main]
49-
async fn main() -> Result<(), ArweaveError> {
52+
async fn main() {
5053
dotenv::dotenv().ok();
5154
env_logger::init_from_env(Env::default().default_filter_or("info"));
5255

@@ -55,18 +58,40 @@ async fn main() -> Result<(), ArweaveError> {
5558
let arweave = Arweave::new(args.arweave_gateway.clone());
5659
let ctx = Context::new(reqwest::Client::default());
5760

58-
let mut tx_data_file = args.tx_cache.clone();
61+
if !args
62+
.tx_cache
63+
.try_exists()
64+
.expect("Failed to check if tx cache folder exists")
65+
{
66+
DirBuilder::new()
67+
.recursive(true)
68+
.create(args.tx_cache.clone())
69+
.expect("Failed to create tx cache dir");
70+
}
71+
72+
let mut tx_data_file = args.tx_cache;
5973
tx_data_file.push(format!("{}", args.tx));
60-
// let tx_data_file = fs::canonicalize(tx_data_file)
61-
// .expect("Failed to create absolute file path from provided information");
6274

63-
info!("Write tx data to {:?}", tx_data_file);
75+
let Transaction { data_size, .. } = arweave
76+
.get_transaction_info(&ctx, &args.tx)
77+
.await
78+
.expect("Failed to fetch transaction size info");
6479

65-
let mut output = File::create(tx_data_file)
80+
let mut output = File::create(tx_data_file.clone())
6681
.await
6782
.expect("Failed to open file for writing");
6883

84+
output
85+
.set_len(data_size.into())
86+
.await
87+
.expect("Failed to set file size to match transaction data size");
88+
6989
arweave
7090
.download_transaction_data(&ctx, &args.tx, &mut output, None)
7191
.await
92+
.expect("Failed to download transaction data");
93+
94+
let file_path = fs::canonicalize(tx_data_file)
95+
.expect("Failed to create absolute file path from provided information");
96+
info!("Wrote tx data to {}", file_path.to_str().unwrap());
7297
}

0 commit comments

Comments
 (0)