@@ -24,6 +24,7 @@ import (
2424 "testing"
2525 "time"
2626
27+ "github.com/google/go-cmp/cmp"
2728 "github.com/prometheus/client_golang/prometheus/testutil"
2829 "github.com/stretchr/testify/require"
2930 "go.uber.org/zap"
@@ -538,104 +539,115 @@ func TestWatchFutureRev(t *testing.T) {
538539}
539540
540541func TestWatchRestore (t * testing.T ) {
541- test := func (delay time.Duration ) func (t * testing.T ) {
542- return func (t * testing.T ) {
543- b , tmpPath := betesting .NewDefaultTmpBackend (t )
544- s := newWatchableStore (zap .NewExample (), b , & lease.FakeLessor {}, StoreConfig {})
545- defer cleanup (s , b , tmpPath )
546-
547- testKey := []byte ("foo" )
548- testValue := []byte ("bar" )
549- w := s .NewWatchStream ()
550- defer w .Close ()
551- w .Watch (0 , testKey , nil , 1 )
542+ resyncDelay := watchResyncPeriod * 3 / 2
552543
553- time .Sleep (delay )
554- wantRev := s .Put (testKey , testValue , lease .NoLease )
544+ t .Run ("NoResync" , func (t * testing.T ) {
545+ testWatchRestore (t , 0 , 0 )
546+ })
547+ t .Run ("ResyncBefore" , func (t * testing.T ) {
548+ testWatchRestore (t , resyncDelay , 0 )
549+ })
550+ t .Run ("ResyncAfter" , func (t * testing.T ) {
551+ testWatchRestore (t , 0 , resyncDelay )
552+ })
555553
556- s .Restore (b )
557- events := readEventsForSecond (w .Chan ())
558- if len (events ) != 1 {
559- t .Errorf ("Expected only one event, got %d" , len (events ))
560- }
561- if events [0 ].Kv .ModRevision != wantRev {
562- t .Errorf ("Expected revision to match, got %d, want %d" , events [0 ].Kv .ModRevision , wantRev )
563- }
554+ t .Run ("ResyncBeforeAndAfter" , func (t * testing.T ) {
555+ testWatchRestore (t , resyncDelay , resyncDelay )
556+ })
557+ }
564558
565- }
559+ func testWatchRestore (t * testing.T , delayBeforeRestore , delayAfterRestore time.Duration ) {
560+ b , tmpPath := betesting .NewDefaultTmpBackend (t )
561+ s := New (zaptest .NewLogger (t ), b , & lease.FakeLessor {}, StoreConfig {})
562+ defer cleanup (s , b , tmpPath )
563+
564+ testKey := []byte ("foo" )
565+ testValue := []byte ("bar" )
566+
567+ tcs := []struct {
568+ name string
569+ startRevision int64
570+ wantEvents []mvccpb.Event
571+ }{
572+ {
573+ name : "zero revision" ,
574+ startRevision : 0 ,
575+ wantEvents : []mvccpb.Event {
576+ {Type : mvccpb .PUT , Kv : & mvccpb.KeyValue {Key : testKey , Value : testValue , CreateRevision : 2 , ModRevision : 2 , Version : 1 }},
577+ {Type : mvccpb .DELETE , Kv : & mvccpb.KeyValue {Key : testKey , ModRevision : 3 }},
578+ },
579+ },
580+ {
581+ name : "revsion before first write" ,
582+ startRevision : 1 ,
583+ wantEvents : []mvccpb.Event {
584+ {Type : mvccpb .PUT , Kv : & mvccpb.KeyValue {Key : testKey , Value : testValue , CreateRevision : 2 , ModRevision : 2 , Version : 1 }},
585+ {Type : mvccpb .DELETE , Kv : & mvccpb.KeyValue {Key : testKey , ModRevision : 3 }},
586+ },
587+ },
588+ {
589+ name : "revision of first write" ,
590+ startRevision : 2 ,
591+ wantEvents : []mvccpb.Event {
592+ {Type : mvccpb .PUT , Kv : & mvccpb.KeyValue {Key : testKey , Value : testValue , CreateRevision : 2 , ModRevision : 2 , Version : 1 }},
593+ {Type : mvccpb .DELETE , Kv : & mvccpb.KeyValue {Key : testKey , ModRevision : 3 }},
594+ },
595+ },
596+ {
597+ name : "current revision" ,
598+ startRevision : 3 ,
599+ wantEvents : []mvccpb.Event {
600+ {Type : mvccpb .DELETE , Kv : & mvccpb.KeyValue {Key : testKey , ModRevision : 3 }},
601+ },
602+ },
603+ {
604+ name : "future revision" ,
605+ startRevision : 4 ,
606+ wantEvents : []mvccpb.Event {},
607+ },
608+ }
609+ watchers := []WatchStream {}
610+ for i , tc := range tcs {
611+ w := s .NewWatchStream ()
612+ defer w .Close ()
613+ watchers = append (watchers , w )
614+ w .Watch (WatchID (i + 1 ), testKey , nil , tc .startRevision )
566615 }
567616
568- t .Run ("Normal" , test (0 ))
569- t .Run ("RunSyncWatchLoopBeforeRestore" , test (time .Millisecond * 120 )) // longer than default waitDuration
617+ s .Put (testKey , testValue , lease .NoLease )
618+ time .Sleep (delayBeforeRestore )
619+ s .Restore (b )
620+ time .Sleep (delayAfterRestore )
621+ s .DeleteRange (testKey , nil )
622+
623+ for i , tc := range tcs {
624+ t .Run (tc .name , func (t * testing.T ) {
625+ events := readEventsForSecond (t , watchers [i ].Chan ())
626+ if diff := cmp .Diff (tc .wantEvents , events ); diff != "" {
627+ t .Errorf ("unexpected events (-want +got):\n %s" , diff )
628+ }
629+ })
630+ }
570631}
571632
572- func readEventsForSecond (ws <- chan WatchResponse ) (events []mvccpb.Event ) {
633+ func readEventsForSecond (t * testing.T , ws <- chan WatchResponse ) []mvccpb.Event {
634+ events := []mvccpb.Event {}
635+ deadline := time .After (time .Second )
573636 for {
574637 select {
575638 case resp := <- ws :
639+ if len (resp .Events ) == 0 {
640+ t .Fatalf ("Events should never be empty, resp: %+v" , resp )
641+ }
576642 events = append (events , resp .Events ... )
577- case <- time .After (time .Second ):
643+ case <- deadline :
644+ return events
645+ case <- time .After (watchResyncPeriod * 3 / 2 ):
578646 return events
579647 }
580648 }
581649}
582650
583- // TestWatchRestoreSyncedWatcher tests such a case that:
584- // 1. watcher is created with a future revision "math.MaxInt64 - 2"
585- // 2. watcher with a future revision is added to "synced" watcher group
586- // 3. restore/overwrite storage with snapshot of a higher lasat revision
587- // 4. restore operation moves "synced" to "unsynced" watcher group
588- // 5. choose the watcher from step 1, without panic
589- func TestWatchRestoreSyncedWatcher (t * testing.T ) {
590- b1 , b1Path := betesting .NewDefaultTmpBackend (t )
591- s1 := newWatchableStore (zap .NewExample (), b1 , & lease.FakeLessor {}, StoreConfig {})
592- defer cleanup (s1 , b1 , b1Path )
593-
594- b2 , b2Path := betesting .NewDefaultTmpBackend (t )
595- s2 := newWatchableStore (zap .NewExample (), b2 , & lease.FakeLessor {}, StoreConfig {})
596- defer cleanup (s2 , b2 , b2Path )
597-
598- testKey , testValue := []byte ("foo" ), []byte ("bar" )
599- rev := s1 .Put (testKey , testValue , lease .NoLease )
600- startRev := rev + 2
601-
602- // create a watcher with a future revision
603- // add to "synced" watcher group (startRev > s.store.currentRev)
604- w1 := s1 .NewWatchStream ()
605- w1 .Watch (0 , testKey , nil , startRev )
606-
607- // make "s2" ends up with a higher last revision
608- s2 .Put (testKey , testValue , lease .NoLease )
609- s2 .Put (testKey , testValue , lease .NoLease )
610-
611- // overwrite storage with higher revisions
612- if err := s1 .Restore (b2 ); err != nil {
613- t .Fatal (err )
614- }
615-
616- // wait for next "syncWatchersLoop" iteration
617- // and the unsynced watcher should be chosen
618- time .Sleep (2 * time .Second )
619-
620- // trigger events for "startRev"
621- s1 .Put (testKey , testValue , lease .NoLease )
622-
623- select {
624- case resp := <- w1 .Chan ():
625- if resp .Revision != startRev {
626- t .Fatalf ("resp.Revision expect %d, got %d" , startRev , resp .Revision )
627- }
628- if len (resp .Events ) != 1 {
629- t .Fatalf ("len(resp.Events) expect 1, got %d" , len (resp .Events ))
630- }
631- if resp .Events [0 ].Kv .ModRevision != startRev {
632- t .Fatalf ("resp.Events[0].Kv.ModRevision expect %d, got %d" , startRev , resp .Events [0 ].Kv .ModRevision )
633- }
634- case <- time .After (time .Second ):
635- t .Fatal ("failed to receive event in 1 second" )
636- }
637- }
638-
639651// TestWatchBatchUnsynced tests batching on unsynced watchers
640652func TestWatchBatchUnsynced (t * testing.T ) {
641653 b , tmpPath := betesting .NewDefaultTmpBackend (t )
0 commit comments