diff --git a/src/Projection/PdoEventStoreReadModelProjector.php b/src/Projection/PdoEventStoreReadModelProjector.php index f4b13e3..a7b219e 100644 --- a/src/Projection/PdoEventStoreReadModelProjector.php +++ b/src/Projection/PdoEventStoreReadModelProjector.php @@ -132,11 +132,6 @@ final class PdoEventStoreReadModelProjector implements MetadataAwareReadModelPro */ private $eventCounter = 0; - /** - * @var int - */ - private $loadedEvents = 0; - /** * @var int */ @@ -529,7 +524,7 @@ public function run(bool $keepRunning = true): void } $streamEvents = new MergedStreamIterator(\array_keys($eventStreams), ...\array_values($eventStreams)); - $this->loadedEvents = $streamEvents->count(); + $loadedEvents = $streamEvents->count(); if ($singleHandler) { $gapDetected = ! $this->handleStreamWithSingleHandler($streamEvents); @@ -586,9 +581,9 @@ public function run(bool $keepRunning = true): void } $this->prepareStreamPositions(); - } while ($keepRunning && ! $this->isStopped); + } while (($keepRunning || $loadedEvents > 0) && ! $this->isStopped); } finally { - $this->releaseLock($keepRunning); + $this->releaseLock($keepRunning, $loadedEvents); } } @@ -915,7 +910,7 @@ private function updateLock(): void $this->lastLockUpdate = $now; } - private function releaseLock(bool $keepRunning): void + private function releaseLock(bool $keepRunning, int $loadedEvents): void { $projectionsTable = $this->quoteTableName($this->projectionsTable); $sql = <<connection->prepare($sql); - $status = $keepRunning && $this->loadedEvents > 0 ? ProjectionStatus::RUNNING() : ProjectionStatus::IDLE(); + $status = ($keepRunning || $loadedEvents > 0) ? ProjectionStatus::RUNNING() : ProjectionStatus::IDLE(); try { $statement->execute([$status->getValue(), $this->name]); diff --git a/tests/Projection/AbstractPdoEventStoreReadModelProjectorTestCase.php b/tests/Projection/AbstractPdoEventStoreReadModelProjectorTestCase.php index 519d0d7..9c21bae 100644 --- a/tests/Projection/AbstractPdoEventStoreReadModelProjectorTestCase.php +++ b/tests/Projection/AbstractPdoEventStoreReadModelProjectorTestCase.php @@ -14,6 +14,7 @@ namespace ProophTest\EventStore\Pdo\Projection; use ArrayIterator; +use Assert\Assertion; use DateInterval; use DateTimeImmutable; use DateTimeZone; @@ -27,6 +28,7 @@ use Prooph\EventStore\Pdo\Projection\GapDetection; use Prooph\EventStore\Pdo\Projection\PdoEventStoreProjector; use Prooph\EventStore\Pdo\Projection\PdoEventStoreReadModelProjector; +use Prooph\EventStore\Projection\ProjectionStatus; use Prooph\EventStore\Projection\Projector; use Prooph\EventStore\Projection\ReadModel; use Prooph\EventStore\Stream; @@ -456,27 +458,21 @@ public function it_detects_gap_and_performs_retry(): void $projection ->fromStream('user') ->init(function () { - return []; + return ['iteration' => 0]; }) - ->when([ - UserCreated::class => function (array $state, Message $event): array { - return $state; - }, - UsernameChanged::class => function (array $state, Message $event): array { - return $state; - }, - ]) - ->run(false); - - $this->assertEquals(1, $projectionManager->fetchProjectionStreamPositions('test_projection')['user']); - - $this->assertTrue($gapDetection->isRetrying()); + ->whenAny( + function (array $state, Message $event) use ($projectionManager, $gapDetection, $parallelConnection): array { + if ($state['iteration'] === 1) { + Assertion::true($gapDetection->isRetrying()); + $parallelConnection->commit(); + } - // Fill the gap - $parallelConnection->commit(); + ++$state['iteration']; - // Run again with gap detection in retry mode - $projection->run(false); + return $state; + } + ) + ->run(false); $this->assertEquals(3, $projectionManager->fetchProjectionStreamPositions('test_projection')['user']); @@ -521,36 +517,25 @@ public function it_continues_when_retry_limit_is_reached_and_gap_not_filled(): v $projection ->fromStream('user') ->init(function () { - return []; + return ['iteration' => 0]; }) - ->when([ - UserCreated::class => function (array $state, Message $event): array { - return $state; - }, - UsernameChanged::class => function (array $state, Message $event): array { - return $state; - }, - ]) - ->run(false); - - $this->assertEquals(1, $projectionManager->fetchProjectionStreamPositions('test_projection')['user']); - - $this->assertTrue($gapDetection->isRetrying()); - - // Force a real gap - $parallelConnection->rollBack(); - - // Run again with gap detection in retry mode - $projection->run(false); + ->whenAny( + function (array $state, Message $event) use ($projectionManager, $gapDetection, $parallelConnection): array { + if ($state['iteration'] === 1) { + Assertion::true($gapDetection->isRetrying()); + $parallelConnection->rollBack(); + } - // Projection should not move forward, but instead retry a second time - $this->assertEquals(1, $projectionManager->fetchProjectionStreamPositions('test_projection')['user']); + ++$state['iteration']; - // Third run with gap detection still in retry mode, but limit reached - $projection->run(false); + return $state; + } + ) + ->run(false); //Projection should have moved forward $this->assertEquals(3, $projectionManager->fetchProjectionStreamPositions('test_projection')['user']); + $this->assertEquals(ProjectionStatus::IDLE(), $projectionManager->fetchProjectionStatus('test_projection')); $this->assertFalse($gapDetection->isRetrying()); } @@ -632,4 +617,61 @@ protected function prepareEventStreamWithOneEvent(string $name, ?DateTimeImmutab $this->eventStore->create(new Stream(new StreamName($name), new ArrayIterator($events))); } + + #[Test] + public function projection_should_run_until_end_of_stream(): void + { + $this->prepareEventStream('user-345'); + + $projectionManager = $this->projectionManager; + $projection = $projectionManager->createReadModelProjection('test_projection', new ReadModelMock(), [ + Projector::OPTION_PERSIST_BLOCK_SIZE => 1, + PdoEventStoreReadModelProjector::OPTION_LOAD_COUNT => 1, + ]); + + $projection + ->fromStream('user-345') + ->whenAny(function () { + }) + ->run(false); + + $this->assertEquals(50, $projectionManager->fetchProjectionStreamPositions('test_projection')['user-345']); + $this->assertEquals(ProjectionStatus::IDLE(), $projectionManager->fetchProjectionStatus('test_projection')); + } + + #[Test] + public function when_failed_projection_should_release_lock_but_indicate_running_status(): void + { + $this->prepareEventStream('user-345'); + + $projectionManager = $this->projectionManager; + $projection = $projectionManager->createReadModelProjection('test_projection', new ReadModelMock(), [ + Projector::OPTION_PERSIST_BLOCK_SIZE => 1, + PdoEventStoreReadModelProjector::OPTION_LOAD_COUNT => 1, + ]); + + $projection + ->fromStream('user-345') + ->init(function () { + return ['iteration' => 0]; + }) + ->whenAny(function (array $state, Message $event): array { + ++$state['iteration']; + + if ($state['iteration'] > 5) { + throw new \RuntimeException('something happened'); + } + + return $state; + }); + + try { + $projection->run(false); + } catch (\Throwable) { + } + + $this->assertEquals(5, $projectionManager->fetchProjectionStreamPositions('test_projection')['user-345']); + $this->assertEquals(ProjectionStatus::RUNNING(), $projectionManager->fetchProjectionStatus('test_projection')); + $this->assertNull($this->connection->query("select locked_until from projections where name = 'test_projection'")->fetch(PDO::FETCH_COLUMN)); + } }