From 6483ac03026ed94de2b6837c76a12379811a9b7f Mon Sep 17 00:00:00 2001 From: Alex Ott Date: Mon, 13 Apr 2026 11:19:51 +0200 Subject: [PATCH 1/8] feat(files): add types, constants, and part size optimization for multipart upload Signed-off-by: Alex Ott --- service/files/ext_upload.go | 185 +++++++++++++++++++++++++++++++ service/files/ext_upload_test.go | 70 ++++++++++++ 2 files changed, 255 insertions(+) create mode 100644 service/files/ext_upload.go create mode 100644 service/files/ext_upload_test.go diff --git a/service/files/ext_upload.go b/service/files/ext_upload.go new file mode 100644 index 000000000..b72563a0c --- /dev/null +++ b/service/files/ext_upload.go @@ -0,0 +1,185 @@ +package files + +import ( + "math" + "time" +) + +const ( + // minMultipartUploadSize is the minimum file size (in bytes) to trigger multipart upload. + minMultipartUploadSize = 50 * 1024 * 1024 // 50 MiB + + // defaultPartSize is the default part size for multipart uploads. + defaultPartSize = 10 * 1024 * 1024 // 10 MiB + + // maxPartSize is the maximum part size for multipart uploads (Azure limit). + maxPartSize = 4 * 1024 * 1024 * 1024 // 4 GiB + + // defaultParallelism is the default number of concurrent upload workers. + defaultParallelism = 10 + + // maxPartsTarget is the target maximum number of parts for an upload. + maxPartsTarget = 100 + + // urlExpirationDuration is how long presigned URLs are valid. + urlExpirationDuration = 1 * time.Hour + + // maxURLExpirationRetries is the maximum number of retries for URL expiration. + maxURLExpirationRetries = 3 + + // uploadInactivityTimeout is the timeout for upload inactivity. + uploadInactivityTimeout = 60 * time.Second +) + +// partSizeOptions lists the candidate part sizes in ascending order. +var partSizeOptions = []int64{ + 10 * 1024 * 1024, // 10 MiB + 20 * 1024 * 1024, // 20 MiB + 50 * 1024 * 1024, // 50 MiB + 100 * 1024 * 1024, // 100 MiB + 200 * 1024 * 1024, // 200 MiB + 500 * 1024 * 1024, // 500 MiB + 1 * 1024 * 1024 * 1024, // 1 GiB + 2 * 1024 * 1024 * 1024, // 2 GiB + 4 * 1024 * 1024 * 1024, // 4 GiB +} + +// UploadConfig holds configuration for a multipart upload. +type UploadConfig struct { + PartSize int64 + Parallelism int + Overwrite bool +} + +// UploadOption is a functional option for configuring a multipart upload. +type UploadOption func(*UploadConfig) + +// WithPartSize sets the part size for a multipart upload. +func WithPartSize(partSize int64) UploadOption { + return func(c *UploadConfig) { + c.PartSize = partSize + } +} + +// WithParallelism sets the number of concurrent upload workers. +func WithParallelism(parallelism int) UploadOption { + return func(c *UploadConfig) { + c.Parallelism = parallelism + } +} + +// WithOverwrite sets whether to overwrite an existing file. +func WithOverwrite(overwrite bool) UploadOption { + return func(c *UploadConfig) { + c.Overwrite = overwrite + } +} + +// initiateUploadResponse is the response from initiating a multipart upload. +type initiateUploadResponse struct { + MultipartUpload *multipartUploadSession `json:"multipart_upload,omitempty"` + ResumableUpload *resumableUploadSession `json:"resumable_upload,omitempty"` +} + +// multipartUploadSession holds the state for a multipart upload session. +type multipartUploadSession struct { + SessionToken string `json:"session_token"` +} + +// resumableUploadSession holds the state for a resumable upload session. +type resumableUploadSession struct { + SessionToken string `json:"session_token"` +} + +// presignedURL represents a presigned URL for uploading a part. +type presignedURL struct { + URL string `json:"url"` + PartNumber int `json:"part_number"` + Headers []presignedHeader `json:"headers"` +} + +// presignedHeader is a header to include when uploading to a presigned URL. +type presignedHeader struct { + Name string `json:"name"` + Value string `json:"value"` +} + +// createUploadPartURLsRequest is the request to create presigned URLs for upload parts. +type createUploadPartURLsRequest struct { + Path string `json:"path"` + SessionToken string `json:"session_token"` + StartPartNumber int `json:"start_part_number"` + Count int `json:"count"` + ExpireTime string `json:"expire_time"` +} + +// createUploadPartURLsResponse is the response containing presigned URLs for upload parts. +type createUploadPartURLsResponse struct { + UploadPartURLs []presignedURL `json:"upload_part_urls"` +} + +// completeUploadPart represents a completed upload part. +type completeUploadPart struct { + PartNumber int `json:"part_number"` + ETag string `json:"etag"` +} + +// completeUploadRequest is the request to complete a multipart upload. +type completeUploadRequest struct { + Parts []completeUploadPart `json:"parts"` +} + +// optimizePartSize selects the best part size and batch size for a multipart upload. +// +// If explicitPartSize > 0, it is used directly and the batch size is computed +// as max(1, ceil(sqrt(numParts))). +// +// If contentLength <= 0 (unknown), defaultPartSize and batch size 1 are returned. +// +// Otherwise, the smallest part size from partSizeOptions where the number of +// parts is <= maxPartsTarget is selected. If no option satisfies the constraint, +// maxPartSize is used as a fallback. +func optimizePartSize(contentLength int64, explicitPartSize int64) (int64, int) { + if explicitPartSize > 0 { + numParts := int(math.Ceil(float64(contentLength) / float64(explicitPartSize))) + if numParts < 1 { + numParts = 1 + } + batchSize := int(math.Ceil(math.Sqrt(float64(numParts)))) + if batchSize < 1 { + batchSize = 1 + } + return explicitPartSize, batchSize + } + + if contentLength <= 0 { + return defaultPartSize, 1 + } + + for _, partSize := range partSizeOptions { + numParts := int(math.Ceil(float64(contentLength) / float64(partSize))) + if numParts <= maxPartsTarget { + batchSize := int(math.Ceil(math.Sqrt(float64(numParts)))) + if batchSize < 1 { + batchSize = 1 + } + return partSize, batchSize + } + } + + // Fallback to maxPartSize + numParts := int(math.Ceil(float64(contentLength) / float64(maxPartSize))) + if numParts < 1 { + numParts = 1 + } + batchSize := int(math.Ceil(math.Sqrt(float64(numParts)))) + if batchSize < 1 { + batchSize = 1 + } + return maxPartSize, batchSize +} + +// uploadURLExpireTime returns the expiration time for a presigned URL as an RFC3339 string. +func uploadURLExpireTime() string { + return time.Now().Add(urlExpirationDuration).UTC().Format(time.RFC3339) +} diff --git a/service/files/ext_upload_test.go b/service/files/ext_upload_test.go new file mode 100644 index 000000000..740d07818 --- /dev/null +++ b/service/files/ext_upload_test.go @@ -0,0 +1,70 @@ +package files + +import ( + "testing" +) + +func TestOptimizePartSize(t *testing.T) { + const MiB = 1024 * 1024 + const GiB = 1024 * MiB + + tests := []struct { + name string + contentLength int64 + explicitPartSize int64 + wantPartSize int64 + wantBatchSize int + }{ + { + // 5 MiB / 10 MiB = 1 part; ceil(sqrt(1)) = 1 + name: "small file 5 MiB returns defaultPartSize batch 1", + contentLength: 5 * MiB, + wantPartSize: defaultPartSize, + wantBatchSize: 1, + }, + { + // 500 MiB / 10 MiB = 50 parts <= 100; ceil(sqrt(50)) = 8 + name: "500 MiB returns 10 MiB parts batch 8", + contentLength: 500 * MiB, + wantPartSize: 10 * MiB, + wantBatchSize: 8, + }, + { + // 5 GiB: 10 MiB = 512 parts (too many), 20 MiB = 256 (too many), + // 50 MiB = 103 (too many), 100 MiB = 52 <= 100; ceil(sqrt(52)) = 8 + name: "5 GiB returns 100 MiB parts batch 8", + contentLength: 5 * GiB, + wantPartSize: 100 * MiB, + wantBatchSize: 8, + }, + { + // Explicit 20 MiB for 500 MiB file: 500/20 = 25 parts; ceil(sqrt(25)) = 5 + name: "explicit part size 20 MiB for 500 MiB file batch 5", + contentLength: 500 * MiB, + explicitPartSize: 20 * MiB, + wantPartSize: 20 * MiB, + wantBatchSize: 5, + }, + { + // Unknown size (0): returns defaultPartSize, batch 1 + name: "unknown size returns defaultPartSize batch 1", + contentLength: 0, + wantPartSize: defaultPartSize, + wantBatchSize: 1, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + gotPartSize, gotBatchSize := optimizePartSize(tc.contentLength, tc.explicitPartSize) + if gotPartSize != tc.wantPartSize { + t.Errorf("optimizePartSize(%d, %d) partSize = %d, want %d", + tc.contentLength, tc.explicitPartSize, gotPartSize, tc.wantPartSize) + } + if gotBatchSize != tc.wantBatchSize { + t.Errorf("optimizePartSize(%d, %d) batchSize = %d, want %d", + tc.contentLength, tc.explicitPartSize, gotBatchSize, tc.wantBatchSize) + } + }) + } +} From abe1842b91bac42be81585fc05ebbe715e897052 Mon Sep 17 00:00:00 2001 From: Alex Ott Date: Mon, 13 Apr 2026 11:23:10 +0200 Subject: [PATCH 2/8] feat(files): implement multipart upload protocol (initiate, upload parts, complete) Co-authored-by: Isaac Signed-off-by: Alex Ott --- service/files/ext_upload.go | 238 +++++++++++++++++++++++++++++++ service/files/ext_upload_test.go | 173 ++++++++++++++++++++++ 2 files changed, 411 insertions(+) diff --git a/service/files/ext_upload.go b/service/files/ext_upload.go index b72563a0c..9f763977c 100644 --- a/service/files/ext_upload.go +++ b/service/files/ext_upload.go @@ -1,8 +1,18 @@ package files import ( + "bytes" + "context" + "fmt" + "io" "math" + "net/http" + "sort" + "sync" "time" + + "github.com/databricks/databricks-sdk-go/httpclient" + "github.com/databricks/databricks-sdk-go/logger" ) const ( @@ -183,3 +193,231 @@ func optimizePartSize(contentLength int64, explicitPartSize int64) (int64, int) func uploadURLExpireTime() string { return time.Now().Add(urlExpirationDuration).UTC().Format(time.RFC3339) } + +// initiateMultipartUpload starts a multipart upload session for the given file path. +func (a *FilesAPI) initiateMultipartUpload(ctx context.Context, filePath string, overwrite bool) (*initiateUploadResponse, error) { + var resp initiateUploadResponse + apiPath := fmt.Sprintf("/api/2.0/fs/files%s", httpclient.EncodeMultiSegmentPathParameter(filePath)) + headers := make(map[string]string) + headers["Accept"] = "application/json" + queryParams := make(map[string]any) + queryParams["action"] = "initiate-upload" + if overwrite { + queryParams["overwrite"] = "true" + } + err := a.filesImpl.client.Do(ctx, http.MethodPost, apiPath, headers, queryParams, nil, &resp) + if err != nil { + return nil, err + } + return &resp, nil +} + +// getUploadPartURLs fetches presigned URLs for uploading parts. +func (a *FilesAPI) getUploadPartURLs(ctx context.Context, filePath, sessionToken string, startPartNumber, count int) ([]presignedURL, error) { + var resp createUploadPartURLsResponse + headers := make(map[string]string) + headers["Content-Type"] = "application/json" + headers["Accept"] = "application/json" + queryParams := make(map[string]any) + req := createUploadPartURLsRequest{ + Path: filePath, + SessionToken: sessionToken, + StartPartNumber: startPartNumber, + Count: count, + ExpireTime: uploadURLExpireTime(), + } + err := a.filesImpl.client.Do(ctx, http.MethodPost, "/api/2.0/fs/create-upload-part-urls", headers, queryParams, req, &resp) + if err != nil { + return nil, err + } + return resp.UploadPartURLs, nil +} + +// uploadOnePart uploads a single part to a presigned URL and returns the ETag. +func (a *FilesAPI) uploadOnePart(ctx context.Context, presigned presignedURL, partData io.ReadSeeker) (string, error) { + // Determine content length + currentPos, err := partData.Seek(0, io.SeekCurrent) + if err != nil { + return "", fmt.Errorf("failed to get current position: %w", err) + } + endPos, err := partData.Seek(0, io.SeekEnd) + if err != nil { + return "", fmt.Errorf("failed to seek to end: %w", err) + } + contentLength := endPos - currentPos + if _, err := partData.Seek(currentPos, io.SeekStart); err != nil { + return "", fmt.Errorf("failed to seek back: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPut, presigned.URL, partData) + if err != nil { + return "", fmt.Errorf("failed to create request: %w", err) + } + req.ContentLength = contentLength + req.Header.Set("Content-Type", "application/octet-stream") + for _, h := range presigned.Headers { + req.Header.Set(h.Name, h.Value) + } + + httpClient := &http.Client{Timeout: uploadInactivityTimeout} + resp, err := httpClient.Do(req) + if err != nil { + return "", fmt.Errorf("upload part %d failed: %w", presigned.PartNumber, err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { + body, _ := io.ReadAll(resp.Body) + return "", fmt.Errorf("upload part %d failed with status %d: %s", presigned.PartNumber, resp.StatusCode, string(body)) + } + + etag := resp.Header.Get("ETag") + return etag, nil +} + +// completeMultipartUpload finalizes a multipart upload with the given ETags. +func (a *FilesAPI) completeMultipartUpload(ctx context.Context, filePath, sessionToken string, etags map[int]string) error { + apiPath := fmt.Sprintf("/api/2.0/fs/files%s", httpclient.EncodeMultiSegmentPathParameter(filePath)) + headers := make(map[string]string) + headers["Content-Type"] = "application/json" + headers["Accept"] = "application/json" + queryParams := make(map[string]any) + queryParams["action"] = "complete-upload" + queryParams["upload_type"] = "multipart" + queryParams["session_token"] = sessionToken + + parts := make([]completeUploadPart, 0, len(etags)) + for partNum, etag := range etags { + parts = append(parts, completeUploadPart{ + PartNumber: partNum, + ETag: etag, + }) + } + sort.Slice(parts, func(i, j int) bool { + return parts[i].PartNumber < parts[j].PartNumber + }) + + req := completeUploadRequest{Parts: parts} + return a.filesImpl.client.Do(ctx, http.MethodPost, apiPath, headers, queryParams, req, nil) +} + +// uploadPartResult holds the result of uploading a single part. +type uploadPartResult struct { + PartNumber int + ETag string + Err error +} + +// uploadMultipart orchestrates a full multipart upload. +func (a *FilesAPI) uploadMultipart(ctx context.Context, filePath string, content io.ReadSeeker, contentLength int64, cfg *UploadConfig) error { + // Phase 1: Initiate + initResp, err := a.initiateMultipartUpload(ctx, filePath, cfg.Overwrite) + if err != nil { + return fmt.Errorf("failed to initiate multipart upload: %w", err) + } + if initResp.MultipartUpload == nil { + return fmt.Errorf("multipart upload not supported for this path (GCP is not supported)") + } + sessionToken := initResp.MultipartUpload.SessionToken + logger.Debugf(ctx, "initiated multipart upload with session token") + + partSize := cfg.PartSize + parallelism := cfg.Parallelism + if parallelism < 1 { + parallelism = defaultParallelism + } + + // Phase 2: Upload parts in parallel + sem := make(chan struct{}, parallelism) + var mu sync.Mutex + etags := make(map[int]string) + var uploadErr error + + partNumber := 1 + for { + // Check if a previous upload failed + mu.Lock() + if uploadErr != nil { + mu.Unlock() + break + } + mu.Unlock() + + // Read the next part + buf := make([]byte, partSize) + n, readErr := io.ReadFull(content, buf) + if n == 0 && readErr != nil { + break + } + buf = buf[:n] + + currentPartNumber := partNumber + partNumber++ + + sem <- struct{}{} + go func(pn int, data []byte) { + defer func() { <-sem }() + + // Check for prior error + mu.Lock() + if uploadErr != nil { + mu.Unlock() + return + } + mu.Unlock() + + // Get presigned URL for this part + urls, err := a.getUploadPartURLs(ctx, filePath, sessionToken, pn, 1) + if err != nil { + mu.Lock() + if uploadErr == nil { + uploadErr = fmt.Errorf("failed to get upload URL for part %d: %w", pn, err) + } + mu.Unlock() + return + } + if len(urls) == 0 { + mu.Lock() + if uploadErr == nil { + uploadErr = fmt.Errorf("no upload URL returned for part %d", pn) + } + mu.Unlock() + return + } + + // Upload the part + etag, err := a.uploadOnePart(ctx, urls[0], bytes.NewReader(data)) + if err != nil { + mu.Lock() + if uploadErr == nil { + uploadErr = fmt.Errorf("failed to upload part %d: %w", pn, err) + } + mu.Unlock() + return + } + + mu.Lock() + etags[pn] = etag + mu.Unlock() + + logger.Debugf(ctx, "uploaded part %d", pn) + }(currentPartNumber, buf) + + if readErr != nil { + break + } + } + + // Wait for all goroutines to finish + for i := 0; i < parallelism; i++ { + sem <- struct{}{} + } + + if uploadErr != nil { + return uploadErr + } + + // Phase 3: Complete + logger.Debugf(ctx, "completing multipart upload with %d parts", len(etags)) + return a.completeMultipartUpload(ctx, filePath, sessionToken, etags) +} diff --git a/service/files/ext_upload_test.go b/service/files/ext_upload_test.go index 740d07818..c013ab453 100644 --- a/service/files/ext_upload_test.go +++ b/service/files/ext_upload_test.go @@ -1,7 +1,21 @@ package files import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/http/httptest" + "strconv" + "strings" + "sync" "testing" + + "github.com/databricks/databricks-sdk-go/client" + "github.com/databricks/databricks-sdk-go/config" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestOptimizePartSize(t *testing.T) { @@ -68,3 +82,162 @@ func TestOptimizePartSize(t *testing.T) { }) } } + +// multipartMockServer simulates the multipart upload protocol for testing. +type multipartMockServer struct { + mu sync.Mutex + parts map[int][]byte + completed bool + completeParts []completeUploadPart + sessionToken string +} + +func newMultipartMockServer() *multipartMockServer { + return &multipartMockServer{ + parts: make(map[int][]byte), + sessionToken: "test-session-token-123", + } +} + +func (m *multipartMockServer) handler(srv *httptest.Server) http.Handler { + mux := http.NewServeMux() + + // Initiate upload + mux.HandleFunc("/api/2.0/fs/files/", func(w http.ResponseWriter, r *http.Request) { + action := r.URL.Query().Get("action") + switch action { + case "initiate-upload": + w.Header().Set("Content-Type", "application/json") + resp := initiateUploadResponse{ + MultipartUpload: &multipartUploadSession{ + SessionToken: m.sessionToken, + }, + } + json.NewEncoder(w).Encode(resp) + case "complete-upload": + var req completeUploadRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + m.mu.Lock() + m.completed = true + m.completeParts = req.Parts + m.mu.Unlock() + w.WriteHeader(http.StatusOK) + default: + http.Error(w, "unknown action", http.StatusBadRequest) + } + }) + + // Create upload part URLs + mux.HandleFunc("/api/2.0/fs/create-upload-part-urls", func(w http.ResponseWriter, r *http.Request) { + var req createUploadPartURLsRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + urls := make([]presignedURL, req.Count) + for i := 0; i < req.Count; i++ { + pn := req.StartPartNumber + i + urls[i] = presignedURL{ + URL: fmt.Sprintf("%s/upload-part/%d", srv.URL, pn), + PartNumber: pn, + Headers: []presignedHeader{ + {Name: "x-test-header", Value: "test-value"}, + }, + } + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(createUploadPartURLsResponse{UploadPartURLs: urls}) + }) + + // Upload part (presigned URL target) + mux.HandleFunc("/upload-part/", func(w http.ResponseWriter, r *http.Request) { + parts := strings.Split(strings.TrimPrefix(r.URL.Path, "/upload-part/"), "/") + partNum, err := strconv.Atoi(parts[0]) + if err != nil { + http.Error(w, "invalid part number", http.StatusBadRequest) + return + } + data, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + m.mu.Lock() + m.parts[partNum] = data + m.mu.Unlock() + + etag := fmt.Sprintf("etag-part-%d", partNum) + w.Header().Set("ETag", etag) + w.WriteHeader(http.StatusOK) + }) + + return mux +} + +func newTestFilesAPI(t *testing.T, serverURL string) *FilesAPI { + t.Helper() + cfg := &config.Config{ + Host: serverURL, + Token: "test-token", + } + err := cfg.EnsureResolved() + require.NoError(t, err) + databricksClient, err := client.New(cfg) + require.NoError(t, err) + return NewFiles(databricksClient) +} + +func TestUploadMultipart_FullFlow(t *testing.T) { + mock := newMultipartMockServer() + + // Create server with a temporary handler, then replace with the real one + srv := httptest.NewServer(http.NotFoundHandler()) + defer srv.Close() + srv.Config.Handler = mock.handler(srv) + + api := newTestFilesAPI(t, srv.URL) + + // Create 100 KiB of test content + contentSize := 100 * 1024 + content := make([]byte, contentSize) + for i := range content { + content[i] = byte(i % 256) + } + + cfg := &UploadConfig{ + PartSize: 30 * 1024, // 30 KiB parts + Parallelism: 2, + Overwrite: true, + } + + ctx := context.Background() + err := api.uploadMultipart(ctx, "/test/upload.bin", strings.NewReader(string(content)), int64(contentSize), cfg) + require.NoError(t, err) + + // Verify completion + mock.mu.Lock() + defer mock.mu.Unlock() + + assert.True(t, mock.completed, "upload should be completed") + assert.True(t, len(mock.parts) > 1, "should have multiple parts, got %d", len(mock.parts)) + + // Verify total bytes match + totalBytes := 0 + for _, data := range mock.parts { + totalBytes += len(data) + } + assert.Equal(t, contentSize, totalBytes, "total uploaded bytes should match content size") + + // 100 KiB / 30 KiB = 4 parts (30+30+30+10) + expectedParts := 4 + assert.Equal(t, expectedParts, len(mock.completeParts), "complete request should have correct number of parts") + + // Verify parts are sorted by part number + for i := 1; i < len(mock.completeParts); i++ { + assert.True(t, mock.completeParts[i].PartNumber > mock.completeParts[i-1].PartNumber, + "parts should be sorted by part number") + } +} From 549cb4022bfe6c5cff840f90bbd7c413aa38607a Mon Sep 17 00:00:00 2001 From: Alex Ott Date: Mon, 13 Apr 2026 11:26:24 +0200 Subject: [PATCH 3/8] feat(files): add UploadWithChunking, URL expiration retry, and UploadFromFile Signed-off-by: Alex Ott --- service/files/ext_upload.go | 120 ++++++++++++++++++++++++------- service/files/ext_upload_test.go | 118 +++++++++++++++++++++++++++++- 2 files changed, 211 insertions(+), 27 deletions(-) diff --git a/service/files/ext_upload.go b/service/files/ext_upload.go index 9f763977c..c0cf80ca2 100644 --- a/service/files/ext_upload.go +++ b/service/files/ext_upload.go @@ -7,12 +7,15 @@ import ( "io" "math" "net/http" + "os" "sort" + "strings" "sync" "time" "github.com/databricks/databricks-sdk-go/httpclient" "github.com/databricks/databricks-sdk-go/logger" + "github.com/databricks/databricks-sdk-go/useragent" ) const ( @@ -233,7 +236,17 @@ func (a *FilesAPI) getUploadPartURLs(ctx context.Context, filePath, sessionToken return resp.UploadPartURLs, nil } +// isURLExpiredResponse returns true if the HTTP response indicates the presigned URL has expired. +func isURLExpiredResponse(statusCode int, body []byte) bool { + if statusCode != http.StatusForbidden { + return false + } + s := string(body) + return strings.Contains(s, "AccessDenied") || strings.Contains(s, "AuthenticationFailed") +} + // uploadOnePart uploads a single part to a presigned URL and returns the ETag. +// It retries up to maxURLExpirationRetries times if the presigned URL has expired. func (a *FilesAPI) uploadOnePart(ctx context.Context, presigned presignedURL, partData io.ReadSeeker) (string, error) { // Determine content length currentPos, err := partData.Seek(0, io.SeekCurrent) @@ -249,30 +262,47 @@ func (a *FilesAPI) uploadOnePart(ctx context.Context, presigned presignedURL, pa return "", fmt.Errorf("failed to seek back: %w", err) } - req, err := http.NewRequestWithContext(ctx, http.MethodPut, presigned.URL, partData) - if err != nil { - return "", fmt.Errorf("failed to create request: %w", err) - } - req.ContentLength = contentLength - req.Header.Set("Content-Type", "application/octet-stream") - for _, h := range presigned.Headers { - req.Header.Set(h.Name, h.Value) - } + for attempt := 0; attempt < maxURLExpirationRetries; attempt++ { + if attempt > 0 { + if _, err := partData.Seek(0, io.SeekStart); err != nil { + return "", fmt.Errorf("failed to rewind part data: %w", err) + } + } - httpClient := &http.Client{Timeout: uploadInactivityTimeout} - resp, err := httpClient.Do(req) - if err != nil { - return "", fmt.Errorf("upload part %d failed: %w", presigned.PartNumber, err) - } - defer resp.Body.Close() + req, err := http.NewRequestWithContext(ctx, http.MethodPut, presigned.URL, partData) + if err != nil { + return "", fmt.Errorf("failed to create request: %w", err) + } + req.ContentLength = contentLength + req.Header.Set("Content-Type", "application/octet-stream") + for _, h := range presigned.Headers { + req.Header.Set(h.Name, h.Value) + } + + httpClient := &http.Client{Timeout: uploadInactivityTimeout} + resp, err := httpClient.Do(req) + if err != nil { + return "", fmt.Errorf("upload part %d failed: %w", presigned.PartNumber, err) + } + + if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusCreated { + etag := resp.Header.Get("ETag") + resp.Body.Close() + return etag, nil + } - if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { body, _ := io.ReadAll(resp.Body) + resp.Body.Close() + + if isURLExpiredResponse(resp.StatusCode, body) { + logger.Debugf(ctx, "presigned URL expired for part %d (attempt %d/%d), retrying", presigned.PartNumber, attempt+1, maxURLExpirationRetries) + continue + } + return "", fmt.Errorf("upload part %d failed with status %d: %s", presigned.PartNumber, resp.StatusCode, string(body)) } - etag := resp.Header.Get("ETag") - return etag, nil + return "", fmt.Errorf("upload part %d failed: presigned URL expired after %d retries", presigned.PartNumber, maxURLExpirationRetries) } // completeMultipartUpload finalizes a multipart upload with the given ETags. @@ -301,15 +331,8 @@ func (a *FilesAPI) completeMultipartUpload(ctx context.Context, filePath, sessio return a.filesImpl.client.Do(ctx, http.MethodPost, apiPath, headers, queryParams, req, nil) } -// uploadPartResult holds the result of uploading a single part. -type uploadPartResult struct { - PartNumber int - ETag string - Err error -} - // uploadMultipart orchestrates a full multipart upload. -func (a *FilesAPI) uploadMultipart(ctx context.Context, filePath string, content io.ReadSeeker, contentLength int64, cfg *UploadConfig) error { +func (a *FilesAPI) uploadMultipart(ctx context.Context, filePath string, content io.ReadSeeker, _ int64, cfg *UploadConfig) error { // Phase 1: Initiate initResp, err := a.initiateMultipartUpload(ctx, filePath, cfg.Overwrite) if err != nil { @@ -421,3 +444,48 @@ func (a *FilesAPI) uploadMultipart(ctx context.Context, filePath string, content logger.Debugf(ctx, "completing multipart upload with %d parts", len(etags)) return a.completeMultipartUpload(ctx, filePath, sessionToken, etags) } + +// UploadWithChunking uploads a file to the given path, automatically choosing +// between single-shot and multipart upload based on the content length. +// For files smaller than 50 MiB, a single PUT request is used. For larger files, +// a multipart upload is performed with configurable part size and parallelism. +func (a *FilesAPI) UploadWithChunking(ctx context.Context, filePath string, content io.ReadSeeker, contentLength int64, opts ...UploadOption) error { + ctx = useragent.InContext(ctx, "sdk-feature", "multipart-upload") + + cfg := &UploadConfig{ + Parallelism: defaultParallelism, + } + for _, opt := range opts { + opt(cfg) + } + + // Auto-select part size if not explicitly set + cfg.PartSize, _ = optimizePartSize(contentLength, cfg.PartSize) + + if contentLength < minMultipartUploadSize { + return a.Upload(ctx, UploadRequest{ + FilePath: filePath, + Contents: io.NopCloser(content), + Overwrite: cfg.Overwrite, + }) + } + + return a.uploadMultipart(ctx, filePath, content, contentLength, cfg) +} + +// UploadFromFile uploads a local file to the given path, automatically choosing +// between single-shot and multipart upload based on the file size. +func (a *FilesAPI) UploadFromFile(ctx context.Context, filePath string, sourcePath string, opts ...UploadOption) error { + f, err := os.Open(sourcePath) + if err != nil { + return fmt.Errorf("failed to open source file: %w", err) + } + defer f.Close() + + info, err := f.Stat() + if err != nil { + return fmt.Errorf("failed to stat source file: %w", err) + } + + return a.UploadWithChunking(ctx, filePath, f, info.Size(), opts...) +} diff --git a/service/files/ext_upload_test.go b/service/files/ext_upload_test.go index c013ab453..526a05689 100644 --- a/service/files/ext_upload_test.go +++ b/service/files/ext_upload_test.go @@ -7,9 +7,11 @@ import ( "io" "net/http" "net/http/httptest" + "os" "strconv" "strings" "sync" + "sync/atomic" "testing" "github.com/databricks/databricks-sdk-go/client" @@ -102,8 +104,22 @@ func newMultipartMockServer() *multipartMockServer { func (m *multipartMockServer) handler(srv *httptest.Server) http.Handler { mux := http.NewServeMux() - // Initiate upload + // Initiate upload / complete upload / single-shot PUT mux.HandleFunc("/api/2.0/fs/files/", func(w http.ResponseWriter, r *http.Request) { + // Single-shot PUT upload (no action query parameter) + if r.Method == http.MethodPut { + data, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + m.mu.Lock() + m.parts[1] = data + m.completed = true + m.mu.Unlock() + w.WriteHeader(http.StatusOK) + return + } action := r.URL.Query().Get("action") switch action { case "initiate-upload": @@ -241,3 +257,103 @@ func TestUploadMultipart_FullFlow(t *testing.T) { "parts should be sorted by part number") } } + +func TestUploadWithChunking_SmallFile_UsesSingleShot(t *testing.T) { + var receivedBody []byte + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodPut && strings.HasPrefix(r.URL.Path, "/api/2.0/fs/files/") { + body, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + receivedBody = body + w.WriteHeader(http.StatusOK) + return + } + http.Error(w, "not found", http.StatusNotFound) + })) + defer srv.Close() + + api := newTestFilesAPI(t, srv.URL) + + content := []byte("hello multipart!!") + reader := strings.NewReader(string(content)) + + ctx := context.Background() + err := api.UploadWithChunking(ctx, "/test/small.txt", reader, int64(len(content))) + require.NoError(t, err) + assert.Equal(t, content, receivedBody) +} + +func TestUploadOnePart_RetriesOnExpiredURL(t *testing.T) { + var attempts atomic.Int32 + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + n := attempts.Add(1) + if n == 1 { + w.WriteHeader(http.StatusForbidden) + w.Write([]byte("AccessDenied")) + return + } + w.Header().Set("ETag", "etag-success") + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + api := newTestFilesAPI(t, srv.URL) + + presigned := presignedURL{ + URL: srv.URL + "/upload-part/1", + PartNumber: 1, + } + data := strings.NewReader("test data") + + ctx := context.Background() + etag, err := api.uploadOnePart(ctx, presigned, data) + require.NoError(t, err) + assert.Equal(t, "etag-success", etag) + assert.Equal(t, int32(2), attempts.Load()) +} + +func TestUploadFromFile(t *testing.T) { + mock := newMultipartMockServer() + + srv := httptest.NewServer(http.NotFoundHandler()) + defer srv.Close() + srv.Config.Handler = mock.handler(srv) + + api := newTestFilesAPI(t, srv.URL) + + // Create a temp file with 100 KiB content + contentSize := 100 * 1024 + content := make([]byte, contentSize) + for i := range content { + content[i] = byte(i % 256) + } + + tmpFile, err := os.CreateTemp("", "upload-test-*") + require.NoError(t, err) + defer os.Remove(tmpFile.Name()) + + _, err = tmpFile.Write(content) + require.NoError(t, err) + require.NoError(t, tmpFile.Close()) + + ctx := context.Background() + err = api.UploadFromFile(ctx, "/test/fromfile.bin", tmpFile.Name(), + WithPartSize(25*1024), + WithParallelism(2), + WithOverwrite(true), + ) + require.NoError(t, err) + + mock.mu.Lock() + defer mock.mu.Unlock() + + // File is 100 KiB < 50 MiB threshold, so single-shot upload is used + assert.True(t, mock.completed, "upload should be completed") + assert.Equal(t, 1, len(mock.parts), "single-shot upload stores as one part") + assert.Equal(t, contentSize, len(mock.parts[1]), "uploaded bytes should match content size") + assert.Equal(t, content, mock.parts[1], "uploaded content should match") +} From d357d915f01226bde788f3a782c931822c0dcaae Mon Sep 17 00:00:00 2001 From: Alex Ott Date: Mon, 13 Apr 2026 11:28:20 +0200 Subject: [PATCH 4/8] feat(files): add abort-on-error cleanup for multipart upload Best-effort abort of incomplete multipart uploads when a part upload fails, preventing orphaned upload sessions on the server. Signed-off-by: Alex Ott --- service/files/ext_upload.go | 50 +++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/service/files/ext_upload.go b/service/files/ext_upload.go index c0cf80ca2..24ea13b79 100644 --- a/service/files/ext_upload.go +++ b/service/files/ext_upload.go @@ -437,6 +437,7 @@ func (a *FilesAPI) uploadMultipart(ctx context.Context, filePath string, content } if uploadErr != nil { + a.abortMultipartUpload(ctx, filePath, sessionToken) return uploadErr } @@ -445,6 +446,55 @@ func (a *FilesAPI) uploadMultipart(ctx context.Context, filePath string, content return a.completeMultipartUpload(ctx, filePath, sessionToken, etags) } +// abortMultipartUpload attempts to abort an in-progress multipart upload. +// This is a best-effort cleanup; errors are logged but not returned. +func (a *FilesAPI) abortMultipartUpload(ctx context.Context, filePath, sessionToken string) { + apiPath := "/api/2.0/fs/create-abort-upload-url" + headers := map[string]string{ + "Content-Type": "application/json", + "Accept": "application/json", + } + body := map[string]string{ + "path": filePath, + "session_token": sessionToken, + "expire_time": uploadURLExpireTime(), + } + + var resp struct { + AbortUploadURL struct { + URL string `json:"url"` + Headers []presignedHeader `json:"headers,omitempty"` + } `json:"abort_upload_url"` + } + err := a.filesImpl.client.Do(ctx, http.MethodPost, apiPath, headers, nil, body, &resp) + if err != nil { + logger.Debugf(ctx, "failed to get abort URL: %v", err) + return + } + if resp.AbortUploadURL.URL == "" { + logger.Debugf(ctx, "no abort URL returned") + return + } + + req, err := http.NewRequestWithContext(ctx, http.MethodDelete, resp.AbortUploadURL.URL, nil) + if err != nil { + logger.Debugf(ctx, "failed to create abort request: %v", err) + return + } + for _, h := range resp.AbortUploadURL.Headers { + req.Header.Set(h.Name, h.Value) + } + + httpClient := &http.Client{Timeout: 30 * time.Second} + abortResp, err := httpClient.Do(req) + if err != nil { + logger.Debugf(ctx, "failed to abort multipart upload: %v", err) + return + } + abortResp.Body.Close() + logger.Debugf(ctx, "aborted multipart upload (status %d)", abortResp.StatusCode) +} + // UploadWithChunking uploads a file to the given path, automatically choosing // between single-shot and multipart upload based on the content length. // For files smaller than 50 MiB, a single PUT request is used. For larger files, From 77f636ab4fc210b448deaf15d4cbc2012d77d5f8 Mon Sep 17 00:00:00 2001 From: Alex Ott Date: Mon, 13 Apr 2026 11:32:07 +0200 Subject: [PATCH 5/8] chore(files): fix formatting from goimports/gofmt Signed-off-by: Alex Ott --- service/files/ext_upload.go | 26 +++++++++++++------------- service/files/ext_upload_test.go | 20 ++++++++++---------- 2 files changed, 23 insertions(+), 23 deletions(-) diff --git a/service/files/ext_upload.go b/service/files/ext_upload.go index 24ea13b79..1f4fdb0e5 100644 --- a/service/files/ext_upload.go +++ b/service/files/ext_upload.go @@ -46,15 +46,15 @@ const ( // partSizeOptions lists the candidate part sizes in ascending order. var partSizeOptions = []int64{ - 10 * 1024 * 1024, // 10 MiB - 20 * 1024 * 1024, // 20 MiB - 50 * 1024 * 1024, // 50 MiB - 100 * 1024 * 1024, // 100 MiB - 200 * 1024 * 1024, // 200 MiB - 500 * 1024 * 1024, // 500 MiB - 1 * 1024 * 1024 * 1024, // 1 GiB - 2 * 1024 * 1024 * 1024, // 2 GiB - 4 * 1024 * 1024 * 1024, // 4 GiB + 10 * 1024 * 1024, // 10 MiB + 20 * 1024 * 1024, // 20 MiB + 50 * 1024 * 1024, // 50 MiB + 100 * 1024 * 1024, // 100 MiB + 200 * 1024 * 1024, // 200 MiB + 500 * 1024 * 1024, // 500 MiB + 1 * 1024 * 1024 * 1024, // 1 GiB + 2 * 1024 * 1024 * 1024, // 2 GiB + 4 * 1024 * 1024 * 1024, // 4 GiB } // UploadConfig holds configuration for a multipart upload. @@ -90,8 +90,8 @@ func WithOverwrite(overwrite bool) UploadOption { // initiateUploadResponse is the response from initiating a multipart upload. type initiateUploadResponse struct { - MultipartUpload *multipartUploadSession `json:"multipart_upload,omitempty"` - ResumableUpload *resumableUploadSession `json:"resumable_upload,omitempty"` + MultipartUpload *multipartUploadSession `json:"multipart_upload,omitempty"` + ResumableUpload *resumableUploadSession `json:"resumable_upload,omitempty"` } // multipartUploadSession holds the state for a multipart upload session. @@ -106,8 +106,8 @@ type resumableUploadSession struct { // presignedURL represents a presigned URL for uploading a part. type presignedURL struct { - URL string `json:"url"` - PartNumber int `json:"part_number"` + URL string `json:"url"` + PartNumber int `json:"part_number"` Headers []presignedHeader `json:"headers"` } diff --git a/service/files/ext_upload_test.go b/service/files/ext_upload_test.go index 526a05689..b96002408 100644 --- a/service/files/ext_upload_test.go +++ b/service/files/ext_upload_test.go @@ -25,11 +25,11 @@ func TestOptimizePartSize(t *testing.T) { const GiB = 1024 * MiB tests := []struct { - name string - contentLength int64 - explicitPartSize int64 - wantPartSize int64 - wantBatchSize int + name string + contentLength int64 + explicitPartSize int64 + wantPartSize int64 + wantBatchSize int }{ { // 5 MiB / 10 MiB = 1 part; ceil(sqrt(1)) = 1 @@ -87,11 +87,11 @@ func TestOptimizePartSize(t *testing.T) { // multipartMockServer simulates the multipart upload protocol for testing. type multipartMockServer struct { - mu sync.Mutex - parts map[int][]byte - completed bool - completeParts []completeUploadPart - sessionToken string + mu sync.Mutex + parts map[int][]byte + completed bool + completeParts []completeUploadPart + sessionToken string } func newMultipartMockServer() *multipartMockServer { From 09f61c197861aa153202645e371424241af126d3 Mon Sep 17 00:00:00 2001 From: Alex Ott Date: Mon, 13 Apr 2026 11:36:12 +0200 Subject: [PATCH 6/8] fix(files): address code review findings for multipart upload - Fix URL expiration retry to fetch fresh presigned URLs instead of retrying the same expired URL - Remove total http.Client timeout that would kill large part uploads; rely on context cancellation instead - Add context cancellation checks in the main upload loop - Extract partUploadError type for structured error handling Signed-off-by: Alex Ott --- service/files/ext_upload.go | 137 +++++++++++++++++-------------- service/files/ext_upload_test.go | 41 ++++++--- 2 files changed, 102 insertions(+), 76 deletions(-) diff --git a/service/files/ext_upload.go b/service/files/ext_upload.go index 1f4fdb0e5..f91274207 100644 --- a/service/files/ext_upload.go +++ b/service/files/ext_upload.go @@ -3,6 +3,7 @@ package files import ( "bytes" "context" + "errors" "fmt" "io" "math" @@ -39,9 +40,6 @@ const ( // maxURLExpirationRetries is the maximum number of retries for URL expiration. maxURLExpirationRetries = 3 - - // uploadInactivityTimeout is the timeout for upload inactivity. - uploadInactivityTimeout = 60 * time.Second ) // partSizeOptions lists the candidate part sizes in ascending order. @@ -246,63 +244,88 @@ func isURLExpiredResponse(statusCode int, body []byte) bool { } // uploadOnePart uploads a single part to a presigned URL and returns the ETag. -// It retries up to maxURLExpirationRetries times if the presigned URL has expired. -func (a *FilesAPI) uploadOnePart(ctx context.Context, presigned presignedURL, partData io.ReadSeeker) (string, error) { - // Determine content length - currentPos, err := partData.Seek(0, io.SeekCurrent) +func (a *FilesAPI) uploadOnePart(ctx context.Context, presigned presignedURL, partData io.ReadSeeker, contentLength int64) (string, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodPut, presigned.URL, partData) if err != nil { - return "", fmt.Errorf("failed to get current position: %w", err) + return "", fmt.Errorf("failed to create request: %w", err) + } + req.ContentLength = contentLength + req.Header.Set("Content-Type", "application/octet-stream") + for _, h := range presigned.Headers { + req.Header.Set(h.Name, h.Value) } - endPos, err := partData.Seek(0, io.SeekEnd) + + // Use a client without a total timeout — context cancellation handles + // overall deadline, and large parts may legitimately take a long time. + httpClient := &http.Client{} + resp, err := httpClient.Do(req) if err != nil { - return "", fmt.Errorf("failed to seek to end: %w", err) + return "", fmt.Errorf("upload part %d failed: %w", presigned.PartNumber, err) + } + + if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusCreated { + etag := resp.Header.Get("ETag") + resp.Body.Close() + return etag, nil } - contentLength := endPos - currentPos - if _, err := partData.Seek(currentPos, io.SeekStart); err != nil { - return "", fmt.Errorf("failed to seek back: %w", err) + + body, _ := io.ReadAll(resp.Body) + resp.Body.Close() + return "", &partUploadError{ + partNumber: presigned.PartNumber, + statusCode: resp.StatusCode, + body: body, } +} - for attempt := 0; attempt < maxURLExpirationRetries; attempt++ { +// partUploadError represents a failed part upload with status and body. +type partUploadError struct { + partNumber int + statusCode int + body []byte +} + +func (e *partUploadError) Error() string { + return fmt.Sprintf("upload part %d failed with status %d: %s", e.partNumber, e.statusCode, string(e.body)) +} + +func (e *partUploadError) isExpiredURL() bool { + return isURLExpiredResponse(e.statusCode, e.body) +} + +// uploadOnePartWithRetry uploads a part, re-fetching the presigned URL if it expires. +func (a *FilesAPI) uploadOnePartWithRetry(ctx context.Context, filePath, sessionToken string, partNumber int, partData io.ReadSeeker, contentLength int64) (string, error) { + for attempt := 0; attempt <= maxURLExpirationRetries; attempt++ { + // Rewind reader for retries. if attempt > 0 { if _, err := partData.Seek(0, io.SeekStart); err != nil { return "", fmt.Errorf("failed to rewind part data: %w", err) } } - req, err := http.NewRequestWithContext(ctx, http.MethodPut, presigned.URL, partData) + // Fetch a (fresh) presigned URL. + urls, err := a.getUploadPartURLs(ctx, filePath, sessionToken, partNumber, 1) if err != nil { - return "", fmt.Errorf("failed to create request: %w", err) + return "", fmt.Errorf("failed to get upload URL for part %d: %w", partNumber, err) } - req.ContentLength = contentLength - req.Header.Set("Content-Type", "application/octet-stream") - for _, h := range presigned.Headers { - req.Header.Set(h.Name, h.Value) + if len(urls) == 0 { + return "", fmt.Errorf("no upload URL returned for part %d", partNumber) } - httpClient := &http.Client{Timeout: uploadInactivityTimeout} - resp, err := httpClient.Do(req) + etag, err := a.uploadOnePart(ctx, urls[0], partData, contentLength) if err != nil { - return "", fmt.Errorf("upload part %d failed: %w", presigned.PartNumber, err) - } - - if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusCreated { - etag := resp.Header.Get("ETag") - resp.Body.Close() - return etag, nil - } - - body, _ := io.ReadAll(resp.Body) - resp.Body.Close() - - if isURLExpiredResponse(resp.StatusCode, body) { - logger.Debugf(ctx, "presigned URL expired for part %d (attempt %d/%d), retrying", presigned.PartNumber, attempt+1, maxURLExpirationRetries) - continue + var pErr *partUploadError + if errors.As(err, &pErr) && pErr.isExpiredURL() && attempt < maxURLExpirationRetries { + logger.Debugf(ctx, "presigned URL expired for part %d (attempt %d/%d), fetching new URL", + partNumber, attempt+1, maxURLExpirationRetries) + continue + } + return "", err } - - return "", fmt.Errorf("upload part %d failed with status %d: %s", presigned.PartNumber, resp.StatusCode, string(body)) + return etag, nil } - return "", fmt.Errorf("upload part %d failed: presigned URL expired after %d retries", presigned.PartNumber, maxURLExpirationRetries) + return "", fmt.Errorf("upload part %d: presigned URL expired after %d retries", partNumber, maxURLExpirationRetries) } // completeMultipartUpload finalizes a multipart upload with the given ETags. @@ -358,7 +381,10 @@ func (a *FilesAPI) uploadMultipart(ctx context.Context, filePath string, content partNumber := 1 for { - // Check if a previous upload failed + // Check context cancellation and prior upload errors. + if err := ctx.Err(); err != nil { + break + } mu.Lock() if uploadErr != nil { mu.Unlock() @@ -366,13 +392,14 @@ func (a *FilesAPI) uploadMultipart(ctx context.Context, filePath string, content } mu.Unlock() - // Read the next part + // Read the next part. buf := make([]byte, partSize) n, readErr := io.ReadFull(content, buf) if n == 0 && readErr != nil { break } buf = buf[:n] + partDataLen := int64(n) currentPartNumber := partNumber partNumber++ @@ -381,39 +408,23 @@ func (a *FilesAPI) uploadMultipart(ctx context.Context, filePath string, content go func(pn int, data []byte) { defer func() { <-sem }() - // Check for prior error + // Check for prior error or context cancellation. mu.Lock() if uploadErr != nil { mu.Unlock() return } mu.Unlock() - - // Get presigned URL for this part - urls, err := a.getUploadPartURLs(ctx, filePath, sessionToken, pn, 1) - if err != nil { - mu.Lock() - if uploadErr == nil { - uploadErr = fmt.Errorf("failed to get upload URL for part %d: %w", pn, err) - } - mu.Unlock() - return - } - if len(urls) == 0 { - mu.Lock() - if uploadErr == nil { - uploadErr = fmt.Errorf("no upload URL returned for part %d", pn) - } - mu.Unlock() + if ctx.Err() != nil { return } - // Upload the part - etag, err := a.uploadOnePart(ctx, urls[0], bytes.NewReader(data)) + // Upload the part with automatic URL refresh on expiration. + etag, err := a.uploadOnePartWithRetry(ctx, filePath, sessionToken, pn, bytes.NewReader(data), partDataLen) if err != nil { mu.Lock() if uploadErr == nil { - uploadErr = fmt.Errorf("failed to upload part %d: %w", pn, err) + uploadErr = err } mu.Unlock() return diff --git a/service/files/ext_upload_test.go b/service/files/ext_upload_test.go index b96002408..200035ea7 100644 --- a/service/files/ext_upload_test.go +++ b/service/files/ext_upload_test.go @@ -286,11 +286,31 @@ func TestUploadWithChunking_SmallFile_UsesSingleShot(t *testing.T) { assert.Equal(t, content, receivedBody) } -func TestUploadOnePart_RetriesOnExpiredURL(t *testing.T) { - var attempts atomic.Int32 +func TestUploadOnePartWithRetry_RefreshesExpiredURL(t *testing.T) { + var uploadAttempts atomic.Int32 - srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - n := attempts.Add(1) + srv := httptest.NewServer(http.NotFoundHandler()) + defer srv.Close() + + mux := http.NewServeMux() + + // create-upload-part-urls — always returns a URL pointing to /upload-part/1 + mux.HandleFunc("/api/2.0/fs/create-upload-part-urls", func(w http.ResponseWriter, r *http.Request) { + var req createUploadPartURLsRequest + json.NewDecoder(r.Body).Decode(&req) + resp := createUploadPartURLsResponse{ + UploadPartURLs: []presignedURL{{ + URL: fmt.Sprintf("%s/upload-part/%d", srv.URL, req.StartPartNumber), + PartNumber: req.StartPartNumber, + }}, + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(resp) + }) + + // upload-part — first attempt returns expired URL error, second succeeds + mux.HandleFunc("/upload-part/", func(w http.ResponseWriter, r *http.Request) { + n := uploadAttempts.Add(1) if n == 1 { w.WriteHeader(http.StatusForbidden) w.Write([]byte("AccessDenied")) @@ -298,22 +318,17 @@ func TestUploadOnePart_RetriesOnExpiredURL(t *testing.T) { } w.Header().Set("ETag", "etag-success") w.WriteHeader(http.StatusOK) - })) - defer srv.Close() + }) + srv.Config.Handler = mux api := newTestFilesAPI(t, srv.URL) - presigned := presignedURL{ - URL: srv.URL + "/upload-part/1", - PartNumber: 1, - } data := strings.NewReader("test data") - ctx := context.Background() - etag, err := api.uploadOnePart(ctx, presigned, data) + etag, err := api.uploadOnePartWithRetry(ctx, "/test/file.bin", "session-tok", 1, data, int64(len("test data"))) require.NoError(t, err) assert.Equal(t, "etag-success", etag) - assert.Equal(t, int32(2), attempts.Load()) + assert.Equal(t, int32(2), uploadAttempts.Load()) } func TestUploadFromFile(t *testing.T) { From 9ca7ae4922f3b9f108e33d5a0773baea8128a5d0 Mon Sep 17 00:00:00 2001 From: Alex Ott Date: Mon, 13 Apr 2026 12:24:54 +0200 Subject: [PATCH 7/8] fix(files): add first-chunk fallback, contentLength validation, and test coverage Address review feedback: expose upload methods via FilesInterface, fall back to single-shot upload when the first multipart chunk fails, validate contentLength matches actual bytes read, and add tests for all new paths. Co-authored-by: Isaac --- .../mocks/service/files/mock_files_upload.go | 55 +++++ service/files/api.go | 1 + service/files/ext_upload.go | 190 ++++++++++----- service/files/ext_upload_test.go | 227 ++++++++++++++++++ 4 files changed, 416 insertions(+), 57 deletions(-) create mode 100644 experimental/mocks/service/files/mock_files_upload.go diff --git a/experimental/mocks/service/files/mock_files_upload.go b/experimental/mocks/service/files/mock_files_upload.go new file mode 100644 index 000000000..2ed7b459a --- /dev/null +++ b/experimental/mocks/service/files/mock_files_upload.go @@ -0,0 +1,55 @@ +// Hand-written mock stubs for filesAPIUploadUtilities methods. +// These will be replaced by mockery-generated code on the next `make codegen`. + +package files + +import ( + "context" + "io" + + files "github.com/databricks/databricks-sdk-go/service/files" +) + +// UploadWithChunking provides a mock function for the FilesInterface. +func (_m *MockFilesInterface) UploadWithChunking(ctx context.Context, filePath string, content io.ReadSeeker, contentLength int64, opts ...files.UploadOption) error { + _va := make([]any, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []any + _ca = append(_ca, ctx, filePath, content, contentLength) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + if len(ret) == 0 { + panic("no return value specified for UploadWithChunking") + } + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, io.ReadSeeker, int64, ...files.UploadOption) error); ok { + r0 = rf(ctx, filePath, content, contentLength, opts...) + } else { + r0 = ret.Error(0) + } + return r0 +} + +// UploadFromFile provides a mock function for the FilesInterface. +func (_m *MockFilesInterface) UploadFromFile(ctx context.Context, filePath string, sourcePath string, opts ...files.UploadOption) error { + _va := make([]any, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []any + _ca = append(_ca, ctx, filePath, sourcePath) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + if len(ret) == 0 { + panic("no return value specified for UploadFromFile") + } + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, string, ...files.UploadOption) error); ok { + r0 = rf(ctx, filePath, sourcePath, opts...) + } else { + r0 = ret.Error(0) + } + return r0 +} diff --git a/service/files/api.go b/service/files/api.go index 432828c39..4b4c6453d 100755 --- a/service/files/api.go +++ b/service/files/api.go @@ -217,6 +217,7 @@ func (a *DbfsAPI) MkdirsByPath(ctx context.Context, path string) error { } type FilesInterface interface { + filesAPIUploadUtilities // Creates an empty directory. If necessary, also creates any parent directories // of the new, empty directory (like the shell command `mkdir -p`). If called on diff --git a/service/files/ext_upload.go b/service/files/ext_upload.go index f91274207..0f2dbfba0 100644 --- a/service/files/ext_upload.go +++ b/service/files/ext_upload.go @@ -55,6 +55,27 @@ var partSizeOptions = []int64{ 4 * 1024 * 1024 * 1024, // 4 GiB } +// filesAPIUploadUtilities defines the hand-written upload extension methods +// for FilesAPI. This interface is embedded in FilesInterface (in api.go) so +// that the methods are accessible through WorkspaceClient.Files. +type filesAPIUploadUtilities interface { + // UploadWithChunking uploads a file to a Unity Catalog volume path. For + // files smaller than 50 MiB it uses the standard single-shot Upload. For + // larger files it uses the multipart chunked upload protocol with parallel + // part uploads. If the first chunk of a multipart upload fails, it falls + // back to single-shot upload automatically. + // + // filePath is the absolute remote path (e.g. /Volumes/catalog/schema/volume/file.bin). + // content must be an io.ReadSeeker so the SDK can read parts for parallel upload. + // contentLength is the total file size in bytes. + UploadWithChunking(ctx context.Context, filePath string, content io.ReadSeeker, contentLength int64, opts ...UploadOption) error + + // UploadFromFile uploads a local file to a Unity Catalog volume path. It + // automatically detects the file size and uses multipart chunked upload for + // files larger than 50 MiB. + UploadFromFile(ctx context.Context, filePath string, sourcePath string, opts ...UploadOption) error +} + // UploadConfig holds configuration for a multipart upload. type UploadConfig struct { PartSize int64 @@ -354,8 +375,24 @@ func (a *FilesAPI) completeMultipartUpload(ctx context.Context, filePath, sessio return a.filesImpl.client.Do(ctx, http.MethodPost, apiPath, headers, queryParams, req, nil) } -// uploadMultipart orchestrates a full multipart upload. -func (a *FilesAPI) uploadMultipart(ctx context.Context, filePath string, content io.ReadSeeker, _ int64, cfg *UploadConfig) error { +// errFallbackToSingleShot signals that multipart upload failed on the first +// chunk and the caller should retry with single-shot upload. +type errFallbackToSingleShot struct { + reason error +} + +func (e *errFallbackToSingleShot) Error() string { + return fmt.Sprintf("falling back to single-shot upload: %v", e.reason) +} + +// uploadMultipart orchestrates a full multipart upload. If the first part +// fails (e.g., 403 from Azure firewall), it returns errFallbackToSingleShot +// so the caller can retry with the standard Upload method. +func (a *FilesAPI) uploadMultipart(ctx context.Context, filePath string, content io.ReadSeeker, contentLength int64, cfg *UploadConfig) error { + if contentLength <= 0 { + return fmt.Errorf("contentLength must be positive, got %d", contentLength) + } + // Phase 1: Initiate initResp, err := a.initiateMultipartUpload(ctx, filePath, cfg.Overwrite) if err != nil { @@ -373,83 +410,109 @@ func (a *FilesAPI) uploadMultipart(ctx context.Context, filePath string, content parallelism = defaultParallelism } - // Phase 2: Upload parts in parallel - sem := make(chan struct{}, parallelism) - var mu sync.Mutex + // Phase 2a: Upload first part synchronously to detect early failures. + // If the first part fails (e.g. Azure firewall 403), signal fallback to + // single-shot upload so the caller can retry without multipart. etags := make(map[int]string) - var uploadErr error - - partNumber := 1 - for { - // Check context cancellation and prior upload errors. - if err := ctx.Err(); err != nil { - break - } - mu.Lock() - if uploadErr != nil { - mu.Unlock() - break - } - mu.Unlock() + var totalBytes int64 - // Read the next part. - buf := make([]byte, partSize) - n, readErr := io.ReadFull(content, buf) - if n == 0 && readErr != nil { - break - } - buf = buf[:n] - partDataLen := int64(n) + firstBuf := make([]byte, partSize) + firstN, firstReadErr := io.ReadFull(content, firstBuf) + if firstN == 0 && firstReadErr != nil { + // Empty content — nothing to upload. Complete with zero parts. + return a.completeMultipartUpload(ctx, filePath, sessionToken, etags) + } + firstBuf = firstBuf[:firstN] + totalBytes += int64(firstN) - currentPartNumber := partNumber - partNumber++ + firstEtag, err := a.uploadOnePartWithRetry(ctx, filePath, sessionToken, 1, bytes.NewReader(firstBuf), int64(firstN)) + if err != nil { + a.abortMultipartUpload(ctx, filePath, sessionToken) + return &errFallbackToSingleShot{reason: err} + } + etags[1] = firstEtag + logger.Debugf(ctx, "uploaded part 1 (first chunk validated)") - sem <- struct{}{} - go func(pn int, data []byte) { - defer func() { <-sem }() + // Phase 2b: Upload remaining parts in parallel. + if firstReadErr == nil { + sem := make(chan struct{}, parallelism) + var mu sync.Mutex + var uploadErr error - // Check for prior error or context cancellation. + partNumber := 2 + for { + if err := ctx.Err(); err != nil { + break + } mu.Lock() if uploadErr != nil { mu.Unlock() - return + break } mu.Unlock() - if ctx.Err() != nil { - return + + buf := make([]byte, partSize) + n, readErr := io.ReadFull(content, buf) + if n == 0 && readErr != nil { + break } + buf = buf[:n] + partDataLen := int64(n) + totalBytes += partDataLen + + currentPartNumber := partNumber + partNumber++ + + sem <- struct{}{} + go func(pn int, data []byte) { + defer func() { <-sem }() - // Upload the part with automatic URL refresh on expiration. - etag, err := a.uploadOnePartWithRetry(ctx, filePath, sessionToken, pn, bytes.NewReader(data), partDataLen) - if err != nil { mu.Lock() - if uploadErr == nil { - uploadErr = err + if uploadErr != nil { + mu.Unlock() + return } mu.Unlock() - return - } + if ctx.Err() != nil { + return + } - mu.Lock() - etags[pn] = etag - mu.Unlock() + etag, err := a.uploadOnePartWithRetry(ctx, filePath, sessionToken, pn, bytes.NewReader(data), partDataLen) + if err != nil { + mu.Lock() + if uploadErr == nil { + uploadErr = err + } + mu.Unlock() + return + } - logger.Debugf(ctx, "uploaded part %d", pn) - }(currentPartNumber, buf) + mu.Lock() + etags[pn] = etag + mu.Unlock() + logger.Debugf(ctx, "uploaded part %d", pn) + }(currentPartNumber, buf) - if readErr != nil { - break + if readErr != nil { + break + } } - } - // Wait for all goroutines to finish - for i := 0; i < parallelism; i++ { - sem <- struct{}{} + // Wait for all goroutines to finish. + for i := 0; i < parallelism; i++ { + sem <- struct{}{} + } + + if uploadErr != nil { + a.abortMultipartUpload(ctx, filePath, sessionToken) + return uploadErr + } } - if uploadErr != nil { + // Verify total bytes read matches the declared content length. + if totalBytes != contentLength { a.abortMultipartUpload(ctx, filePath, sessionToken) - return uploadErr + return fmt.Errorf("content length mismatch: declared %d bytes but read %d bytes", contentLength, totalBytes) } // Phase 3: Complete @@ -531,7 +594,20 @@ func (a *FilesAPI) UploadWithChunking(ctx context.Context, filePath string, cont }) } - return a.uploadMultipart(ctx, filePath, content, contentLength, cfg) + err := a.uploadMultipart(ctx, filePath, content, contentLength, cfg) + var fallback *errFallbackToSingleShot + if errors.As(err, &fallback) { + logger.Debugf(ctx, "multipart first-chunk failed (%v), falling back to single-shot upload", fallback.reason) + if _, seekErr := content.Seek(0, io.SeekStart); seekErr != nil { + return fmt.Errorf("failed to rewind content for single-shot fallback: %w", seekErr) + } + return a.Upload(ctx, UploadRequest{ + FilePath: filePath, + Contents: io.NopCloser(content), + Overwrite: cfg.Overwrite, + }) + } + return err } // UploadFromFile uploads a local file to the given path, automatically choosing diff --git a/service/files/ext_upload_test.go b/service/files/ext_upload_test.go index 200035ea7..ccfba3f9a 100644 --- a/service/files/ext_upload_test.go +++ b/service/files/ext_upload_test.go @@ -372,3 +372,230 @@ func TestUploadFromFile(t *testing.T) { assert.Equal(t, contentSize, len(mock.parts[1]), "uploaded bytes should match content size") assert.Equal(t, content, mock.parts[1], "uploaded content should match") } + +func TestUploadMultipart_FirstChunkFallback(t *testing.T) { + // Simulate: initiate-upload succeeds, but the first presigned URL upload + // returns 403 (Azure firewall). uploadMultipart should return + // errFallbackToSingleShot so the caller can retry with single-shot. + + mux := http.NewServeMux() + srv := httptest.NewServer(http.NotFoundHandler()) + defer srv.Close() + + // initiate-upload — succeed with a session token. + mux.HandleFunc("/api/2.0/fs/files/", func(w http.ResponseWriter, r *http.Request) { + action := r.URL.Query().Get("action") + if action == "initiate-upload" { + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(initiateUploadResponse{ + MultipartUpload: &multipartUploadSession{SessionToken: "tok"}, + }) + return + } + http.Error(w, "unexpected", http.StatusBadRequest) + }) + + // create-upload-part-urls — return a URL that will fail. + mux.HandleFunc("/api/2.0/fs/create-upload-part-urls", func(w http.ResponseWriter, r *http.Request) { + var req createUploadPartURLsRequest + json.NewDecoder(r.Body).Decode(&req) + resp := createUploadPartURLsResponse{ + UploadPartURLs: []presignedURL{{ + URL: fmt.Sprintf("%s/upload-part/%d", srv.URL, req.StartPartNumber), + PartNumber: req.StartPartNumber, + }}, + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(resp) + }) + + // upload-part — always return 403 to simulate firewall block. + mux.HandleFunc("/upload-part/", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusForbidden) + w.Write([]byte("AccessDenied")) + }) + + // abort upload URL (best-effort cleanup). + mux.HandleFunc("/api/2.0/fs/create-abort-upload-url", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.Write([]byte(`{"abort_upload_url":{"url":""}}`)) + }) + + srv.Config.Handler = mux + api := newTestFilesAPI(t, srv.URL) + + content := strings.NewReader("test data for fallback") + cfg := &UploadConfig{PartSize: 1024, Parallelism: 1, Overwrite: true} + + err := api.uploadMultipart(context.Background(), "/test/fallback.bin", content, int64(len("test data for fallback")), cfg) + + var fallback *errFallbackToSingleShot + require.ErrorAs(t, err, &fallback, "should return errFallbackToSingleShot") + assert.Contains(t, fallback.reason.Error(), "AccessDenied") +} + +func TestUploadWithChunking_FallbackToSingleShot(t *testing.T) { + // End-to-end test: UploadWithChunking triggers multipart (content > threshold), + // first chunk fails, falls back to single-shot Upload which succeeds. + + var receivedBody []byte + var singleShotCalled atomic.Int32 + var initiateCount atomic.Int32 + + mux := http.NewServeMux() + srv := httptest.NewServer(http.NotFoundHandler()) + defer srv.Close() + + mux.HandleFunc("/api/2.0/fs/files/", func(w http.ResponseWriter, r *http.Request) { + action := r.URL.Query().Get("action") + switch { + case r.Method == http.MethodPut && action == "": + // Single-shot PUT fallback. + singleShotCalled.Add(1) + body, _ := io.ReadAll(r.Body) + receivedBody = body + w.WriteHeader(http.StatusOK) + case action == "initiate-upload": + initiateCount.Add(1) + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(initiateUploadResponse{ + MultipartUpload: &multipartUploadSession{SessionToken: "tok"}, + }) + default: + http.Error(w, "unexpected", http.StatusBadRequest) + } + }) + + mux.HandleFunc("/api/2.0/fs/create-upload-part-urls", func(w http.ResponseWriter, r *http.Request) { + var req createUploadPartURLsRequest + json.NewDecoder(r.Body).Decode(&req) + resp := createUploadPartURLsResponse{ + UploadPartURLs: []presignedURL{{ + URL: fmt.Sprintf("%s/upload-part/%d", srv.URL, req.StartPartNumber), + PartNumber: req.StartPartNumber, + }}, + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(resp) + }) + + // First chunk always fails. + mux.HandleFunc("/upload-part/", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusForbidden) + w.Write([]byte("AccessDenied")) + }) + + mux.HandleFunc("/api/2.0/fs/create-abort-upload-url", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.Write([]byte(`{"abort_upload_url":{"url":""}}`)) + }) + + srv.Config.Handler = mux + api := newTestFilesAPI(t, srv.URL) + + // Content must be >= minMultipartUploadSize to trigger multipart path. + // We use a small threshold override via the internal config. + // Instead, we call uploadMultipart + fallback directly through UploadWithChunking + // by crafting content that exceeds minMultipartUploadSize. + // For a practical test, we override the threshold by testing the full flow + // with a content size that's large enough. But 50 MiB is too large for a unit test. + // Instead, test the fallback path by calling uploadMultipart directly and + // verifying that UploadWithChunking handles the error correctly. + + // Direct test: call UploadWithChunking with a content size that looks large + // to pass the threshold. We fake contentLength to be above threshold. + contentData := "hello fallback data" + content := strings.NewReader(contentData) + fakeLength := int64(minMultipartUploadSize + 1) // Trick threshold check + + err := api.UploadWithChunking(context.Background(), "/test/fallback.bin", content, fakeLength, WithOverwrite(true)) + require.NoError(t, err) + assert.Equal(t, int32(1), singleShotCalled.Load(), "single-shot upload should have been called as fallback") + assert.Equal(t, int32(1), initiateCount.Load(), "multipart initiate should have been called once") + assert.Equal(t, []byte(contentData), receivedBody, "single-shot should have received the full content") +} + +func TestUploadMultipart_ContentLengthMismatch(t *testing.T) { + mock := newMultipartMockServer() + srv := httptest.NewServer(http.NotFoundHandler()) + defer srv.Close() + srv.Config.Handler = mock.handler(srv) + + api := newTestFilesAPI(t, srv.URL) + + content := strings.NewReader("short") + // Declare a much larger contentLength than actual data. + cfg := &UploadConfig{PartSize: 1024, Parallelism: 1, Overwrite: true} + + err := api.uploadMultipart(context.Background(), "/test/mismatch.bin", content, 999999, cfg) + require.Error(t, err) + assert.Contains(t, err.Error(), "content length mismatch") +} + +func TestUploadMultipart_RejectsNonPositiveContentLength(t *testing.T) { + api := &FilesAPI{} + cfg := &UploadConfig{PartSize: 1024, Parallelism: 1} + + err := api.uploadMultipart(context.Background(), "/test/file.bin", strings.NewReader("data"), 0, cfg) + require.Error(t, err) + assert.Contains(t, err.Error(), "contentLength must be positive") + + err = api.uploadMultipart(context.Background(), "/test/file.bin", strings.NewReader("data"), -1, cfg) + require.Error(t, err) + assert.Contains(t, err.Error(), "contentLength must be positive") +} + +func TestUploadWithChunking_LargeFileUsesMultipart(t *testing.T) { + mock := newMultipartMockServer() + srv := httptest.NewServer(http.NotFoundHandler()) + defer srv.Close() + srv.Config.Handler = mock.handler(srv) + + api := newTestFilesAPI(t, srv.URL) + + // Create content just above the multipart threshold. + // We can't use 50 MiB in a unit test, so we use a smaller content but + // fake the content length to exceed the threshold. The mock server will + // accept whatever we send. + contentSize := 100 * 1024 // 100 KiB actual data + content := make([]byte, contentSize) + for i := range content { + content[i] = byte(i % 256) + } + + ctx := context.Background() + // Use UploadWithChunking with contentLength > minMultipartUploadSize + // but actual data is smaller. The content length mismatch will cause + // an error because we validate it. Instead, let's test the multipart + // path directly through uploadMultipart, which we know works from + // TestUploadMultipart_FullFlow. The key thing to test here is that + // UploadWithChunking dispatches to multipart when length >= threshold. + + // We verify this by checking that initiate-upload was called (multipart + // path) rather than a direct PUT (single-shot). + // To do this properly without 50 MiB, we check mock state. + + // For the threshold test, we can just verify that small files go single-shot + // (already covered by TestUploadWithChunking_SmallFile_UsesSingleShot) + // and that the routing logic is correct by calling uploadMultipart directly. + // The full multipart flow is tested by TestUploadMultipart_FullFlow. + + // Test that uploadMultipart is called when length >= threshold by verifying + // the mock received multiple parts. + err := api.UploadWithChunking(ctx, "/test/large.bin", + strings.NewReader(string(content)), + int64(minMultipartUploadSize+1), // Declare as "large" to trigger multipart + WithPartSize(30*1024), + WithParallelism(2), + WithOverwrite(true), + ) + // This will fail with content length mismatch since actual < declared, + // but it proves the multipart path was taken (initiate was called). + require.Error(t, err) + assert.Contains(t, err.Error(), "content length mismatch") + + mock.mu.Lock() + defer mock.mu.Unlock() + // The multipart upload was initiated (proves the routing went to multipart). + assert.True(t, len(mock.parts) > 0, "multipart path should have been taken") +} From cdbdc26b08287011e54d65ed276b778779529315 Mon Sep 17 00:00:00 2001 From: Alex Ott Date: Sun, 17 May 2026 12:13:27 +0200 Subject: [PATCH 8/8] Add changelog Signed-off-by: Alex Ott --- NEXT_CHANGELOG.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/NEXT_CHANGELOG.md b/NEXT_CHANGELOG.md index 8e0513084..df1f5e440 100755 --- a/NEXT_CHANGELOG.md +++ b/NEXT_CHANGELOG.md @@ -6,6 +6,8 @@ ### New Features and Improvements +- Upload of big files (> 5Gb) to UC Volumes using multipart chunking ([#621](https://github.com/databricks/databricks-sdk-go/pull/1621)). + ### Bug Fixes ### Documentation @@ -29,4 +31,4 @@ * Add `GpuXlarge` enum value for [serving.ServedModelInputWorkloadType](https://pkg.go.dev/github.com/databricks/databricks-sdk-go/service/serving#ServedModelInputWorkloadType). * Add `GpuXlarge` enum value for [serving.ServingModelWorkloadType](https://pkg.go.dev/github.com/databricks/databricks-sdk-go/service/serving#ServingModelWorkloadType). * [Breaking] Change `ListFeatures` method for [w.FeatureEngineering](https://pkg.go.dev/github.com/databricks/databricks-sdk-go/service/ml#FeatureEngineeringAPI) workspace-level service with new required argument order. -* [Breaking] Remove `UnspecifiedResourceName` field for [postgres.RequestedResource](https://pkg.go.dev/github.com/databricks/databricks-sdk-go/service/postgres#RequestedResource). \ No newline at end of file +* [Breaking] Remove `UnspecifiedResourceName` field for [postgres.RequestedResource](https://pkg.go.dev/github.com/databricks/databricks-sdk-go/service/postgres#RequestedResource).