diff --git a/dist/.magento.env.yaml b/dist/.magento.env.yaml index 9ab1ee8d6c..b761cf8435 100644 --- a/dist/.magento.env.yaml +++ b/dist/.magento.env.yaml @@ -307,6 +307,24 @@ # number_of_shards: 3 # # number_of_replicas: 3 # ####################################################################################################################### +# CONSUMERS_WAIT_FOR_MAX_MESSAGES - use this variable to configure how consumers process messages # +# If this value is true, a consumer waits to process the number of messages # +# (max_messages) from the message queue specified in the CRONS_CONSUMERS_RUNNER # +# variable before closing the connection and terminating consumer process. If you # +# use workers to run consumers instead of using a cron job, set # +# the CONSUMERS_WAIT_FOR_MAX_MESSAGES variable to true. # +# If this value is false, consumers process available messages in the queue, close # +# the TCP connection and terminate. Consumers do not wait for additional messages # +# to enter the queue, even if the number of processed messages is less than # +# the max_messages value. # +# Magento Version: 2.2.0 and later # +# Default value: false # +# Stages: deploy # +# Example: # +# stage: # +# deploy: # +# CONSUMERS_WAIT_FOR_MAX_MESSAGES: true # +####################################################################################################################### # CRON_CONSUMERS_RUNNER - use this variable to make sure message queues are running after a deployment. # # By default, the deployment process overwrites all settings in the env.php file # # cron_run — a boolean value that enables or disables the consumers_runner cron job. # diff --git a/patches.json b/patches.json index 42b9032ec8..2a899a691b 100644 --- a/patches.json +++ b/patches.json @@ -176,6 +176,10 @@ "2.2.0": "MAGECLOUD-3806__error_code_fix_for_setup_upgrade__2.2.0.patch", "2.2.1 - 2.2.9": "MAGECLOUD-3806__error_code_fix_for_setup_upgrade__2.2.1.patch", "2.3.0 - 2.3.2": "MAGECLOUD-3806__error_code_fix_for_setup_upgrade__2.3.0.patch" + }, + "Re-work consumers to terminate as soon as there is nothing left to process": { + "2.2.0 - 2.3.1": "MAGECLOUD-4071__terminate_consumers_if_the_queue_is_empty__2.2.0.patch", + "2.3.2 - 2.3.3": "MAGECLOUD-4071__terminate_consumers_if_the_queue_is_empty__2.3.2.patch" } }, "monolog/monolog": { diff --git a/patches/MAGECLOUD-4071__terminate_consumers_if_the_queue_is_empty__2.2.0.patch b/patches/MAGECLOUD-4071__terminate_consumers_if_the_queue_is_empty__2.2.0.patch new file mode 100644 index 0000000000..2a019dd6fb --- /dev/null +++ b/patches/MAGECLOUD-4071__terminate_consumers_if_the_queue_is_empty__2.2.0.patch @@ -0,0 +1,170 @@ +diff -Naur a/vendor/magento/framework-message-queue/CallbackInvoker.php b/vendor/magento/framework-message-queue/CallbackInvoker.php +--- a/vendor/magento/framework-message-queue/CallbackInvoker.php ++++ b/vendor/magento/framework-message-queue/CallbackInvoker.php +@@ -6,11 +6,28 @@ + + namespace Magento\Framework\MessageQueue; + ++use Magento\Framework\App\DeploymentConfig; ++ + /** + * Class CallbackInvoker to invoke callbacks for consumer classes + */ + class CallbackInvoker + { ++ /** ++ * @var DeploymentConfig ++ */ ++ private $deploymentConfig; ++ ++ /** ++ * CallbackInvoker constructor. ++ * @param DeploymentConfig $deploymentConfig ++ */ ++ public function __construct( ++ DeploymentConfig $deploymentConfig ++ ) { ++ $this->deploymentConfig = $deploymentConfig; ++ } ++ + /** + * Run short running process + * +@@ -24,8 +41,23 @@ class CallbackInvoker + for ($i = $maxNumberOfMessages; $i > 0; $i--) { + do { + $message = $queue->dequeue(); +- } while ($message === null && (sleep(1) === 0)); ++ } while ($message === null && $this->isWaitingNextMessage() && (sleep(1) === 0)); ++ ++ if ($message === null) { ++ break; ++ } ++ + $callback($message); + } + } ++ ++ /** ++ * Checks if consumers should wait for message from the queue ++ * ++ * @return bool ++ */ ++ private function isWaitingNextMessage(): bool ++ { ++ return $this->deploymentConfig->get('queue/consumers_wait_for_messages', 1) === 1; ++ } + } +diff -Naur a/vendor/magento/module-message-queue/Setup/ConfigOptionsList.php b/vendor/magento/module-message-queue/Setup/ConfigOptionsList.php +new file mode 100644 +--- /dev/null ++++ b/vendor/magento/module-message-queue/Setup/ConfigOptionsList.php +@@ -0,0 +1,108 @@ ++selectOptions, ++ self::CONFIG_PATH_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES, ++ 'Should consumers wait for a message from the queue? 1 - Yes, 0 - No', ++ self::DEFAULT_CONSUMERS_WAIT_FOR_MESSAGES ++ ), ++ ]; ++ } ++ ++ /** ++ * @inheritdoc ++ * @SuppressWarnings(PHPMD.UnusedFormalParameter) ++ */ ++ public function createConfig(array $data, DeploymentConfig $deploymentConfig) ++ { ++ $configData = new ConfigData(ConfigFilePool::APP_ENV); ++ ++ if (!$this->isDataEmpty($data, self::INPUT_KEY_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES)) { ++ $configData->set( ++ self::CONFIG_PATH_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES, ++ (int)$data[self::INPUT_KEY_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES] ++ ); ++ } ++ ++ return [$configData]; ++ } ++ ++ /** ++ * @inheritdoc ++ */ ++ public function validate(array $options, DeploymentConfig $deploymentConfig) ++ { ++ $errors = []; ++ ++ if (!$this->isDataEmpty($options, self::INPUT_KEY_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES) ++ && !in_array($options[self::INPUT_KEY_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES], $this->selectOptions)) { ++ $errors[] = 'You can use only 1 or 0 for ' . self::INPUT_KEY_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES . ' option'; ++ } ++ ++ return $errors; ++ } ++ ++ /** ++ * Check if data ($data) with key ($key) is empty ++ * ++ * @param array $data ++ * @param string $key ++ * @return bool ++ */ ++ private function isDataEmpty(array $data, $key) ++ { ++ if (isset($data[$key]) && $data[$key] !== '') { ++ return false; ++ } ++ ++ return true; ++ } ++} diff --git a/patches/MAGECLOUD-4071__terminate_consumers_if_the_queue_is_empty__2.3.2.patch b/patches/MAGECLOUD-4071__terminate_consumers_if_the_queue_is_empty__2.3.2.patch new file mode 100644 index 0000000000..eb1b785546 --- /dev/null +++ b/patches/MAGECLOUD-4071__terminate_consumers_if_the_queue_is_empty__2.3.2.patch @@ -0,0 +1,181 @@ +diff -Naur a/vendor/magento/framework-message-queue/CallbackInvoker.php b/vendor/magento/framework-message-queue/CallbackInvoker.php +--- a/vendor/magento/framework-message-queue/CallbackInvoker.php ++++ b/vendor/magento/framework-message-queue/CallbackInvoker.php +@@ -8,6 +8,7 @@ namespace Magento\Framework\MessageQueue; + + use Magento\Framework\MessageQueue\PoisonPill\PoisonPillCompareInterface; + use Magento\Framework\MessageQueue\PoisonPill\PoisonPillReadInterface; ++use Magento\Framework\App\DeploymentConfig; + + /** + * Class CallbackInvoker to invoke callbacks for consumer classes +@@ -29,16 +30,24 @@ class CallbackInvoker implements CallbackInvokerInterface + */ + private $poisonPillCompare; + ++ /** ++ * @var DeploymentConfig ++ */ ++ private $deploymentConfig; ++ + /** + * @param PoisonPillReadInterface $poisonPillRead + * @param PoisonPillCompareInterface $poisonPillCompare ++ * @param DeploymentConfig $deploymentConfig + */ + public function __construct( + PoisonPillReadInterface $poisonPillRead, +- PoisonPillCompareInterface $poisonPillCompare ++ PoisonPillCompareInterface $poisonPillCompare, ++ DeploymentConfig $deploymentConfig + ) { + $this->poisonPillRead = $poisonPillRead; + $this->poisonPillCompare = $poisonPillCompare; ++ $this->deploymentConfig = $deploymentConfig; + } + + /** +@@ -56,13 +65,29 @@ class CallbackInvoker implements CallbackInvokerInterface + do { + $message = $queue->dequeue(); + // phpcs:ignore Magento2.Functions.DiscouragedFunction +- } while ($message === null && (sleep(1) === 0)); ++ } while ($message === null && $this->isWaitingNextMessage() && (sleep(1) === 0)); ++ ++ if ($message === null) { ++ break; ++ } ++ + if (false === $this->poisonPillCompare->isLatestVersion($this->poisonPillVersion)) { + $queue->reject($message); + // phpcs:ignore Magento2.Security.LanguageConstruct.ExitUsage + exit(0); + } ++ + $callback($message); + } + } ++ ++ /** ++ * Checks if consumers should wait for message from the queue ++ * ++ * @return bool ++ */ ++ private function isWaitingNextMessage(): bool ++ { ++ return $this->deploymentConfig->get('queue/consumers_wait_for_messages', 1) === 1; ++ } + } +diff -Naur a/vendor/magento/module-message-queue/Setup/ConfigOptionsList.php b/vendor/magento/module-message-queue/Setup/ConfigOptionsList.php +new file mode 100644 +--- /dev/null ++++ b/vendor/magento/module-message-queue/Setup/ConfigOptionsList.php +@@ -0,0 +1,108 @@ ++selectOptions, ++ self::CONFIG_PATH_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES, ++ 'Should consumers wait for a message from the queue? 1 - Yes, 0 - No', ++ self::DEFAULT_CONSUMERS_WAIT_FOR_MESSAGES ++ ), ++ ]; ++ } ++ ++ /** ++ * @inheritdoc ++ * @SuppressWarnings(PHPMD.UnusedFormalParameter) ++ */ ++ public function createConfig(array $data, DeploymentConfig $deploymentConfig) ++ { ++ $configData = new ConfigData(ConfigFilePool::APP_ENV); ++ ++ if (!$this->isDataEmpty($data, self::INPUT_KEY_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES)) { ++ $configData->set( ++ self::CONFIG_PATH_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES, ++ (int)$data[self::INPUT_KEY_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES] ++ ); ++ } ++ ++ return [$configData]; ++ } ++ ++ /** ++ * @inheritdoc ++ */ ++ public function validate(array $options, DeploymentConfig $deploymentConfig) ++ { ++ $errors = []; ++ ++ if (!$this->isDataEmpty($options, self::INPUT_KEY_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES) ++ && !in_array($options[self::INPUT_KEY_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES], $this->selectOptions)) { ++ $errors[] = 'You can use only 1 or 0 for ' . self::INPUT_KEY_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES . ' option'; ++ } ++ ++ return $errors; ++ } ++ ++ /** ++ * Check if data ($data) with key ($key) is empty ++ * ++ * @param array $data ++ * @param string $key ++ * @return bool ++ */ ++ private function isDataEmpty(array $data, $key) ++ { ++ if (isset($data[$key]) && $data[$key] !== '') { ++ return false; ++ } ++ ++ return true; ++ } ++} diff --git a/src/App/Container.php b/src/App/Container.php index 183d9466b9..beedcde3f2 100644 --- a/src/App/Container.php +++ b/src/App/Container.php @@ -390,7 +390,7 @@ function () { $this->container->when(CronKill::class) ->needs(ProcessInterface::class) - ->give(DeployProcess\CronProcessKill::class); + ->give(DeployProcess\BackgroundProcessKill::class); $this->container->when(ModuleRefresh::class) ->needs(ProcessInterface::class) ->give(BuildProcess\RefreshModules::class); diff --git a/src/Config/Schema.php b/src/Config/Schema.php index ce783ae80a..83611c6a16 100644 --- a/src/Config/Schema.php +++ b/src/Config/Schema.php @@ -433,6 +433,16 @@ public function getSchema() StageConfigInterface::STAGE_DEPLOY => [], ], ], + DeployInterface::VAR_CONSUMERS_WAIT_FOR_MAX_MESSAGES => [ + self::SCHEMA_TYPE => ['boolean'], + self::SCHEMA_STAGE => [ + StageConfigInterface::STAGE_GLOBAL, + StageConfigInterface::STAGE_DEPLOY + ], + self::SCHEMA_DEFAULT_VALUE => [ + StageConfigInterface::STAGE_DEPLOY => false, + ], + ], DeployInterface::VAR_ENABLE_GOOGLE_ANALYTICS => [ self::SCHEMA_TYPE => ['boolean'], self::SCHEMA_STAGE => [ diff --git a/src/Config/Stage/DeployInterface.php b/src/Config/Stage/DeployInterface.php index fcc4d30184..7cad63d161 100644 --- a/src/Config/Stage/DeployInterface.php +++ b/src/Config/Stage/DeployInterface.php @@ -20,6 +20,7 @@ interface DeployInterface extends StageConfigInterface const VAR_DATABASE_CONFIGURATION = 'DATABASE_CONFIGURATION'; const VAR_RESOURCE_CONFIGURATION = 'RESOURCE_CONFIGURATION'; const VAR_CRON_CONSUMERS_RUNNER = 'CRON_CONSUMERS_RUNNER'; + const VAR_CONSUMERS_WAIT_FOR_MAX_MESSAGES = 'CONSUMERS_WAIT_FOR_MAX_MESSAGES'; const VAR_CLEAN_STATIC_FILES = 'CLEAN_STATIC_FILES'; const VAR_STATIC_CONTENT_SYMLINK = 'STATIC_CONTENT_SYMLINK'; const VAR_UPDATE_URLS = 'UPDATE_URLS'; diff --git a/src/Process/Deploy/CronProcessKill.php b/src/Process/Deploy/BackgroundProcessKill.php similarity index 81% rename from src/Process/Deploy/CronProcessKill.php rename to src/Process/Deploy/BackgroundProcessKill.php index 3d7893e28c..ab87d5d1a8 100644 --- a/src/Process/Deploy/CronProcessKill.php +++ b/src/Process/Deploy/BackgroundProcessKill.php @@ -10,9 +10,9 @@ use Psr\Log\LoggerInterface; /** - * Kills all running Magento cron processes + * Kills all running Magento cron and consumers processes */ -class CronProcessKill implements ProcessInterface +class BackgroundProcessKill implements ProcessInterface { /** * @var LoggerInterface @@ -37,16 +37,16 @@ public function __construct( } /** - * Kills all running Magento cron jobs. + * Kills all running Magento cron jobs and consumers processes. * * @return void */ public function execute() { try { - $this->logger->info('Trying to kill running cron jobs'); + $this->logger->info('Trying to kill running cron jobs and consumers processes'); - $process = $this->shell->execute('pgrep -U "$(id -u)" -f "bin/magento cron:run"'); + $process = $this->shell->execute('pgrep -U "$(id -u)" -f "bin/magento +(cron:run|queue:consumers:start)"'); $cronPids = array_filter(explode(PHP_EOL, $process->getOutput())); foreach ($cronPids as $pid) { @@ -55,9 +55,9 @@ public function execute() } catch (\RuntimeException $e) { // pgrep returns 1 when no processes matched. Returns 2 and 3 in case of error if ($e->getCode() === 1) { - $this->logger->info('Running Magento cron processes were not found.'); + $this->logger->info('Running Magento cron and consumers processes were not found.'); } else { - $this->logger->warning('Error happening during kill cron: ' . $e->getMessage()); + $this->logger->warning('Error happening during kill cron or consumers processes: ' . $e->getMessage()); } } } diff --git a/src/Process/Deploy/DisableCron.php b/src/Process/Deploy/DisableCron.php index 7ad6af7724..d6a00885d8 100644 --- a/src/Process/Deploy/DisableCron.php +++ b/src/Process/Deploy/DisableCron.php @@ -15,9 +15,9 @@ class DisableCron implements ProcessInterface { /** - * @var CronProcessKill + * @var BackgroundProcessKill */ - private $cronProcessKill; + private $backgroundProcessKill; /** * @var Writer @@ -30,16 +30,16 @@ class DisableCron implements ProcessInterface private $logger; /** - * @param CronProcessKill $cronProcessKill + * @param BackgroundProcessKill $backgroundProcessKill * @param LoggerInterface $logger * @param Writer $deployConfigWriter */ public function __construct( - CronProcessKill $cronProcessKill, + BackgroundProcessKill $backgroundProcessKill, LoggerInterface $logger, Writer $deployConfigWriter ) { - $this->cronProcessKill = $cronProcessKill; + $this->backgroundProcessKill = $backgroundProcessKill; $this->logger = $logger; $this->writer = $deployConfigWriter; } @@ -55,6 +55,6 @@ public function execute() $this->logger->info('Disable cron'); $this->writer->update(['cron' => ['enabled' => 0]]); - $this->cronProcessKill->execute(); + $this->backgroundProcessKill->execute(); } } diff --git a/src/Process/Deploy/InstallUpdate/ConfigUpdate/Amqp/Config.php b/src/Process/Deploy/InstallUpdate/ConfigUpdate/Amqp/Config.php index dbadd46e21..4b86af5a65 100644 --- a/src/Process/Deploy/InstallUpdate/ConfigUpdate/Amqp/Config.php +++ b/src/Process/Deploy/InstallUpdate/ConfigUpdate/Amqp/Config.php @@ -8,6 +8,7 @@ use Magento\MagentoCloud\Config\ConfigMerger; use Magento\MagentoCloud\Config\Stage\DeployInterface; use Magento\MagentoCloud\Service\RabbitMq; +use Magento\MagentoCloud\Package\MagentoVersion; /** * Returns queue configuration. @@ -29,27 +30,54 @@ class Config */ private $configMerger; + /** + * @var MagentoVersion + */ + private $magentoVersion; + /** * @param RabbitMq $rabbitMQ * @param DeployInterface $stageConfig * @param ConfigMerger $configMerger + * @param MagentoVersion $magentoVersion */ public function __construct( RabbitMq $rabbitMQ, DeployInterface $stageConfig, - ConfigMerger $configMerger + ConfigMerger $configMerger, + MagentoVersion $magentoVersion ) { $this->rabbitMQ = $rabbitMQ; $this->stageConfig = $stageConfig; $this->configMerger = $configMerger; + $this->magentoVersion = $magentoVersion; } /** * Returns queue configuration * * @return array + * @throws \Magento\MagentoCloud\Package\UndefinedPackageException */ public function get(): array + { + $config = $this->getConfig(); + + if ($this->magentoVersion->isGreaterOrEqual('2.2')) { + $config['consumers_wait_for_messages'] = $this->stageConfig->get( + DeployInterface::VAR_CONSUMERS_WAIT_FOR_MAX_MESSAGES + ) ? 1 : 0; + } + + return $config; + } + + /** + * Returns merged queue configuration + * + * @return array + */ + private function getConfig(): array { $envQueueConfig = $this->stageConfig->get(DeployInterface::VAR_QUEUE_CONFIGURATION); $mqConfig = $this->getAmqpConfig(); diff --git a/src/Test/Unit/Config/SchemaTest.php b/src/Test/Unit/Config/SchemaTest.php index 674c58ee87..9e12a25ba2 100644 --- a/src/Test/Unit/Config/SchemaTest.php +++ b/src/Test/Unit/Config/SchemaTest.php @@ -79,6 +79,7 @@ public function testGetDefaultsForDeploy() DeployInterface::VAR_SCD_MATRIX => [], DeployInterface::VAR_RESOURCE_CONFIGURATION => [], DeployInterface::VAR_LOCK_PROVIDER => 'file', + DeployInterface::VAR_CONSUMERS_WAIT_FOR_MAX_MESSAGES => false, ], $this->schema->getDefaults(StageConfigInterface::STAGE_DEPLOY) ); diff --git a/src/Test/Unit/Process/Deploy/CronProcessKillTest.php b/src/Test/Unit/Process/Deploy/BackgroundProcessKillTest.php similarity index 76% rename from src/Test/Unit/Process/Deploy/CronProcessKillTest.php rename to src/Test/Unit/Process/Deploy/BackgroundProcessKillTest.php index e9441ca622..710e0aeb90 100644 --- a/src/Test/Unit/Process/Deploy/CronProcessKillTest.php +++ b/src/Test/Unit/Process/Deploy/BackgroundProcessKillTest.php @@ -5,7 +5,7 @@ */ namespace Magento\MagentoCloud\Test\Unit\Process\Deploy; -use Magento\MagentoCloud\Process\Deploy\CronProcessKill; +use Magento\MagentoCloud\Process\Deploy\BackgroundProcessKill; use Magento\MagentoCloud\Shell\ProcessInterface; use Magento\MagentoCloud\Shell\ShellInterface; use Psr\Log\LoggerInterface; @@ -13,12 +13,12 @@ use PHPUnit\Framework\MockObject\MockObject; /** - * Test for Magento\MagentoCloud\Process\Deploy\CronProcessKill process + * Test for Magento\MagentoCloud\Process\Deploy\BackgroundProcessKill process */ -class CronProcessKillTest extends TestCase +class BackgroundProcessKillTest extends TestCase { /** - * @var CronProcessKill + * @var BackgroundProcessKill */ private $process; @@ -40,7 +40,7 @@ protected function setUp() $this->loggerMock = $this->getMockForAbstractClass(LoggerInterface::class); $this->shellMock = $this->createMock(ShellInterface::class); - $this->process = new CronProcessKill( + $this->process = new BackgroundProcessKill( $this->loggerMock, $this->shellMock ); @@ -59,12 +59,12 @@ public function testExecute() $this->loggerMock->expects($this->once()) ->method('info') - ->with('Trying to kill running cron jobs'); + ->with('Trying to kill running cron jobs and consumers processes'); $this->shellMock->expects($this->exactly(3)) ->method('execute') ->willReturnMap( [ - ['pgrep -U "$(id -u)" -f "bin/magento cron:run"', [], $processMock1], + ['pgrep -U "$(id -u)" -f "bin/magento +(cron:run|queue:consumers:start)"', [], $processMock1], ["kill 111", [], $processMock2], ["kill 222", [], $processMock2], ] @@ -80,12 +80,12 @@ public function testExecuteWithNoRunningCrons() $this->loggerMock->expects($this->atLeastOnce()) ->method('info') ->withConsecutive( - ['Trying to kill running cron jobs'], - ['Running Magento cron processes were not found.'] + ['Trying to kill running cron jobs and consumers processes'], + ['Running Magento cron and consumers processes were not found.'] ); $this->shellMock->expects($this->once()) ->method('execute') - ->with('pgrep -U "$(id -u)" -f "bin/magento cron:run"') + ->with('pgrep -U "$(id -u)" -f "bin/magento +(cron:run|queue:consumers:start)"') ->willThrowException(new \RuntimeException('return code 1', 1)); $this->process->execute(); } @@ -97,14 +97,14 @@ public function testExecuteWithError() { $this->loggerMock->expects($this->once()) ->method('info') - ->with('Trying to kill running cron jobs'); + ->with('Trying to kill running cron jobs and consumers processes'); $this->shellMock->expects($this->once()) ->method('execute') - ->with('pgrep -U "$(id -u)" -f "bin/magento cron:run"') + ->with('pgrep -U "$(id -u)" -f "bin/magento +(cron:run|queue:consumers:start)"') ->willThrowException(new \RuntimeException('return code 2', 2)); $this->loggerMock->expects($this->once()) ->method('warning') - ->with('Error happening during kill cron: return code 2'); + ->with('Error happening during kill cron or consumers processes: return code 2'); $this->process->execute(); } @@ -123,7 +123,7 @@ public function testExecuteWithExeption() $this->loggerMock->expects($this->exactly(2)) ->method('info') ->withConsecutive( - ['Trying to kill running cron jobs'], + ['Trying to kill running cron jobs and consumers processes'], ['Couldn\'t kill process #111 it may be already finished'] ); $this->loggerMock->expects($this->once()) @@ -131,7 +131,7 @@ public function testExecuteWithExeption() ->with('some error'); $this->shellMock->expects($this->at(0)) ->method('execute') - ->with('pgrep -U "$(id -u)" -f "bin/magento cron:run"') + ->with('pgrep -U "$(id -u)" -f "bin/magento +(cron:run|queue:consumers:start)"') ->willReturn($processMock); $this->shellMock->expects($this->at(1)) ->method('execute') diff --git a/src/Test/Unit/Process/Deploy/DisableCronTest.php b/src/Test/Unit/Process/Deploy/DisableCronTest.php index 5fce731dad..8882edd3a1 100644 --- a/src/Test/Unit/Process/Deploy/DisableCronTest.php +++ b/src/Test/Unit/Process/Deploy/DisableCronTest.php @@ -6,7 +6,7 @@ namespace Magento\MagentoCloud\Test\Unit\Process\Deploy; use Magento\MagentoCloud\Config\Deploy\Writer; -use Magento\MagentoCloud\Process\Deploy\CronProcessKill; +use Magento\MagentoCloud\Process\Deploy\BackgroundProcessKill; use Magento\MagentoCloud\Process\Deploy\DisableCron; use Psr\Log\LoggerInterface; use PHPUnit\Framework\TestCase; @@ -33,9 +33,9 @@ class DisableCronTest extends TestCase private $writerMock; /** - * @var CronProcessKill|MockObject + * @var BackgroundProcessKill|MockObject */ - private $cronProcessKillMock; + private $backgroundProcessKillMock; /** * Setup the test environment. @@ -43,11 +43,11 @@ class DisableCronTest extends TestCase protected function setUp() { $this->loggerMock = $this->getMockForAbstractClass(LoggerInterface::class); - $this->cronProcessKillMock = $this->createMock(CronProcessKill::class); + $this->backgroundProcessKillMock = $this->createMock(BackgroundProcessKill::class); $this->writerMock = $this->createMock(Writer::class); $this->process = new DisableCron( - $this->cronProcessKillMock, + $this->backgroundProcessKillMock, $this->loggerMock, $this->writerMock ); @@ -62,7 +62,7 @@ public function testExecute() $this->writerMock->expects($this->once()) ->method('update') ->with($config); - $this->cronProcessKillMock->expects($this->once()) + $this->backgroundProcessKillMock->expects($this->once()) ->method('execute'); $this->process->execute(); } diff --git a/src/Test/Unit/Process/Deploy/InstallUpdate/ConfigUpdate/Amqp/ConfigTest.php b/src/Test/Unit/Process/Deploy/InstallUpdate/ConfigUpdate/Amqp/ConfigTest.php index 3eea88aaad..e3fe4173a5 100644 --- a/src/Test/Unit/Process/Deploy/InstallUpdate/ConfigUpdate/Amqp/ConfigTest.php +++ b/src/Test/Unit/Process/Deploy/InstallUpdate/ConfigUpdate/Amqp/ConfigTest.php @@ -6,6 +6,7 @@ namespace Magento\MagentoCloud\Test\Unit\Process\Deploy\InstallUpdate\ConfigUpdate\Amqp; use Magento\MagentoCloud\Config\ConfigMerger; +use Magento\MagentoCloud\Package\MagentoVersion; use Magento\MagentoCloud\Config\Stage\DeployInterface; use Magento\MagentoCloud\Process\Deploy\InstallUpdate\ConfigUpdate\Amqp\Config; use Magento\MagentoCloud\Service\RabbitMq; @@ -32,6 +33,11 @@ class ConfigTest extends TestCase */ private $stageConfigMock; + /** + * @var MagentoVersion|Mock + */ + private $magentoVersionMock; + /** * @inheritdoc */ @@ -39,17 +45,21 @@ protected function setUp() { $this->rabbitMq = $this->createMock(RabbitMq::class); $this->stageConfigMock = $this->getMockForAbstractClass(DeployInterface::class); + $this->magentoVersionMock = $this->createMock(MagentoVersion::class); $this->config = new Config( $this->rabbitMq, $this->stageConfigMock, - new ConfigMerger() + new ConfigMerger(), + $this->magentoVersionMock ); } /** * @param array $customQueueConfig * @param array $amqpServiceConfig + * @param bool $isGreaterOrEqualReturns + * @param bool $consumersWaitMaxMessages * @param array $expectedQueueConfig * @dataProvider getDataProvider * @return void @@ -57,15 +67,25 @@ protected function setUp() public function testGet( $customQueueConfig, $amqpServiceConfig, + $isGreaterOrEqualReturns, + $consumersWaitMaxMessages, + $countCallGetConfig, $expectedQueueConfig ) { - $this->stageConfigMock->expects($this->once()) + $this->stageConfigMock->expects($this->exactly($countCallGetConfig)) ->method('get') - ->with(DeployInterface::VAR_QUEUE_CONFIGURATION) - ->willReturn($customQueueConfig); + ->withConsecutive( + [DeployInterface::VAR_QUEUE_CONFIGURATION], + [DeployInterface::VAR_CONSUMERS_WAIT_FOR_MAX_MESSAGES] + ) + ->willReturnOnConsecutiveCalls($customQueueConfig, $consumersWaitMaxMessages); $this->rabbitMq->expects($this->once()) ->method('getConfiguration') ->willReturn($amqpServiceConfig); + $this->magentoVersionMock->expects($this->once()) + ->method('isGreaterOrEqual') + ->with('2.2') + ->willReturn($isGreaterOrEqualReturns); $this->assertEquals($expectedQueueConfig, $this->config->get()); } @@ -80,8 +100,19 @@ public function getDataProvider(): array 'queue configuration does not exist' => [ 'customQueueConfig' => [], 'amqpServiceConfig' => [], + 'isGreaterOrEqualReturns' => false, + 'consumersWaitMaxMessages' => false, + 'countCallGetConfig' => 1, 'expectedQueueConfig' => [], ], + 'queue configuration does not exist and Magento >= 2.2.0' => [ + 'customQueueConfig' => [], + 'amqpServiceConfig' => [], + 'isGreaterOrEqualReturns' => true, + 'consumersWaitMaxMessages' => true, + 'countCallGetConfig' => 2, + 'expectedQueueConfig' => ['consumers_wait_for_messages' => 1], + ], 'only custom queue configuration exists' => [ 'customQueueConfig' => [ 'amqp' => [ @@ -93,6 +124,33 @@ public function getDataProvider(): array ], ], 'amqpServiceConfig' => [], + 'isGreaterOrEqualReturns' => false, + 'consumersWaitMaxMessages' => false, + 'countCallGetConfig' => 1, + 'expectedQueueConfig' => [ + 'amqp' => [ + 'host' => 'custom_host', + 'port' => 3333, + 'user' => 'custom_user', + 'password' => 'custom_password', + 'virtualhost' => 'custom_vhost', + ], + ], + ], + 'only custom queue configuration exists and Magento >= 2.2.0' => [ + 'customQueueConfig' => [ + 'amqp' => [ + 'host' => 'custom_host', + 'port' => 3333, + 'user' => 'custom_user', + 'password' => 'custom_password', + 'virtualhost' => 'custom_vhost', + ], + ], + 'amqpServiceConfig' => [], + 'isGreaterOrEqualReturns' => true, + 'consumersWaitMaxMessages' => false, + 'countCallGetConfig' => 2, 'expectedQueueConfig' => [ 'amqp' => [ 'host' => 'custom_host', @@ -101,6 +159,7 @@ public function getDataProvider(): array 'password' => 'custom_password', 'virtualhost' => 'custom_vhost', ], + 'consumers_wait_for_messages' => 0 ], ], 'custom and relationship queue configurations exists without merge' => [ @@ -120,6 +179,9 @@ public function getDataProvider(): array 'password' => 'qwerty', 'vhost' => '/' ], + 'isGreaterOrEqualReturns' => false, + 'consumersWaitMaxMessages' => false, + 'countCallGetConfig' => 1, 'expectedQueueConfig' => [ 'amqp' => [ 'host' => 'custom_host', @@ -130,6 +192,37 @@ public function getDataProvider(): array ] ], ], + 'custom and relationship queue configurations exists without merge and Magento >= 2.2.0' => [ + 'customQueueConfig' => [ + 'amqp' => [ + 'host' => 'custom_host', + 'port' => 3333, + 'user' => 'custom_user', + 'password' => 'custom_password', + 'virtualhost' => 'custom_vhost', + ], + ], + 'amqpServiceConfig' => [ + 'host' => 'localhost', + 'port' => 5538, + 'username' => 'johndoe', + 'password' => 'qwerty', + 'vhost' => '/' + ], + 'isGreaterOrEqualReturns' => true, + 'consumersWaitMaxMessages' => true, + 'countCallGetConfig' => 2, + 'expectedQueueConfig' => [ + 'amqp' => [ + 'host' => 'custom_host', + 'port' => 3333, + 'user' => 'custom_user', + 'password' => 'custom_password', + 'virtualhost' => 'custom_vhost', + ], + 'consumers_wait_for_messages' => 1 + ], + ], 'custom and relationship queue configurations exists with merge' => [ 'customQueueConfig' => [ 'amqp' => [ @@ -146,6 +239,9 @@ public function getDataProvider(): array 'password' => 'qwerty', 'vhost' => '/' ], + 'isGreaterOrEqualReturns' => false, + 'consumersWaitMaxMessages' => false, + 'countCallGetConfig' => 1, 'expectedQueueConfig' => [ 'amqp' => [ 'host' => 'localhost', @@ -156,6 +252,36 @@ public function getDataProvider(): array ] ], ], + 'custom and relationship queue configurations exists with merge and Magento >= 2.2.0' => [ + 'customQueueConfig' => [ + 'amqp' => [ + 'user' => 'custom_user', + 'password' => 'custom_password', + 'virtualhost' => 'custom_vhost', + ], + '_merge' => true, + ], + 'amqpServiceConfig' => [ + 'host' => 'localhost', + 'port' => 5538, + 'username' => 'johndoe', + 'password' => 'qwerty', + 'vhost' => '/' + ], + 'isGreaterOrEqualReturns' => true, + 'consumersWaitMaxMessages' => false, + 'countCallGetConfig' => 2, + 'expectedQueueConfig' => [ + 'amqp' => [ + 'host' => 'localhost', + 'port' => 5538, + 'user' => 'custom_user', + 'password' => 'custom_password', + 'virtualhost' => 'custom_vhost', + ], + 'consumers_wait_for_messages' => 0 + ], + ], 'only relationships queue configuration exists' => [ 'customQueueConfig' => [], 'amqpServiceConfig' => [ @@ -165,6 +291,9 @@ public function getDataProvider(): array 'password' => 'qwerty', 'vhost' => '/' ], + 'isGreaterOrEqualReturns' => false, + 'consumersWaitMaxMessages' => false, + 'countCallGetConfig' => 1, 'expectedQueueConfig' => [ 'amqp' => [ 'host' => 'localhost', @@ -175,6 +304,29 @@ public function getDataProvider(): array ] ], ], + 'only relationships queue configuration exists and Magento >= 2.2.0' => [ + 'customQueueConfig' => [], + 'amqpServiceConfig' => [ + 'host' => 'localhost', + 'port' => 5538, + 'username' => 'johndoe', + 'password' => 'qwerty', + 'vhost' => '/' + ], + 'isGreaterOrEqualReturns' => true, + 'consumersWaitMaxMessages' => true, + 'countCallGetConfig' => 2, + 'expectedQueueConfig' => [ + 'amqp' => [ + 'host' => 'localhost', + 'port' => 5538, + 'user' => 'johndoe', + 'password' => 'qwerty', + 'virtualhost' => '/', + ], + 'consumers_wait_for_messages' => 1 + ], + ], ]; } }