Skip to content

Commit a44ce44

Browse files
committed
keep running projections
projections should keep running when: - runs with `keepRunning: true` due it awaits for new events - it did not reached end of the stream, e.g. in case of resetting projection status should indicate actual status of projection therefore, `idle` should be set if projection wasn't supposed to keep running or end of stream was reached
1 parent 31625e2 commit a44ce44

File tree

2 files changed

+69
-10
lines changed

2 files changed

+69
-10
lines changed

src/Projection/PdoEventStoreReadModelProjector.php

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -132,11 +132,6 @@ final class PdoEventStoreReadModelProjector implements MetadataAwareReadModelPro
132132
*/
133133
private $eventCounter = 0;
134134

135-
/**
136-
* @var int
137-
*/
138-
private $loadedEvents = 0;
139-
140135
/**
141136
* @var int
142137
*/
@@ -529,7 +524,7 @@ public function run(bool $keepRunning = true): void
529524
}
530525

531526
$streamEvents = new MergedStreamIterator(\array_keys($eventStreams), ...\array_values($eventStreams));
532-
$this->loadedEvents = $streamEvents->count();
527+
$loadedEvents = $streamEvents->count();
533528

534529
if ($singleHandler) {
535530
$gapDetected = ! $this->handleStreamWithSingleHandler($streamEvents);
@@ -586,9 +581,9 @@ public function run(bool $keepRunning = true): void
586581
}
587582

588583
$this->prepareStreamPositions();
589-
} while ($keepRunning && ! $this->isStopped);
584+
} while (($keepRunning || $loadedEvents > 0) && ! $this->isStopped);
590585
} finally {
591-
$this->releaseLock($keepRunning);
586+
$this->releaseLock($keepRunning, $loadedEvents);
592587
}
593588
}
594589

@@ -915,7 +910,7 @@ private function updateLock(): void
915910
$this->lastLockUpdate = $now;
916911
}
917912

918-
private function releaseLock(bool $keepRunning): void
913+
private function releaseLock(bool $keepRunning, int $loadedEvents): void
919914
{
920915
$projectionsTable = $this->quoteTableName($this->projectionsTable);
921916
$sql = <<<EOT
@@ -924,7 +919,7 @@ private function releaseLock(bool $keepRunning): void
924919

925920
$statement = $this->connection->prepare($sql);
926921

927-
$status = $keepRunning && $this->loadedEvents > 0 ? ProjectionStatus::RUNNING() : ProjectionStatus::IDLE();
922+
$status = ($keepRunning || $loadedEvents > 0) ? ProjectionStatus::RUNNING() : ProjectionStatus::IDLE();
928923

929924
try {
930925
$statement->execute([$status->getValue(), $this->name]);

tests/Projection/PdoEventStoreReadModelProjectorTestCase.php

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
namespace ProophTest\EventStore\Pdo\Projection;
1515

1616
use ArrayIterator;
17+
use Assert\Assertion;
1718
use DateInterval;
1819
use DateTimeImmutable;
1920
use DateTimeZone;
@@ -26,6 +27,7 @@
2627
use Prooph\EventStore\Pdo\Projection\PdoEventStoreProjector;
2728
use Prooph\EventStore\Pdo\Projection\PdoEventStoreReadModelProjector;
2829
use Prooph\EventStore\Projection\ProjectionManager;
30+
use Prooph\EventStore\Projection\ProjectionStatus;
2931
use Prooph\EventStore\Projection\Projector;
3032
use Prooph\EventStore\Projection\ReadModel;
3133
use Prooph\EventStore\Stream;
@@ -657,6 +659,68 @@ public function it_does_not_perform_retry_when_event_is_older_than_detection_win
657659
$this->assertFalse($gapDetection->isRetrying());
658660
}
659661

662+
/**
663+
* @test
664+
*/
665+
public function projection_should_run_until_end_of_stream(): void
666+
{
667+
$this->prepareEventStream('user-345');
668+
669+
$projectionManager = $this->projectionManager;
670+
$projection = $projectionManager->createReadModelProjection('test_projection', new ReadModelMock(), [
671+
Projector::OPTION_PERSIST_BLOCK_SIZE => 1,
672+
PdoEventStoreReadModelProjector::OPTION_LOAD_COUNT => 1,
673+
]);
674+
675+
$projection
676+
->fromStream('user-345')
677+
->whenAny(function () use ($projectionManager) {
678+
Assertion::eq(ProjectionStatus::RUNNING(), $projectionManager->fetchProjectionStatus('test_projection'));
679+
})
680+
->run(false);
681+
682+
$this->assertEquals(50, $projectionManager->fetchProjectionStreamPositions('test_projection')['user-345']);
683+
$this->assertEquals(ProjectionStatus::IDLE(), $projectionManager->fetchProjectionStatus('test_projection'));
684+
}
685+
686+
/**
687+
* @test
688+
*/
689+
public function when_failed_projection_should_release_lock_but_indicate_running_status(): void
690+
{
691+
$this->prepareEventStream('user-345');
692+
693+
$projectionManager = $this->projectionManager;
694+
$projection = $projectionManager->createReadModelProjection('test_projection', new ReadModelMock(), [
695+
Projector::OPTION_PERSIST_BLOCK_SIZE => 1,
696+
PdoEventStoreReadModelProjector::OPTION_LOAD_COUNT => 1,
697+
]);
698+
699+
$projection
700+
->fromStream('user-345')
701+
->init(function () {
702+
return ['iteration' => 0];
703+
})
704+
->whenAny(function (array $state, Message $event): array {
705+
++$state['iteration'];
706+
707+
if ($state['iteration'] > 5) {
708+
throw new \RuntimeException('something happened');
709+
}
710+
711+
return $state;
712+
});
713+
714+
try {
715+
$projection->run(false);
716+
} catch (\Throwable) {
717+
}
718+
719+
$this->assertEquals(5, $projectionManager->fetchProjectionStreamPositions('test_projection')['user-345']);
720+
$this->assertEquals(ProjectionStatus::RUNNING(), $projectionManager->fetchProjectionStatus('test_projection'));
721+
$this->assertNull($this->connection->query("select locked_until from projections where name = 'test_projection'")->fetch(PDO::FETCH_COLUMN));
722+
}
723+
660724
protected function prepareEventStreamWithOneEvent(string $name, ?DateTimeImmutable $createdAt = null): void
661725
{
662726
$events = [];

0 commit comments

Comments
 (0)