Skip to main content

Do You Know Everything About Queue Topics in Magento?

I have a strong feeling that many developers and integrators still do not have a deep understanding of how the Magento queue framework works and its benefits.

In software, job queues facilitate asynchronous (batch) processing of data and can be used to optimize server resources by establishing a “slow lane” for operations that do not require immediate processing. Queue software architecture is made of a few components and, in this article, we’ll look at one of those components: queue topic exchanges — what they are, and how they are implemented. Understanding the architecture of topics is a first step in understanding of whole Magento Message Queue Framework.

 

Job Queue Basics

In simple terms, scripts needing to perform asynchronous operations in Magento produce a message and publish this message to a queue, from which the message is executed. Messages are data that contain enough information to perform subsequent operations on the data, including transformation and storage. The queue itself is a data structure combined with a job scheduler.

If we look a little bit deeper at message publishing, we see that a message is not going directly to the queue. The message first must be routed, and this work is executed by exchanges. An exchange is an agent that decides to which queue each message will go. There are four exchange types (i.e. four different ways by which messages may be routed):

  • Direct Exchange
  • Default Exchange
  • Topic Exchange
  • Fanout Exchange

A thorough explanation of each exchange type can be found here. Topic exchanges are important in the case of Magento, which implements RabbitMQ as its job queue software.

 

Topics

Topic exchanges route messages based on the topic name. The topic name is a list of words, delimited by a dot.

An example of a topic name is: async.magento.cataloginventory.api.stockregistryinterface.

Such patterns may also use “*” or “#” where “*” represents any word and “#” represents one or more words.

Examples:

·         async.*.cataloginventory.*.stockregistryinterface - will match all routes that have async on a first place, cataloginventory on a third and a stockregistryinterface on the fifth.

·         async.magento.cataloginventory.# - will match all routes that start from async.magento.cataloginventory

 

Topics in Magento

Magento currently has two types of topic names: defined in communication.xml and auto-generated.

communication.xml

The file communication.xml contains topics that are predefined by developers. Example:

In this example, we see that the topic has a name, inventory.mass.update. Then, in file queue_topology.xml, we will define into which queue this message will be landed.

Auto-Generated Topic Names

Not everybody knows this, but Magento also has a set of auto-generated topic names. Those topics are used by asynchronous and bulk APIs for processing requests.

Each generated topic represents an API request defined in webapi.xml.

Let’s have a look at an example:

Based on this route, the generator of communication.xml files for asynchronous API takes the class and the method and creates topic names with the following pattern:

async.lowercase(SERVICE.CONTRACT.NAME.SERVICE.CONTRACT.METHOD.METHOD)

So, for <route url="/V1/products" method=POST">, our topic name will look like following:

async.magento.catalog.api.productrepositoryinterface.save.post

This makes each topic name unique for each webapi request. This also gives us an opportunity to see which topics were executed by which request by calling operation status API endpoint or by looking into “magento_operation” table.

 

Topic Name Usage

Why else may we need topics in our daily developer’s routine?

Publish Messages

Topics are used for publishing an operation to both Magento core and third-party modules.

The following code publishes a message to the queue based on the defined topic name. The method is defined in PublisherInterface:

Reuse of Asynchronous API Topics

Based on the previous example, we can also reuse asynchronous API topics to publish messages on behalf of asynchronous API. This can be used if you plan to call a Magento service contract within your own custom module and want to do this asynchronously.

So, you do not need to create your own implementation; you just have to find the required service contract and respective API and then publish the message under this topic name.

Asynchronous API Message Forwarding

You can also forward different messages of asynchronous/bulk API to different queues and different consumers for optimizing your system flow. Let’s go into a bit more in detail about this topic.

 

Split Asynchronous/Bulk API Messages

If we have a look at the file queue_topology.xml in the WebApiAsync module, we will see the following: 

We can see that all messages that have a routing key started from “async.” will go to the same queue, named async.operations.all.

If you are using Magento async and bulk API for your integrations, it may lead to use cases where your messages are processed very slowly. This happens because all messages are going to the same queue, and this queue becomes overloaded with messages.

The most obvious and fast decision is to run more consumers at the same time. With this approach, your messages will be processed faster because multiple consumers will work at the same time. However, this may lead to another problem: waitlocks and deadlocks, which are caused by the same processes trying to access same database tables.

There is also another option: split asynchronous API to different queues. Here is where our knowledge about topics come into play.

First, we need to create a queue_topology.xml file and disable default binding by adding disabled=true

This is required for block messages to land in the general queue.

Next, we create new bindings for splitting messages. For example, we want all messages related to products to go to another queue, then categories. For this purpose, we have to define stricter topic names, such as:

async.magento.catalog.api.productrepositoryinterface.#

async.magento.catalog.api.categoryrepositoryinterface.#

... and send them to their own queues:

async.operations.products

async.operations.categories

So, our final file will look like this:

From now on, your messages to asynchronous/bulk API will be forwarded to different queues.

And, for final preparation, you also have to create consumers to be able to manage different queues.

In your queue_consumer.xml, you are creating two new entries:

After execution, run setup:upgrade and you will see new consumers in queue:consumers:list and new queues in RabbitMQ.

 

Topics Management in env.php

Topics and all other related settings of Magento message queues are mostly always defined in *.xml configuration files (except cases of async/bulk API).

Magento has two queue system implemented by default: DB and RabbitMQ. MMQF allows you to create different publishers for the messages. So, you can define by which queue system your request will be processed in queue_publisher.xml file. But Magento always requires you to have only one active connection.

If you want to manipulate your topics settings without managing XML configuration, you can do it within an env.php file.

For example:

‘queue’ => [

    ‘topics’ => [

        ‘customer.created’ => [

            ‘schema’ => [

                ‘schema_type’ => ‘object’,

                ‘schema_value’ => ‘string’

            ],

            ‘response_schema’ => [

                ‘schema_type’ => ‘object’,

                ‘schema_value’ => ‘string’

            ],

            ‘publisher’ = ‘default-rabitmq’

        ]

            ]

]