-
Notifications
You must be signed in to change notification settings - Fork 109
feat: Add support for repodata patching in rattler-index, fix silent failures #1129
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
e78df5a
afa31c1
29430c4
01a2da5
a1d4ac0
b71a227
7bbb1a3
5964509
d896ab4
069fdf3
4b716f9
ef7aba6
5894ce7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,18 +5,21 @@ | |
| use anyhow::Result; | ||
| use bytes::buf::Buf; | ||
| use fs_err::{self as fs}; | ||
| use futures::future::try_join_all; | ||
| use futures::{stream::FuturesUnordered, StreamExt}; | ||
| use fxhash::FxHashMap; | ||
| use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; | ||
| use rattler_conda_types::{ | ||
| package::{ArchiveType, IndexJson, PackageFile}, | ||
| ChannelInfo, PackageRecord, Platform, RepoData, | ||
| ChannelInfo, PackageRecord, PatchInstructions, Platform, RepoData, | ||
| }; | ||
| use rattler_networking::{Authentication, AuthenticationStorage}; | ||
| use rattler_package_streaming::{read, seek}; | ||
| use rattler_package_streaming::{ | ||
| read, | ||
| seek::{self, stream_conda_content}, | ||
| }; | ||
| use std::{ | ||
| collections::{HashMap, HashSet}, | ||
| io::{Cursor, Read}, | ||
| io::{Cursor, Read, Seek}, | ||
| path::{Path, PathBuf}, | ||
| str::FromStr, | ||
| sync::Arc, | ||
|
|
@@ -71,6 +74,38 @@ pub fn package_record_from_index_json<T: Read>( | |
| Ok(package_record) | ||
| } | ||
|
|
||
| fn repodata_patch_from_conda_package_stream<'a>( | ||
| package: impl Read + Seek + 'a, | ||
| ) -> std::io::Result<rattler_conda_types::RepoDataPatch> { | ||
| let mut subdirs = FxHashMap::default(); | ||
|
|
||
| let mut content_reader = stream_conda_content(package).unwrap(); | ||
| let entries = content_reader.entries().unwrap(); | ||
| for entry in entries { | ||
| let mut entry = entry.unwrap(); | ||
| if !entry.header().entry_type().is_file() { | ||
| todo!(); | ||
| } | ||
| let mut buf = Vec::new(); | ||
| entry.read_to_end(&mut buf).unwrap(); | ||
| let path = entry.path().unwrap(); | ||
| let components = path.components().collect::<Vec<_>>(); | ||
| let subdir = if components.len() == 2 { | ||
| if components[1].as_os_str() != "patch_instructions.json" { | ||
| todo!(); | ||
| } | ||
| components[0].as_os_str().to_string_lossy().to_string() | ||
| } else { | ||
| todo!(); | ||
| }; | ||
|
|
||
| let instructions: PatchInstructions = serde_json::from_slice(&buf).unwrap(); | ||
| subdirs.insert(subdir, instructions); | ||
| } | ||
|
|
||
| Ok(rattler_conda_types::RepoDataPatch { subdirs }) | ||
| } | ||
|
|
||
| /// Extract the package record from a `.tar.bz2` package file. | ||
| /// This function will look for the `info/index.json` file in the conda package | ||
| /// and extract the package record from it. | ||
|
|
@@ -132,12 +167,17 @@ async fn index_subdir( | |
| subdir: Platform, | ||
| op: Operator, | ||
| force: bool, | ||
| repodata_patch: Option<PatchInstructions>, | ||
| progress: Option<MultiProgress>, | ||
| semaphore: Arc<Semaphore>, | ||
| ) -> Result<()> { | ||
| let repodata_path = if repodata_patch.is_some() { | ||
| format!("{subdir}/repodata_from_packages.json") | ||
| } else { | ||
| format!("{subdir}/repodata.json") | ||
| }; | ||
| let mut registered_packages: FxHashMap<String, PackageRecord> = HashMap::default(); | ||
| if !force { | ||
| let repodata_path = format!("{subdir}/repodata.json"); | ||
| let repodata_bytes = op.read(&repodata_path).await; | ||
| let repodata: RepoData = match repodata_bytes { | ||
| Ok(bytes) => serde_json::from_slice(&bytes.to_vec())?, | ||
|
|
@@ -210,7 +250,7 @@ async fn index_subdir( | |
| .cloned() | ||
| .collect::<Vec<_>>(); | ||
|
|
||
| tracing::debug!( | ||
| tracing::info!( | ||
| "Adding {} packages to subdir {}.", | ||
| packages_to_add.len(), | ||
| subdir | ||
|
|
@@ -229,53 +269,80 @@ async fn index_subdir( | |
| .progress_chars("##-"); | ||
| pb.set_style(sty); | ||
|
|
||
| let tasks = packages_to_add | ||
| .iter() | ||
| .map(|filename| { | ||
| tokio::spawn({ | ||
| let op = op.clone(); | ||
| let filename = filename.clone(); | ||
| let pb = pb.clone(); | ||
| let semaphore = semaphore.clone(); | ||
| { | ||
| async move { | ||
| let _permit = semaphore | ||
| .acquire() | ||
| .await | ||
| .expect("Semaphore was unexpectedly closed"); | ||
| pb.set_message(format!( | ||
| "Indexing {} {}", | ||
| subdir.as_str(), | ||
| console::style(filename.clone()).dim() | ||
| )); | ||
| let file_path = format!("{subdir}/{filename}"); | ||
| let buffer = op.read(&file_path).await?; | ||
| let reader = buffer.reader(); | ||
| // We already know it's not None | ||
| let archive_type = ArchiveType::try_from(&filename).unwrap(); | ||
| let record = match archive_type { | ||
| ArchiveType::TarBz2 => package_record_from_tar_bz2_reader(reader), | ||
| ArchiveType::Conda => package_record_from_conda_reader(reader), | ||
| }?; | ||
| pb.inc(1); | ||
| Ok::<(String, PackageRecord), std::io::Error>((filename.clone(), record)) | ||
| } | ||
| let mut tasks = FuturesUnordered::new(); | ||
| for filename in packages_to_add.iter() { | ||
| let task = { | ||
| let op = op.clone(); | ||
| let filename = filename.clone(); | ||
| let pb = pb.clone(); | ||
| let semaphore = semaphore.clone(); | ||
| { | ||
| async move { | ||
| let _permit = semaphore | ||
| .acquire() | ||
| .await | ||
| .expect("Semaphore was unexpectedly closed"); | ||
| pb.set_message(format!( | ||
| "Indexing {} {}", | ||
| subdir.as_str(), | ||
| console::style(filename.clone()).dim() | ||
| )); | ||
| let file_path = format!("{subdir}/{filename}"); | ||
| let buffer = op.read(&file_path).await?; | ||
| let reader = buffer.reader(); | ||
| // We already know it's not None | ||
| let archive_type = ArchiveType::try_from(&filename).unwrap(); | ||
| let record = match archive_type { | ||
| ArchiveType::TarBz2 => package_record_from_tar_bz2_reader(reader), | ||
| ArchiveType::Conda => package_record_from_conda_reader(reader), | ||
| }?; | ||
| pb.inc(1); | ||
| // todo: make this future ok/err instead of results | ||
| Ok::<(String, PackageRecord), std::io::Error>((filename.clone(), record)) | ||
| } | ||
| }) | ||
| }) | ||
| .collect::<Vec<_>>(); | ||
| let results = try_join_all(tasks).await?; | ||
|
|
||
| pb.finish_with_message(format!("Finished {}", subdir.as_str())); | ||
| } | ||
| }; | ||
| tasks.push(tokio::spawn(task)); | ||
| } | ||
| let mut results = Vec::new(); | ||
| while let Some(join_result) = tasks.next().await { | ||
| match join_result { | ||
| Ok(Ok(result)) => results.push(result), | ||
| Ok(Err(e)) => { | ||
| tasks.clear(); | ||
| tracing::error!("Failed to process package: {}", e); | ||
| pb.abandon_with_message(format!( | ||
| "{} {}", | ||
| console::style("Failed to index").red(), | ||
| console::style(subdir.as_str()).dim() | ||
| )); | ||
| return Err(e.into()); | ||
| } | ||
| Err(join_err) => { | ||
| tasks.clear(); | ||
| tracing::error!("Task panicked: {}", join_err); | ||
| pb.abandon_with_message(format!( | ||
| "{} {}", | ||
| console::style("Failed to index").red(), | ||
| console::style(subdir.as_str()).dim() | ||
| )); | ||
| return Err(anyhow::anyhow!("Task panicked: {}", join_err)); | ||
| } | ||
| } | ||
| } | ||
| pb.finish_with_message(format!( | ||
| "{} {}", | ||
| console::style("Finished").green(), | ||
| subdir.as_str() | ||
| )); | ||
|
|
||
| tracing::debug!( | ||
| tracing::info!( | ||
| "Successfully added {} packages to subdir {}.", | ||
| results.len(), | ||
| subdir | ||
| ); | ||
|
|
||
| for result in results { | ||
| let (filename, record) = result?; | ||
| for (filename, record) in results { | ||
| registered_packages.insert(filename, record); | ||
| } | ||
|
|
||
|
|
@@ -304,9 +371,19 @@ async fn index_subdir( | |
| version: Some(2), | ||
| }; | ||
|
|
||
| let repodata_path = format!("{subdir}/repodata.json"); | ||
| tracing::info!("Writing repodata to {}", repodata_path); | ||
| let repodata_bytes = serde_json::to_vec(&repodata)?; | ||
| op.write(&repodata_path, repodata_bytes).await?; | ||
|
|
||
| if let Some(instructions) = repodata_patch { | ||
| let patched_repodata_path = format!("{subdir}/repodata.json"); | ||
| tracing::info!("Writing patched repodata to {}", patched_repodata_path); | ||
| let mut patched_repodata = repodata.clone(); | ||
| patched_repodata.apply_patches(&instructions); | ||
| let patched_repodata_bytes = serde_json::to_vec(&patched_repodata)?; | ||
| op.write(&patched_repodata_path, patched_repodata_bytes) | ||
| .await?; | ||
| } | ||
| // todo: also write repodata.json.bz2, repodata.json.zst, repodata.json.jlap and sharded repodata once available in rattler | ||
| // https://github.com/conda/rattler/issues/1096 | ||
|
|
||
|
|
@@ -317,13 +394,22 @@ async fn index_subdir( | |
| pub async fn index_fs( | ||
| channel: impl Into<PathBuf>, | ||
| target_platform: Option<Platform>, | ||
| repodata_patch: Option<String>, | ||
| force: bool, | ||
| max_parallel: usize, | ||
| multi_progress: Option<MultiProgress>, | ||
| ) -> anyhow::Result<()> { | ||
| let mut config = FsConfig::default(); | ||
| config.root = Some(channel.into().canonicalize()?.to_string_lossy().to_string()); | ||
| index(target_platform, config, force, max_parallel, multi_progress).await | ||
| index( | ||
| target_platform, | ||
| config, | ||
| force, | ||
| max_parallel, | ||
| multi_progress, | ||
| repodata_patch, | ||
| ) | ||
| .await | ||
| } | ||
|
|
||
| /// Create a new `repodata.json` for all packages in the channel at the given S3 URL. | ||
|
|
@@ -337,6 +423,7 @@ pub async fn index_s3( | |
| secret_access_key: Option<String>, | ||
| session_token: Option<String>, | ||
| target_platform: Option<Platform>, | ||
| repodata_patch: Option<String>, | ||
| force: bool, | ||
| max_parallel: usize, | ||
| multi_progress: Option<MultiProgress>, | ||
|
|
@@ -379,6 +466,7 @@ pub async fn index_s3( | |
| force, | ||
| max_parallel, | ||
| multi_progress, | ||
| repodata_patch, | ||
| ) | ||
| .await | ||
| } | ||
|
|
@@ -401,6 +489,7 @@ pub async fn index<T: Configurator>( | |
| force: bool, | ||
| max_parallel: usize, | ||
| multi_progress: Option<MultiProgress>, | ||
| repodata_patch: Option<String>, | ||
| ) -> anyhow::Result<()> { | ||
| let builder = config.into_builder(); | ||
|
|
||
|
|
@@ -443,22 +532,57 @@ pub async fn index<T: Configurator>( | |
| subdirs.insert(Platform::NoArch); | ||
| } | ||
|
|
||
| let repodata_patch = if let Some(path) = repodata_patch { | ||
| match ArchiveType::try_from(path.clone()) { | ||
| Some(ArchiveType::Conda) => {} | ||
| Some(ArchiveType::TarBz2) | None => { | ||
| return Err(anyhow::anyhow!( | ||
| "Only .conda packages are supported for repodata patches. Got: {}", | ||
| path | ||
| )) | ||
| } | ||
| } | ||
| let repodata_patch_path = format!("noarch/{path}"); | ||
| let repodata_patch_bytes = op.read(&repodata_patch_path).await?.to_bytes(); | ||
| let reader = Cursor::new(repodata_patch_bytes); | ||
| let repodata_patch = repodata_patch_from_conda_package_stream(reader)?; | ||
| Some(repodata_patch) | ||
| } else { | ||
| None | ||
| }; | ||
|
|
||
| let semaphore = Semaphore::new(max_parallel); | ||
| let semaphore = Arc::new(semaphore); | ||
|
|
||
| let tasks = subdirs | ||
| .iter() | ||
| .map(|subdir| { | ||
| tokio::spawn(index_subdir( | ||
| *subdir, | ||
| op.clone(), | ||
| force, | ||
| multi_progress.clone(), | ||
| semaphore.clone(), | ||
| )) | ||
| }) | ||
| .collect::<Vec<_>>(); | ||
| try_join_all(tasks).await?; | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this was silently failing before because it only checked whether the join handles were okay but not whether the actual return values of the tasks were |
||
| let mut tasks = FuturesUnordered::new(); | ||
| for subdir in subdirs.iter() { | ||
| let task = index_subdir( | ||
| *subdir, | ||
| op.clone(), | ||
| force, | ||
| repodata_patch | ||
| .as_ref() | ||
| .and_then(|p| p.subdirs.get(&subdir.to_string()).cloned()), | ||
| multi_progress.clone(), | ||
| semaphore.clone(), | ||
| ); | ||
| tasks.push(tokio::spawn(task)); | ||
| } | ||
|
|
||
| while let Some(join_result) = tasks.next().await { | ||
| match join_result { | ||
| Ok(Ok(_)) => {} | ||
| Ok(Err(e)) => { | ||
| tracing::error!("Failed to process subdir: {}", e); | ||
| tasks.clear(); | ||
| return Err(e); | ||
| } | ||
| Err(join_err) => { | ||
| tracing::error!("Task panicked: {}", join_err); | ||
| tasks.clear(); | ||
| return Err(anyhow::anyhow!("Task panicked: {}", join_err)); | ||
| } | ||
| } | ||
| } | ||
| Ok(()) | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not perfect as the progress bar can get overwritten by other tasks that are still executed after this failure. haven't seen a nice way to avoid this behavior
i think we can keep it for now like this since the error messages are printed anyway and i expect this script to be executed in non-interactive environments anyway most of the time