Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

246 changes: 185 additions & 61 deletions crates/rattler_index/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,21 @@
use anyhow::Result;
use bytes::buf::Buf;
use fs_err::{self as fs};
use futures::future::try_join_all;
use futures::{stream::FuturesUnordered, StreamExt};
use fxhash::FxHashMap;
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use rattler_conda_types::{
package::{ArchiveType, IndexJson, PackageFile},
ChannelInfo, PackageRecord, Platform, RepoData,
ChannelInfo, PackageRecord, PatchInstructions, Platform, RepoData,
};
use rattler_networking::{Authentication, AuthenticationStorage};
use rattler_package_streaming::{read, seek};
use rattler_package_streaming::{
read,
seek::{self, stream_conda_content},
};
use std::{
collections::{HashMap, HashSet},
io::{Cursor, Read},
io::{Cursor, Read, Seek},
path::{Path, PathBuf},
str::FromStr,
sync::Arc,
Expand Down Expand Up @@ -71,6 +74,38 @@ pub fn package_record_from_index_json<T: Read>(
Ok(package_record)
}

fn repodata_patch_from_conda_package_stream<'a>(
package: impl Read + Seek + 'a,
) -> std::io::Result<rattler_conda_types::RepoDataPatch> {
let mut subdirs = FxHashMap::default();

let mut content_reader = stream_conda_content(package).unwrap();
let entries = content_reader.entries().unwrap();
for entry in entries {
let mut entry = entry.unwrap();
if !entry.header().entry_type().is_file() {
todo!();
}
let mut buf = Vec::new();
entry.read_to_end(&mut buf).unwrap();
let path = entry.path().unwrap();
let components = path.components().collect::<Vec<_>>();
let subdir = if components.len() == 2 {
if components[1].as_os_str() != "patch_instructions.json" {
todo!();
}
components[0].as_os_str().to_string_lossy().to_string()
} else {
todo!();
};

let instructions: PatchInstructions = serde_json::from_slice(&buf).unwrap();
subdirs.insert(subdir, instructions);
}

Ok(rattler_conda_types::RepoDataPatch { subdirs })
}

