|
14 | 14 | namespace ProophTest\EventStore\Pdo\Projection; |
15 | 15 |
|
16 | 16 | use ArrayIterator; |
| 17 | +use Assert\Assertion; |
17 | 18 | use DateInterval; |
18 | 19 | use DateTimeImmutable; |
19 | 20 | use DateTimeZone; |
|
27 | 28 | use Prooph\EventStore\Pdo\Projection\GapDetection; |
28 | 29 | use Prooph\EventStore\Pdo\Projection\PdoEventStoreProjector; |
29 | 30 | use Prooph\EventStore\Pdo\Projection\PdoEventStoreReadModelProjector; |
| 31 | +use Prooph\EventStore\Projection\ProjectionStatus; |
30 | 32 | use Prooph\EventStore\Projection\Projector; |
31 | 33 | use Prooph\EventStore\Projection\ReadModel; |
32 | 34 | use Prooph\EventStore\Stream; |
@@ -456,27 +458,21 @@ public function it_detects_gap_and_performs_retry(): void |
456 | 458 | $projection |
457 | 459 | ->fromStream('user') |
458 | 460 | ->init(function () { |
459 | | - return []; |
| 461 | + return ['iteration' => 0]; |
460 | 462 | }) |
461 | | - ->when([ |
462 | | - UserCreated::class => function (array $state, Message $event): array { |
463 | | - return $state; |
464 | | - }, |
465 | | - UsernameChanged::class => function (array $state, Message $event): array { |
466 | | - return $state; |
467 | | - }, |
468 | | - ]) |
469 | | - ->run(false); |
470 | | - |
471 | | - $this->assertEquals(1, $projectionManager->fetchProjectionStreamPositions('test_projection')['user']); |
472 | | - |
473 | | - $this->assertTrue($gapDetection->isRetrying()); |
| 463 | + ->whenAny( |
| 464 | + function (array $state, Message $event) use ($projectionManager, $gapDetection, $parallelConnection): array { |
| 465 | + if ($state['iteration'] === 1) { |
| 466 | + Assertion::true($gapDetection->isRetrying()); |
| 467 | + $parallelConnection->commit(); |
| 468 | + } |
474 | 469 |
|
475 | | - // Fill the gap |
476 | | - $parallelConnection->commit(); |
| 470 | + ++$state['iteration']; |
477 | 471 |
|
478 | | - // Run again with gap detection in retry mode |
479 | | - $projection->run(false); |
| 472 | + return $state; |
| 473 | + } |
| 474 | + ) |
| 475 | + ->run(false); |
480 | 476 |
|
481 | 477 | $this->assertEquals(3, $projectionManager->fetchProjectionStreamPositions('test_projection')['user']); |
482 | 478 |
|
@@ -521,36 +517,25 @@ public function it_continues_when_retry_limit_is_reached_and_gap_not_filled(): v |
521 | 517 | $projection |
522 | 518 | ->fromStream('user') |
523 | 519 | ->init(function () { |
524 | | - return []; |
| 520 | + return ['iteration' => 0]; |
525 | 521 | }) |
526 | | - ->when([ |
527 | | - UserCreated::class => function (array $state, Message $event): array { |
528 | | - return $state; |
529 | | - }, |
530 | | - UsernameChanged::class => function (array $state, Message $event): array { |
531 | | - return $state; |
532 | | - }, |
533 | | - ]) |
534 | | - ->run(false); |
535 | | - |
536 | | - $this->assertEquals(1, $projectionManager->fetchProjectionStreamPositions('test_projection')['user']); |
537 | | - |
538 | | - $this->assertTrue($gapDetection->isRetrying()); |
539 | | - |
540 | | - // Force a real gap |
541 | | - $parallelConnection->rollBack(); |
542 | | - |
543 | | - // Run again with gap detection in retry mode |
544 | | - $projection->run(false); |
| 522 | + ->whenAny( |
| 523 | + function (array $state, Message $event) use ($projectionManager, $gapDetection, $parallelConnection): array { |
| 524 | + if ($state['iteration'] === 1) { |
| 525 | + Assertion::true($gapDetection->isRetrying()); |
| 526 | + $parallelConnection->rollBack(); |
| 527 | + } |
545 | 528 |
|
546 | | - // Projection should not move forward, but instead retry a second time |
547 | | - $this->assertEquals(1, $projectionManager->fetchProjectionStreamPositions('test_projection')['user']); |
| 529 | + ++$state['iteration']; |
548 | 530 |
|
549 | | - // Third run with gap detection still in retry mode, but limit reached |
550 | | - $projection->run(false); |
| 531 | + return $state; |
| 532 | + } |
| 533 | + ) |
| 534 | + ->run(false); |
551 | 535 |
|
552 | 536 | //Projection should have moved forward |
553 | 537 | $this->assertEquals(3, $projectionManager->fetchProjectionStreamPositions('test_projection')['user']); |
| 538 | + $this->assertEquals(ProjectionStatus::IDLE(), $projectionManager->fetchProjectionStatus('test_projection')); |
554 | 539 |
|
555 | 540 | $this->assertFalse($gapDetection->isRetrying()); |
556 | 541 | } |
|
0 commit comments