Skip to content

Commit

Permalink
MAGECLOUD-4071: Re-work consumers to terminate as soon as there is no…
Browse files Browse the repository at this point in the history
…thing left to process (#594)
  • Loading branch information
BaDos authored and shiftedreality committed Aug 27, 2019
1 parent ec1f037 commit 68f81e0
Show file tree
Hide file tree
Showing 14 changed files with 605 additions and 40 deletions.
18 changes: 18 additions & 0 deletions dist/.magento.env.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,24 @@
# number_of_shards: 3 #
# number_of_replicas: 3 #
#######################################################################################################################
# CONSUMERS_WAIT_FOR_MAX_MESSAGES - use this variable to configure how consumers process messages #
# If this value is true, a consumer waits to process the number of messages #
# (max_messages) from the message queue specified in the CRONS_CONSUMERS_RUNNER #
# variable before closing the connection and terminating consumer process. If you #
# use workers to run consumers instead of using a cron job, set #
# the CONSUMERS_WAIT_FOR_MAX_MESSAGES variable to true. #
# If this value is false, consumers process available messages in the queue, close #
# the TCP connection and terminate. Consumers do not wait for additional messages #
# to enter the queue, even if the number of processed messages is less than #
# the max_messages value. #
# Magento Version: 2.2.0 and later #
# Default value: false #
# Stages: deploy #
# Example: #
# stage: #
# deploy: #
# CONSUMERS_WAIT_FOR_MAX_MESSAGES: true #
#######################################################################################################################
# CRON_CONSUMERS_RUNNER - use this variable to make sure message queues are running after a deployment. #
# By default, the deployment process overwrites all settings in the env.php file #
# cron_run — a boolean value that enables or disables the consumers_runner cron job. #
Expand Down
4 changes: 4 additions & 0 deletions patches.json
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@
"2.2.0": "MAGECLOUD-3806__error_code_fix_for_setup_upgrade__2.2.0.patch",
"2.2.1 - 2.2.9": "MAGECLOUD-3806__error_code_fix_for_setup_upgrade__2.2.1.patch",
"2.3.0 - 2.3.2": "MAGECLOUD-3806__error_code_fix_for_setup_upgrade__2.3.0.patch"
},
"Re-work consumers to terminate as soon as there is nothing left to process": {
"2.2.0 - 2.3.1": "MAGECLOUD-4071__terminate_consumers_if_the_queue_is_empty__2.2.0.patch",
"2.3.2 - 2.3.3": "MAGECLOUD-4071__terminate_consumers_if_the_queue_is_empty__2.3.2.patch"
}
},
"monolog/monolog": {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
diff -Naur a/vendor/magento/framework-message-queue/CallbackInvoker.php b/vendor/magento/framework-message-queue/CallbackInvoker.php
--- a/vendor/magento/framework-message-queue/CallbackInvoker.php
+++ b/vendor/magento/framework-message-queue/CallbackInvoker.php
@@ -6,11 +6,28 @@

namespace Magento\Framework\MessageQueue;

+use Magento\Framework\App\DeploymentConfig;
+
/**
* Class CallbackInvoker to invoke callbacks for consumer classes
*/
class CallbackInvoker
{
+ /**
+ * @var DeploymentConfig
+ */
+ private $deploymentConfig;
+
+ /**
+ * CallbackInvoker constructor.
+ * @param DeploymentConfig $deploymentConfig
+ */
+ public function __construct(
+ DeploymentConfig $deploymentConfig
+ ) {
+ $this->deploymentConfig = $deploymentConfig;
+ }
+
/**
* Run short running process
*
@@ -24,8 +41,23 @@ class CallbackInvoker
for ($i = $maxNumberOfMessages; $i > 0; $i--) {
do {
$message = $queue->dequeue();
- } while ($message === null && (sleep(1) === 0));
+ } while ($message === null && $this->isWaitingNextMessage() && (sleep(1) === 0));
+
+ if ($message === null) {
+ break;
+ }
+
$callback($message);
}
}
+
+ /**
+ * Checks if consumers should wait for message from the queue
+ *
+ * @return bool
+ */
+ private function isWaitingNextMessage(): bool
+ {
+ return $this->deploymentConfig->get('queue/consumers_wait_for_messages', 1) === 1;
+ }
}
diff -Naur a/vendor/magento/module-message-queue/Setup/ConfigOptionsList.php b/vendor/magento/module-message-queue/Setup/ConfigOptionsList.php
new file mode 100644
--- /dev/null
+++ b/vendor/magento/module-message-queue/Setup/ConfigOptionsList.php
@@ -0,0 +1,108 @@
+<?php
+/**
+ * Copyright © Magento, Inc. All rights reserved.
+ * See COPYING.txt for license details.
+ */
+declare(strict_types=1);
+
+namespace Magento\MessageQueue\Setup;
+
+use Magento\Framework\Setup\ConfigOptionsListInterface;
+use Magento\Framework\Setup\Option\SelectConfigOption;
+use Magento\Framework\App\DeploymentConfig;
+use Magento\Framework\Config\Data\ConfigData;
+use Magento\Framework\Config\File\ConfigFilePool;
+
+/**
+ * Deployment configuration consumers options needed for Setup application
+ */
+class ConfigOptionsList implements ConfigOptionsListInterface
+{
+ /**
+ * Input key for the option
+ */
+ const INPUT_KEY_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES ='consumers-wait-for-messages';
+
+ /**
+ * Path to the value in the deployment config
+ */
+ const CONFIG_PATH_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES = 'queue/consumers_wait_for_messages';
+
+ /**
+ * Default value
+ */
+ const DEFAULT_CONSUMERS_WAIT_FOR_MESSAGES = 1;
+
+ /**
+ * The available configuration values
+ *
+ * @var array
+ */
+ private $selectOptions = [0, 1];
+
+ /**
+ * @inheritdoc
+ */
+ public function getOptions()
+ {
+ return [
+ new SelectConfigOption(
+ self::INPUT_KEY_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES,
+ SelectConfigOption::FRONTEND_WIZARD_SELECT,
+ $this->selectOptions,
+ self::CONFIG_PATH_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES,
+ 'Should consumers wait for a message from the queue? 1 - Yes, 0 - No',
+ self::DEFAULT_CONSUMERS_WAIT_FOR_MESSAGES
+ ),
+ ];
+ }
+
+ /**
+ * @inheritdoc
+ * @SuppressWarnings(PHPMD.UnusedFormalParameter)
+ */
+ public function createConfig(array $data, DeploymentConfig $deploymentConfig)
+ {
+ $configData = new ConfigData(ConfigFilePool::APP_ENV);
+
+ if (!$this->isDataEmpty($data, self::INPUT_KEY_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES)) {
+ $configData->set(
+ self::CONFIG_PATH_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES,
+ (int)$data[self::INPUT_KEY_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES]
+ );
+ }
+
+ return [$configData];
+ }
+
+ /**
+ * @inheritdoc
+ */
+ public function validate(array $options, DeploymentConfig $deploymentConfig)
+ {
+ $errors = [];
+
+ if (!$this->isDataEmpty($options, self::INPUT_KEY_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES)
+ && !in_array($options[self::INPUT_KEY_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES], $this->selectOptions)) {
+ $errors[] = 'You can use only 1 or 0 for ' . self::INPUT_KEY_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES . ' option';
+ }
+
+ return $errors;
+ }
+
+ /**
+ * Check if data ($data) with key ($key) is empty
+ *
+ * @param array $data
+ * @param string $key
+ * @return bool
+ */
+ private function isDataEmpty(array $data, $key)
+ {
+ if (isset($data[$key]) && $data[$key] !== '') {
+ return false;
+ }
+
+ return true;
+ }
+}
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
diff -Naur a/vendor/magento/framework-message-queue/CallbackInvoker.php b/vendor/magento/framework-message-queue/CallbackInvoker.php
--- a/vendor/magento/framework-message-queue/CallbackInvoker.php
+++ b/vendor/magento/framework-message-queue/CallbackInvoker.php
@@ -8,6 +8,7 @@ namespace Magento\Framework\MessageQueue;

use Magento\Framework\MessageQueue\PoisonPill\PoisonPillCompareInterface;
use Magento\Framework\MessageQueue\PoisonPill\PoisonPillReadInterface;
+use Magento\Framework\App\DeploymentConfig;

/**
* Class CallbackInvoker to invoke callbacks for consumer classes
@@ -29,16 +30,24 @@ class CallbackInvoker implements CallbackInvokerInterface
*/
private $poisonPillCompare;

+ /**
+ * @var DeploymentConfig
+ */
+ private $deploymentConfig;
+
/**
* @param PoisonPillReadInterface $poisonPillRead
* @param PoisonPillCompareInterface $poisonPillCompare
+ * @param DeploymentConfig $deploymentConfig
*/
public function __construct(
PoisonPillReadInterface $poisonPillRead,
- PoisonPillCompareInterface $poisonPillCompare
+ PoisonPillCompareInterface $poisonPillCompare,
+ DeploymentConfig $deploymentConfig
) {
$this->poisonPillRead = $poisonPillRead;
$this->poisonPillCompare = $poisonPillCompare;
+ $this->deploymentConfig = $deploymentConfig;
}

/**
@@ -56,13 +65,29 @@ class CallbackInvoker implements CallbackInvokerInterface
do {
$message = $queue->dequeue();
// phpcs:ignore Magento2.Functions.DiscouragedFunction
- } while ($message === null && (sleep(1) === 0));
+ } while ($message === null && $this->isWaitingNextMessage() && (sleep(1) === 0));
+
+ if ($message === null) {
+ break;
+ }
+
if (false === $this->poisonPillCompare->isLatestVersion($this->poisonPillVersion)) {
$queue->reject($message);
// phpcs:ignore Magento2.Security.LanguageConstruct.ExitUsage
exit(0);
}
+
$callback($message);
}
}
+
+ /**
+ * Checks if consumers should wait for message from the queue
+ *
+ * @return bool
+ */
+ private function isWaitingNextMessage(): bool
+ {
+ return $this->deploymentConfig->get('queue/consumers_wait_for_messages', 1) === 1;
+ }
}
diff -Naur a/vendor/magento/module-message-queue/Setup/ConfigOptionsList.php b/vendor/magento/module-message-queue/Setup/ConfigOptionsList.php
new file mode 100644
--- /dev/null
+++ b/vendor/magento/module-message-queue/Setup/ConfigOptionsList.php
@@ -0,0 +1,108 @@
+<?php
+/**
+ * Copyright © Magento, Inc. All rights reserved.
+ * See COPYING.txt for license details.
+ */
+declare(strict_types=1);
+
+namespace Magento\MessageQueue\Setup;
+
+use Magento\Framework\Setup\ConfigOptionsListInterface;
+use Magento\Framework\Setup\Option\SelectConfigOption;
+use Magento\Framework\App\DeploymentConfig;
+use Magento\Framework\Config\Data\ConfigData;
+use Magento\Framework\Config\File\ConfigFilePool;
+
+/**
+ * Deployment configuration consumers options needed for Setup application
+ */
+class ConfigOptionsList implements ConfigOptionsListInterface
+{
+ /**
+ * Input key for the option
+ */
+ const INPUT_KEY_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES ='consumers-wait-for-messages';
+
+ /**
+ * Path to the value in the deployment config
+ */
+ const CONFIG_PATH_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES = 'queue/consumers_wait_for_messages';
+
+ /**
+ * Default value
+ */
+ const DEFAULT_CONSUMERS_WAIT_FOR_MESSAGES = 1;
+
+ /**
+ * The available configuration values
+ *
+ * @var array
+ */
+ private $selectOptions = [0, 1];
+
+ /**
+ * @inheritdoc
+ */
+ public function getOptions()
+ {
+ return [
+ new SelectConfigOption(
+ self::INPUT_KEY_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES,
+ SelectConfigOption::FRONTEND_WIZARD_SELECT,
+ $this->selectOptions,
+ self::CONFIG_PATH_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES,
+ 'Should consumers wait for a message from the queue? 1 - Yes, 0 - No',
+ self::DEFAULT_CONSUMERS_WAIT_FOR_MESSAGES
+ ),
+ ];
+ }
+
+ /**
+ * @inheritdoc
+ * @SuppressWarnings(PHPMD.UnusedFormalParameter)
+ */
+ public function createConfig(array $data, DeploymentConfig $deploymentConfig)
+ {
+ $configData = new ConfigData(ConfigFilePool::APP_ENV);
+
+ if (!$this->isDataEmpty($data, self::INPUT_KEY_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES)) {
+ $configData->set(
+ self::CONFIG_PATH_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES,
+ (int)$data[self::INPUT_KEY_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES]
+ );
+ }
+
+ return [$configData];
+ }
+
+ /**
+ * @inheritdoc
+ */
+ public function validate(array $options, DeploymentConfig $deploymentConfig)
+ {
+ $errors = [];
+
+ if (!$this->isDataEmpty($options, self::INPUT_KEY_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES)
+ && !in_array($options[self::INPUT_KEY_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES], $this->selectOptions)) {
+ $errors[] = 'You can use only 1 or 0 for ' . self::INPUT_KEY_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES . ' option';
+ }
+
+ return $errors;
+ }
+
+ /**
+ * Check if data ($data) with key ($key) is empty
+ *
+ * @param array $data
+ * @param string $key
+ * @return bool
+ */
+ private function isDataEmpty(array $data, $key)
+ {
+ if (isset($data[$key]) && $data[$key] !== '') {
+ return false;
+ }
+
+ return true;
+ }
+}
2 changes: 1 addition & 1 deletion src/App/Container.php
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ function () {

$this->container->when(CronKill::class)
->needs(ProcessInterface::class)
->give(DeployProcess\CronProcessKill::class);
->give(DeployProcess\BackgroundProcessKill::class);
$this->container->when(ModuleRefresh::class)
->needs(ProcessInterface::class)
->give(BuildProcess\RefreshModules::class);
Expand Down
Loading

0 comments on commit 68f81e0

Please sign in to comment.