Magento Open Source and Queuing Systems: Integration Review
Magento (Adobe Commerce) Open Source has a great framework, which has been available since the release of its 2.1.0 version, called “Message Queue Framework (MQF)”. Since its first release, the framework has been gradually improved and now a lot of modules and integrations are currently using MQF as part of their functionality.
At the current moment, Magento could support only two different queuing systems: own database queue implementation and RabbitMQ. The main question that is appearing more often today is, “Which other queuing system(s) will be supported by Magento and when?”
The goal of this article is to look at existing queuing solutions and attempt to predict which could be easily integrated in Magento and used in near future, and which we do not need to even think about as core implementation.
Message Queue Processing in Magento
The best and easiest explanation for how message processing happens on Magento queue (using RabbitMQ integration) looks like following:
The basis for queue implementations on MQF is: \Magento\Framework\MessageQueue
\QueueInterface. Both default Magento implementations (database and RabbitMQ) are implementing this interface.
Let’s have a deeper look through the following flow:
This flow represents current queue implementation, and, though we see each service utilize its own functionality, both are also using QueueInterface.
QueueInterface
The table below describes all the methods expected to be implemented by the specific Queue provider, and the intention of what functionality is expected from each method.
# | Method | Purpose / Description |
---|---|---|
1 | dequeue() | Get a single message from the queue |
2 | acknowledge() | Acknowledge message delivery |
3 | subscribe() | Wait for messages and dispatch them, this is based on pub/sub mechanism, consumes the messages through callbacks, until connection is closed |
4 | reject() | Reject message, messages gets returned to the queue |
5 | push() | Push message to queue directly without using exchange; it uses publish behind the scenes |
Per the following, each implementation realizes its own logic depending on the queuing system used:
- DB utilizes its own tables in database
- RabbitMQ implements connection to the service and executes required actions
From another side, we have consumers who are picking up the message from the queue and processing it related to the configuration.
Queuing Landscape
Next, let’s take a look at which other queueing technologies exist and determine if we can use them in Magento in the near future. Within the messaging technologies, we can divide them into categories based on different use cases, depicted in the illustration below.
Use-case Type | Objectives |
Portable systems or Standard based protocols | Either these systems follow industry standard protocols, or the projects are portable like Kafka |
High throughput & data streaming | Streaming for near real-time use cases is the key here, or very high throughput |
Serverless & rule-based event streaming / routing | These are modern systems built with intelligent routing in mind, they also support serverless functions in their ecosystem, like Adobe I/O Runtime and Adobe I/O Events |
Interservice communication or Microservices | Third-party connectivity, extensibility and message delivery |
Third-party connectivity, extensibility and message delivery | Suitable for large organizations, usually developed internally for wide-scale use cases; they encapsulate modern extensibility requirements like out-of-process extension functions or in-process extensibility. They may also include message delivery via web-hooks. |
Queuing Technologies – Functional Overview
Platform | Pub/Sub Mechanism | Connectivity | Message Ordering / FIFO | High Throughput Data Streaming | Rule-based Filter / Routing |
---|---|---|---|---|---|
AWS EventBridge | *Available (limited to AWS Targets) | *HTTPS (for publishers only) | Available | ||
AWS MQ | Available | JMS, NMS, AMQP 1.0, STOMP, MQTT, WebSocket | *Available | ||
AWS SQS | HTTPS | *Available | |||
AWS Kinesis | Available | HTTP/2 Persistent and HTTP REST | *Available | Available | |
Apache Kafka | Available | TCP Socket, Kafka Connect, Kafka Streams Apps | *Available | Available | |
Azure Service Bus | Available | AMQP 1.0 and REST | *Available | ||
Adobe I/O | *Available(limited to Adobe Event Providers) | *HTTP Webhooks (for subscribers only, designed for integration with Adobe SaaS solutions) | Available |
*Available with some limitations.
**Other aspects to consider (i.e., consumer groups, batching, multi-tenant segregation, message encryption/security, aggregation, counting, scheduling, dead-letter queue)
Integration Possibilities with MQF
As the basis for all queueing implementation in Magento is MQF and QueueInterface, we will last look at existing technologies that might easily be integrated into the current Magento architecture.
AWS EventBridge
AWS EventBridge is a serverless event bus that facilitates receiving data from your application and third parties to AWS Services. Currently, it seems like the targets are specifically AWS Services, and are set using specialized rules.
More information can be found at docs.aws.amazon.com. And, for more information on available targets, click here.
But, let’s see if we can utilize AWS integration through MQF:
Method | Evaluation | Implementation Readiness |
dequeue() | Not Possible or N/A | Multiple targets can be set to receive the messages when they are available asynchronously. There is no concept of fetching the message from the Event Bus on-demand; it’s more of a serverless architecture. |
acknowledge() | Not Possible or N/A | There is no need to acknowledge the message, AWS internally makes sure that the target receives the message. |
subscribe() | Not Possible or N/A | AWS related targets can be set or subscribed for the Event Bus, based on the rules, but we cannot set PHP callback as functions. |
reject() | Not Possible or N/A | The concept is not available or used. |
push() | Available | PutEvents or PutPartnerEvents functions can be used for this purpose. |
AWS MQ
AWS MQ is a message broker based on popular Apache ActiveMQ. It supports multiple protocols for connectivity, for instance AMQP, JMS, STOMP, NMS, MQTT and WebSocket.
Most of the features are available, since Magento is also using AMQP protocol with RabbitMQ but the protocol version is different. RabbitMQ uses 0.9 and Amazon MQ is using AMQP 1.0, so any migration would require porting of queues from RabbitMQ to AWS MQ. More can be found here.
In this case, everything looks much better. Again, the reason for that is that MQF is already using AMQP.
Method | Evaluation | Implementation Readiness |
dequeue() | Available | receive() |
acknowledge() | Available | accept() / release() |
subscribe() | *Workaround | Long Polling might need to be implemented, unless we find a good library that supports AMQP 1.0 in PHP; Java has full support though. |
reject() | Available | reject() |
push() | Available | sendMessage(Message, Destination) |
AWS SQS
AWS SQS is a distributed and fault tolerant queuing technology that provides point-to-point connectivity. It can be used with SNS to add a publish/subscribe mechanism as well — a single message gets replicated across different SQS servers.
SQS, as we can see, can also be the next queuing system in Magento, although it will require some effort to be implemented into MQF.
Method | Evaluation | Implementation Readiness |
dequeue() | Available | ReceiveMessage() - possiblity with many available options for instance long & short polling. |
acknowledge() | Possiblity | DeleteMessage() for positive acknowledge, by default message locked for Visibility Timeout period for other consumers. |
subscribe() | Workaround | ReceiveMessage() - Using polling based mechanism, it can be implemented; but not exactly as true callback mechanism. |
reject() | Possiblity | The messages are auto visible again for consumption, if explicit DeleteMessage() is not called before timeout, as explained above. |
push() | Available | SendMessage() |
AWS Kinesis
AWS Kinesis is a streaming-based distributed messaging technology. It uses a publish/subscribe mechanism for loose coupling between senders and receivers. It is designed for extremely high throughput for real-time applications.
Streaming and the concept of stream itself is the central idea behind Kinesis. It is pretty similar to Apache Kafka with some differences. It is also suitable for implementing event sourcing and CQRS pattern, which are commonly used in microservices architecture because of the out-of-the-box support for high throughput messaging and publish/subscribe mechanisms.
Kinesis also looks quite optimistic:
Method | Evaluation | Implementation Readiness |
dequeue() | Possiblity | getRecords() - ShardIterator needs to be managed behind the scenes. Stream can be Queue name. It can use getShardIterator(), before the call. |
acknowledge() | Possiblity | Need to maintain the ShardIterator & SequenceNumber associated with it, using getShardIterator(). We can save NextShardIterator from getRecords() to acknowledge the message. |
subscribe() | Workaround | Workaround - batch reads with long polling can be implemented, example Long Polling Subscribe Mechanism in AWS Kinesis |
reject() | Possiblity | If we don't move the ShardIterator to NextShardIterator, we are pretty much staying on the same message. |
push() | Possiblity | Since you will have to provide stream, data & partition; we need to have some strategy to for partition selection; and need to maintain these values for Consumers. |
Apache Kafka
Apache Kafka is a popular, open-source stream-processing / messaging platform. It is, by design, distributed, replicated and resilient (or fault-tolerent), which means it can achieve a very high throughput.
Topic and publish/subscribe mechanism is at the core of Kafka. Kafka is effective for implementing event sourcing and CQRS pattern, which is commonly used in microservices architecture. It is also used for a variety of streaming use cases, which require near real-time processing of records.
Kafka looks like the most optimistic and valuable solution at the moment.
Method | Evaluation | Implementation Readiness |
dequeue() | Possiblity | Initiate poll() or consume(). If there are records available, the call will immediately return, otherwise it will wait for a specified timeout that can be passed as parameter. |
acknowledge() | Possiblity | There are several ways to commit the offset, which indicates that a particular consumer has consumed those messages. The way you call commit API controls the delivery semantics. |
subscribe() | Possiblity | There are multiple ways in which the subscription mechanism can be implemented, the default Kafka subscription is telling Kafka which topics a consumer is interested in. But, we can also subscribe a callback function and we can use Kafka Stream API to receive messages in near real time. |
reject() | Possiblity | If we don't auto-commit or manually commit the offset, then we are not moving the needle. |
push() | Possiblity | Since you will have to provide topic, data and partition; we need to have some strategy for partition selection and need to maintain these values for consumers. |
Microsoft Azure Service Bus
Microsoft Azure Service Bus is a fully managed, enterprise integration message broker. It supports familiar concepts like queues, topics, rules/filters and much more.
Azure Service Bus supports AMQP 1.0, and couple of languages, PHP support is again limited for the protocol.
As Azure also uses AMQP protocol, it also looks quite optimistic. And, in fact, that Magento Cloud will also use Microsoft Azure, it makes use of Azure Service Bus even more probable.
Method | Evaluation | Implementation Readiness |
dequeue() | Available | receive() |
acknowledge() | Available | accept() / release() |
subscribe() | *Workaround | Long Polling might need to be implemented, unless we find a good library that supports AMQP 1.0 for PHP; Java has full support for required features. |
reject() | Available | reject(errorCondition, errorDescription) |
push() | Available | sendMessage(message, destination) |
Adobe I/O
Adobe I/O is a serverless, event-driven platform that allows you to quickly deploy custom functions/code in the cloud without any server setup. These functions execute via HTTP requests or Adobe I/O events. Events can be orchestrated with sequences and compositions, and it is built on top of the Apache OpenWhisk framework.
Events are triggered by event providers within Adobe Services, for instance Creative Cloud assets, Adobe Experience Manager and Adobe Analytics. To start listening to events for your application, you need to register a webhook (URL endpoint), specifying which event types from which event providers it wants to receive. Adobe then pushes events to your webhook via HTTP POST messages.
In the current Magento implementation, using Adobe I/O is not possible. So, fully new services have to be developed for support.
Method | Evaluation | Implementation Readiness |
dequeue() | Not Possible or N/A | There is not a concept of explicit fetching of event, rather you define a trigger/event and the actions associated with it. |
acknowledge() | Not Possible or N/A | This concept is not used, the architecture is fundamentally different |
subscribe() | Not Possible or N/A | A PHP callback function is not possible, although a custom webhook (http endpoint) can be configured to be triggered for a particular Event. |
reject() | Not Possible or N/A | This concept is not used, the architecture is fundamentally different |
push() | Not Possible or N/A | Events are triggered by Adobe SaaS Services in the Adobe Cloud, as discussed above. |
Evaluation and Conclusion
If we compare all possible solutions, we see the following picture:
Method | AWS EventBridge | AWS MQ | AWS SQS | AWS Kinesis | Apache Kafka | ||
dequeue() | Not Possible or N/A | Available | Available | Possibility | possibility | Available | Not Possible or N/A |
acknowledge() | Not Possible or N/A | Available | possibility | possibility | possibility | Available | Not Possible or N/A |
subscribe() | Not Possible or N/A | *Workaround | *Workaround | Workaround | Possibility | *Workaround | Not Possible or N/A |
reject() | Not Possible or N/A | Available | Possibility | Possibility | Possibility | Available | Not Possible or N/A |
push() | Available | Available | Available | Possibility | possibility | Available | Not Possible or N/A |
*Workaround – This feature may be available, but full support for PHP (library) is not available.
As you can see, Apache Kafka currently is the winner based on this research. Kafka is a system that is quite popular, performant and covers all requirements we have in growing Magento complexity. Also, in the last few months, Kafka has become one of the most discussed technologies as a “next service” that Magento may use for asynchronous communications.
But, integrators and developers definitely should consider looking in the direction of the AWS MQ, AWS SQS, AWS Kinesis and Azure Service Bus integrations, because integrating those services do not require too much effort.
With those systems, one of the main problems is that AMQP 1.0 is not fully supported by PHP but there are PHP Enqueue and Symfony libraries available, which provide an abstraction layer over multiple brokers.
Supported Brokers / Protocols | Notes | |
Enqueue | Apcahe Kafka, AWS SQS/SNS, AMQP 0.9, Database, MongoDB, Redis etc. | Enqueue tries to follow JMS specification as close as possible, although this library does not have very good documentation. Theoretically, this library can enable multiple brokers for Magento with only a few deviations in terms of configurations. |
Symfony | AMQP 0.9, Doctorine, Redis, In Memory, Serializing Messages | This is a more mature framework with active community that also has better documentation. |
Symfony via Enqueue Transport | Add support for Enqueue Brokers with Symfony |
In any case, because the Magento Service Isolation approach insists on modules independency and extensibility, we always have to be careful in future direction of Magento asynchronous communications development. Cause interfaces and architecture that we build or extend now must be easy to reuse and extend in the future.