Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ bytes = { version = "1" }
chrono = { version = "0.4.40", default-features = false, features = ["clock"] }
tracing = { version = "0.1", features = ["log"] }
regex = { version = "1" }
rstest = { version = "0.26.1" }
thiserror = { version = "2" }
url = { version = "2" }
percent-encoding-rfc3986 = { version = "0.1.3" }
Expand Down
2 changes: 1 addition & 1 deletion crates/aws/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ rust-version.workspace = true

[dependencies]
# path dependencies
deltalake-core = { version = "0.29.0", path = "../core" , features = ["cloud"]}
deltalake-logstore = { version = "0.29.0", path = "../logstore" , features = ["cloud"]}

# workspace dependencies
async-trait = { workspace = true }
Expand Down
12 changes: 6 additions & 6 deletions crates/aws/src/credentials.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ use aws_credential_types::provider::error::CredentialsError;
use aws_credential_types::provider::{future, ProvideCredentials};
use aws_credential_types::Credentials;

use deltalake_core::logstore::object_store::aws::{AmazonS3ConfigKey, AwsCredential};
use deltalake_core::logstore::object_store::{
use deltalake_logstore::object_store::aws::{AmazonS3ConfigKey, AwsCredential};
use deltalake_logstore::object_store::{
CredentialProvider, Error as ObjectStoreError, Result as ObjectStoreResult,
};
use deltalake_core::DeltaResult;
use deltalake_logstore::LogStoreResult;
use tokio::sync::Mutex;
use tracing::log::*;

Expand Down Expand Up @@ -255,7 +255,7 @@ fn assume_session_name(options: &HashMap<String, String>) -> String {

/// Take a set of [StorageOptions] and produce an appropriate AWS SDK [SdkConfig]
/// for use with various AWS SDK APIs, such as in our [S3DynamoDbLogStore](crate::logstore::S3DynamoDbLogStore)
pub async fn resolve_credentials(options: &HashMap<String, String>) -> DeltaResult<SdkConfig> {
pub async fn resolve_credentials(options: &HashMap<String, String>) -> LogStoreResult<SdkConfig> {
let default_provider = DefaultCredentialsChain::builder().build().await;

let credentials_provider = match assume_role_arn(options) {
Expand Down Expand Up @@ -373,7 +373,7 @@ mod tests {

#[tokio::test]
#[serial]
async fn test_object_store_credential_provider() -> DeltaResult<()> {
async fn test_object_store_credential_provider() -> LogStoreResult<()> {
let options = HashMap::from([
(
constants::AWS_ACCESS_KEY_ID.to_string(),
Expand Down Expand Up @@ -403,7 +403,7 @@ mod tests {
/// operations.
#[tokio::test]
#[serial]
async fn test_object_store_credential_provider_consistency() -> DeltaResult<()> {
async fn test_object_store_credential_provider_consistency() -> LogStoreResult<()> {
let options = HashMap::from([
(
constants::AWS_ACCESS_KEY_ID.to_string(),
Expand Down
8 changes: 4 additions & 4 deletions crates/aws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ use aws_sdk_dynamodb::{
},
Client,
};
use deltalake_core::logstore::object_store::aws::AmazonS3ConfigKey;
use deltalake_core::logstore::{
use deltalake_logstore::object_store::aws::AmazonS3ConfigKey;
use deltalake_logstore::{
default_logstore, logstore_factories, object_store_factories, LogStore, LogStoreFactory,
ObjectStoreRef, StorageConfig,
};
use deltalake_core::{DeltaResult, Path};
use deltalake_logstore::{LogStoreResult, Path};
use errors::{DynamoDbConfigError, LockClientError};
use regex::Regex;
use std::{
Expand Down Expand Up @@ -59,7 +59,7 @@ impl LogStoreFactory for S3LogStoreFactory {
root_store: ObjectStoreRef,
location: &Url,
options: &StorageConfig,
) -> DeltaResult<Arc<dyn LogStore>> {
) -> LogStoreResult<Arc<dyn LogStore>> {
let s3_options = self.with_env_s3(&options.raw.clone());
if s3_options.keys().any(|key| {
let key = key.to_ascii_lowercase();
Expand Down
19 changes: 8 additions & 11 deletions crates/aws/src/logstore/default_logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@
use std::sync::Arc;

use bytes::Bytes;
use deltalake_core::logstore::*;
use deltalake_core::{
kernel::transaction::TransactionError, logstore::ObjectStoreRef, DeltaResult,
};
use deltalake_logstore::*;
use object_store::{Error as ObjectStoreError, ObjectStore};
use url::Url;
use uuid::Uuid;
Expand Down Expand Up @@ -65,7 +62,7 @@ impl LogStore for S3LogStore {
"S3LogStore".into()
}

async fn read_commit_entry(&self, version: i64) -> DeltaResult<Option<Bytes>> {
async fn read_commit_entry(&self, version: i64) -> LogStoreResult<Option<Bytes>> {
read_commit_entry(self.object_store(None).as_ref(), version).await
}

Expand All @@ -79,7 +76,7 @@ impl LogStore for S3LogStore {
version: i64,
commit_or_bytes: CommitOrBytes,
_operation_id: Uuid,
) -> Result<(), TransactionError> {
) -> LogStoreResult<()> {
match commit_or_bytes {
CommitOrBytes::TmpCommit(tmp_commit) => {
Ok(
Expand All @@ -89,12 +86,12 @@ impl LogStore for S3LogStore {
}
_ => unreachable!(), // S3 Log Store should never receive bytes
}
.map_err(|err| -> TransactionError {
.map_err(|err| -> LogStoreError {
match err {
ObjectStoreError::AlreadyExists { .. } => {
TransactionError::VersionAlreadyExists(version)
LogStoreError::VersionAlreadyExists(version)
}
_ => TransactionError::from(err),
_ => LogStoreError::from(err),
}
})?;
Ok(())
Expand All @@ -105,7 +102,7 @@ impl LogStore for S3LogStore {
version: i64,
commit_or_bytes: CommitOrBytes,
_operation_id: Uuid,
) -> Result<(), TransactionError> {
) -> LogStoreResult<()> {
match &commit_or_bytes {
CommitOrBytes::TmpCommit(tmp_commit) => {
abort_commit_entry(self.object_store(None).as_ref(), version, tmp_commit).await
Expand All @@ -114,7 +111,7 @@ impl LogStore for S3LogStore {
}
}

async fn get_latest_version(&self, current_version: i64) -> DeltaResult<i64> {
async fn get_latest_version(&self, current_version: i64) -> LogStoreResult<i64> {
get_latest_version(self, current_version).await
}

Expand Down
59 changes: 24 additions & 35 deletions crates/aws/src/logstore/dynamodb_logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,12 @@ use crate::storage::S3StorageOptions;
use crate::{constants, CommitEntry, DynamoDbLockClient, UpdateLogEntryResult};

use bytes::Bytes;
use deltalake_core::{ObjectStoreError, Path};
use deltalake_logstore::{ObjectStoreError, Path};
use tracing::{debug, error, warn};
use typed_builder::TypedBuilder;
use url::Url;

use deltalake_core::logstore::*;
use deltalake_core::{
kernel::transaction::TransactionError, logstore::ObjectStoreRef, DeltaResult, DeltaTableError,
};
use deltalake_logstore::*;
use uuid::Uuid;

const STORE_NAME: &str = "DeltaS3ObjectStore";
Expand Down Expand Up @@ -53,7 +50,7 @@ impl S3DynamoDbLogStore {
s3_options: &S3StorageOptions,
prefixed_store: ObjectStoreRef,
root_store: ObjectStoreRef,
) -> DeltaResult<Self> {
) -> LogStoreResult<Self> {
let lock_client = DynamoDbLockClient::try_new(
&s3_options.sdk_config.clone().unwrap(),
s3_options
Expand All @@ -74,7 +71,7 @@ impl S3DynamoDbLogStore {
s3_options.dynamodb_secret_access_key.clone(),
s3_options.dynamodb_session_token.clone(),
)
.map_err(|err| DeltaTableError::ObjectStore {
.map_err(|err| LogStoreError::ObjectStore {
source: ObjectStoreError::Generic {
store: STORE_NAME,
source: Box::new(err),
Expand All @@ -95,10 +92,7 @@ impl S3DynamoDbLogStore {

/// Attempt to repair an incomplete log entry by moving the temporary commit file
/// to `N.json` and update the associated log entry to mark it as completed.
pub async fn repair_entry(
&self,
entry: &CommitEntry,
) -> Result<RepairLogEntryResult, TransactionError> {
pub async fn repair_entry(&self, entry: &CommitEntry) -> LogStoreResult<RepairLogEntryResult> {
// java does this, do we need it?
if entry.complete {
return Ok(RepairLogEntryResult::AlreadyCompleted);
Expand All @@ -116,7 +110,7 @@ impl S3DynamoDbLogStore {
return self.try_complete_entry(entry, true).await;
}
// `N.json` has already been moved, complete the entry in DynamoDb just in case
Err(TransactionError::ObjectStore {
Err(LogStoreError::ObjectStore {
source: ObjectStoreError::NotFound { .. },
}) => {
warn!("It looks like the {}.json has already been moved, we got 404 from ObjectStorage.", entry.version);
Expand All @@ -136,14 +130,14 @@ impl S3DynamoDbLogStore {
&self,
entry: &CommitEntry,
copy_performed: bool,
) -> Result<RepairLogEntryResult, TransactionError> {
) -> LogStoreResult<RepairLogEntryResult> {
debug!("try_complete_entry for {entry:?}, {copy_performed}");
for retry in 0..=MAX_REPAIR_RETRIES {
match self
.lock_client
.update_commit_entry(entry.version, &self.table_path)
.await
.map_err(|err| TransactionError::LogStoreError {
.map_err(|err| LogStoreError::TransactionError {
msg: format!(
"unable to complete entry for '{}': failure to write to DynamoDb",
entry.version
Expand Down Expand Up @@ -187,12 +181,12 @@ impl LogStore for S3DynamoDbLogStore {
self.table_path.clone()
}

async fn refresh(&self) -> DeltaResult<()> {
async fn refresh(&self) -> LogStoreResult<()> {
let entry = self
.lock_client
.get_latest_entry(&self.table_path)
.await
.map_err(|err| DeltaTableError::GenericError {
.map_err(|err| LogStoreError::Generic {
source: Box::new(err),
})?;
if let Some(entry) = entry {
Expand All @@ -201,7 +195,7 @@ impl LogStore for S3DynamoDbLogStore {
Ok(())
}

async fn read_commit_entry(&self, version: i64) -> DeltaResult<Option<Bytes>> {
async fn read_commit_entry(&self, version: i64) -> LogStoreResult<Option<Bytes>> {
let entry = self
.lock_client
.get_commit_entry(&self.table_path, version)
Expand All @@ -212,7 +206,7 @@ impl LogStore for S3DynamoDbLogStore {
read_commit_entry(self.object_store(None).as_ref(), version).await
}

/// Tries to commit a prepared commit file. Returns [DeltaTableError::VersionAlreadyExists]
/// Tries to commit a prepared commit file. Returns [LogStoreError::VersionAlreadyExists]
/// if the given `version` already exists. The caller should handle the retry logic itself.
/// This is low-level transaction API. If user does not want to maintain the commit loop then
/// the `DeltaTransaction.commit` is desired to be used as it handles `try_commit_transaction`
Expand All @@ -222,7 +216,7 @@ impl LogStore for S3DynamoDbLogStore {
version: i64,
commit_or_bytes: CommitOrBytes,
_operation_id: Uuid,
) -> Result<(), TransactionError> {
) -> LogStoreResult<()> {
let tmp_commit = match commit_or_bytes {
CommitOrBytes::TmpCommit(tmp_commit) => tmp_commit,
_ => unreachable!(), // S3DynamoDBLogstore should never get Bytes
Expand All @@ -238,27 +232,22 @@ impl LogStore for S3DynamoDbLogStore {
.await
.map_err(|err| match err {
LockClientError::VersionAlreadyExists { version, .. } => {
warn!("LockClientError::VersionAlreadyExists({version})");
TransactionError::VersionAlreadyExists(version)
LogStoreError::VersionAlreadyExists(version)
}
LockClientError::ProvisionedThroughputExceeded => todo!(
"deltalake-aws does not yet handle DynamoDB provisioned throughput errors"
),
LockClientError::LockTableNotFound => {
let table_name = self.lock_client.get_lock_table_name();
error!("Lock table '{table_name}' not found");
TransactionError::LogStoreError {
LogStoreError::TransactionError {
msg: format!("lock table '{table_name}' not found"),
source: Box::new(err),
}
}
err => {
error!("dynamodb client failed to write log entry: {err:?}");
TransactionError::LogStoreError {
msg: "dynamodb client failed to write log entry".to_owned(),
source: Box::new(err),
}
}
err => LogStoreError::TransactionError {
msg: "dynamodb client failed to write log entry".to_owned(),
source: Box::new(err),
},
})?;
// `repair_entry` performs the exact steps required to finalize the commit, but contains
// retry logic and more robust error handling under the assumption that any other client
Expand All @@ -275,7 +264,7 @@ impl LogStore for S3DynamoDbLogStore {
version: i64,
commit_or_bytes: CommitOrBytes,
_operation_id: Uuid,
) -> Result<(), TransactionError> {
) -> LogStoreResult<()> {
let tmp_commit = match commit_or_bytes {
CommitOrBytes::TmpCommit(tmp_commit) => tmp_commit,
_ => unreachable!(), // S3DynamoDBLogstore should never get Bytes
Expand All @@ -289,12 +278,12 @@ impl LogStore for S3DynamoDbLogStore {
),
LockClientError::VersionAlreadyCompleted { version, .. } => {
error!("Trying to abort a completed commit");
TransactionError::LogStoreError {
LogStoreError::TransactionError {
msg: format!("trying to abort a completed log entry: {version}"),
source: Box::new(err),
}
}
err => TransactionError::LogStoreError {
err => LogStoreError::TransactionError {
msg: "dynamodb client failed to delete log entry".to_owned(),
source: Box::new(err),
},
Expand All @@ -304,13 +293,13 @@ impl LogStore for S3DynamoDbLogStore {
Ok(())
}

async fn get_latest_version(&self, current_version: i64) -> DeltaResult<i64> {
async fn get_latest_version(&self, current_version: i64) -> LogStoreResult<i64> {
debug!("Retrieving latest version of {self:?} at v{current_version}");
let entry = self
.lock_client
.get_latest_entry(&self.table_path)
.await
.map_err(|err| DeltaTableError::GenericError {
.map_err(|err| LogStoreError::Generic {
source: Box::new(err),
})?;
// when there is a latest entry in DynamoDb, we can avoid the file listing in S3.
Expand Down
Loading