-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbundle.go
More file actions
190 lines (159 loc) · 5.49 KB
/
bundle.go
File metadata and controls
190 lines (159 loc) · 5.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
package storage
import (
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"strings"
"time"
v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
)
const (
// BundleFormatTar is the tar archive format for bundle responses.
BundleFormatTar = "tar"
// BundleCompressionNone disables compression (default).
BundleCompressionNone = "none"
// BundleCompressionGzip enables gzip compression.
BundleCompressionGzip = "gzip"
// BundleCompressionZstd enables zstd compression.
BundleCompressionZstd = "zstd"
// BundleOnErrorSkip silently omits missing objects from the archive (default).
BundleOnErrorSkip = "skip"
// BundleOnErrorFail returns an error if any object is missing.
BundleOnErrorFail = "fail"
)
// bundleHTTPClient is reused across calls. No overall timeout — the caller's
// context controls cancellation, which avoids cutting off streaming reads.
var bundleHTTPClient = &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{Timeout: 30 * time.Second}).DialContext,
TLSHandshakeTimeout: 10 * time.Second,
ResponseHeaderTimeout: 60 * time.Second,
},
}
// BundleObjectsInput is the input for a BundleObjects request.
type BundleObjectsInput struct {
// Bucket is the name of the bucket containing the objects. Required.
Bucket string
// Keys is the list of object keys to include in the bundle. Required.
// Maximum 5,000 keys per request.
Keys []string
// Compression sets the compression algorithm for the response.
// Valid values: "none" (default), "gzip", "zstd".
Compression string
// OnError controls behavior when objects are missing.
// "skip" (default): omit missing objects and append __bundle_errors.json to the tar.
// "fail": return an error before streaming if any object is missing.
OnError string
}
// BundleObjectsOutput is the response from a BundleObjects request.
//
// The Body contains a streaming tar archive. Callers are responsible for closing Body.
// Use archive/tar to iterate entries:
//
// tr := tar.NewReader(output.Body)
// for {
// hdr, err := tr.Next()
// if err == io.EOF { break }
// // process hdr.Name, tr
// }
//
// If compression was requested, wrap Body with the appropriate decompressor first:
//
// gz, _ := gzip.NewReader(output.Body)
// tr := tar.NewReader(gz)
type BundleObjectsOutput struct {
// Body is the streaming tar archive. Must be closed by the caller.
Body io.ReadCloser
// ContentType is the response Content-Type (e.g. "application/x-tar", "application/gzip").
ContentType string
// StatusCode is the HTTP status code of the response.
StatusCode int
}
type bundleRequestBody struct {
Keys []string `json:"keys"`
}
// BundleObjects fetches multiple objects from a bucket as a streaming tar archive
// in a single HTTP request.
//
// This is a Tigris extension to the S3 API, designed for ML training workloads
// that need to fetch thousands of objects per batch without per-object HTTP overhead.
//
// The caller is responsible for closing the returned Body.
func (c *Client) BundleObjects(ctx context.Context, in *BundleObjectsInput) (*BundleObjectsOutput, error) {
if in.Bucket == "" {
return nil, fmt.Errorf("storage: BundleObjects: bucket is required")
}
if len(in.Keys) == 0 {
return nil, fmt.Errorf("storage: BundleObjects: at least one key is required")
}
compression := in.Compression
if compression == "" {
compression = BundleCompressionNone
}
onError := in.OnError
if onError == "" {
onError = BundleOnErrorSkip
}
opts := c.Client.Options()
endpoint := GlobalEndpoint
if opts.BaseEndpoint != nil {
endpoint = *opts.BaseEndpoint
}
endpoint = strings.TrimRight(endpoint, "/")
reqURL := fmt.Sprintf("%s/%s?bundle", endpoint, in.Bucket)
body, err := json.Marshal(bundleRequestBody{Keys: in.Keys})
if err != nil {
return nil, fmt.Errorf("storage: BundleObjects: failed to marshal keys: %w", err)
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, reqURL, bytes.NewReader(body))
if err != nil {
return nil, fmt.Errorf("storage: BundleObjects: failed to create request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Tigris-Bundle-Format", BundleFormatTar)
req.Header.Set("X-Tigris-Bundle-Compression", compression)
req.Header.Set("X-Tigris-Bundle-On-Error", onError)
// Sign request with SigV4.
if opts.Credentials != nil {
creds, err := opts.Credentials.Retrieve(ctx)
if err != nil {
return nil, fmt.Errorf("storage: BundleObjects: failed to retrieve credentials: %w", err)
}
payloadHash := sha256Hex(body)
req.Header.Set("X-Amz-Content-Sha256", payloadHash)
signer := v4.NewSigner()
region := opts.Region
if region == "" {
region = "auto"
}
err = signer.SignHTTP(ctx, creds, req, payloadHash, "s3", region, time.Now())
if err != nil {
return nil, fmt.Errorf("storage: BundleObjects: failed to sign request: %w", err)
}
}
resp, err := bundleHTTPClient.Do(req)
if err != nil {
return nil, fmt.Errorf("storage: BundleObjects: request failed: %w", err)
}
if resp.StatusCode >= 400 {
defer resp.Body.Close()
errBody, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
return nil, fmt.Errorf("storage: BundleObjects: HTTP %d: %s", resp.StatusCode, string(errBody))
}
return &BundleObjectsOutput{
Body: resp.Body,
ContentType: resp.Header.Get("Content-Type"),
StatusCode: resp.StatusCode,
}, nil
}
func sha256Hex(data []byte) string {
h := sha256.Sum256(data)
return hex.EncodeToString(h[:])
}