Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdk/data/azcosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

### Features Added

* Added `ReadMany` API to read multiple items from a container in a single operation. See [PR 25516](https://github.com/Azure/azure-sdk-for-go/pull/25516)

### Breaking Changes

### Bugs Fixed
Expand Down
88 changes: 88 additions & 0 deletions sdk/data/azcosmos/cosmos_read_many.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package azcosmos

import (
"context"
"errors"
"net/http"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
)

// ItemIdentity represents the identity (ID and partition key) of an item.
// This is useful for bulk/read-many style operations that need to address multiple
// items under (potentially) different partition key values.
//
// ID must match the 'id' property of the stored item. PartitionKey is the value (or
// composite/hierarchical set of values) the item was written with. For hierarchical
// partition keys create the PartitionKey with NewPartitionKey* helpers (e.g.
// NewPartitionKeyString, NewPartitionKeyInt, or NewPartitionKeyArray) following the
// order defined in the container.
type ItemIdentity struct {
ID string
PartitionKey PartitionKey
}

// ReadManyOptions contains options for ReadMany operations.
type ReadManyOptions struct {
// SessionToken to be used when using Session consistency on the account.
// When working with Session consistency, each new write request to Azure Cosmos DB is assigned a new SessionToken.
// The client instance will use this token internally with each read/query request to ensure that the set consistency level is maintained.
SessionToken *string
// ConsistencyLevel overrides the account defined consistency level for this operation.
// Consistency can only be relaxed.
ConsistencyLevel *ConsistencyLevel
// Options for operations in the dedicated gateway.
DedicatedGatewayRequestOptions *DedicatedGatewayRequestOptions
}

// ReadManyResponse represents the response from a ReadMany operation.
type ReadManyResponse struct {
Items [][]byte
TotalRequestCharge float32
}

// ReadMany reads multiple items from a Cosmos container.
// This operation may be equivalent to performing a sequence of Read operations for each item,
// but the SDK will apply optimizations when possible. In the worst case, this is no worse
// than sequential point reads, but may benefit from current and future optimizations.
// Items that do not exist in the container will be silently skipped and not included in the response.
// ctx - The context for the request.
// itemIdentities - The identities (ID and partition key) of the items to read.
// options - Options for the operation.
func (c *ContainerClient) ReadMany(
ctx context.Context,
itemIdentities []ItemIdentity,
options *ReadManyOptions) (ReadManyResponse, error) {
if options == nil {
options = &ReadManyOptions{}
}

items := make([][]byte, 0, len(itemIdentities))
var totalCharge float32

for _, identity := range itemIdentities {
itemOptions := &ItemOptions{
SessionToken: options.SessionToken,
ConsistencyLevel: options.ConsistencyLevel,
DedicatedGatewayRequestOptions: options.DedicatedGatewayRequestOptions,
}
response, err := c.ReadItem(ctx, identity.PartitionKey, identity.ID, itemOptions)
if err != nil {
var responseErr *azcore.ResponseError
if errors.As(err, &responseErr) && responseErr.StatusCode == http.StatusNotFound {
continue
}
return ReadManyResponse{}, err
}
items = append(items, response.Value)
totalCharge += response.RequestCharge
}

return ReadManyResponse{
Items: items,
TotalRequestCharge: totalCharge,
}, nil
}
136 changes: 136 additions & 0 deletions sdk/data/azcosmos/emulator_cosmos_item_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -728,3 +728,139 @@ func verifyEncodingScenarioResponse(t *testing.T, name string, itemResponse Item
t.Fatalf("[%s] Expected status code %d, got %d", name, expectedStatus, itemResponse.RawResponse.StatusCode)
}
}

func TestReadMany(t *testing.T) {
emulatorTests := newEmulatorTests(t)
client := emulatorTests.getClient(t, newSpanValidator(t, &spanMatcher{
ExpectedSpans: []string{},
}))

database := emulatorTests.createDatabase(t, context.TODO(), client, "readMany")
defer emulatorTests.deleteDatabase(t, context.TODO(), database)
properties := ContainerProperties{
ID: "aContainer",
PartitionKeyDefinition: PartitionKeyDefinition{
Paths: []string{"/pk"},
},
}

_, err := database.CreateContainer(context.TODO(), properties, nil)
if err != nil {
t.Fatalf("Failed to create container: %v", err)
}

container, _ := database.NewContainer("aContainer")

item1 := map[string]interface{}{
"id": "1",
"pk": "pk1",
"value": "first",
}

item2 := map[string]interface{}{
"id": "2",
"pk": "pk2",
"value": "second",
}

item3 := map[string]interface{}{
"id": "3",
"pk": "pk3",
"value": "third",
}

item4 := map[string]interface{}{
"id": "4",
"pk": "pk4",
"value": "fourth",
}

pk1 := NewPartitionKeyString("pk1")
pk2 := NewPartitionKeyString("pk2")
pk3 := NewPartitionKeyString("pk3")
pk4 := NewPartitionKeyString("pk4")

marshalled1, err := json.Marshal(item1)
if err != nil {
t.Fatal(err)
}
marshalled2, err := json.Marshal(item2)
if err != nil {
t.Fatal(err)
}
marshalled3, err := json.Marshal(item3)
if err != nil {
t.Fatal(err)
}
marshalled4, err := json.Marshal(item4)
if err != nil {
t.Fatal(err)
}

_, err = container.CreateItem(context.TODO(), pk1, marshalled1, nil)
if err != nil {
t.Fatalf("Failed to create item1: %v", err)
}

_, err = container.CreateItem(context.TODO(), pk2, marshalled2, nil)
if err != nil {
t.Fatalf("Failed to create item2: %v", err)
}

_, err = container.CreateItem(context.TODO(), pk3, marshalled3, nil)
if err != nil {
t.Fatalf("Failed to create item3: %v", err)
}

_, err = container.CreateItem(context.TODO(), pk4, marshalled4, nil)
if err != nil {
t.Fatalf("Failed to create item4: %v", err)
}

identities := []ItemIdentity{
{ID: "1", PartitionKey: pk1},
{ID: "2", PartitionKey: pk2},
{ID: "3", PartitionKey: pk3},
{ID: "nonexistent", PartitionKey: NewPartitionKeyString("pkNonExistent")},
}

response, err := container.ReadMany(context.TODO(), identities, nil)
if err != nil {
t.Fatalf("Failed to read many items: %v", err)
}

if len(response.Items) != 3 {
t.Fatalf("Expected 3 items, got %d", len(response.Items))
}

if response.TotalRequestCharge <= 0 {
t.Fatalf("Expected positive request charge, got %f", response.TotalRequestCharge)
}

var result1 map[string]interface{}
err = json.Unmarshal(response.Items[0], &result1)
if err != nil {
t.Fatalf("Failed to unmarshal item 1: %v", err)
}
if result1["id"] != "1" || result1["value"] != "first" {
t.Fatalf("Item 1 mismatch: %v", result1)
}

var result2 map[string]interface{}
err = json.Unmarshal(response.Items[1], &result2)
if err != nil {
t.Fatalf("Failed to unmarshal item 2: %v", err)
}
if result2["id"] != "2" || result2["value"] != "second" {
t.Fatalf("Item 2 mismatch: %v", result2)
}

var result3 map[string]interface{}
err = json.Unmarshal(response.Items[2], &result3)
if err != nil {
t.Fatalf("Failed to unmarshal item 3: %v", err)
}
if result3["id"] != "3" || result3["value"] != "third" {
t.Fatalf("Item 3 mismatch: %v", result3)
}
}