Skip to content

Commit ed7d1a2

Browse files
committed
feat: fix types and refactoring
1 parent f62aa0a commit ed7d1a2

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+893
-716
lines changed

composer.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@
4949
},
5050
"extra": {
5151
"laminas": {
52-
"module": "RabbitMqModule"
52+
"component": "RabbitMqModule",
53+
"config-provider": "RabbitMqModule\\ConfigProvider"
5354
}
5455
},
5556
"scripts": {

config/module.config.php

Lines changed: 0 additions & 62 deletions
This file was deleted.

psalm.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
<?xml version="1.0"?>
22
<psalm
3-
errorLevel="5"
3+
errorLevel="1"
44
resolveFromConfigFile="true"
55
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
66
xmlns="https://getpsalm.org/schema/config"

src/BaseAmqp.php

Lines changed: 22 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -14,30 +14,20 @@
1414

1515
abstract class BaseAmqp implements SetupFabricAwareInterface
1616
{
17-
/** @var AbstractConnection */
18-
protected $connection;
17+
protected AbstractConnection $connection;
1918

20-
/** @var AMQPChannel|null */
21-
private $channel;
19+
private ?AMQPChannel $channel = null;
2220

23-
/** @var QueueOptions */
24-
protected $queueOptions;
21+
protected ?QueueOptions $queueOptions = null;
2522

26-
/** @var ExchangeOptions */
27-
protected $exchangeOptions;
23+
protected ?ExchangeOptions $exchangeOptions = null;
2824

29-
/** @var bool */
30-
protected $autoSetupFabricEnabled = true;
25+
protected bool $autoSetupFabricEnabled = true;
3126

32-
/** @var bool */
33-
protected $exchangeDeclared = false;
27+
protected bool $exchangeDeclared = false;
3428

35-
/** @var bool */
36-
protected $queueDeclared = false;
29+
protected bool $queueDeclared = false;
3730

38-
/**
39-
* @param AMQPChannel $channel
40-
*/
4131
public function __construct(AbstractConnection $connection, AMQPChannel $channel = null)
4232
{
4333
$this->connection = $connection;
@@ -68,17 +58,17 @@ public function getQueueOptions(): ?QueueOptions
6858
return $this->queueOptions;
6959
}
7060

71-
public function setQueueOptions(QueueOptions $queueOptions): void
61+
public function setQueueOptions(?QueueOptions $queueOptions): void
7262
{
7363
$this->queueOptions = $queueOptions;
7464
}
7565

76-
public function getExchangeOptions(): ExchangeOptions
66+
public function getExchangeOptions(): ?ExchangeOptions
7767
{
7868
return $this->exchangeOptions;
7969
}
8070

81-
public function setExchangeOptions(ExchangeOptions $exchangeOptions): void
71+
public function setExchangeOptions(?ExchangeOptions $exchangeOptions): void
8272
{
8373
$this->exchangeOptions = $exchangeOptions;
8474
}
@@ -95,16 +85,14 @@ public function setAutoSetupFabricEnabled(bool $autoSetupFabricEnabled): void
9585

9686
/**
9787
* Declare Exchange
98-
*
99-
* @param ExchangeOptions $options
10088
*/
101-
protected function declareExchange(ExchangeOptions $options = null): void
89+
protected function declareExchange(?ExchangeOptions $options = null): void
10290
{
10391
if (! $options) {
10492
$options = $this->getExchangeOptions();
10593
}
10694

107-
if (! $options->isDeclare()) {
95+
if (! $options || ! $options->isDeclare()) {
10896
return;
10997
}
11098

@@ -117,7 +105,7 @@ protected function declareExchange(ExchangeOptions $options = null): void
117105
$options->isDurable(),
118106
$options->isAutoDelete(),
119107
$options->isInternal(),
120-
$options->isNoWait(),
108+
false,
121109
$arguments ? new AMQPTable($arguments) : [],
122110
$options->getTicket()
123111
);
@@ -148,30 +136,33 @@ protected function declareQueue(): void
148136
{
149137
$queueOptions = $this->getQueueOptions();
150138

151-
if (! $queueOptions || null === $queueOptions->getName()) {
139+
if (! $queueOptions || '' === $queueOptions->getName()) {
152140
return;
153141
}
154142

155143
$exchangeOptions = $this->getExchangeOptions();
156144
$arguments = $queueOptions->getArguments();
157145

146+
/** @psalm-var non-empty-list<string> $result */
158147
$result = $this->getChannel()->queue_declare(
159148
$queueOptions->getName(),
160149
$queueOptions->isPassive(),
161150
$queueOptions->isDurable(),
162151
$queueOptions->isExclusive(),
163152
$queueOptions->isAutoDelete(),
164-
$queueOptions->isNoWait(),
153+
false,
165154
$arguments ? new AMQPTable($arguments) : [],
166155
$queueOptions->getTicket()
167156
);
168-
if (is_array($result)) {
169-
[$queueName] = $result;
170-
} else {
171-
$queueName = null;
157+
158+
[$queueName] = $result;
159+
160+
if (null === $exchangeOptions) {
161+
throw new \RuntimeException('Unable to create queue bindings: no exchange configuration provided');
172162
}
173163

174164
$routingKeys = $queueOptions->getRoutingKeys();
165+
175166
if (! count($routingKeys)) {
176167
$routingKeys = [''];
177168
}

src/BaseConsumer.php

Lines changed: 39 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,31 +5,44 @@
55
namespace RabbitMqModule;
66

77
use BadFunctionCallException;
8-
use function count;
8+
use PhpAmqpLib\Channel\AMQPChannel;
9+
use PhpAmqpLib\Connection\AbstractConnection;
10+
use RabbitMqModule\Options\Queue;
911
use function extension_loaded;
1012
use function function_exists;
11-
use Laminas\EventManager\EventManagerAwareInterface;
12-
use Laminas\EventManager\EventManagerAwareTrait;
1313
use PhpAmqpLib\Message\AMQPMessage;
1414

15-
abstract class BaseConsumer extends BaseAmqp implements EventManagerAwareInterface
15+
/**
16+
* @psalm-type ConsumerHandler = callable(AMQPMessage): (int|null|void)
17+
*/
18+
abstract class BaseConsumer extends BaseAmqp
1619
{
17-
use EventManagerAwareTrait;
20+
protected ?string $consumerTag = null;
1821

19-
/** @var null|string */
20-
protected $consumerTag;
21-
22-
/** @var callable */
22+
/**
23+
* @psalm-var callable(AMQPMessage): (int|void)
24+
* @var callable
25+
*/
2326
protected $callback;
2427

25-
/** @var bool */
26-
protected $forceStop = false;
28+
protected bool $forceStop = false;
29+
30+
protected int $idleTimeout = 0;
2731

28-
/** @var int */
29-
protected $idleTimeout = 0;
32+
protected bool $signalsEnabled = true;
3033

31-
/** @var bool */
32-
protected $signalsEnabled = true;
34+
protected string $queueName;
35+
36+
/**
37+
* @psalm-param callable(AMQPMessage): (int|void) $callback
38+
*/
39+
public function __construct(AbstractConnection $connection, Queue $queueOptions, callable $callback, AMQPChannel $channel = null)
40+
{
41+
parent::__construct($connection, $channel);
42+
$this->setQueueOptions($queueOptions);
43+
$this->queueName = $queueOptions->getName();
44+
$this->callback = $callback;
45+
}
3346

3447
public function isSignalsEnabled(): bool
3548
{
@@ -55,11 +68,19 @@ public function setConsumerTag(string $consumerTag): void
5568
$this->consumerTag = $consumerTag;
5669
}
5770

71+
/**
72+
* @psalm-return callable(AMQPMessage): (int|void)
73+
* @return callable
74+
*/
5875
public function getCallback(): callable
5976
{
6077
return $this->callback;
6178
}
6279

80+
/**
81+
* @param callable(AMQPMessage): (int|void) $callback
82+
* @return void
83+
*/
6384
public function setCallback(callable $callback): void
6485
{
6586
$this->callback = $callback;
@@ -82,7 +103,7 @@ public function start(): void
82103
{
83104
$this->setupConsumer();
84105

85-
while (count($this->getChannel()->callbacks)) {
106+
while ($this->getChannel()->is_consuming()) {
86107
$this->getChannel()->wait();
87108
}
88109
}
@@ -94,7 +115,7 @@ protected function setupConsumer(): void
94115
}
95116

96117
$this->getChannel()->basic_consume(
97-
$this->getQueueOptions()->getName(),
118+
$this->queueName,
98119
$this->getConsumerTag(),
99120
false,
100121
false,
@@ -107,12 +128,8 @@ protected function setupConsumer(): void
107128
/**
108129
* Sets the qos settings for the current channel
109130
* Consider that prefetchSize and global do not work with rabbitMQ version <= 8.0.
110-
*
111-
* @param int $prefetchSize
112-
* @param int $prefetchCount
113-
* @param bool $global
114131
*/
115-
public function setQosOptions($prefetchSize = 0, $prefetchCount = 0, $global = false): void
132+
public function setQosOptions(int $prefetchSize = 0, int $prefetchCount = 0, bool $global = false): void
116133
{
117134
$this->getChannel()->basic_qos($prefetchSize, $prefetchCount, $global);
118135
}

src/Command/ContainerAwareCommand.php

Lines changed: 0 additions & 18 deletions
This file was deleted.

src/Command/Factory/ContainerAwareCommandFactory.php

Lines changed: 0 additions & 34 deletions
This file was deleted.
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace RabbitMqModule\Command\Factory;
6+
7+
use Psr\Container\ContainerInterface;
8+
use RabbitMqModule\Command\ListConsumersCommand;
9+
use RabbitMqModule\ConfigProvider;
10+
11+
/**
12+
* @psalm-import-type ConfigArray from ConfigProvider
13+
*/
14+
final class ListConsumersCommandFactory
15+
{
16+
public function __invoke(ContainerInterface $container): ListConsumersCommand
17+
{
18+
/** @psalm-var ConfigArray $config */
19+
$config = $container->get('config');
20+
21+
return new ListConsumersCommand($config['rabbitmq']['consumer'] ?? []);
22+
}
23+
}

0 commit comments

Comments
 (0)