Skip to content

Commit ae0356f

Browse files
committed
chore: skip download if file has already been downloaded
1 parent e5f8f70 commit ae0356f

File tree

4 files changed

+81
-66
lines changed

4 files changed

+81
-66
lines changed

rust-toolchain.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[toolchain]
2-
channel = "1.80.0"
2+
channel = "1.85.0"
33
targets = [
44
"x86_64-linux-android",
55
"i686-linux-android",

uplink/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
name = "uplink"
33
version = "2.18.4"
44
authors = ["tekjar <[email protected]>"]
5-
edition = "2021"
5+
edition = "2024"
66

77
[dependencies]
88
bytes = { workspace = true }

uplink/src/base/actions.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ pub struct ActionResponse {
3434
}
3535

3636
impl ActionResponse {
37-
fn new(id: &str, state: &str, progress: u8, errors: Vec<String>) -> Self {
37+
pub fn new(id: &str, state: &str, progress: u8, errors: Vec<String>) -> Self {
3838
let timestamp = clock() as u64;
3939

4040
ActionResponse {

uplink/src/collector/downloader.rs

Lines changed: 78 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,8 @@ impl FileDownloader {
131131
DownloadResult::Ok => {
132132
// Forward updated action as part of response
133133
let DownloadState { current: CurrentDownload { action, .. }, .. } = state;
134-
let status = ActionResponse::done(&action.action_id, "Downloaded", Some(action));
134+
let mut status = ActionResponse::new(&action.action_id, "Downloaded", 100, vec![]);
135+
status.done_response = Some(action);
135136
self.bridge_tx.send_action_response(status).await;
136137
}
137138
DownloadResult::Err(e) => {
@@ -161,7 +162,8 @@ impl FileDownloader {
161162
DownloadResult::Ok => {
162163
// Forward updated action as part of response
163164
let DownloadState { current: CurrentDownload { action, .. }, .. } = state;
164-
let status = ActionResponse::done(&action.action_id, "Downloaded", Some(action));
165+
let mut status = ActionResponse::new(&action.action_id, "Downloaded", 100, vec![]);
166+
status.done_response = Some(action);
165167
self.bridge_tx.send_action_response(status).await;
166168
}
167169
DownloadResult::Err(e) => {
@@ -173,66 +175,70 @@ impl FileDownloader {
173175

174176
// Accepts `DownloadState`, sets a timeout for the action
175177
async fn download(&mut self, state: &mut DownloadState) -> DownloadResult {
176-
let shutdown_rx = self.shutdown_rx.clone();
177-
loop {
178-
select! {
179-
o = self.continuous_retry(state) => {
180-
if let Err(e) = o {
181-
return DownloadResult::Err(e.to_string());
182-
} else {
183-
break;
184-
}
185-
},
186-
Ok(action) = self.actions_rx.recv_async() => {
187-
if action.action_id == &state.current.action.action_id {
188-
// This handles the edge case when the device is able to receive actions
189-
// from the broker but for something goes wrong when pushing action statuses back to the backend
190-
// In this case the backend will try sending the same action again
191-
//
192-
// TODO: Right now we use the action status pushed by device as confirmation that it
193-
// has received the action. It is not very reliable because as of now the action status pipeline can drop messages.
194-
// Would it be better if the backend used MQTT Ack of the action message instead?
195-
log::error!("Backend tried sending the same action again!");
196-
} else if action.name != "cancel_action" {
197-
self.bridge_tx.send_action_response(ActionResponse::failure(action.action_id.as_str(), "Downloader is already occupied")).await;
198-
} else {
199-
match serde_json::from_str::<Cancellation>(&action.payload)
200-
.context("Invalid cancel action payload")
201-
.and_then(|cancellation| {
202-
if cancellation.action_id == state.current.action.action_id {
203-
Ok(())
204-
} else {
205-
Err(anyhow::Error::msg(format!("Cancel action target ({}) doesn't match active download action id ({})", cancellation.action_id, &state.current.action.action_id)))
206-
}
207-
})
208-
.and_then(|_| {
209-
state.clean()
210-
.context("Couldn't couldn't perform cleanup")
211-
}) {
212-
Ok(_) => {
213-
self.bridge_tx.send_action_response(ActionResponse::success(action.action_id.as_str())).await;
214-
return DownloadResult::Err("action has been cancelled!".to_string());
215-
},
216-
Err(e) => {
217-
self.bridge_tx.send_action_response(ActionResponse::failure(action.action_id.as_str(), format!("Could not stop download: {e:?}"))).await;
218-
},
178+
if state.bytes_written < state.current.meta.content_length {
179+
let shutdown_rx = self.shutdown_rx.clone();
180+
loop {
181+
select! {
182+
o = self.continuous_retry(state) => {
183+
if let Err(e) = o {
184+
return DownloadResult::Err(e.to_string());
185+
} else {
186+
break;
219187
}
220-
}
221-
},
188+
},
189+
Ok(action) = self.actions_rx.recv_async() => {
190+
if &action.action_id == &state.current.action.action_id {
191+
// This handles the edge case when the device is able to receive actions
192+
// from the broker but for something goes wrong when pushing action statuses back to the backend
193+
// In this case the backend will try sending the same action again
194+
//
195+
// TODO: Right now we use the action status pushed by device as confirmation that it
196+
// has received the action. It is not very reliable because as of now the action status pipeline can drop messages.
197+
// Would it be better if the backend used MQTT Ack of the action message instead?
198+
log::error!("Backend tried sending the same action again!");
199+
} else if action.name != "cancel_action" {
200+
self.bridge_tx.send_action_response(ActionResponse::failure(action.action_id.as_str(), "Downloader is already occupied")).await;
201+
} else {
202+
match serde_json::from_str::<Cancellation>(&action.payload)
203+
.context("Invalid cancel action payload")
204+
.and_then(|cancellation| {
205+
if cancellation.action_id == state.current.action.action_id {
206+
Ok(())
207+
} else {
208+
Err(anyhow::Error::msg(format!("Cancel action target ({}) doesn't match active download action id ({})", cancellation.action_id, &state.current.action.action_id)))
209+
}
210+
})
211+
.and_then(|_| {
212+
state.clean()
213+
.context("Couldn't couldn't perform cleanup")
214+
}) {
215+
Ok(_) => {
216+
self.bridge_tx.send_action_response(ActionResponse::success(action.action_id.as_str())).await;
217+
return DownloadResult::Err("action has been cancelled!".to_string());
218+
},
219+
Err(e) => {
220+
self.bridge_tx.send_action_response(ActionResponse::failure(action.action_id.as_str(), format!("Could not stop download: {e:?}"))).await;
221+
},
222+
}
223+
}
224+
},
222225

223-
Ok(_) = shutdown_rx.recv_async(), if !shutdown_rx.is_disconnected() => {
224-
if let Err(e) = state.save(&self.config) {
225-
error!("Error saving current_download: {e:?}");
226-
}
226+
Ok(_) = shutdown_rx.recv_async(), if !shutdown_rx.is_disconnected() => {
227+
if let Err(e) = state.save(&self.config) {
228+
error!("Error saving current_download: {e:?}");
229+
}
227230

228-
return DownloadResult::Suspended;
229-
},
231+
return DownloadResult::Suspended;
232+
},
233+
}
230234
}
231235
}
232236

233237
self.bridge_tx.send_action_response(ActionResponse::progress(&state.current.action.action_id, "VerifyingChecksum", 99)).await;
234-
if let Err(e) = state.current.meta.verify_checksum() {
235-
return DownloadResult::Err(e.to_string());
238+
if state.current.meta.checksum.is_some() {
239+
if let Err(e) = state.current.meta.verify_checksum() {
240+
return DownloadResult::Err(e.to_string());
241+
}
236242
}
237243
// Update Action payload with `download_path`, i.e. downloaded file's location in fs
238244
state.current.action.payload = match serde_json::to_string(&state.current.meta) {
@@ -324,9 +330,7 @@ fn create_dirs_with_perms(path: &Path, perms: Permissions) -> std::io::Result<()
324330
}
325331

326332
/// Creates file to download into
327-
fn create_file(download_path: &PathBuf, file_name: &str) -> Result<(File, PathBuf), Error> {
328-
let mut file_path = download_path.to_owned();
329-
file_path.push(file_name);
333+
fn create_file(file_path: &Path) -> Result<File, Error> {
330334
// NOTE: if file_path is occupied by a directory due to previous working of uplink, remove it
331335
if let Ok(f) = metadata(&file_path) {
332336
if f.is_dir() {
@@ -337,7 +341,7 @@ fn create_file(download_path: &PathBuf, file_name: &str) -> Result<(File, PathBu
337341
#[cfg(unix)]
338342
file.set_permissions(std::os::unix::fs::PermissionsExt::from_mode(0o666))?;
339343

340-
Ok((file, file_path))
344+
Ok(file)
341345
}
342346

343347
fn check_disk_size(config: &DownloaderConfig, download: &DownloadFile) -> Result<(), Error> {
@@ -372,14 +376,13 @@ pub struct DownloadFile {
372376

373377
impl DownloadFile {
374378
fn verify_checksum(&self) -> Result<(), Error> {
375-
let Some(checksum) = &self.checksum else { return Ok(()) };
376379
let path = self.download_path.as_ref().expect("Downloader didn't set \"download_path\"");
377380
let mut file = File::open(path)?;
378381
let mut hasher = Sha256::new();
379382
io::copy(&mut file, &mut hasher)?;
380383
let hash = hasher.finalize();
381384

382-
if checksum != &hex::encode(hash) {
385+
if self.checksum.as_ref().unwrap() != &hex::encode(hash) {
383386
return Err(Error::BadChecksum);
384387
}
385388

@@ -429,6 +432,19 @@ impl DownloadState {
429432
};
430433

431434
let file_path = path.join(&meta.file_name);
435+
meta.download_path = Some(file_path.clone());
436+
if meta.checksum.is_some() && meta.verify_checksum().is_ok() {
437+
// TODO: verify that range of size zero works as expected with consoled, platform,
438+
info!("file has already been downloaded and its checksum matches, skipping download...");
439+
return Ok(Self {
440+
bytes_written: meta.content_length,
441+
current: CurrentDownload { action, meta },
442+
file: File::open("/dev/null")?,
443+
percentage_downloaded: 100,
444+
start: Instant::now(),
445+
});
446+
}
447+
432448
let _ = remove_file(&file_path);
433449
let _ = remove_dir_all(&file_path);
434450

@@ -437,15 +453,14 @@ impl DownloadState {
437453
let url = meta.url.clone();
438454

439455
// Create file to actually download into
440-
let (file, file_path) = create_file(&path, &meta.file_name)?;
456+
let file = create_file(&file_path)?;
441457
// Retry downloading upto 3 times in case of connectivity issues
442458
// TODO: Error out for 1XX/3XX responses
443459
info!(
444460
"Downloading from {url} into {}; size = {}",
445461
file_path.display(),
446462
human_bytes(meta.content_length as f64)
447463
);
448-
meta.download_path = Some(file_path);
449464
let current = CurrentDownload { action, meta };
450465

451466
Ok(Self {

0 commit comments

Comments
 (0)