Mastering queue usage in Magento 2: handling application state changes
Queues are a great way of distributing the workload in your application. They allow you to move more resource-intensive processing to a separate process. Common usages are order processing, external service integration, etc. Queue consumer runs as a separate process and will start as soon as there is a new message in a queue to handle.
This consumer is not like the other processes
There is however one caveat in this approach that sometimes may be overlooked. The consumer is not restarted for each new message in the queue. In this manner it is different to the standard approach which we have with web applications, where at the end of each request the entire memory is cleared. That means we not only need to better think of memory usage but also that many Magento classes we use can provide us with incorrect information.
As an example let’s imagine we have an external system that we need to integrate with our Magento store. This system will keep information about orders and we want to send every new order from the store to that system. The code will not be too complicated. We will create a listener for order submit, generate a message to the queue with the order ID and then the consumer will receive this message, load the order and send data to a 3rd party service.
Here’s the catch
At first glance there is nothing here that could cause problems. Orders will be loaded for each request from the database. Generating the message once the order has been saved to the database and the changes have been committed will ensure that we will not end up with an order that is not yet fully saved to the database (although if we work with database replicas we may think about the propagation time).
However, there are several elements which may present us with unforeseen problems. The first one is the configuration. Most likely, for such an integration we will need to prepare a configuration which will get some API keys, maybe domains that need to be used in the request. The problem will occur when we change those settings. In such a case the configuration will be read during consumer initialization and any subsequent messages will get the same values, regardless of the actual state of the configuration. This means that when the administrator changes anything in the settings, the consumer needs to be restarted.
A similar situation arises when we need to get information about attributes. For example, if we want to get the value of the dropdown attribute that is managed from the backend, the first attempt will retrieve the option label and every next order which has a product with the same value will return that initial label, even though it may change in the database. Magento caches the result to speed up the store. This is done because those results will only be used during the same request or during a single cron run. Any changes in the settings will be available for the next request to use.
What to do, what to do?
There are several approaches to this problem. One is to write custom code that will interact directly with the database instead of Magento classes that may not be suitable for such continuous usage. The drawback of this solution is that it makes the code more complex, it makes maintenance more difficult and it may cause issues with plugins and observers that hook into those core Magento elements.
A better solution is to use a poison message or a poison pill. A poison pill is a message sent to the queue that cannot be processed by the consumer. Usually this is considered a bug scenario. The message is fed to the consumer and it fails to be properly processed and is therefore requeued, creating an infinite loop. However we can use this case as an easy way to restart a consumer without the need to manually log into the server and restart the consumer process when it is necessary. Luckily for us, Magento implements poison pill handling for such cases.
Peeking under the hood
Magento offers two ways of working with RabbitMQ. Consumers can be started by the cron process or manually. The former method requires no additional work except for making sure that the main Magento cron is working. The latter requires adding a way to autorestart a consumer if it is not running.
Let’s dig into the code. The main cron that handles starting consumers is this:
Magento_MessageQueue::etc/crontab.xml
The ConsumerRunner::run method loops over consumers that should be managed by this cron and checks if they need to be run. This check has several steps, the first one being checking if there is a lock for the consumer:
Then it checks if the consumer connection (db or amqp) is configured correctly:
The third test depends on the configuration and can be skipped. It checks if there are any messages waiting in the queue. If there are none, the consumer will not start:
Magento\MessageQueue\Model\CheckIsAvailableMessagesInQueue
If all these tests are passed, we can run the consumer. What Magento does is actually spawn a new process that executes the CLI command queue:consumers:start. The start command retrieves the consumer name from parameters, creates a lock for it and connects it to the queue so the new messages can be processed. If a maximum number of messages to process is provided, the following function will be called to handle the messages:
Magento\Framework\MessageQueue\CallbackInvoker
What we can see here is that this method will retrieve the new message from the queue
$message = $queue->dequeue();
then it will check if the poison pill has been created and only if there is no poison pill message will it proceed with the message to the define consumer
$callback($message);
If we take a look at how this check works, we will see that it provides the value obtained when the function starts in the first line and it compares it with the current version of the value in the database:
If there is a new version in the database during this loop, the code rejects the message without requeuing it and exits the entire process. The next time cron runs, the same consumer will be restarted.
A little helper
How can this help solve our problem? If we have elements that may change while the consumer is working, instead of adding logic to that consumer to be aware of the possible change, and therefore of the underlying implementation of service contracts, we can create a listener or plugin for elements that modify such data and simply create a poison pill. This will ensure the next message after this change will need to be handled.
As an example if we have an attribute we can create a backend model for it:
And in the backend model class:
Of course the logic in this class might be more complex. We can utilize the beforeSave method to check if there are any changes that may affect the consumer and set a flag that is checked in the afterSave method. This way we can prevent unnecessary restarts of the consumer.
To make it fully working we need to define the constructor parameter in the di.xml file:
This part is needed as a default preference because the poison pill interface is a stub implementation in
Conclusion
The issues described above may not be common. It is highly likely that when preparing a queue consumer you will not have to worry about the state of the database. However, if you need to work with configuration, attribute settings or even with the order repository you may encounter this problem. Magento often uses internal cache in classes to speed up the platform so you never know when such an issue may occur.
The proposed solution is not ideal. The poison pill that is created will restart all running consumers and, currently, there is no way to create a poison pill for each consumer. Another limitation is that it will work only with the default way of running a consumer, namely when you provide the number of messages to process. Hopefully this summary will guide you if you ever need to handle a similar situation with a queue consumer running in an infinite loop.
Would you like to innovate your ecommerce project with Hatimeria?
Zephiryn was a musician, / fancied chords and a syrinx, / nonchalant, carefree, and blithe, / or in a word, Zephyr (Konstanty Ildefons Gałczyński - A song of three merry angels; translation AJ)
Read more Artur's articles