/// Extract the package record from a `.tar.bz2` package file.
/// This function will look for the `info/index.json` file in the conda package
/// and extract the package record from it.
Expand Down Expand Up @@ -132,12 +167,17 @@ async fn index_subdir(
subdir: Platform,
op: Operator,
force: bool,
repodata_patch: Option<PatchInstructions>,
progress: Option<MultiProgress>,
semaphore: Arc<Semaphore>,
) -> Result<()> {
let repodata_path = if repodata_patch.is_some() {
format!("{subdir}/repodata_from_packages.json")
} else {
format!("{subdir}/repodata.json")
};
let mut registered_packages: FxHashMap<String, PackageRecord> = HashMap::default();
if !force {
let repodata_path = format!("{subdir}/repodata.json");
let repodata_bytes = op.read(&repodata_path).await;
let repodata: RepoData = match repodata_bytes {
Ok(bytes) => serde_json::from_slice(&bytes.to_vec())?,
Expand Down Expand Up @@ -210,7 +250,7 @@ async fn index_subdir(
.cloned()
.collect::<Vec<_>>();

tracing::debug!(
tracing::info!(
"Adding {} packages to subdir {}.",
packages_to_add.len(),
subdir
Expand All @@ -229,53 +269,80 @@ async fn index_subdir(
.progress_chars("##-");
pb.set_style(sty);

let tasks = packages_to_add
.iter()
.map(|filename| {
tokio::spawn({
let op = op.clone();
let filename = filename.clone();
let pb = pb.clone();
let semaphore = semaphore.clone();
{
async move {
let _permit = semaphore
.acquire()
.await
.expect("Semaphore was unexpectedly closed");
pb.set_message(format!(
"Indexing {} {}",
subdir.as_str(),
console::style(filename.clone()).dim()
));
let file_path = format!("{subdir}/{filename}");
let buffer = op.read(&file_path).await?;
let reader = buffer.reader();
// We already know it's not None
let archive_type = ArchiveType::try_from(&filename).unwrap();
let record = match archive_type {
ArchiveType::TarBz2 => package_record_from_tar_bz2_reader(reader),
ArchiveType::Conda => package_record_from_conda_reader(reader),
}?;
pb.inc(1);
Ok::<(String, PackageRecord), std::io::Error>((filename.clone(), record))
}
let mut tasks = FuturesUnordered::new();
for filename in packages_to_add.iter() {
let task = {
let op = op.clone();
let filename = filename.clone();
let pb = pb.clone();
let semaphore = semaphore.clone();
{
async move {
let _permit = semaphore
.acquire()
.await
.expect("Semaphore was unexpectedly closed");
pb.set_message(format!(
"Indexing {} {}",
subdir.as_str(),
console::style(filename.clone()).dim()
));
let file_path = format!("{subdir}/{filename}");
let buffer = op.read(&file_path).await?;
let reader = buffer.reader();
// We already know it's not None
let archive_type = ArchiveType::try_from(&filename).unwrap();
let record = match archive_type {
ArchiveType::TarBz2 => package_record_from_tar_bz2_reader(reader),
ArchiveType::Conda => package_record_from_conda_reader(reader),
}?;
pb.inc(1);
// todo: make this future ok/err instead of results
Ok::<(String, PackageRecord), std::io::Error>((filename.clone(), record))
}
})
})
.collect::<Vec<_>>();
let results = try_join_all(tasks).await?;

pb.finish_with_message(format!("Finished {}", subdir.as_str()));
}
};
tasks.push(tokio::spawn(task));
}
let mut results = Vec::new();
while let Some(join_result) = tasks.next().await {
match join_result {
Ok(Ok(result)) => results.push(result),
Ok(Err(e)) => {
tasks.clear();
tracing::error!("Failed to process package: {}", e);
pb.abandon_with_message(format!(
"{} {}",
console::style("Failed to index").red(),
console::style(subdir.as_str()).dim()
));
Comment on lines +321 to +325
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not perfect as the progress bar can get overwritten by other tasks that are still executed after this failure. haven't seen a nice way to avoid this behavior

image

i think we can keep it for now like this since the error messages are printed anyway and i expect this script to be executed in non-interactive environments anyway most of the time

return Err(e.into());
}
Err(join_err) => {
tasks.clear();
tracing::error!("Task panicked: {}", join_err);
pb.abandon_with_message(format!(
"{} {}",
console::style("Failed to index").red(),
console::style(subdir.as_str()).dim()
));
return Err(anyhow::anyhow!("Task panicked: {}", join_err));
}
}
}
pb.finish_with_message(format!(
"{} {}",
console::style("Finished").green(),
subdir.as_str()
));

tracing::debug!(
tracing::info!(
"Successfully added {} packages to subdir {}.",
results.len(),
subdir
);

for result in results {
let (filename, record) = result?;
for (filename, record) in results {
registered_packages.insert(filename, record);
}

Expand Down Expand Up @@ -304,9 +371,19 @@ async fn index_subdir(
version: Some(2),
};

let repodata_path = format!("{subdir}/repodata.json");
tracing::info!("Writing repodata to {}", repodata_path);
let repodata_bytes = serde_json::to_vec(&repodata)?;
op.write(&repodata_path, repodata_bytes).await?;

if let Some(instructions) = repodata_patch {
let patched_repodata_path = format!("{subdir}/repodata.json");
tracing::info!("Writing patched repodata to {}", patched_repodata_path);
let mut patched_repodata = repodata.clone();
patched_repodata.apply_patches(&instructions);
let patched_repodata_bytes = serde_json::to_vec(&patched_repodata)?;
op.write(&patched_repodata_path, patched_repodata_bytes)
.await?;
}
// todo: also write repodata.json.bz2, repodata.json.zst, repodata.json.jlap and sharded repodata once available in rattler
// https://github.com/conda/rattler/issues/1096

Expand All @@ -317,13 +394,22 @@ async fn index_subdir(
pub async fn index_fs(
channel: impl Into<PathBuf>,
target_platform: Option<Platform>,
repodata_patch: Option<String>,
force: bool,
max_parallel: usize,
multi_progress: Option<MultiProgress>,
) -> anyhow::Result<()> {
let mut config = FsConfig::default();
config.root = Some(channel.into().canonicalize()?.to_string_lossy().to_string());
index(target_platform, config, force, max_parallel, multi_progress).await
index(
target_platform,
config,
force,
max_parallel,
multi_progress,
repodata_patch,
)
.await
}

/// Create a new `repodata.json` for all packages in the channel at the given S3 URL.
Expand All @@ -337,6 +423,7 @@ pub async fn index_s3(
secret_access_key: Option<String>,
session_token: Option<String>,
target_platform: Option<Platform>,
repodata_patch: Option<String>,
force: bool,
max_parallel: usize,
multi_progress: Option<MultiProgress>,
Expand Down Expand Up @@ -379,6 +466,7 @@ pub async fn index_s3(
force,
max_parallel,
multi_progress,
repodata_patch,
)
.await
}
Expand All @@ -401,6 +489,7 @@ pub async fn index<T: Configurator>(
force: bool,
max_parallel: usize,
multi_progress: Option<MultiProgress>,
repodata_patch: Option<String>,
) -> anyhow::Result<()> {
let builder = config.into_builder();

Expand Down Expand Up @@ -443,22 +532,57 @@ pub async fn index<T: Configurator>(
subdirs.insert(Platform::NoArch);
}

let repodata_patch = if let Some(path) = repodata_patch {
match ArchiveType::try_from(path.clone()) {
Some(ArchiveType::Conda) => {}
Some(ArchiveType::TarBz2) | None => {
return Err(anyhow::anyhow!(
"Only .conda packages are supported for repodata patches. Got: {}",
path
))
}
}
let repodata_patch_path = format!("noarch/{path}");
let repodata_patch_bytes = op.read(&repodata_patch_path).await?.to_bytes();
let reader = Cursor::new(repodata_patch_bytes);
let repodata_patch = repodata_patch_from_conda_package_stream(reader)?;
Some(repodata_patch)
} else {
None
};

let semaphore = Semaphore::new(max_parallel);
let semaphore = Arc::new(semaphore);

let tasks = subdirs
.iter()
.map(|subdir| {
tokio::spawn(index_subdir(
*subdir,
op.clone(),
force,
multi_progress.clone(),
semaphore.clone(),
))
})
.collect::<Vec<_>>();
try_join_all(tasks).await?;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was silently failing before because it only checked whether the join handles were okay but not whether the actual return values of the tasks were

let mut tasks = FuturesUnordered::new();
for subdir in subdirs.iter() {
let task = index_subdir(
*subdir,
op.clone(),
force,
repodata_patch
.as_ref()
.and_then(|p| p.subdirs.get(&subdir.to_string()).cloned()),
multi_progress.clone(),
semaphore.clone(),
);
tasks.push(tokio::spawn(task));
}

while let Some(join_result) = tasks.next().await {
match join_result {
Ok(Ok(_)) => {}
Ok(Err(e)) => {
tracing::error!("Failed to process subdir: {}", e);
tasks.clear();
return Err(e);
}
Err(join_err) => {
tracing::error!("Task panicked: {}", join_err);
tasks.clear();
return Err(anyhow::anyhow!("Task panicked: {}", join_err));
}
}
}
Ok(())
}
Loading
Loading