Skip to content

Commit cc93a82

Browse files
committed
chore: some cleanup
1 parent ae0356f commit cc93a82

File tree

3 files changed

+62
-56
lines changed

3 files changed

+62
-56
lines changed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

uplink/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "uplink"
3-
version = "2.18.4"
3+
version = "2.18.5"
44
authors = ["tekjar <[email protected]>"]
55
edition = "2024"
66

uplink/src/collector/downloader.rs

Lines changed: 59 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -175,62 +175,64 @@ impl FileDownloader {
175175

176176
// Accepts `DownloadState`, sets a timeout for the action
177177
async fn download(&mut self, state: &mut DownloadState) -> DownloadResult {
178-
if state.bytes_written < state.current.meta.content_length {
179-
let shutdown_rx = self.shutdown_rx.clone();
180-
loop {
181-
select! {
182-
o = self.continuous_retry(state) => {
183-
if let Err(e) = o {
184-
return DownloadResult::Err(e.to_string());
185-
} else {
186-
break;
187-
}
188-
},
189-
Ok(action) = self.actions_rx.recv_async() => {
190-
if &action.action_id == &state.current.action.action_id {
191-
// This handles the edge case when the device is able to receive actions
192-
// from the broker but for something goes wrong when pushing action statuses back to the backend
193-
// In this case the backend will try sending the same action again
194-
//
195-
// TODO: Right now we use the action status pushed by device as confirmation that it
196-
// has received the action. It is not very reliable because as of now the action status pipeline can drop messages.
197-
// Would it be better if the backend used MQTT Ack of the action message instead?
198-
log::error!("Backend tried sending the same action again!");
199-
} else if action.name != "cancel_action" {
200-
self.bridge_tx.send_action_response(ActionResponse::failure(action.action_id.as_str(), "Downloader is already occupied")).await;
201-
} else {
202-
match serde_json::from_str::<Cancellation>(&action.payload)
203-
.context("Invalid cancel action payload")
204-
.and_then(|cancellation| {
205-
if cancellation.action_id == state.current.action.action_id {
206-
Ok(())
207-
} else {
208-
Err(anyhow::Error::msg(format!("Cancel action target ({}) doesn't match active download action id ({})", cancellation.action_id, &state.current.action.action_id)))
209-
}
210-
})
211-
.and_then(|_| {
212-
state.clean()
213-
.context("Couldn't couldn't perform cleanup")
214-
}) {
215-
Ok(_) => {
216-
self.bridge_tx.send_action_response(ActionResponse::success(action.action_id.as_str())).await;
217-
return DownloadResult::Err("action has been cancelled!".to_string());
218-
},
219-
Err(e) => {
220-
self.bridge_tx.send_action_response(ActionResponse::failure(action.action_id.as_str(), format!("Could not stop download: {e:?}"))).await;
221-
},
222-
}
223-
}
224-
},
178+
if state.already_downloaded {
179+
return DownloadResult::Ok;
180+
}
225181

226-
Ok(_) = shutdown_rx.recv_async(), if !shutdown_rx.is_disconnected() => {
227-
if let Err(e) = state.save(&self.config) {
228-
error!("Error saving current_download: {e:?}");
182+
let shutdown_rx = self.shutdown_rx.clone();
183+
loop {
184+
select! {
185+
o = self.continuous_retry(state) => {
186+
if let Err(e) = o {
187+
return DownloadResult::Err(e.to_string());
188+
} else {
189+
break;
190+
}
191+
},
192+
Ok(action) = self.actions_rx.recv_async() => {
193+
if &action.action_id == &state.current.action.action_id {
194+
// This handles the edge case when the device is able to receive actions
195+
// from the broker but for something goes wrong when pushing action statuses back to the backend
196+
// In this case the backend will try sending the same action again
197+
//
198+
// TODO: Right now we use the action status pushed by device as confirmation that it
199+
// has received the action. It is not very reliable because as of now the action status pipeline can drop messages.
200+
// Would it be better if the backend used MQTT Ack of the action message instead?
201+
log::error!("Backend tried sending the same action again!");
202+
} else if action.name != "cancel_action" {
203+
self.bridge_tx.send_action_response(ActionResponse::failure(action.action_id.as_str(), "Downloader is already occupied")).await;
204+
} else {
205+
match serde_json::from_str::<Cancellation>(&action.payload)
206+
.context("Invalid cancel action payload")
207+
.and_then(|cancellation| {
208+
if cancellation.action_id == state.current.action.action_id {
209+
Ok(())
210+
} else {
211+
Err(anyhow::Error::msg(format!("Cancel action target ({}) doesn't match active download action id ({})", cancellation.action_id, &state.current.action.action_id)))
212+
}
213+
})
214+
.and_then(|_| {
215+
state.clean()
216+
.context("Couldn't couldn't perform cleanup")
217+
}) {
218+
Ok(_) => {
219+
self.bridge_tx.send_action_response(ActionResponse::success(action.action_id.as_str())).await;
220+
return DownloadResult::Err("action has been cancelled!".to_string());
221+
},
222+
Err(e) => {
223+
self.bridge_tx.send_action_response(ActionResponse::failure(action.action_id.as_str(), format!("Could not stop download: {e:?}"))).await;
224+
},
229225
}
226+
}
227+
},
230228

231-
return DownloadResult::Suspended;
232-
},
233-
}
229+
Ok(_) = shutdown_rx.recv_async(), if !shutdown_rx.is_disconnected() => {
230+
if let Err(e) = state.save(&self.config) {
231+
error!("Error saving current_download: {e:?}");
232+
}
233+
234+
return DownloadResult::Suspended;
235+
},
234236
}
235237
}
236238

@@ -404,6 +406,7 @@ struct DownloadState {
404406
file: File,
405407
bytes_written: usize,
406408
percentage_downloaded: u8,
409+
already_downloaded: bool,
407410
start: Instant,
408411
}
409412

@@ -441,6 +444,7 @@ impl DownloadState {
441444
current: CurrentDownload { action, meta },
442445
file: File::open("/dev/null")?,
443446
percentage_downloaded: 100,
447+
already_downloaded: true,
444448
start: Instant::now(),
445449
});
446450
}
@@ -468,6 +472,7 @@ impl DownloadState {
468472
file,
469473
bytes_written: 0,
470474
percentage_downloaded: 0,
475+
already_downloaded: false,
471476
start: Instant::now(),
472477
})
473478
}
@@ -495,6 +500,7 @@ impl DownloadState {
495500
file,
496501
bytes_written,
497502
percentage_downloaded: 0,
503+
already_downloaded: false,
498504
start: Instant::now(),
499505
})
500506
}

0 commit comments

Comments
 (0)