diff --git a/go/go.mod b/go/go.mod index 31d2386..152f4ff 100644 --- a/go/go.mod +++ b/go/go.mod @@ -3,6 +3,7 @@ module github.com/helius-labs/laserstream-sdk/go go 1.25.1 require ( + github.com/google/go-cmp v0.6.0 github.com/google/uuid v1.6.0 github.com/joho/godotenv v1.5.1 github.com/rpcpool/yellowstone-grpc/examples/golang v0.0.0-20250409203454-bb3a44a2f723 diff --git a/go/laserstream.go b/go/laserstream.go index 97b2391..515ef68 100644 --- a/go/laserstream.go +++ b/go/laserstream.go @@ -216,6 +216,13 @@ func (c *Client) Write(req *SubscribeRequest) error { } c.mu.RUnlock() + // Persist the update so reconnects reuse the latest effective subscription. + c.mu.Lock() + if c.originalRequest != nil { + mergeSubscribeRequests(c.originalRequest, req, c.internalSlotSubID) + } + c.mu.Unlock() + select { case c.writeChan <- req: return nil @@ -224,6 +231,102 @@ func (c *Client) Write(req *SubscribeRequest) error { } } +// mergeSubscribeRequests merges an in-flight stream-write update into the cached +// request used for subsequent reconnects. It keeps the internal slot tracker intact. +func mergeSubscribeRequests(base, update *SubscribeRequest, internalSlotID string) { + if base == nil || update == nil { + return + } + + var internalSlot *SubscribeRequestFilterSlots + if internalSlotID != "" && base.Slots != nil { + internalSlot = base.Slots[internalSlotID] + } + + if len(update.Accounts) > 0 { + if base.Accounts == nil { + base.Accounts = make(map[string]*SubscribeRequestFilterAccounts) + } + for k, v := range update.Accounts { + base.Accounts[k] = v + } + } + + if len(update.Slots) > 0 { + if base.Slots == nil { + base.Slots = make(map[string]*SubscribeRequestFilterSlots) + } + for k, v := range update.Slots { + base.Slots[k] = v + } + } + + if len(update.Transactions) > 0 { + if base.Transactions == nil { + base.Transactions = make(map[string]*SubscribeRequestFilterTransactions) + } + for k, v := range update.Transactions { + base.Transactions[k] = v + } + } + + if len(update.TransactionsStatus) > 0 { + if base.TransactionsStatus == nil { + base.TransactionsStatus = make(map[string]*SubscribeRequestFilterTransactions) + } + for k, v := range update.TransactionsStatus { + base.TransactionsStatus[k] = v + } + } + + if len(update.Blocks) > 0 { + if base.Blocks == nil { + base.Blocks = make(map[string]*SubscribeRequestFilterBlocks) + } + for k, v := range update.Blocks { + base.Blocks[k] = v + } + } + + if len(update.BlocksMeta) > 0 { + if base.BlocksMeta == nil { + base.BlocksMeta = make(map[string]*SubscribeRequestFilterBlocksMeta) + } + for k, v := range update.BlocksMeta { + base.BlocksMeta[k] = v + } + } + + if len(update.Entry) > 0 { + if base.Entry == nil { + base.Entry = make(map[string]*SubscribeRequestFilterEntry) + } + for k, v := range update.Entry { + base.Entry[k] = v + } + } + + if len(update.AccountsDataSlice) > 0 { + base.AccountsDataSlice = append(base.AccountsDataSlice, update.AccountsDataSlice...) + } + + if update.Commitment != nil { + base.Commitment = update.Commitment + } + + if update.FromSlot != nil { + base.FromSlot = update.FromSlot + } + + // Re-apply the internal slot tracker after the merge so it never gets dropped. + if internalSlotID != "" && internalSlot != nil { + if base.Slots == nil { + base.Slots = make(map[string]*SubscribeRequestFilterSlots) + } + base.Slots[internalSlotID] = internalSlot + } +} + // Close terminates the subscription and closes the connection. func (c *Client) Close() { c.mu.Lock() @@ -376,11 +479,11 @@ func (c *Client) handleStream(ctx context.Context, stream pb.Geyser_SubscribeCli // Start periodic ping goroutine pingCtx, cancelPing := context.WithCancel(ctx) defer cancelPing() - + go func() { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() - + for { select { case <-pingCtx.Done(): diff --git a/go/laserstream_test.go b/go/laserstream_test.go new file mode 100644 index 0000000..8ac2570 --- /dev/null +++ b/go/laserstream_test.go @@ -0,0 +1,354 @@ +package laserstream + +import ( + "fmt" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + pb "github.com/rpcpool/yellowstone-grpc/examples/golang/proto" + gproto "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/testing/protocmp" +) + +var cmpOpts = []cmp.Option{ + protocmp.Transform(), + cmpopts.EquateEmpty(), +} + +func boolPtr(v bool) *bool { return &v } + +func commitmentPtr(c CommitmentLevel) *CommitmentLevel { return &c } + +func slotFilter(commit, interslot bool) *SubscribeRequestFilterSlots { + return &SubscribeRequestFilterSlots{ + FilterByCommitment: boolPtr(commit), + InterslotUpdates: boolPtr(interslot), + } +} + +func accountFilter(tag string) *SubscribeRequestFilterAccounts { + return &SubscribeRequestFilterAccounts{ + Account: []string{"acct-" + tag}, + Owner: []string{"owner-" + tag}, + NonemptyTxnSignature: boolPtr(len(tag)%2 == 0), + } +} + +func txnFilter(tag string) *SubscribeRequestFilterTransactions { + return &SubscribeRequestFilterTransactions{ + Vote: boolPtr(len(tag)%2 == 0), + Failed: boolPtr(len(tag)%2 == 1), + Signature: gproto.String("sig-" + tag), + AccountInclude: []string{"inc-" + tag}, + AccountExclude: []string{"exc-" + tag}, + AccountRequired: []string{"req-" + tag}, + } +} + +func blockFilter(tag string) *SubscribeRequestFilterBlocks { + return &SubscribeRequestFilterBlocks{ + AccountInclude: []string{"block-" + tag}, + IncludeTransactions: boolPtr(true), + IncludeAccounts: boolPtr(false), + IncludeEntries: boolPtr(len(tag)%2 == 0), + } +} + +func dataSlice(offset, length uint64) *SubscribeRequestAccountsDataSlice { + return &SubscribeRequestAccountsDataSlice{Offset: offset, Length: length} +} + +func cloneReq(req *SubscribeRequest) *SubscribeRequest { + if req == nil { + return nil + } + msg := gproto.Clone((*pb.SubscribeRequest)(req)) + return (*SubscribeRequest)(msg.(*pb.SubscribeRequest)) +} + +func TestMergeSubscribeRequests(t *testing.T) { + t.Run("nil base is safe", func(t *testing.T) { + update := &SubscribeRequest{Accounts: map[string]*SubscribeRequestFilterAccounts{"a": accountFilter("1")}} + mergeSubscribeRequests(nil, update, "internal") + }) + + t.Run("nil update is safe", func(t *testing.T) { + base := &SubscribeRequest{Accounts: map[string]*SubscribeRequestFilterAccounts{"a": accountFilter("1")}} + original := cloneReq(base) + mergeSubscribeRequests(base, nil, "internal") + if diff := cmp.Diff(original, base, cmpOpts...); diff != "" { + t.Fatalf("base changed unexpectedly (-want +got):\n%s", diff) + } + }) + + t.Run("empty update no-op", func(t *testing.T) { + base := &SubscribeRequest{ + Accounts: map[string]*SubscribeRequestFilterAccounts{"a": accountFilter("base")}, + Commitment: commitmentPtr(CommitmentLevel_PROCESSED), + FromSlot: gproto.Uint64(10), + } + original := cloneReq(base) + + mergeSubscribeRequests(base, &SubscribeRequest{}, "internal") + if diff := cmp.Diff(original, base, cmpOpts...); diff != "" { + t.Fatalf("expected no change (-want +got):\n%s", diff) + } + }) + + t.Run("accounts additive and override", func(t *testing.T) { + base := &SubscribeRequest{Accounts: map[string]*SubscribeRequestFilterAccounts{"a": accountFilter("base")}} + update := &SubscribeRequest{Accounts: map[string]*SubscribeRequestFilterAccounts{"a": accountFilter("new"), "b": accountFilter("extra")}} + expected := &SubscribeRequest{Accounts: map[string]*SubscribeRequestFilterAccounts{"a": accountFilter("new"), "b": accountFilter("extra")}} + + mergeSubscribeRequests(base, update, "internal") + if diff := cmp.Diff(expected, base, cmpOpts...); diff != "" { + t.Fatalf("accounts not merged (-want +got):\n%s", diff) + } + }) + + t.Run("slots additive and preserve internal", func(t *testing.T) { + internal := "slot-internal" + base := &SubscribeRequest{Slots: map[string]*SubscribeRequestFilterSlots{ + internal: slotFilter(true, false), + "keep": slotFilter(false, false), + }} + update := &SubscribeRequest{Slots: map[string]*SubscribeRequestFilterSlots{ + "new": slotFilter(false, true), + }} + + expected := cloneReq(base) + expected.Slots["new"] = slotFilter(false, true) + + mergeSubscribeRequests(base, update, internal) + if diff := cmp.Diff(expected, base, cmpOpts...); diff != "" { + t.Fatalf("slots not merged (-want +got):\n%s", diff) + } + }) + + t.Run("internal slot preserved when overwritten", func(t *testing.T) { + internal := "slot-internal" + orig := slotFilter(true, false) + base := &SubscribeRequest{Slots: map[string]*SubscribeRequestFilterSlots{ + internal: orig, + }} + update := &SubscribeRequest{Slots: map[string]*SubscribeRequestFilterSlots{ + internal: slotFilter(false, true), + "other": slotFilter(false, false), + }} + + mergeSubscribeRequests(base, update, internal) + if diff := cmp.Diff(orig, base.Slots[internal], cmpOpts...); diff != "" { + t.Fatalf("internal slot changed (-want +got):\n%s", diff) + } + if base.Slots["other"] == nil { + t.Fatalf("expected additional slot from update") + } + }) + + t.Run("internal slot not preserved when id empty", func(t *testing.T) { + base := &SubscribeRequest{Slots: map[string]*SubscribeRequestFilterSlots{ + "internal": slotFilter(true, false), + }} + update := &SubscribeRequest{Slots: map[string]*SubscribeRequestFilterSlots{ + "internal": slotFilter(false, true), + }} + + mergeSubscribeRequests(base, update, "") + if diff := cmp.Diff(update.Slots["internal"], base.Slots["internal"], cmpOpts...); diff != "" { + t.Fatalf("expected update to win when id empty (-want +got):\n%s", diff) + } + }) + + t.Run("slots map created when nil", func(t *testing.T) { + base := &SubscribeRequest{} + update := &SubscribeRequest{Slots: map[string]*SubscribeRequestFilterSlots{"a": slotFilter(true, true)}} + + mergeSubscribeRequests(base, update, "internal") + if diff := cmp.Diff(update.Slots, base.Slots, cmpOpts...); diff != "" { + t.Fatalf("slots not merged (-want +got):\n%s", diff) + } + }) + + t.Run("transactions additive", func(t *testing.T) { + base := &SubscribeRequest{Transactions: map[string]*SubscribeRequestFilterTransactions{"a": txnFilter("base")}} + update := &SubscribeRequest{Transactions: map[string]*SubscribeRequestFilterTransactions{"b": txnFilter("new")}} + expected := &SubscribeRequest{Transactions: map[string]*SubscribeRequestFilterTransactions{"a": txnFilter("base"), "b": txnFilter("new")}} + + mergeSubscribeRequests(base, update, "internal") + if diff := cmp.Diff(expected, base, cmpOpts...); diff != "" { + t.Fatalf("transactions not merged (-want +got):\n%s", diff) + } + }) + + t.Run("transactions status additive", func(t *testing.T) { + base := &SubscribeRequest{TransactionsStatus: map[string]*SubscribeRequestFilterTransactions{"a": txnFilter("base")}} + update := &SubscribeRequest{TransactionsStatus: map[string]*SubscribeRequestFilterTransactions{"a": txnFilter("new"), "b": txnFilter("extra")}} + expected := &SubscribeRequest{TransactionsStatus: map[string]*SubscribeRequestFilterTransactions{"a": txnFilter("new"), "b": txnFilter("extra")}} + + mergeSubscribeRequests(base, update, "internal") + if diff := cmp.Diff(expected, base, cmpOpts...); diff != "" { + t.Fatalf("transactions status not merged (-want +got):\n%s", diff) + } + }) + + t.Run("blocks additive", func(t *testing.T) { + base := &SubscribeRequest{Blocks: map[string]*SubscribeRequestFilterBlocks{"a": blockFilter("base")}} + update := &SubscribeRequest{Blocks: map[string]*SubscribeRequestFilterBlocks{"b": blockFilter("new")}} + expected := &SubscribeRequest{Blocks: map[string]*SubscribeRequestFilterBlocks{"a": blockFilter("base"), "b": blockFilter("new")}} + + mergeSubscribeRequests(base, update, "internal") + if diff := cmp.Diff(expected, base, cmpOpts...); diff != "" { + t.Fatalf("blocks not merged (-want +got):\n%s", diff) + } + }) + + t.Run("blocks meta additive", func(t *testing.T) { + base := &SubscribeRequest{BlocksMeta: map[string]*SubscribeRequestFilterBlocksMeta{"a": {}}} + update := &SubscribeRequest{BlocksMeta: map[string]*SubscribeRequestFilterBlocksMeta{"b": {}}} + expected := &SubscribeRequest{BlocksMeta: map[string]*SubscribeRequestFilterBlocksMeta{"a": {}, "b": {}}} + + mergeSubscribeRequests(base, update, "internal") + if diff := cmp.Diff(expected, base, cmpOpts...); diff != "" { + t.Fatalf("blocks meta not merged (-want +got):\n%s", diff) + } + }) + + t.Run("entry additive", func(t *testing.T) { + base := &SubscribeRequest{Entry: map[string]*SubscribeRequestFilterEntry{"a": {}}} + update := &SubscribeRequest{Entry: map[string]*SubscribeRequestFilterEntry{"b": {}}} + expected := &SubscribeRequest{Entry: map[string]*SubscribeRequestFilterEntry{"a": {}, "b": {}}} + + mergeSubscribeRequests(base, update, "internal") + if diff := cmp.Diff(expected, base, cmpOpts...); diff != "" { + t.Fatalf("entry not merged (-want +got):\n%s", diff) + } + }) + + t.Run("accounts data slice appended", func(t *testing.T) { + base := &SubscribeRequest{AccountsDataSlice: []*SubscribeRequestAccountsDataSlice{dataSlice(1, 2)}} + update := &SubscribeRequest{AccountsDataSlice: []*SubscribeRequestAccountsDataSlice{dataSlice(3, 4), dataSlice(5, 6)}} + expected := &SubscribeRequest{AccountsDataSlice: []*SubscribeRequestAccountsDataSlice{dataSlice(1, 2), dataSlice(3, 4), dataSlice(5, 6)}} + + mergeSubscribeRequests(base, update, "internal") + if diff := cmp.Diff(expected, base, cmpOpts...); diff != "" { + t.Fatalf("accounts data slice not appended (-want +got):\n%s", diff) + } + }) + + t.Run("commitment override none to some", func(t *testing.T) { + base := &SubscribeRequest{} + update := &SubscribeRequest{Commitment: commitmentPtr(CommitmentLevel_CONFIRMED)} + + mergeSubscribeRequests(base, update, "internal") + if diff := cmp.Diff(update.Commitment, base.Commitment, cmpOpts...); diff != "" { + t.Fatalf("commitment not overridden (-want +got):\n%s", diff) + } + }) + + t.Run("commitment override some to some", func(t *testing.T) { + base := &SubscribeRequest{Commitment: commitmentPtr(CommitmentLevel_PROCESSED)} + update := &SubscribeRequest{Commitment: commitmentPtr(CommitmentLevel_FINALIZED)} + + mergeSubscribeRequests(base, update, "internal") + if diff := cmp.Diff(update.Commitment, base.Commitment, cmpOpts...); diff != "" { + t.Fatalf("commitment not replaced (-want +got):\n%s", diff) + } + }) + + t.Run("commitment unchanged when absent", func(t *testing.T) { + base := &SubscribeRequest{Commitment: commitmentPtr(CommitmentLevel_FINALIZED)} + update := &SubscribeRequest{} + mergeSubscribeRequests(base, update, "internal") + if diff := cmp.Diff(commitmentPtr(CommitmentLevel_FINALIZED), base.Commitment, cmpOpts...); diff != "" { + t.Fatalf("commitment unexpectedly changed (-want +got):\n%s", diff) + } + }) + + t.Run("from slot override none to some", func(t *testing.T) { + base := &SubscribeRequest{} + update := &SubscribeRequest{FromSlot: gproto.Uint64(42)} + + mergeSubscribeRequests(base, update, "internal") + if diff := cmp.Diff(update.FromSlot, base.FromSlot, cmpOpts...); diff != "" { + t.Fatalf("from_slot not overridden (-want +got):\n%s", diff) + } + }) + + t.Run("from slot override some to some", func(t *testing.T) { + base := &SubscribeRequest{FromSlot: gproto.Uint64(7)} + update := &SubscribeRequest{FromSlot: gproto.Uint64(99)} + + mergeSubscribeRequests(base, update, "internal") + if diff := cmp.Diff(update.FromSlot, base.FromSlot, cmpOpts...); diff != "" { + t.Fatalf("from_slot not replaced (-want +got):\n%s", diff) + } + }) + + t.Run("from slot unchanged when absent", func(t *testing.T) { + base := &SubscribeRequest{FromSlot: gproto.Uint64(15)} + update := &SubscribeRequest{} + mergeSubscribeRequests(base, update, "internal") + if diff := cmp.Diff(gproto.Uint64(15), base.FromSlot, cmpOpts...); diff != "" { + t.Fatalf("from_slot unexpectedly changed (-want +got):\n%s", diff) + } + }) + + t.Run("large map merge with internal preserved", func(t *testing.T) { + internal := "slot-internal" + base := &SubscribeRequest{Slots: map[string]*SubscribeRequestFilterSlots{internal: slotFilter(true, false)}} + for i := 0; i < 120; i++ { + key := fmt.Sprintf("slot-base-%d", i) + base.Slots[key] = slotFilter(i%2 == 0, i%3 == 0) + } + update := &SubscribeRequest{Slots: map[string]*SubscribeRequestFilterSlots{}} + for i := 50; i < 170; i++ { + key := fmt.Sprintf("slot-upd-%d", i) + update.Slots[key] = slotFilter(i%2 == 1, i%3 == 1) + } + update.Slots[internal] = slotFilter(false, true) + + mergeSubscribeRequests(base, update, internal) + if base.Slots[internal].GetFilterByCommitment() != true { + t.Fatalf("internal slot not preserved") + } + if got := len(base.Slots); got != 1+120+120 { // internal + originals + updates + t.Fatalf("unexpected slot count: %d", got) + } + }) + + t.Run("base only internal slot survives conflicting update", func(t *testing.T) { + internal := "slot-internal" + base := &SubscribeRequest{Slots: map[string]*SubscribeRequestFilterSlots{internal: slotFilter(true, false)}} + update := &SubscribeRequest{Slots: map[string]*SubscribeRequestFilterSlots{internal: slotFilter(false, true)}} + + mergeSubscribeRequests(base, update, internal) + if diff := cmp.Diff(slotFilter(true, false), base.Slots[internal], cmpOpts...); diff != "" { + t.Fatalf("internal slot lost (-want +got):\n%s", diff) + } + }) + + t.Run("mixed fields merge together", func(t *testing.T) { + internal := "slot-internal" + base := &SubscribeRequest{ + Accounts: map[string]*SubscribeRequestFilterAccounts{"a": accountFilter("base")}, + Slots: map[string]*SubscribeRequestFilterSlots{internal: slotFilter(true, false)}, + Blocks: map[string]*SubscribeRequestFilterBlocks{"b": blockFilter("base")}, + } + update := &SubscribeRequest{ + Accounts: map[string]*SubscribeRequestFilterAccounts{"c": accountFilter("new")}, + Slots: map[string]*SubscribeRequestFilterSlots{"user": slotFilter(false, true)}, + Blocks: map[string]*SubscribeRequestFilterBlocks{"b": blockFilter("new")}, + } + expected := &SubscribeRequest{ + Accounts: map[string]*SubscribeRequestFilterAccounts{"a": accountFilter("base"), "c": accountFilter("new")}, + Slots: map[string]*SubscribeRequestFilterSlots{internal: slotFilter(true, false), "user": slotFilter(false, true)}, + Blocks: map[string]*SubscribeRequestFilterBlocks{"b": blockFilter("new")}, + } + + mergeSubscribeRequests(base, update, internal) + if diff := cmp.Diff(expected, base, cmpOpts...); diff != "" { + t.Fatalf("mixed merge failed (-want +got):\n%s", diff) + } + }) +} \ No newline at end of file diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 3f7cdc3..582c589 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -336,6 +336,21 @@ dependencies = [ "serde", ] +[[package]] +name = "bit-set" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08807e080ed7f9d5433fa9b275196cfc35414f66a0c79d864dc51a0d825231a3" +dependencies = [ + "bit-vec", +] + +[[package]] +name = "bit-vec" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e764a1d40d510daf35e07be9eb06e75770908c27d411ee6c92109c9840eaaf7" + [[package]] name = "bitflags" version = "1.3.2" @@ -1122,6 +1137,7 @@ dependencies = [ "futures", "futures-channel", "futures-util", + "proptest", "prost 0.12.6", "prost-types 0.12.6", "rand 0.8.5", @@ -2048,6 +2064,25 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "proptest" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bee689443a2bd0a16ab0348b52ee43e3b2d1b1f931c8aa5c9f8de4c86fbe8c40" +dependencies = [ + "bit-set", + "bit-vec", + "bitflags 2.9.4", + "num-traits", + "rand 0.9.2", + "rand_chacha 0.9.0", + "rand_xorshift", + "regex-syntax", + "rusty-fork", + "tempfile", + "unarray", +] + [[package]] name = "prost" version = "0.12.6" @@ -2202,6 +2237,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "quick-error" +version = "1.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" + [[package]] name = "quote" version = "1.0.41" @@ -2241,6 +2282,16 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "rand" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1" +dependencies = [ + "rand_chacha 0.9.0", + "rand_core 0.9.3", +] + [[package]] name = "rand_chacha" version = "0.2.2" @@ -2261,6 +2312,16 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "rand_chacha" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" +dependencies = [ + "ppv-lite86", + "rand_core 0.9.3", +] + [[package]] name = "rand_core" version = "0.5.1" @@ -2279,6 +2340,15 @@ dependencies = [ "getrandom 0.2.16", ] +[[package]] +name = "rand_core" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38" +dependencies = [ + "getrandom 0.3.3", +] + [[package]] name = "rand_hc" version = "0.2.0" @@ -2288,6 +2358,15 @@ dependencies = [ "rand_core 0.5.1", ] +[[package]] +name = "rand_xorshift" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "513962919efc330f829edb2535844d1b912b0fbe2ca165d613e4e8788bb05a5a" +dependencies = [ + "rand_core 0.9.3", +] + [[package]] name = "redox_syscall" version = "0.5.17" @@ -2507,6 +2586,18 @@ version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" +[[package]] +name = "rusty-fork" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc6bf79ff24e648f6da1f8d1f011e9cac26491b619e6b9280f2b47f1774e6ee2" +dependencies = [ + "fnv", + "quick-error", + "tempfile", + "wait-timeout", +] + [[package]] name = "ryu" version = "1.0.20" @@ -4788,6 +4879,12 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb" +[[package]] +name = "unarray" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eaea85b334db583fe3274d12b4cd1880032beab409c0d774be044d4480ab9a94" + [[package]] name = "unicase" version = "2.8.1" @@ -4873,6 +4970,15 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "wait-timeout" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09ac3b126d3914f9849036f826e054cbabdc8519970b8998ddaf3b5bd3c65f11" +dependencies = [ + "libc", +] + [[package]] name = "want" version = "0.3.1" diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 7909abb..e65c528 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -38,6 +38,7 @@ sha2 = "0.10" [dev-dependencies] dotenv = "0.15" +proptest = "1.6" [build-dependencies] tonic-build = "0.10.2" diff --git a/rust/src/client.rs b/rust/src/client.rs index c57ab21..3f7f279 100644 --- a/rust/src/client.rs +++ b/rust/src/client.rs @@ -1,4 +1,6 @@ -use crate::{LaserstreamConfig, LaserstreamError, config::CompressionEncoding as ConfigCompressionEncoding}; +use crate::{ + config::CompressionEncoding as ConfigCompressionEncoding, LaserstreamConfig, LaserstreamError, +}; use async_stream::stream; use futures::StreamExt; use futures_channel::mpsc as futures_mpsc; @@ -6,17 +8,17 @@ use futures_util::{sink::SinkExt, Stream}; use std::{pin::Pin, time::Duration}; use tokio::sync::mpsc; use tokio::time::sleep; -use yellowstone_grpc_proto::tonic::{ - Status, Request, metadata::MetadataValue, transport::Endpoint, codec::CompressionEncoding, -}; use tracing::{error, instrument, warn}; use uuid; use yellowstone_grpc_client::{ClientTlsConfig, Interceptor}; -use yellowstone_grpc_proto::prelude::{geyser_client::GeyserClient}; use yellowstone_grpc_proto::geyser::{ subscribe_update::UpdateOneof, SubscribeRequest, SubscribeRequestFilterSlots, SubscribeRequestPing, SubscribeUpdate, }; +use yellowstone_grpc_proto::prelude::geyser_client::GeyserClient; +use yellowstone_grpc_proto::tonic::{ + codec::CompressionEncoding, metadata::MetadataValue, transport::Endpoint, Request, Status, +}; const HARD_CAP_RECONNECT_ATTEMPTS: u32 = (20 * 60) / 5; // 20 mins / 5 sec interval const FIXED_RECONNECT_INTERVAL_MS: u64 = 5000; // 5 seconds fixed interval @@ -32,9 +34,11 @@ struct SdkMetadataInterceptor { impl SdkMetadataInterceptor { fn new(api_key: String) -> Result { let x_token = if !api_key.is_empty() { - Some(api_key.parse().map_err(|e| { - Status::invalid_argument(format!("Invalid API key: {}", e)) - })?) + Some( + api_key + .parse() + .map_err(|e| Status::invalid_argument(format!("Invalid API key: {}", e)))?, + ) } else { None }; @@ -50,8 +54,12 @@ impl Interceptor for SdkMetadataInterceptor { } // Add SDK metadata headers - request.metadata_mut().insert("x-sdk-name", MetadataValue::from_static(SDK_NAME)); - request.metadata_mut().insert("x-sdk-version", MetadataValue::from_static(SDK_VERSION)); + request + .metadata_mut() + .insert("x-sdk-name", MetadataValue::from_static(SDK_NAME)); + request + .metadata_mut() + .insert("x-sdk-version", MetadataValue::from_static(SDK_VERSION)); Ok(request) } @@ -72,6 +80,61 @@ impl StreamHandle { } } +/// Merge a stream-write update into the cached request that will be reused on reconnect. +fn merge_subscribe_requests( + base: &mut SubscribeRequest, + update: &SubscribeRequest, + internal_slot_sub_id: &str, + replay_enabled: bool, +) { + // Preserve the internal slot subscription (if present) so we don't drop it during merges. + let internal_slot_entry = if replay_enabled && !internal_slot_sub_id.is_empty() { + base.slots.get(internal_slot_sub_id).cloned() + } else { + None + }; + + if !update.accounts.is_empty() { + base.accounts.extend(update.accounts.clone()); + } + if !update.slots.is_empty() { + base.slots.extend(update.slots.clone()); + } + if !update.transactions.is_empty() { + base.transactions.extend(update.transactions.clone()); + } + if !update.transactions_status.is_empty() { + base.transactions_status + .extend(update.transactions_status.clone()); + } + if !update.blocks.is_empty() { + base.blocks.extend(update.blocks.clone()); + } + if !update.blocks_meta.is_empty() { + base.blocks_meta.extend(update.blocks_meta.clone()); + } + if !update.entry.is_empty() { + base.entry.extend(update.entry.clone()); + } + if !update.accounts_data_slice.is_empty() { + base.accounts_data_slice + .extend_from_slice(&update.accounts_data_slice); + } + + if let Some(commitment) = update.commitment { + base.commitment = Some(commitment); + } + if let Some(from_slot) = update.from_slot { + base.from_slot = Some(from_slot); + } + + // Re-apply the internal slot subscription after merging to ensure it survives overwrites. + if let Some(slot_entry) = internal_slot_entry { + base.slots + .insert(internal_slot_sub_id.to_string(), slot_entry); + } +} + /// Establishes a gRPC connection, handles the subscription lifecycle, /// and provides a stream of updates. Automatically reconnects on failure. #[instrument(skip(config, request))] @@ -97,10 +160,10 @@ pub fn subscribe( // Keep original request for reconnection attempts let mut current_request = request.clone(); let internal_slot_sub_id = format!("internal-{}", uuid::Uuid::new_v4().to_string().split('-').next().unwrap()); - + // Get replay behavior from config let replay_enabled = config.replay; - + // Add internal slot subscription only when replay is enabled if replay_enabled { current_request.slots.insert( @@ -111,13 +174,13 @@ pub fn subscribe( } ); } - + // Clear any user-provided from_slot if replay is disabled if !replay_enabled { current_request.from_slot = None; } - let api_key_string = config.api_key.clone(); + let api_key_string = config.api_key.clone(); loop { @@ -132,7 +195,7 @@ pub fn subscribe( 1 | 2 => tracked_slot, // CONFIRMED/FINALIZED: exact slot _ => tracked_slot.saturating_sub(31), // Unknown: default to safe behavior }; - + attempt_request.from_slot = Some(from_slot); } else if !replay_enabled { // Ensure from_slot is always None when replay is disabled @@ -170,7 +233,7 @@ pub fn subscribe( if let Some(result) = result { match result { Ok(update) => { - + // Handle ping/pong if matches!(&update.update_oneof, Some(UpdateOneof::Ping(_))) { let pong_req = SubscribeRequest { ping: Some(SubscribeRequestPing { id: 1 }), ..Default::default() }; @@ -180,7 +243,7 @@ pub fn subscribe( } continue; } - + // Do not forward server 'Pong' updates to consumers either if matches!(&update.update_oneof, Some(UpdateOneof::Pong(_))) { continue; @@ -191,7 +254,7 @@ pub fn subscribe( if replay_enabled { tracked_slot = s.slot; } - + // Skip if this slot update is EXCLUSIVELY from our internal subscription if update.filters.len() == 1 && update.filters.contains(&internal_slot_sub_id) { continue; @@ -202,7 +265,7 @@ pub fn subscribe( let mut clean_update = update; if replay_enabled { clean_update.filters.retain(|f| f != &internal_slot_sub_id); - + // Only yield if there are still filters after cleaning if !clean_update.filters.is_empty() { yield Ok(clean_update); @@ -224,9 +287,12 @@ pub fn subscribe( break; } } - + // Handle write requests from the user Some(write_request) = write_rx.recv() => { + // Persist the update so future reconnects reuse the latest subscription + merge_subscribe_requests(&mut current_request, &write_request, &internal_slot_sub_id, replay_enabled); + if let Err(e) = sender.send(write_request).await { warn!(error = %e, "Failed to send write request"); break; @@ -259,7 +325,7 @@ pub fn subscribe( sleep(delay).await; } }; - + (update_stream, handle) } @@ -283,13 +349,23 @@ async fn connect_and_subscribe_once( // Build endpoint with all options let mut endpoint = Endpoint::from_shared(config.endpoint.clone()) .map_err(|e| Status::internal(format!("Failed to parse endpoint: {}", e)))? - .connect_timeout(Duration::from_secs(options.connect_timeout_secs.unwrap_or(10))) + .connect_timeout(Duration::from_secs( + options.connect_timeout_secs.unwrap_or(10), + )) .timeout(Duration::from_secs(options.timeout_secs.unwrap_or(30))) - .http2_keep_alive_interval(Duration::from_secs(options.http2_keep_alive_interval_secs.unwrap_or(30))) - .keep_alive_timeout(Duration::from_secs(options.keep_alive_timeout_secs.unwrap_or(5))) + .http2_keep_alive_interval(Duration::from_secs( + options.http2_keep_alive_interval_secs.unwrap_or(30), + )) + .keep_alive_timeout(Duration::from_secs( + options.keep_alive_timeout_secs.unwrap_or(5), + )) .keep_alive_while_idle(options.keep_alive_while_idle.unwrap_or(true)) .initial_stream_window_size(options.initial_stream_window_size.or(Some(1024 * 1024 * 4))) - .initial_connection_window_size(options.initial_connection_window_size.or(Some(1024 * 1024 * 8))) + .initial_connection_window_size( + options + .initial_connection_window_size + .or(Some(1024 * 1024 * 8)), + ) .http2_adaptive_window(options.http2_adaptive_window.unwrap_or(true)) .tcp_nodelay(options.tcp_nodelay.unwrap_or(true)) .buffer_size(options.buffer_size.or(Some(1024 * 64))); @@ -356,3 +432,431 @@ async fn connect_and_subscribe_once( Ok((subscribe_tx, response.into_inner())) } + +#[cfg(test)] +mod tests { + use super::*; + use proptest::prelude::*; + use yellowstone_grpc_proto::geyser::{ + SubscribeRequest, SubscribeRequestAccountsDataSlice, SubscribeRequestFilterAccounts, + SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta, + SubscribeRequestFilterEntry, SubscribeRequestFilterSlots, SubscribeRequestFilterTransactions, + }; + + fn slot_filter(commit: bool, interslot: bool) -> SubscribeRequestFilterSlots { + SubscribeRequestFilterSlots { + filter_by_commitment: Some(commit), + interslot_updates: Some(interslot), + ..Default::default() + } + } + + fn account_filter(tag: &str) -> SubscribeRequestFilterAccounts { + SubscribeRequestFilterAccounts { + account: vec![format!("acct-{tag}")], + owner: vec![format!("owner-{tag}")], + nonempty_txn_signature: Some(tag.len() % 2 == 0), + ..Default::default() + } + } + + fn transaction_filter(tag: &str) -> SubscribeRequestFilterTransactions { + SubscribeRequestFilterTransactions { + vote: Some(tag.len() % 2 == 0), + failed: Some(tag.len() % 2 == 1), + signature: Some(format!("sig-{tag}")), + account_include: vec![format!("inc-{tag}")], + account_exclude: vec![format!("exc-{tag}")], + account_required: vec![format!("req-{tag}")], + } + } + + fn block_filter(tag: &str) -> SubscribeRequestFilterBlocks { + SubscribeRequestFilterBlocks { + account_include: vec![format!("block-{tag}")], + include_transactions: Some(true), + include_accounts: Some(false), + include_entries: Some(tag.len() % 2 == 0), + } + } + + fn data_slice(offset: u64, length: u64) -> SubscribeRequestAccountsDataSlice { + SubscribeRequestAccountsDataSlice { offset, length } + } + + fn base_request() -> SubscribeRequest { + SubscribeRequest::default() + } + + #[test] + fn merge_accounts_map_additive_and_override() { + let mut base = base_request(); + base.accounts.insert("a".into(), account_filter("base")); + let mut update = base_request(); + update.accounts.insert("b".into(), account_filter("new")); + update.accounts.insert("a".into(), account_filter("updated")); + + merge_subscribe_requests(&mut base, &update, "internal", true); + + assert_eq!(base.accounts.len(), 2); + assert_eq!(base.accounts.get("b"), update.accounts.get("b")); + assert_eq!(base.accounts.get("a"), update.accounts.get("a")); + } + + #[test] + fn merge_slots_additive_and_preserves_internal_on_conflict() { + let internal_id = "internal-slot"; + let mut base = base_request(); + base.slots + .insert(internal_id.into(), slot_filter(true, false)); + base.slots.insert("existing".into(), slot_filter(false, false)); + + let mut update = base_request(); + update.slots.insert("user".into(), slot_filter(false, true)); + update + .slots + .insert(internal_id.into(), slot_filter(false, false)); + + let preserved = base.slots.get(internal_id).cloned(); + merge_subscribe_requests(&mut base, &update, internal_id, true); + + assert_eq!(base.slots.get("user"), update.slots.get("user")); + assert_eq!(base.slots.get("existing"), Some(&slot_filter(false, false))); + assert_eq!(base.slots.get(internal_id), preserved.as_ref()); + } + + #[test] + fn merge_transactions_map_additive_and_override() { + let mut base = base_request(); + base.transactions + .insert("txn-a".into(), transaction_filter("old")); + let mut update = base_request(); + update + .transactions + .insert("txn-b".into(), transaction_filter("new")); + update + .transactions + .insert("txn-a".into(), transaction_filter("override")); + + merge_subscribe_requests(&mut base, &update, "internal", true); + + assert_eq!(base.transactions.len(), 2); + assert_eq!(base.transactions.get("txn-a"), update.transactions.get("txn-a")); + assert_eq!(base.transactions.get("txn-b"), update.transactions.get("txn-b")); + } + + #[test] + fn merge_transactions_status_map_additive_and_override() { + let mut base = base_request(); + base.transactions_status + .insert("status-a".into(), transaction_filter("old-status")); + let mut update = base_request(); + update.transactions_status.insert("status-b".into(), transaction_filter("new-status")); + update.transactions_status.insert("status-a".into(), transaction_filter("override-status")); + + merge_subscribe_requests(&mut base, &update, "internal", true); + + assert_eq!(base.transactions_status.len(), 2); + assert_eq!( + base.transactions_status.get("status-a"), + update.transactions_status.get("status-a") + ); + assert_eq!( + base.transactions_status.get("status-b"), + update.transactions_status.get("status-b") + ); + } + + #[test] + fn merge_blocks_map_additive_and_override() { + let mut base = base_request(); + base.blocks.insert("block-a".into(), block_filter("old")); + let mut update = base_request(); + update.blocks.insert("block-b".into(), block_filter("new")); + update.blocks.insert("block-a".into(), block_filter("override")); + + merge_subscribe_requests(&mut base, &update, "internal", true); + + assert_eq!(base.blocks.len(), 2); + assert_eq!(base.blocks.get("block-a"), update.blocks.get("block-a")); + assert_eq!(base.blocks.get("block-b"), update.blocks.get("block-b")); + } + + #[test] + fn merge_blocks_meta_map_additive() { + let mut base = base_request(); + base.blocks_meta.insert("meta-a".into(), SubscribeRequestFilterBlocksMeta::default()); + let mut update = base_request(); + update.blocks_meta.insert("meta-b".into(), SubscribeRequestFilterBlocksMeta::default()); + + merge_subscribe_requests(&mut base, &update, "internal", true); + + assert_eq!(base.blocks_meta.len(), 2); + assert!(base.blocks_meta.contains_key("meta-a")); + assert!(base.blocks_meta.contains_key("meta-b")); + } + + #[test] + fn merge_entry_map_additive_and_override() { + let mut base = base_request(); + base.entry + .insert("entry-a".into(), SubscribeRequestFilterEntry::default()); + let mut update = base_request(); + update + .entry + .insert("entry-b".into(), SubscribeRequestFilterEntry::default()); + update + .entry + .insert("entry-a".into(), SubscribeRequestFilterEntry::default()); + + merge_subscribe_requests(&mut base, &update, "internal", true); + + assert_eq!(base.entry.len(), 2); + assert!(base.entry.contains_key("entry-a")); + assert!(base.entry.contains_key("entry-b")); + } + + #[test] + fn merge_appends_accounts_data_slice() { + let mut base = base_request(); + base.accounts_data_slice.push(data_slice(1, 2)); + + let mut update = base_request(); + update.accounts_data_slice.push(data_slice(3, 4)); + update.accounts_data_slice.push(data_slice(5, 6)); + + merge_subscribe_requests(&mut base, &update, "internal", true); + + assert_eq!(base.accounts_data_slice.len(), 3); + assert_eq!(base.accounts_data_slice[1], data_slice(3, 4)); + assert_eq!(base.accounts_data_slice[2], data_slice(5, 6)); + } + + #[test] + fn merge_overrides_commitment_from_none() { + let mut base = base_request(); + base.commitment = None; + let mut update = base_request(); + update.commitment = Some(2); + + merge_subscribe_requests(&mut base, &update, "internal", true); + + assert_eq!(base.commitment, Some(2)); + } + + #[test] + fn merge_overrides_commitment_from_some() { + let mut base = base_request(); + base.commitment = Some(0); + let mut update = base_request(); + update.commitment = Some(1); + + merge_subscribe_requests(&mut base, &update, "internal", true); + + assert_eq!(base.commitment, Some(1)); + } + + #[test] + fn merge_leaves_commitment_when_absent() { + let mut base = base_request(); + base.commitment = Some(2); + let mut update = base_request(); + update.commitment = None; + + merge_subscribe_requests(&mut base, &update, "internal", true); + + assert_eq!(base.commitment, Some(2)); + } + + #[test] + fn merge_overrides_from_slot_none_to_some() { + let mut base = base_request(); + base.from_slot = None; + let mut update = base_request(); + update.from_slot = Some(42); + + merge_subscribe_requests(&mut base, &update, "internal", true); + + assert_eq!(base.from_slot, Some(42)); + } + + #[test] + fn merge_overrides_from_slot_some_to_some() { + let mut base = base_request(); + base.from_slot = Some(5); + let mut update = base_request(); + update.from_slot = Some(99); + + merge_subscribe_requests(&mut base, &update, "internal", true); + + assert_eq!(base.from_slot, Some(99)); + } + + #[test] + fn merge_leaves_from_slot_when_absent() { + let mut base = base_request(); + base.from_slot = Some(7); + let mut update = base_request(); + update.from_slot = None; + + merge_subscribe_requests(&mut base, &update, "internal", true); + + assert_eq!(base.from_slot, Some(7)); + } + + #[test] + fn merge_empty_update_is_noop() { + let mut base = base_request(); + base.accounts.insert("a".into(), account_filter("base")); + base.commitment = Some(2); + let snapshot = base.clone(); + let update = base_request(); + + merge_subscribe_requests(&mut base, &update, "internal", true); + + assert_eq!(base, snapshot); + } + + #[test] + fn merge_empty_base_takes_update() { + let mut base = base_request(); + let mut update = base_request(); + update.accounts.insert("a".into(), account_filter("new")); + update.slots.insert("s".into(), slot_filter(true, false)); + update.commitment = Some(1); + update.from_slot = Some(10); + + merge_subscribe_requests(&mut base, &update, "internal", true); + + assert_eq!(base.accounts.get("a"), update.accounts.get("a")); + assert_eq!(base.slots.get("s"), update.slots.get("s")); + assert_eq!(base.commitment, Some(1)); + assert_eq!(base.from_slot, Some(10)); + } + + #[test] + fn merge_preserves_internal_when_update_overwrites_slot() { + let internal_id = "internal-id"; + let mut base = base_request(); + base.slots + .insert(internal_id.into(), slot_filter(true, false)); + let mut update = base_request(); + update + .slots + .insert(internal_id.into(), slot_filter(false, true)); + + let preserved = base.slots.get(internal_id).cloned(); + merge_subscribe_requests(&mut base, &update, internal_id, true); + + assert_eq!(base.slots.get(internal_id), preserved.as_ref()); + } + + #[test] + fn merge_does_not_preserve_internal_when_replay_disabled() { + let internal_id = "internal-id"; + let mut base = base_request(); + base.slots + .insert(internal_id.into(), slot_filter(true, false)); + let mut update = base_request(); + update + .slots + .insert(internal_id.into(), slot_filter(false, true)); + + merge_subscribe_requests(&mut base, &update, internal_id, false); + + assert_eq!(base.slots.get(internal_id), update.slots.get(internal_id)); + } + + #[test] + fn merge_does_not_preserve_internal_when_id_empty() { + let mut base = base_request(); + base.slots + .insert("internal".into(), slot_filter(true, false)); + let mut update = base_request(); + update + .slots + .insert("internal".into(), slot_filter(false, true)); + + merge_subscribe_requests(&mut base, &update, "", true); + + assert_eq!(base.slots.get("internal"), update.slots.get("internal")); + } + + #[test] + fn merge_handles_large_maps_and_preserves_internal() { + let internal_id = "internal-large"; + let mut base = base_request(); + base.slots + .insert(internal_id.into(), slot_filter(true, false)); + for i in 0..120 { + base.accounts + .insert(format!("acct-{i}"), account_filter(&format!("b{i}"))); + } + + let mut update = base_request(); + for i in 100..200 { + update + .accounts + .insert(format!("acct-{i}"), account_filter(&format!("u{i}"))); + } + update + .slots + .insert(internal_id.into(), slot_filter(false, true)); + + let preserved = base.slots.get(internal_id).cloned(); + merge_subscribe_requests(&mut base, &update, internal_id, true); + + assert_eq!(base.accounts.len(), 200); + assert_eq!(base.slots.get(internal_id), preserved.as_ref()); + assert_eq!( + base.accounts.get("acct-150"), + update.accounts.get("acct-150") + ); + } + + proptest! { + #[test] + fn prop_map_union_and_internal_preservation( + base_slots in proptest::collection::hash_map("slot-[a-z]{1,3}", any::(), 0..5), + update_slots in proptest::collection::hash_map("slot-[a-z]{1,3}", any::(), 0..5), + replay_enabled in any::(), + ) { + let internal_id = "internal-prop"; + let mut base = base_request(); + for (k, v) in base_slots.iter() { + base.slots.insert(k.clone(), slot_filter(*v, !*v)); + } + let internal_filter = slot_filter(true, false); + base.slots.insert(internal_id.to_string(), internal_filter.clone()); + + let mut update = base_request(); + for (k, v) in update_slots.iter() { + update.slots.insert(k.clone(), slot_filter(*v, *v)); + } + + let base_snapshot = base.slots.clone(); + let update_snapshot = update.slots.clone(); + + merge_subscribe_requests(&mut base, &update, internal_id, replay_enabled); + + for (k, v) in update_snapshot.iter() { + prop_assert_eq!(base.slots.get(k), Some(v)); + } + + for (k, v) in base_snapshot.iter() { + if k == internal_id { + continue; + } + if !update_snapshot.contains_key(k) { + prop_assert_eq!(base.slots.get(k), Some(v)); + } + } + + if replay_enabled { + prop_assert_eq!(base.slots.get(internal_id), Some(&internal_filter)); + } else if let Some(update_val) = update_snapshot.get(internal_id) { + prop_assert_eq!(base.slots.get(internal_id), Some(update_val)); + } + } + } +}