Skip to content
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdk/data/azcosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

### Features Added

* Added `ReadMany` API to read multiple items from a container in a single operation. See [PR 25516](https://github.com/Azure/azure-sdk-for-go/pull/25516)

### Breaking Changes

### Bugs Fixed
Expand Down
78 changes: 78 additions & 0 deletions sdk/data/azcosmos/cosmos_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
"encoding/json"
"errors"
"fmt"
"net/http"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
"github.com/Azure/azure-sdk-for-go/sdk/internal/uuid"
Expand Down Expand Up @@ -434,6 +436,82 @@ func (c *ContainerClient) ReadItem(
return response, err
}

// ItemIdentity represents the identity (ID and partition key) of an item.
// This is useful for bulk/read-many style operations that need to address multiple
// items under (potentially) different partition key values.
//
// ID must match the 'id' property of the stored item. PartitionKey is the value (or
// composite/hierarchical set of values) the item was written with. For hierarchical
// partition keys create the PartitionKey with NewPartitionKey* helpers (e.g.
// NewPartitionKeyString, NewPartitionKeyInt, or NewPartitionKeyArray) following the
// order defined in the container.
type ItemIdentity struct {
ID string
PartitionKey PartitionKey
}

// ReadManyOptions contains options for ReadMany operations.
type ReadManyOptions struct {
// SessionToken to be used when using Session consistency on the account.
// When working with Session consistency, each new write request to Azure Cosmos DB is assigned a new SessionToken.
// The client instance will use this token internally with each read/query request to ensure that the set consistency level is maintained.
SessionToken *string
// ConsistencyLevel overrides the account defined consistency level for this operation.
// Consistency can only be relaxed.
ConsistencyLevel *ConsistencyLevel
// Options for operations in the dedicated gateway.
DedicatedGatewayRequestOptions *DedicatedGatewayRequestOptions
}

// ReadManyResponse represents the response from a ReadMany operation.
type ReadManyResponse struct {
Items [][]byte
TotalRequestCharge float32
}

// ReadMany reads multiple items from a Cosmos container.
// This operation may be equivalent to performing a sequence of Read operations for each item,
// but the SDK will apply optimizations when possible. In the worst case, this is no worse
// than sequential point reads, but may benefit from current and future optimizations.
// Items that do not exist in the container will be silently skipped and not included in the response.
// ctx - The context for the request.
// itemIdentities - The identities (ID and partition key) of the items to read.
// options - Options for the operation.
func (c *ContainerClient) ReadMany(
ctx context.Context,
itemIdentities []ItemIdentity,
options *ReadManyOptions) (ReadManyResponse, error) {
if options == nil {
options = &ReadManyOptions{}
}

items := make([][]byte, 0, len(itemIdentities))
var totalCharge float32

for _, identity := range itemIdentities {
itemOptions := &ItemOptions{
SessionToken: options.SessionToken,
ConsistencyLevel: options.ConsistencyLevel,
DedicatedGatewayRequestOptions: options.DedicatedGatewayRequestOptions,
}
response, err := c.ReadItem(ctx, identity.PartitionKey, identity.ID, itemOptions)
if err != nil {
var responseErr *azcore.ResponseError
if errors.As(err, &responseErr) && responseErr.StatusCode == http.StatusNotFound {
continue
}
return ReadManyResponse{}, err
}
items = append(items, response.Value)
totalCharge += response.RequestCharge
}

return ReadManyResponse{
Items: items,
TotalRequestCharge: totalCharge,
}, nil
}

// GetFeedRanges retrieves all the feed ranges for which changefeed could be fetched.
// ctx - The context for the request.
func (c *ContainerClient) GetFeedRanges(ctx context.Context) ([]FeedRange, error) {
Expand Down
136 changes: 136 additions & 0 deletions sdk/data/azcosmos/emulator_cosmos_item_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -728,3 +728,139 @@ func verifyEncodingScenarioResponse(t *testing.T, name string, itemResponse Item
t.Fatalf("[%s] Expected status code %d, got %d", name, expectedStatus, itemResponse.RawResponse.StatusCode)
}
}

func TestReadMany(t *testing.T) {
emulatorTests := newEmulatorTests(t)
client := emulatorTests.getClient(t, newSpanValidator(t, &spanMatcher{
ExpectedSpans: []string{},
}))

database := emulatorTests.createDatabase(t, context.TODO(), client, "readMany")
defer emulatorTests.deleteDatabase(t, context.TODO(), database)
properties := ContainerProperties{
ID: "aContainer",
PartitionKeyDefinition: PartitionKeyDefinition{
Paths: []string{"/pk"},
},
}

_, err := database.CreateContainer(context.TODO(), properties, nil)
if err != nil {
t.Fatalf("Failed to create container: %v", err)
}

container, _ := database.NewContainer("aContainer")

item1 := map[string]interface{}{
"id": "1",
"pk": "pk1",
"value": "first",
}

item2 := map[string]interface{}{
"id": "2",
"pk": "pk2",
"value": "second",
}

item3 := map[string]interface{}{
"id": "3",
"pk": "pk3",
"value": "third",
}

item4 := map[string]interface{}{
"id": "4",
"pk": "pk4",
"value": "fourth",
}

pk1 := NewPartitionKeyString("pk1")
pk2 := NewPartitionKeyString("pk2")
pk3 := NewPartitionKeyString("pk3")
pk4 := NewPartitionKeyString("pk4")

marshalled1, err := json.Marshal(item1)
if err != nil {
t.Fatal(err)
}
marshalled2, err := json.Marshal(item2)
if err != nil {
t.Fatal(err)
}
marshalled3, err := json.Marshal(item3)
if err != nil {
t.Fatal(err)
}
marshalled4, err := json.Marshal(item4)
if err != nil {
t.Fatal(err)
}

_, err = container.CreateItem(context.TODO(), pk1, marshalled1, nil)
if err != nil {
t.Fatalf("Failed to create item1: %v", err)
}

_, err = container.CreateItem(context.TODO(), pk2, marshalled2, nil)
if err != nil {
t.Fatalf("Failed to create item2: %v", err)
}

_, err = container.CreateItem(context.TODO(), pk3, marshalled3, nil)
if err != nil {
t.Fatalf("Failed to create item3: %v", err)
}

_, err = container.CreateItem(context.TODO(), pk4, marshalled4, nil)
if err != nil {
t.Fatalf("Failed to create item4: %v", err)
}

identities := []ItemIdentity{
{ID: "1", PartitionKey: pk1},
{ID: "2", PartitionKey: pk2},
{ID: "3", PartitionKey: pk3},
{ID: "nonexistent", PartitionKey: NewPartitionKeyString("pkNonExistent")},
}

response, err := container.ReadMany(context.TODO(), identities, nil)
if err != nil {
t.Fatalf("Failed to read many items: %v", err)
}

if len(response.Items) != 3 {
t.Fatalf("Expected 3 items, got %d", len(response.Items))
}

if response.TotalRequestCharge <= 0 {
t.Fatalf("Expected positive request charge, got %f", response.TotalRequestCharge)
}

var result1 map[string]interface{}
err = json.Unmarshal(response.Items[0], &result1)
if err != nil {
t.Fatalf("Failed to unmarshal item 1: %v", err)
}
if result1["id"] != "1" || result1["value"] != "first" {
t.Fatalf("Item 1 mismatch: %v", result1)
}

var result2 map[string]interface{}
err = json.Unmarshal(response.Items[1], &result2)
if err != nil {
t.Fatalf("Failed to unmarshal item 2: %v", err)
}
if result2["id"] != "2" || result2["value"] != "second" {
t.Fatalf("Item 2 mismatch: %v", result2)
}

var result3 map[string]interface{}
err = json.Unmarshal(response.Items[2], &result3)
if err != nil {
t.Fatalf("Failed to unmarshal item 3: %v", err)
}
if result3["id"] != "3" || result3["value"] != "third" {
t.Fatalf("Item 3 mismatch: %v", result3)
}
}