Skip to content

Commit e94a671

Browse files
authored
V2.18.5 (#390)
fix: skip download if file has already been downloaded
1 parent 0d98ca5 commit e94a671

File tree

5 files changed

+49
-38
lines changed

5 files changed

+49
-38
lines changed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust-toolchain.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[toolchain]
2-
channel = "1.80.0"
2+
channel = "1.85.0"
33
targets = [
44
"x86_64-linux-android",
55
"i686-linux-android",

uplink/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
[package]
22
name = "uplink"
3-
version = "2.18.4"
3+
version = "2.18.5"
44
authors = ["tekjar <[email protected]>"]
5-
edition = "2021"
5+
edition = "2024"
66

77
[dependencies]
88
bytes = { workspace = true }

uplink/src/base/actions.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ pub struct ActionResponse {
3434
}
3535

3636
impl ActionResponse {
37-
fn new(id: &str, state: &str, progress: u8, errors: Vec<String>) -> Self {
37+
pub fn new(id: &str, state: &str, progress: u8, errors: Vec<String>) -> Self {
3838
let timestamp = clock() as u64;
3939

4040
ActionResponse {

uplink/src/collector/downloader.rs

Lines changed: 43 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ enum DownloadResult {
6565
pub struct FileDownloader {
6666
config: DownloaderConfig,
6767
actions_rx: Receiver<Action>,
68-
action_id: String,
6968
bridge_tx: BridgeTx,
7069
client: Client,
7170
shutdown_rx: Receiver<DownloaderShutdown>,
@@ -102,7 +101,6 @@ impl FileDownloader {
102101
actions_rx,
103102
client,
104103
bridge_tx,
105-
action_id: String::default(),
106104
shutdown_rx,
107105
disabled,
108106
})
@@ -116,28 +114,29 @@ impl FileDownloader {
116114

117115
info!("Downloader thread is ready to receive download actions");
118116
while let Ok(action) = self.actions_rx.recv_async().await {
119-
action.action_id.clone_into(&mut self.action_id);
117+
let action_id = action.action_id.clone();
120118
let mut state = match DownloadState::new(action, &self.config) {
121119
Ok(s) => s,
122120
Err(e) => {
123-
self.forward_error(e).await;
121+
self.bridge_tx.send_action_response(ActionResponse::failure(&action_id, e.to_string())).await;
124122
continue;
125123
}
126124
};
127125

128126
// Update action status for process initiated
129-
let status = ActionResponse::progress(&self.action_id, "Downloading", 0);
127+
let status = ActionResponse::progress(&state.current.action.action_id, "Downloading", 0);
130128
self.bridge_tx.send_action_response(status).await;
131129

132130
match self.download(&mut state).await {
133131
DownloadResult::Ok => {
134132
// Forward updated action as part of response
135133
let DownloadState { current: CurrentDownload { action, .. }, .. } = state;
136-
let status = ActionResponse::done(&self.action_id, "Downloaded", Some(action));
134+
let mut status = ActionResponse::new(&action.action_id, "Downloaded", 100, vec![]);
135+
status.done_response = Some(action);
137136
self.bridge_tx.send_action_response(status).await;
138137
}
139138
DownloadResult::Err(e) => {
140-
self.bridge_tx.send_action_response(ActionResponse::failure(&self.action_id, e)).await;
139+
self.bridge_tx.send_action_response(ActionResponse::failure(&state.current.action.action_id, e)).await;
141140
}
142141
DownloadResult::Suspended => {
143142
break
@@ -158,24 +157,28 @@ impl FileDownloader {
158157
return;
159158
}
160159
};
161-
state.current.action.action_id.clone_into(&mut self.action_id);
162160

163161
match self.download(&mut state).await {
164162
DownloadResult::Ok => {
165163
// Forward updated action as part of response
166164
let DownloadState { current: CurrentDownload { action, .. }, .. } = state;
167-
let status = ActionResponse::done(&self.action_id, "Downloaded", Some(action));
165+
let mut status = ActionResponse::new(&action.action_id, "Downloaded", 100, vec![]);
166+
status.done_response = Some(action);
168167
self.bridge_tx.send_action_response(status).await;
169168
}
170169
DownloadResult::Err(e) => {
171-
self.bridge_tx.send_action_response(ActionResponse::failure(&self.action_id, e)).await;
170+
self.bridge_tx.send_action_response(ActionResponse::failure(&state.current.action.action_id, e)).await;
172171
}
173172
DownloadResult::Suspended => {}
174173
}
175174
}
176175

177176
// Accepts `DownloadState`, sets a timeout for the action
178177
async fn download(&mut self, state: &mut DownloadState) -> DownloadResult {
178+
if state.already_downloaded {
179+
return DownloadResult::Ok;
180+
}
181+
179182
let shutdown_rx = self.shutdown_rx.clone();
180183
loop {
181184
select! {
@@ -187,7 +190,7 @@ impl FileDownloader {
187190
}
188191
},
189192
Ok(action) = self.actions_rx.recv_async() => {
190-
if action.action_id == self.action_id {
193+
if &action.action_id == &state.current.action.action_id {
191194
// This handles the edge case when the device is able to receive actions
192195
// from the broker but for something goes wrong when pushing action statuses back to the backend
193196
// In this case the backend will try sending the same action again
@@ -202,10 +205,10 @@ impl FileDownloader {
202205
match serde_json::from_str::<Cancellation>(&action.payload)
203206
.context("Invalid cancel action payload")
204207
.and_then(|cancellation| {
205-
if cancellation.action_id == self.action_id {
208+
if cancellation.action_id == state.current.action.action_id {
206209
Ok(())
207210
} else {
208-
Err(anyhow::Error::msg(format!("Cancel action target ({}) doesn't match active download action id ({})", cancellation.action_id, self.action_id)))
211+
Err(anyhow::Error::msg(format!("Cancel action target ({}) doesn't match active download action id ({})", cancellation.action_id, &state.current.action.action_id)))
209212
}
210213
})
211214
.and_then(|_| {
@@ -233,9 +236,11 @@ impl FileDownloader {
233236
}
234237
}
235238

236-
self.bridge_tx.send_action_response(ActionResponse::progress(self.action_id.as_str(), "VerifyingChecksum", 99)).await;
237-
if let Err(e) = state.current.meta.verify_checksum() {
238-
return DownloadResult::Err(e.to_string());
239+
self.bridge_tx.send_action_response(ActionResponse::progress(&state.current.action.action_id, "VerifyingChecksum", 99)).await;
240+
if state.current.meta.checksum.is_some() {
241+
if let Err(e) = state.current.meta.verify_checksum() {
242+
return DownloadResult::Err(e.to_string());
243+
}
239244
}
240245
// Update Action payload with `download_path`, i.e. downloaded file's location in fs
241246
state.current.action.payload = match serde_json::to_string(&state.current.meta) {
@@ -284,7 +289,7 @@ impl FileDownloader {
284289
// Retry non-status errors
285290
Err(e) if !e.is_status() => {
286291
let status =
287-
ActionResponse::progress(&self.action_id, "Download Failed", 0)
292+
ActionResponse::progress(&state.current.action.action_id, "Download Failed", 0)
288293
.add_error(e.to_string());
289294
self.bridge_tx.send_action_response(status).await;
290295
error!("Download failed: {e:?}");
@@ -296,7 +301,7 @@ impl FileDownloader {
296301
};
297302
if let Some(percentage) = state.write_bytes(&chunk)? {
298303
let status =
299-
ActionResponse::progress(&self.action_id, "Downloading", percentage);
304+
ActionResponse::progress(&state.current.action.action_id, "Downloading", percentage);
300305
self.bridge_tx.send_action_response(status).await;
301306
}
302307
}
@@ -307,12 +312,6 @@ impl FileDownloader {
307312

308313
Ok(())
309314
}
310-
311-
// Forward errors as action response to bridge
312-
async fn forward_error(&mut self, err: Error) {
313-
let status = ActionResponse::failure(&self.action_id, err.to_string());
314-
self.bridge_tx.send_action_response(status).await;
315-
}
316315
}
317316

318317
#[cfg(unix)]
@@ -333,9 +332,7 @@ fn create_dirs_with_perms(path: &Path, perms: Permissions) -> std::io::Result<()
333332
}
334333

335334
/// Creates file to download into
336-
fn create_file(download_path: &PathBuf, file_name: &str) -> Result<(File, PathBuf), Error> {
337-
let mut file_path = download_path.to_owned();
338-
file_path.push(file_name);
335+
fn create_file(file_path: &Path) -> Result<File, Error> {
339336
// NOTE: if file_path is occupied by a directory due to previous working of uplink, remove it
340337
if let Ok(f) = metadata(&file_path) {
341338
if f.is_dir() {
@@ -346,7 +343,7 @@ fn create_file(download_path: &PathBuf, file_name: &str) -> Result<(File, PathBu
346343
#[cfg(unix)]
347344
file.set_permissions(std::os::unix::fs::PermissionsExt::from_mode(0o666))?;
348345

349-
Ok((file, file_path))
346+
Ok(file)
350347
}
351348

352349
fn check_disk_size(config: &DownloaderConfig, download: &DownloadFile) -> Result<(), Error> {
@@ -381,14 +378,13 @@ pub struct DownloadFile {
381378

382379
impl DownloadFile {
383380
fn verify_checksum(&self) -> Result<(), Error> {
384-
let Some(checksum) = &self.checksum else { return Ok(()) };
385381
let path = self.download_path.as_ref().expect("Downloader didn't set \"download_path\"");
386382
let mut file = File::open(path)?;
387383
let mut hasher = Sha256::new();
388384
io::copy(&mut file, &mut hasher)?;
389385
let hash = hasher.finalize();
390386

391-
if checksum != &hex::encode(hash) {
387+
if self.checksum.as_ref().unwrap() != &hex::encode(hash) {
392388
return Err(Error::BadChecksum);
393389
}
394390

@@ -410,6 +406,7 @@ struct DownloadState {
410406
file: File,
411407
bytes_written: usize,
412408
percentage_downloaded: u8,
409+
already_downloaded: bool,
413410
start: Instant,
414411
}
415412

@@ -438,6 +435,19 @@ impl DownloadState {
438435
};
439436

440437
let file_path = path.join(&meta.file_name);
438+
meta.download_path = Some(file_path.clone());
439+
if meta.checksum.is_some() && meta.verify_checksum().is_ok() {
440+
info!("file has already been downloaded and its checksum matches, skipping download...");
441+
return Ok(Self {
442+
bytes_written: meta.content_length,
443+
current: CurrentDownload { action, meta },
444+
file: File::open("/dev/null")?,
445+
percentage_downloaded: 100,
446+
already_downloaded: true,
447+
start: Instant::now(),
448+
});
449+
}
450+
441451
let _ = remove_file(&file_path);
442452
let _ = remove_dir_all(&file_path);
443453

@@ -446,22 +456,22 @@ impl DownloadState {
446456
let url = meta.url.clone();
447457

448458
// Create file to actually download into
449-
let (file, file_path) = create_file(&path, &meta.file_name)?;
459+
let file = create_file(&file_path)?;
450460
// Retry downloading upto 3 times in case of connectivity issues
451461
// TODO: Error out for 1XX/3XX responses
452462
info!(
453463
"Downloading from {url} into {}; size = {}",
454464
file_path.display(),
455465
human_bytes(meta.content_length as f64)
456466
);
457-
meta.download_path = Some(file_path);
458467
let current = CurrentDownload { action, meta };
459468

460469
Ok(Self {
461470
current,
462471
file,
463472
bytes_written: 0,
464473
percentage_downloaded: 0,
474+
already_downloaded: false,
465475
start: Instant::now(),
466476
})
467477
}
@@ -489,6 +499,7 @@ impl DownloadState {
489499
file,
490500
bytes_written,
491501
percentage_downloaded: 0,
502+
already_downloaded: false,
492503
start: Instant::now(),
493504
})
494505
}

0 commit comments

Comments
 (0)