diff --git a/fakes/WebsocketFake.php b/fakes/WebsocketFake.php index fe8beb7..1d0dcf9 100644 --- a/fakes/WebsocketFake.php +++ b/fakes/WebsocketFake.php @@ -6,12 +6,24 @@ use Evenement\EventEmitter; use JsonSerializable; +use Ragnarok\Fenrir\Buffer\BufferInterface; +use Ragnarok\Fenrir\Buffer\Passthrough; use Ragnarok\Fenrir\WebsocketInterface; use React\Promise\PromiseInterface; class WebsocketFake extends EventEmitter implements WebsocketInterface { public array $openings = []; + public array $closings = []; + + public function __construct(public BufferInterface $buffer = new Passthrough()) + { + } + + public function getBuffer(): BufferInterface + { + return $this->buffer; + } public function open(string $url): PromiseInterface { @@ -22,6 +34,8 @@ public function open(string $url): PromiseInterface public function close(int $code, string $reason): void { + $this->closings[] = [$code, $reason]; + $this->buffer->reset(); } public function send(string $message, bool $useBucket = true): void diff --git a/src/Buffer/BufferInterface.php b/src/Buffer/BufferInterface.php new file mode 100644 index 0000000..83e0f53 --- /dev/null +++ b/src/Buffer/BufferInterface.php @@ -0,0 +1,15 @@ +handler = fn () => null; + + $keys = array_keys($buffers); + + foreach ($keys as $key => $bufferKey) { + $buffer = $this->buffers[$bufferKey]; + + if (isset($keys[$key + 1])) { + $next = $this->buffers[$keys[$key + 1]]; + $buffer->onCompleteMessage(fn (string $message) => $next->partialMessage($message)); + + continue; + } + + $buffer->onCompleteMessage(fn (string $message) => ($this->handler)($message)); + } + + $this->first = $this->buffers[$keys[0]]; + } + + public function partialMessage(string $partial): void + { + $this->first->partialMessage($partial); + } + + public function onCompleteMessage(Closure $handler): void + { + $this->handler = $handler; + } + + public function additionalQueryData(): array + { + return array_merge( + ...array_map(fn (BufferInterface $buffer) => $buffer->additionalQueryData(), $this->buffers) + ); + } + + public function reset(): void + { + foreach ($this->buffers as $buffer) { + $buffer->reset(); + } + } +} diff --git a/src/Buffer/Passthrough.php b/src/Buffer/Passthrough.php new file mode 100644 index 0000000..2c9b74c --- /dev/null +++ b/src/Buffer/Passthrough.php @@ -0,0 +1,36 @@ +completeHandler = fn () => null; + } + + public function partialMessage(string $partial): void + { + ($this->completeHandler)($partial); + } + + public function onCompleteMessage(Closure $handler): void + { + $this->completeHandler = $handler; + } + + public function additionalQueryData(): array + { + return []; + } + + public function reset(): void + { + } +} diff --git a/src/Buffer/ZlibStream.php b/src/Buffer/ZlibStream.php new file mode 100644 index 0000000..5bc602d --- /dev/null +++ b/src/Buffer/ZlibStream.php @@ -0,0 +1,66 @@ +completeHandler = fn () => null; + $this->inflator = inflate_init(ZLIB_ENCODING_DEFLATE); + } + + public function reset(): void + { + $this->logger->debug('Resetting Buffer'); + + $this->buffer = ''; + $this->inflator = inflate_init(ZLIB_ENCODING_DEFLATE); + } + + public function partialMessage(string $partial): void + { + $this->buffer .= $partial; + + if (!str_ends_with($partial, self::SUFFIX)) { + return; + } + + $message = inflate_add($this->inflator, $this->buffer); + $this->buffer = ''; + + if ($message === false) { + $this->logger->warning('Failed to decode zlib-stream message(s)'); + return; + } + + ($this->completeHandler)($message); + } + + public function onCompleteMessage(Closure $handler): void + { + $this->completeHandler = $handler; + } + + public function additionalQueryData(): array + { + return [ + 'compress' => 'zlib-stream', + ]; + } +} diff --git a/src/Discord.php b/src/Discord.php index 7b1e986..2dcb9d9 100644 --- a/src/Discord.php +++ b/src/Discord.php @@ -11,6 +11,8 @@ use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; use Ragnarok\Fenrir\Bitwise\Bitwise; +use Ragnarok\Fenrir\Buffer\BufferInterface; +use Ragnarok\Fenrir\Buffer\Passthrough; use Ragnarok\Fenrir\Enums\TokenType; use Ragnarok\Fenrir\Exceptions\Extension\ExtensionNotFoundException; use Ragnarok\Fenrir\Extension\Extension; @@ -48,14 +50,15 @@ public function __construct( */ public function withGateway( Bitwise $intents, - int $timeout = 10 + int $timeout = 10, + BufferInterface $buffer = new Passthrough(), ): static { $this->gateway = new Connection( $this->loop, $this->token, $intents, $this->mapper, - new Websocket($timeout, $this->logger, [$this->token => '::token::']), + new Websocket($timeout, $this->logger, [$this->token => '::token::'], $buffer), $this->logger, ); diff --git a/src/Gateway/Connection.php b/src/Gateway/Connection.php index 957b384..745c9cc 100644 --- a/src/Gateway/Connection.php +++ b/src/Gateway/Connection.php @@ -14,7 +14,6 @@ use Ragnarok\Fenrir\Constants\WebsocketEvents; use Ragnarok\Fenrir\DataMapper; use Ragnarok\Fenrir\EventHandler; -use Ragnarok\Fenrir\Gateway\Events\Meta\MetaEvent; use Ragnarok\Fenrir\Gateway\Handlers\HeartbeatAcknowledgedEvent; use Ragnarok\Fenrir\Gateway\Handlers\IdentifyHelloEvent; use Ragnarok\Fenrir\Gateway\Handlers\IdentifyResumeEvent; @@ -28,7 +27,6 @@ use Ragnarok\Fenrir\Gateway\Helpers\PresenceUpdateBuilder; use Ragnarok\Fenrir\Gateway\Objects\Payload; use Ragnarok\Fenrir\WebsocketInterface; -use Ratchet\RFC6455\Messaging\MessageInterface; use React\EventLoop\LoopInterface; use React\EventLoop\TimerInterface; use React\Promise\PromiseInterface; @@ -90,8 +88,8 @@ public function __construct( ) { $this->events = new EventHandler($mapper); - $this->websocket->on(WebsocketEvents::MESSAGE, function (MessageInterface $message) { - $parsedMessage = json_decode((string) $message, depth: 1024); + $this->websocket->on(WebsocketEvents::MESSAGE, function (string $message) { + $parsedMessage = json_decode($message, depth: 1024); if ($parsedMessage === null) { return; } @@ -208,7 +206,12 @@ public function setSequence(int $sequence): void public function connect(string $url): PromiseInterface { - $url .= '?' . http_build_query(self::QUERY_DATA); + $queryData = [ + ...self::QUERY_DATA, + ...$this->websocket->getBuffer()->additionalQueryData(), + ]; + + $url .= '?' . http_build_query($queryData); return $this->websocket->open($url); } diff --git a/src/Websocket.php b/src/Websocket.php index 687d146..c496b92 100644 --- a/src/Websocket.php +++ b/src/Websocket.php @@ -7,6 +7,8 @@ use Evenement\EventEmitter; use JsonSerializable; use Psr\Log\LoggerInterface; +use Ragnarok\Fenrir\Buffer\BufferInterface; +use Ragnarok\Fenrir\Buffer\Passthrough; use Ragnarok\Fenrir\Constants\WebsocketEvents; use Ragnarok\Fenrir\Exceptions\Websocket\ConnectionFailedException; use Ragnarok\Fenrir\Exceptions\Websocket\ConnectionNotInitializedException; @@ -30,8 +32,12 @@ class Websocket extends EventEmitter implements WebsocketInterface private Bucket $bucket; - public function __construct(private int $timeout, private LoggerInterface $logger, private array $sendLoggerBlacklist = []) - { + public function __construct( + private int $timeout, + private LoggerInterface $logger, + private array $sendLoggerBlacklist = [], + private readonly BufferInterface $buffer = new Passthrough(), + ) { $this->loop = Loop::get(); $this->socketConnector = new SocketConnector(['timeout' => $timeout]); @@ -63,11 +69,15 @@ public function open(string $url): PromiseInterface $this->logger->info('Client: Connection esablished', ['url' => $url]); - $connection->on('message', function (MessageInterface $message) { + $this->buffer->onCompleteMessage(function (string $message) { $this->logger->debug('Server: New message', ['message' => $message]); $this->emit(WebsocketEvents::MESSAGE, [$message]); }); + $connection->on('message', function (MessageInterface $message) { + $this->buffer->partialMessage((string) $message); + }); + $connection->on('close', function (int $code, string $reason = '') { $this->logger->debug('Connection closed', ['code' => $code, 'reason' => $reason]); $this->emit(WebsocketEvents::CLOSE, [$code, $reason]); @@ -98,6 +108,7 @@ public function close(int $code, string $reason): void ); $this->connection->close($code, $reason); + $this->buffer->reset(); unset($this->connection); } @@ -137,4 +148,9 @@ public function sendAsJson(array|JsonSerializable $item, bool $useBucket): void { $this->send(json_encode($item), $useBucket); } + + public function getBuffer(): BufferInterface + { + return $this->buffer; + } } diff --git a/src/WebsocketInterface.php b/src/WebsocketInterface.php index 4740444..a01583e 100644 --- a/src/WebsocketInterface.php +++ b/src/WebsocketInterface.php @@ -6,6 +6,7 @@ use Evenement\EventEmitterInterface; use JsonSerializable; +use Ragnarok\Fenrir\Buffer\BufferInterface; use React\Promise\PromiseInterface; interface WebsocketInterface extends EventEmitterInterface @@ -14,4 +15,5 @@ public function open(string $url): PromiseInterface; public function close(int $code, string $reason): void; public function send(string $message, bool $useBucket = true): void; public function sendAsJson(array|JsonSerializable $item, bool $useBucket): void; + public function getBuffer(): BufferInterface; } diff --git a/tests/Gateway/Buffer/MultilayerTest.php b/tests/Gateway/Buffer/MultilayerTest.php new file mode 100644 index 0000000..11ae8fb --- /dev/null +++ b/tests/Gateway/Buffer/MultilayerTest.php @@ -0,0 +1,162 @@ +handler)('two'); + } + + public function onCompleteMessage(Closure $handler): void + { + $this->handler = $handler; + } + + public function additionalQueryData(): array + { + return []; + } + + public function reset(): void + { + $this->resets[] = 1; + } + }; + + $two = new class ($resets) implements BufferInterface { + private Closure $handler; + + public function __construct(private array &$resets) + { + + } + + public function partialMessage(string $partial): void + { + Assert::assertEquals('two', $partial); + + ($this->handler)('three'); + } + + public function onCompleteMessage(Closure $handler): void + { + $this->handler = $handler; + } + + public function additionalQueryData(): array + { + return []; + } + + public function reset(): void + { + $this->resets[] = 2; + } + }; + + $three = new class ($resets) implements BufferInterface { + private Closure $handler; + + public function __construct(private array &$resets) + { + + } + + public function partialMessage(string $partial): void + { + Assert::assertEquals('three', $partial); + + ($this->handler)('four'); + } + + public function onCompleteMessage(Closure $handler): void + { + $this->handler = $handler; + } + + public function additionalQueryData(): array + { + return []; + } + + public function reset(): void + { + $this->resets[] = 3; + } + }; + + $four = new class ($resets) implements BufferInterface { + private Closure $handler; + + public function __construct(private array &$resets) + { + + } + + public function partialMessage(string $partial): void + { + Assert::assertEquals('four', $partial); + + ($this->handler)('five'); + } + + public function onCompleteMessage(Closure $handler): void + { + $this->handler = $handler; + } + + public function additionalQueryData(): array + { + return []; + } + + public function reset(): void + { + $this->resets[] = 4; + } + }; + + $multilayer = new Multilayer([ + $one, + $two, + $three, + $four + ]); + + $multilayer->onCompleteMessage(function (string $message) use (&$result) { + $result = $message; + }); + + $multilayer->partialMessage('one'); + + $this->assertEquals('five', $result); + + $multilayer->reset(); + + $this->assertEquals([1,2,3,4], $resets); + } +} diff --git a/tests/Gateway/Buffer/PassthroughTest.php b/tests/Gateway/Buffer/PassthroughTest.php new file mode 100644 index 0000000..c084004 --- /dev/null +++ b/tests/Gateway/Buffer/PassthroughTest.php @@ -0,0 +1,29 @@ +onCompleteMessage(function (string $message) use (&$messages) { + $messages[] = $message; + }); + + $passthrough->partialMessage('uno'); + $passthrough->partialMessage('dos'); + $passthrough->partialMessage('tres'); + $passthrough->partialMessage('quatro'); + $passthrough->partialMessage('I dont know how to spell cinqo'); + + $this->assertEquals(['uno', 'dos', 'tres', 'quatro', 'I dont know how to spell cinqo'], $messages); + } +} diff --git a/tests/Gateway/Buffer/ZlibStreamTest.php b/tests/Gateway/Buffer/ZlibStreamTest.php new file mode 100644 index 0000000..2787a72 --- /dev/null +++ b/tests/Gateway/Buffer/ZlibStreamTest.php @@ -0,0 +1,120 @@ +onCompleteMessage(function (string $message) use (&$messages) { + $messages[] = $message; + }); + + $payload = json_encode(['op' => 1, 'd' => 'hello']); + + $compressed = $this->compress($payload); + + $buffer->partialMessage($compressed); + + $this->assertCount(1, $messages); + $this->assertSame($payload, $messages[0]); + } + + public function testItHandlesChunkedMessages(): void + { + $buffer = new ZlibStream(); + + $messages = []; + $buffer->onCompleteMessage(function (string $message) use (&$messages) { + $messages[] = $message; + }); + + $payload = json_encode(['op' => 2, 'd' => 'chunked']); + $compressed = $this->compress($payload); + + $chunks = str_split($compressed, 5); + + foreach ($chunks as $chunk) { + $buffer->partialMessage($chunk); + } + + $this->assertCount(1, $messages); + $this->assertSame($payload, $messages[0]); + } + + public function testResetClearsBufferState(): void + { + $buffer = new ZlibStream(); + + $messages = []; + $buffer->onCompleteMessage(function (string $message) use (&$messages) { + $messages[] = $message; + }); + + $payload = json_encode(['op' => 3]); + $compressed = $this->compress($payload); + + $half = intdiv(strlen($compressed), 2); + + $firstHalf = substr($compressed, 0, $half); + $secondHalf = substr($compressed, $half); + + $buffer->partialMessage($firstHalf); + + $buffer->reset(); + + $this->assertEmpty($messages); + + $buffer->partialMessage($firstHalf); + $buffer->partialMessage($secondHalf); + + $this->assertCount(1, $messages); + $this->assertSame($payload, $messages[0]); + } + + public function testItReceivesSeveralMessages(): void + { + $buffer = new ZlibStream(); + + $messages = []; + $buffer->onCompleteMessage(function (string $message) use (&$messages) { + $messages[] = $message; + }); + + $payloads = [ + json_encode(['op' => 1, 'd' => 'first']), + json_encode(['op' => 2, 'd' => 'second']), + json_encode(['op' => 3, 'd' => 'third']), + ]; + + $compressedMessages = array_map(fn ($p) => $this->compress($p), $payloads); + + foreach ($compressedMessages as $compressed) { + $half = intdiv(strlen($compressed), 2); + + $firstHalf = substr($compressed, 0, $half); + $secondHalf = substr($compressed, $half); + + $buffer->partialMessage($firstHalf); + $buffer->partialMessage($secondHalf); + } + + $this->assertCount(3, $messages); + $this->assertSame($payloads, $messages); + } +} diff --git a/tests/Gateway/ConnectionTest.php b/tests/Gateway/ConnectionTest.php index 165c5ef..ef7db43 100644 --- a/tests/Gateway/ConnectionTest.php +++ b/tests/Gateway/ConnectionTest.php @@ -15,6 +15,8 @@ use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; use Ragnarok\Fenrir\Bitwise\Bitwise; +use Ragnarok\Fenrir\Buffer\BufferInterface; +use Ragnarok\Fenrir\Buffer\Passthrough; use Ragnarok\Fenrir\Constants\MetaEvents; use Ragnarok\Fenrir\Constants\WebsocketEvents; use Ragnarok\Fenrir\DataMapper; @@ -66,47 +68,48 @@ public function testSequence(): void public function testConnect(): void { + $websocket = new WebsocketFake(); + $connection = new Connection( $this->getLoop(), '::token::', new Bitwise(), new DataMapper(new NullLogger()), - new WebsocketFake(), + $websocket, ); - /** @var MockInterface&Websocket */ - $websocket = Mockery::mock(Websocket::class); - (new ReflectionProperty($connection, 'websocket'))->setValue($connection, $websocket); - $websocket->expects() - ->open() - ->with('::ws url::?v=10') - ->andReturns(PromiseFake::get('::return::')) - ->once(); + await($connection->connect('::ws url::')); - $this->assertEquals('::return::', await($connection->connect('::ws url::'))); + $this->assertEquals(['::ws url::?v=10'], $websocket->openings); } public function testDisconnect(): void { + $buffer = new class () extends Passthrough { + public bool $hasReset = false; + + public function reset(): void + { + $this->hasReset = true; + } + }; + + $websocket = new WebsocketFake($buffer); + $connection = new Connection( $this->getLoop(), '::token::', new Bitwise(), new DataMapper(new NullLogger()), - new WebsocketFake(), + $websocket, ); - /** @var MockInterface&Websocket */ - $websocket = Mockery::mock(Websocket::class); - (new ReflectionProperty($connection, 'websocket'))->setValue($connection, $websocket); - - $websocket->expects() - ->close() - ->with(1234, '::reason::') - ->once(); - $connection->disconnect(1234, '::reason::'); + + $this->assertCount(1, $websocket->closings); + $this->assertEquals([1234, '::reason::'], $websocket->closings[0]); + $this->assertTrue($buffer->hasReset); } public function testSessionId(): void @@ -425,28 +428,19 @@ public function testItResumes(): void public function testOpen(): void { + $websocket = new WebsocketFake(); $connection = new Connection( $this->getLoop(), '::token::', new Bitwise(), new DataMapper(new NullLogger()), - new WebsocketFake(), + $websocket, ); - /** @var MockInterface&Websocket */ - $websocket = Mockery::mock(Websocket::class); - (new ReflectionProperty($connection, 'websocket'))->setValue($connection, $websocket); - - $websocket->expects() - ->open() - ->with(Mockery::on(function (string $url) { - $this->assertMatchesRegularExpression('/wss:\/\/gateway.discord.gg\/\?v=(\d+)/', $url); - - return true; - })) - ->once(); - $connection->open(); + + $this->assertCount(1, $websocket->openings); + $this->assertMatchesRegularExpression('/wss:\/\/gateway.discord.gg\/\?v=(\d+)/', $websocket->openings[0]); } public function testItEmitsGatewayMessagesAsEvents(): void @@ -483,14 +477,7 @@ public function testItEmitsGatewayMessagesAsEvents(): void return true; })); - /** @var MessageInterface&MockInterface */ - $message = Mockery::mock(MessageInterface::class); - $message->expects() - ->__toString() - ->andReturns('{"op": 1}') - ->once(); - - $websocket->emit(WebsocketEvents::MESSAGE, [$message]); + $websocket->emit(WebsocketEvents::MESSAGE, ['{"op": 1}']); } public function testItSendsPresenceUpdates()