Building a SOLID, Strongly Typed, and Generic Messages Processor

Building a SOLID, Strongly Typed, and Generic Messages Processor
💡
This article belongs to the Walletera series.

In this article, we will see how to build a Messages Processor for the DinoPay Gateway. Along the way we will see how to work in an iterative and incremental way. We will also learn about the SOLID principles and how to apply them. Finally we will explore the usage of generics to make our solution more strongly typed.

Introduction

Recalling from the previous article, where we described the architecture of our digital wallet, the DinoPay Gateway is the service that handles the communication between the internal Payments service and the external DinoPay payments service provider. Let’s see the design diagram for the DinoPay Gateway to refresh our memories.

From the diagram above, we see that the gateway will be processing messages from three different sources, the Payments Service, the DinoPay Payments Provider, and the Internal Messages Repository. For each of these sources, we will have an instance of a Message Processor that will process the incoming messages. The responsibilities of this component are.

  • Consumption of messages from external sources. Examples of external sources may be message queues, databases, or HTTP requests.

  • Deserialization of the messages payloads (the events) into the corresponding internal representation.

  • Execution of specific event handlers.

Besides these basic tasks, there are other tasks that are required in order to build a production-grade message processor.

  • Acknowledgment of the messages based on the response from the event handlers.

  • Supporting a graceful shutdown. The message processor will be a stateful long-running process. This means that during shutdown there are a couple of tasks that need to be done. These tasks include:

    • Closing any underlying connection that may be held (connection to the message queue or the database).

    • Waiting for any inflight message to be processed.

  • Collect and send metrics like the number of messages successfully processed, the number of processing errors, and the time that it takes to process each message.

In this article, we will cover how to implement the basic tasks in a SOLID (we will see later what this acronym means), strongly typed, and generic way. The other three requirements will be addressed in subsequent articles.

The approach that we will follow for building this message processor will be iterative and incremental. In the first iteration, we will focus on the basic setup needed to consume messages from a particular message queue, RabbitMQ. In the second iteration, we will add the logic needed to process Payments events. Then we will identify the weak points of our implementation and what options we have to improve it. In the subsequent iteration, we will refactor the code by applying the SOLID principles. Finally, in the last iteration, we will see how we can improve our solution even more by using generics and the visitor design pattern to make the code strongly typed.

For the message queue, we will use RabbitMQ. We picked this message queue for no reason other than being the most popular. But we could have used any other solution, like Nats, Amazon SQS/SNS, or whatever. The implementation of the message processor will be agnostic of the message queue.

Iteration 1: The basic stuff

💡
Find the complete code for this section here

Let's start with the basic stuff needed to consume messages from Rabbitmq. The first component that we are going to write is a RabbitMQMessageConsumer. This component will implement the logic needed to connect and consume messages from an amqp server (amqp is the core messaging protocol used by RabbitMQ). Let’s see what this component looks like.

type RabbitMQMessageConsumer struct {
    conn        *amqp.Connection
    connChannel *amqp.Channel
}

func NewRabbitMQMessageConsumer() *RabbitMQMessageConsumer {
    return &RabbitMQMessageConsumer{}
}

func (r *RabbitMQMessageConsumer) Consume() (<-chan Message, error) {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        return nil, fmt.Errorf("failed to connect to RabbitMQ: %w", err)
    }

    r.conn = conn

    ch, err := conn.Channel()
    if err != nil {
        return nil, fmt.Errorf("failed to open a channel: %w", err)
    }

    r.connChannel = ch

    q, err := ch.QueueDeclare(
        "message-consumer-queue", // name
        false,                    // durable
        false,                    // delete when unused
        false,                    // exclusive
        false,                    // no-wait
        nil,                      // arguments
    )
    if err != nil {
        return nil, fmt.Errorf("failed to declare a queue: %w", err)
    }

    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        true,   // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    if err != nil {
        return nil, fmt.Errorf("failed to register a consumer: %w", err)
    }

    messagesCh := make(chan Message)
    go func() {
        defer close(messagesCh)
        for msg := range msgs {
            messagesCh <- Message{
                Payload: msg.Body,
            }
        }
    }()

    return messagesCh, nil
}

func (r *RabbitMQMessageConsumer) Close() error {
    err := r.connChannel.Close()
    if err != nil {
        return fmt.Errorf("failed to close rabbitmq connection channel: %w", err)
    }
    err = r.conn.Close()
    if err != nil {
        return fmt.Errorf("failed to close rabbitmq connection: %w", err)
    }
    return nil
}

We are not going to analyze this implementation in detail. I copied most of the code from the rabbitmq go tutorial so you can read the details there if you are interested. The important part for us is the interface. The interface of this component is compounded by two methods. The first method is Consume(). It receives no parameter and returns two values. The first value’s type is <-chan Message and the type of the second value is error. The Consume method initiates the connection with the RabbitMQ server and then forwards every incoming message to the messages channel returned as the first return value. If any error occurs while trying to connect to RabbitMQ the first return value will be nil and the second return value will contain the underlying error wrapped into a higher-level error. The other method that compounds the interface is the Close() method. This method closes the underlying connection to the RabbitMQ server. If the attempt to close the connection fails the method will return an error.
The Message type will be a struct with a field of type []byte called Payload that will contain the raw event, i.e. the event in the transmission format which in our case will be JSON.

type Message struct {
   Payload []byte
}

The Message Processor skeleton

Now that we have a way of consuming messages from Rabbimq let’s write the first version of the MessageProcessor.

type MessageProcessor struct {
   messageConsumer *RabbitMQMessageConsumer
}

func NewMessageProcessor() *MessageProcessor {
   return &MessageProcessor{
       messageConsumer: NewRabbitMQMessageConsumer(),
   }
}

func (p *MessageProcessor) Start() error {
   msgCh, err := p.messageConsumer.Consume()
   if err != nil {
       return fmt.Errorf("failed consuming from message consumer: %w", err)
   }
   go p.processMessages(msgCh)
   return nil
}

func (p *MessageProcessor) processMessages(ch <-chan Message) {
   for msg := range ch {
       go p.processMessage(msg)
   }
}

func (p *MessageProcessor) processMessage(msg Message) {
   log.Printf("processing message with payload %s", msg.Payload)
}

Our MessageProcessor will be constructed using the NewMessageProcessor() method. This method creates a pointer to a new instance of the MessageProcessor struct. The MessageProcessor struct has a field called messageConsumer which is a pointer to a RabbitMQMessageConsumner. This field is initialized using the NewRabbitMQConsumer() method that we define at the beginning of this iteration.

The MessageProcessor is a long-running process. It is initiated by the Start() method. In the Start() method, we first call the Consume() method from the messageConsumer to get the messages channel, if no error is returned we then call the processMessages method passing the messages channel as a parameter. Note that we are calling this method with the go operator in front. This will execute the method in a new goroutine which will allow the Start() method to return immediately simplifying the implementation of the caller component.

The processMessages(ch <-chan Message) method uses a for loop to consume the messages from the ch channel. Using a for loop to do this task is a good practice because we don’t need to check explicitly if the channel is still open before reading a value, as soon as the channel is closed the for loop will exit. For each incoming message, we will call the processMessage method passing the message as a parameter. To increase the throughput of our application we will process each message concurrently. To achieve this we use the go operator again to execute the processMessage method in a new goroutine.

Finally, the processMessage method implements the processing of a particular message. For now, the processing will be printing the message. Along the article, we will see different implementations of this method.

The main function

Let's write a very simple main function to run our MessageProcessor. In this implementation, we create an instance of the MessageProcessor using the constructor method NewMessageProcessor() and then we start it using the Start() method. At the bottom of the function, we just blocked the main goroutine to keep our application alive.

func main() {

   processor := NewMessageProcessor()

   err := processor.Start()
   if err != nil {
       log.Fatalf("failed to start message processor: %s", err.Error())
   }

   blockForeverCh := make(chan any)
   <-blockForeverCh
}

Exercising our application

Let’s see if what we did so far works. To do this we will start a RabbitMQ server and run our application. The application has hardcoded parameters (you can see those parameters in the RabbitMQMessageConsumer component) that will allow it to connect to a local RabbitMQ instance run with the default configuration. Then we will send some messages to RabbitMQ. Finally, we will inspect the output printed by the application to see if the messages were handled as expected.

Let’s start by initiating the RabbitMQ instance. The easiest way to do this is using Docker. This way we don’t need to install RabbitMQ on our service. We just write a very simple docker-compose.yml file and run it. Docker will download the RabbitMQ image and will run it. Let’s see how we can do that.

First, we write the docker-compose.yml file.

version: "3"

services:
 rabbitmq:
   image: rabbitmq:3.8.0-management
   container_name: rabbitmq
   ports:
     - "5672:5672"

With this simple YAML file, we are telling Docker what RabbitMQ image we want to use and also that we want to expose the port 5672 from the server running inside the container to the host. This way our app can connect to the RabbitMQ using the localhost IP 127.0.0.1.
Now we can start our RabbitMQ instance by executing (on the same directory where we placed the docker-compose.yml) the following command.

docker-compose up -d

The next step will be compiling and running our application.

go build -o message_processor
./message_processor &> message_processor_output.txt &
MESSAGE_PROCESSOR_PID=$!

Execute this on the same directory where you placed the go code. The first line compiles the application into a binary named message_processor. The second line executes the binary in the background and redirects its output to the file message_processor_output.txt. The third line saves the process id of our app in the MESSAGE_PROCESOR_PID environment variable. This will allow killing the app when we finish with the test. Executing the app this way will make it easier to put all the commands together in a bash script later.

Now will send some messages to RabbitMQ.

for i in {1..10} ; do
   message="Message #$i"
   echo "Publishing message $message"
   docker-compose exec rabbitmq rabbitmqadmin publish routing_key="message-consumer-queue" payload="$message"
done

Here we are using docker-compose exec to execute the rabbitmqadmin command inside the container. This command allows publishing messages from the command line. We will execute this command 10 times to send 10 messages.
Let’s see what was printed to the message_processor_output.txt file executing this command.

2023/03/08 18:37:21 processing message with payload Message #1
2023/03/08 18:37:22 processing message with payload Message #2
2023/03/08 18:37:23 processing message with payload Message #3
2023/03/08 18:37:24 processing message with payload Message #4
2023/03/08 18:37:24 processing message with payload Message #5
2023/03/08 18:37:25 processing message with payload Message #6
2023/03/08 18:37:26 processing message with payload Message #7
2023/03/08 18:37:27 processing message with payload Message #8
2023/03/08 18:37:27 processing message with payload Message #9
2023/03/08 18:37:28 processing message with payload Message #10

The final step is killing our application and shutting down the RabbitMQ.

kill -n 9 $MESSAGE_PROCESSOR_PID
docker-compose down

Summary

In this first iteration, we wrote the skeleton of a MessageProcessor which uses a RabbitMQMessageConsumer instance to connect and consume messages from the message queue. We also exercised the application with some messages and we saw that it was able to handle it as expected.

In the next iteration, we will modify our MessageProcessor to handle one of the events emitted by the Payments service, the WithdrawalCreated event.

Iteration 2: Handling Payments events

💡
Find the complete code for this section here

In this iteration, we will modify our MessageProcessor to handle Payments events. We will write a component that will be in charge of the deserialization process. We will also write a handler that will handle the deserialized events.

For the moment we will only handle WithdrawalCreated events. Let’s look at an example of this event.

{
   "type": "WithdrawalCreated",
   "data": {
      "withdrawal_id": "e728f3a7-b92f-46fe-b080-524442065cb3",
      "amount": 100,
      "source_account": "source account details",
      "destination_account": "destination account details"
   }
}

Deserialization of events

The first component that we need to implement is a deserializer, i.e. the component that will take raw strings containing the events payload in JSON format and will output a go struct representing that event. Let’s start by writing the structs that represent a WithdrawalCreated event.

type PaymentsEventsDeserializer struct {
}

func NewPaymentsEventsDeserializer() *PaymentsEventsDeserializer {
   return &PaymentsEventsDeserializer{}
}

func (d *PaymentsEventsDeserializer) Deserialize(message Message) (any, error) {
   var event EventEnvelope
   err := json.Unmarshal(message.Payload, &event)
   if err != nil {
       return nil, fmt.Errorf("error deserializing message with payload %s: %w", message.Payload, err)
   }
   switch event.Type {
   case "WithdrawalCreated":
       var withdrawalCreated WithdrawalCreated
       err := json.Unmarshal(event.Data, &withdrawalCreated)
       if err != nil {
           log.Printf("error deserializing WithdrawalCreated event data %s: %s", event.Data, err.Error())
       }
       return withdrawalCreated, nil
   default:
       log.Printf("unexpected event type: %s", event.Type)
       return nil, nil
   }
}

The deserialization logic is implemented in the Deserialize(message Message) method. It receives a message as a parameter and returns two values, the first value will be the deserialized event and the second value will contain any error that may occur during the deserialization process. The Deserialize method first unmarshals the message payload into an instance of an EventEnvelope. Note that the Data field of the EventEnvelope struct is of type json.RawMessage. Using this type tells the json.Unmarshal function that we don’t want to deserialize that field (yet) so it will leave it as a slice of bytes (json.RawMessage is an alias of []byte).

Once we have the event envelope deserialized we use the Type field to decide how are we going to deserialize the data object. For now, we are only processing events of type WithdrawalCreated so our switch only contains one case for this type of event.

Note that the type of the first return value is any. This is an alias for the type interface{} that appeared in version 1.18 of Golang. Similarly to the Object type in Java or the void pointer in C, it is used to denote the fact that the Deserialize method will return values of different types. We need to return this value because each payment event will have a different set of attributes so there is no common interface (type) between them.

Handling the events

Once we have the event represented in a go struct we can process it. This processing will occur in specific handlers. We need a way to call the correct handler for each event. This “routing” logic will be implemented in a component that we are going to call PaymentsEventsHandler.

type PaymentsEventsHandler struct {
}

func NewPaymentsEventsHandler() *PaymentsEventsHandler {
   return &PaymentsEventsHandler{}
}

func (h *PaymentsEventsHandler) HandleEvent(event any) error {
   switch event.(type) {
   case WithdrawalCreated:
       log.Printf("handling WithdrawalCreated event: %+v", event)
   default:
       return fmt.Errorf("unexpected event type %t", event)
   }
   return nil
}

To route the event to the specific handler we are using a type switch, this is a construct that permits several type assertions in series. For simplicity, we are not calling any real handler but we are just printing a message containing the event struct.

The new implementation of the Message Processor

Let’s see what the new implementation of the MessageProcessor looks like.

type MessageProcessor struct {
   messageConsumer            *RabbitMQMessageConsumer
   paymentsEventsDeserializer *PaymentsEventsDeserializer
   paymentsEventsHandler      *PaymentsEventsHandler
}

func NewMessageProcessor() *MessageProcessor {
   return &MessageProcessor{
       messageConsumer:            NewRabbitMQMessageConsumer(),
       paymentsEventsDeserializer: NewPaymentsEventsDeserializer(),
       paymentsEventsHandler:      NewPaymentsEventsHandler(),
   }
}

// ...

func (p *MessageProcessor) processMessage(message Message) {
   event, err := p.paymentsEventsDeserializer.Deserialize(message)
   if err != nil {
       p.pringErrorLog(message, err)
       return
   }
   err = p.paymentsEventsHandler.HandleEvent(event)
   if err != nil {
       p.pringErrorLog(message, err)
   }
}

The MessageProcessor struct now contains two additional fields. A paymentsEventsDeserializer of type PaymentsEventsDeserializer and a paymentsEventsHandler of type PaymentsEventsHandler. These fields are initialized in the constructor method the same way we initialized the messageConsumer field.

The processMessage method first uses the instance of the PaymentsEventsDeserializer to deserialize the message into an event. Then it passes the event to the PaymentsEventsHandler instance. This simple implementation will allow us to process events coming from the Payments service.

Exercising the application with the new changes

Before exercising our app with our bash script we need to change the payload we are sending to RabbitMQ. The code that sends the messages to rabbitmq now looks like this.

print_message "Sending messages to rabbitmq"
for i in {1..10} ; do
   echo "Publishing WithdrawalCreated event with amount $i"
   docker-compose exec rabbitmq rabbitmqadmin publish routing_key="message-consumer-queue" payload="
     {
     \"type\": \"WithdrawalCreated\",
     \"data\": {
      \"withdrawal_id\": \"e728f3a7-b92f-46fe-b080-524442065cb3\",
      \"amount\": $i,
      \"source_account\": \"source account details\",
      \"destination_account\": \"destination account details\"
     }
    }
   "
done

This is the output that you should see if you run the bash script.

2023/03/31 17:00:58 handling WithdrawalCreated event: {WithdrawalId:e728f3a7-b92f-46fe-b080-524442065cb3 Amount:1 SourceAccount:source account details DestinationAccount:destination account details}
2023/03/31 17:00:59 handling WithdrawalCreated event: {WithdrawalId:e728f3a7-b92f-46fe-b080-524442065cb3 Amount:2 SourceAccount:source account details DestinationAccount:destination account details}
2023/03/31 17:01:00 handling WithdrawalCreated event: {WithdrawalId:e728f3a7-b92f-46fe-b080-524442065cb3 Amount:3 SourceAccount:source account details DestinationAccount:destination account details}
2023/03/31 17:01:00 handling WithdrawalCreated event: {WithdrawalId:e728f3a7-b92f-46fe-b080-524442065cb3 Amount:4 SourceAccount:source account details DestinationAccount:destination account details}
2023/03/31 17:01:01 handling WithdrawalCreated event: {WithdrawalId:e728f3a7-b92f-46fe-b080-524442065cb3 Amount:5 SourceAccount:source account details DestinationAccount:destination account details}
2023/03/31 17:01:02 handling WithdrawalCreated event: {WithdrawalId:e728f3a7-b92f-46fe-b080-524442065cb3 Amount:6 SourceAccount:source account details DestinationAccount:destination account details}
2023/03/31 17:01:02 handling WithdrawalCreated event: {WithdrawalId:e728f3a7-b92f-46fe-b080-524442065cb3 Amount:7 SourceAccount:source account details DestinationAccount:destination account details}
2023/03/31 17:01:03 handling WithdrawalCreated event: {WithdrawalId:e728f3a7-b92f-46fe-b080-524442065cb3 Amount:8 SourceAccount:source account details DestinationAccount:destination account details}
2023/03/31 17:01:04 handling WithdrawalCreated event: {WithdrawalId:e728f3a7-b92f-46fe-b080-524442065cb3 Amount:9 SourceAccount:source account details DestinationAccount:destination account details}
2023/03/31 17:01:04 handling WithdrawalCreated event: {WithdrawalId:e728f3a7-b92f-46fe-b080-524442065cb3 Amount:10 SourceAccount:source account details DestinationAccount:destination account details}

Summary

Now our MessageProcessor can process Payments events of type WithdrawalCreated. We added to the processMessage method the ability to deserialize an event using the PaymentsEventsDeserializer and process it using the PaymentsEventsHandler. While this implementation works it has some issues that will make it hard to maintain. In the next section, we will discuss what are those issues, why they may present a problem with the maintainability of our application, and what design principles we can follow to improve our implementation.

Problems with our current implementation

Let's put the break on the implementation and think a bit about whether we are following the good practices for software design or not. The topic of software design includes several concepts like object-oriented design, design patterns, domain-driven design, test-driven design, etc. But in this case, we are going to follow a set of principles known by the acronym SOLID. These principles will help us identify the weaknesses of our current implementation and will give us some ideas on how we can improve it.

SOLID design

From Wikipedia:

Insoftware engineering, SOLID is amnemonicacronymfor five design principles intended to makeobject-orienteddesigns more understandable, flexible, andmaintainable. The principles are a subset of many principles promoted by American software engineer and instructorRobert C. Martinfirst introduced in his 2000 paper Design Principles and Design Patterns discussingsoftware rot.

The SOLID ideas are

The identified issues

The MessageProcessor depends on concrete implementations. These are the RabbitMQMessageConsumer, the PaymentsEventsDeserializer, and the PaymentsEventsHandler. This violates the fifth SOLID principle, the Dependency Inversion Principle, which states that

  • High-level modules should not import anything from low-level modules. Both should depend on abstractions (e.g., interfaces).

  • Abstractions should not depend on details. Details (concrete implementations) should depend on abstractions.

Violating this software design principle has some consequences. We will end we code that is hard to maintain, hard to test, and will lead to code duplication. Let’s see why.

Maintainability issues

Changes in the low-level modules will impact the MessageProcessor. For example, If we decide to use a different message queue (Kafka instead of RabbitMQ for example) to communicate Payments events we will have to change the MessageProcessor implementation to use a different message consumer. The same goes for the serialization format, if we decide to use another format than JSON, like MessagePack or Internet Object, we will also have to modify the MessageProcessor. This means that we will find ourselves changing the MessageProcessor not because the processing logic has changed but because there was a change in some of the low-level modules on which it depends.

Testability issues

There are a set of principles, named with the acronym FIRST (yet another acronym) that defines what is considered an effective unit test. According to these principles, a unit test should be Fast, Isolated, Repeatable, Self-Verifying, and Timely. Let's focus on the first two principles (for a detailed explanation of these principles you can read this article). Fast means that our tests should be as fast as possible. The faster your tests run, the more often you’ll run them, and we want to be able to run our tests all the time, ideally after each modification done to our MessageProcessor. Remember that the earlier we found bugs the cheaper is to fix them. Isolated means that each unit test should have a single reason to fail.

Because the MessageProcessor depends on a concrete implementation of a message consumer, the RabbitMQMessageConsumer, we will only be able to test it if we have a running instance of RabbitMQ. The same will occur with the concrete implementation of the handlers. If the handlers interact with an external service via HTTP for example (the handler of the WithdrawalCreated event, at some point, will need to interact with the DinoPay API to create a payment) we will need to have at least a mocked server of the DinoPay API running to be able to execute our tests. Having to run external services every time we want to run our tests will make the tests slow which is exactly the opposite of the first principle described above. Moreover, any test that we write besides executing the MessageProcessor code will also run the code of the concrete implementations on which it depends. Any error on those other components will make our test fail. This means that the test is not isolated which is the second principle.

Our current implementation will not allow us to implement proper unit tests, at least unit tests that comply with the FIRST principles.

Reusability issues

We will not be able to reuse our MessageProcessor to process messages from other sources like DinoPay for example because the events deserializer is specific for Payments events. This implies that we will have to implement a different processor for each of the different event sources duplicating the core logic of the processor.

Summary

In this section, we introduced a set of design principles called SOLID that if followed allows us to write more readable, flexible, and maintainable code. We saw that our current implementation of the MessageProcessor violates some of these principles. We also discuss the different issues that we may encounter if we don’t follow those principles. In the next section, we will see how we can refactor our code to align it with the SOLID principles.

Iteration 3: Refactoring

💡
Find the complete code for this section here

In this iteration, we are going to address the issues mentioned in the previous section. We will make our MessageProcessor independent from the concrete implementations of the low-level modules through the introduction of interfaces.

These changes will make the MessageProcessor much more testable so we will see how we can write a unit test using mocked implementations of the new interfaces.

Substituting the concrete implementations with interfaces

Let’s define one interface for each of the concrete components on which the MessageProcessor depends.

type MessageConsumer interface {
   Consume() (<-chan Message, error)
   Close() error
}

The MessageConsumer interface represents a component that consumes messages from some source and communicates those messages via a channel. Note that this interface is abstract enough. It can be implemented to consume messages from a RabbitMQ server (the RabbitMQMessageConsumer that we already implemented) or it can be implemented by a component that receives HTTP requests (an HTTP server). In the latter case, each incoming request will be transformed into a Message and will be sent over the Messages channel.

type EventsDeserializer interface {
   Deserialize(message Message) (any, error)
}

The EventsDeserializer interface represents any component that knows how to deserialize certain types of events. We will have one implementation for each group of events. We already implemented the PaymentsEventsDeserializer. For DinoPay events, we will implement the DinoPayEventsDeserializer and for the internal events, we will write an InternalEventsDeserializer.

type EventsHandler interface {
   HandleEvent(event any) error
}

Finally, we defined the EventsHandler interface. Similar to the EventsDeserializer, this interface represents components that know how to handle events.

Now that we have our abstractions let’s look at the new version of our MessageProcessor.

type MessageProcessor struct {
   messageConsumer    MessageConsumer
   eventsDeserializer EventsDeserializer
   eventsHandler      EventsHandler
}

func NewMessageProcessor(
   messageConsumer MessageConsumer,
   eventsDeserializer EventsDeserializer,
   eventsHandler EventsHandler,
) *MessageProcessor {
   return &MessageProcessor{
       messageConsumer:    messageConsumer,
       eventsDeserializer: eventsDeserializer,
       eventsHandler:      eventsHandler,
   }
}

// …

func (p *MessageProcessor) processMessage(message Message) {
   event, err := p.eventsDeserializer.Deserialize(message)
   if err != nil {
       p.printErrorLog(message, err)
   }
   if event != nil {
       err = p.eventsHandler.HandleEvent(event)
       if err != nil {
           p.printErrorLog(message, err)
       }
   }
}

func (p *MessageProcessor) printErrorLog(message Message, err error) {
   log.Printf("error processing message with payload: %s: %s", message.Payload, err.Error())
}

Now the MessageProcessor depends on abstractions. The same code can be used to process messages coming from different sources (the internal Payments service or an external PSP like DinoPay), using different transport technologies (amqp to communicate with RabbitMQ or HTTP to receive webhooks) and different serialization formats (JSON or MessagePack).

Another benefit of this refactor is that now our MessageProcessor can be tested in isolation which is one of the desired characteristics of a unit test. Let’s then how to write a unit test for this component.

Writing a unit test for the Message Processor

One of the desired properties of a good unit test is isolation. The test should only execute the code of the component that is being tested. Ideally, no other code should be executed. Writing the test this way has several benefits

  • The test will be repeatable, which means that no matter how many times we execute the test it will always pass

  • Maintainable, which means that any refactor done on any of the dependencies of the unit we are testing, as long as the interfaces remain the same, will not affect our test

  • Fast, because we are not accessing any external system (like the message bus or a database)

To achieve the desired level of isolation we will not use the real implementations of the interfaces on which our MessageProcessor depends but instead, we will use fake implementations called mocks. If you are not familiar with the concept of mock here you can find a good article explaining how to test using mocks in go.

For testing our MessageProcessor we need three mocks, one for each of its dependencies. We can write the mocks ourselves and that would be totally fine. But, there is a great tool called mockery that can generate the mocks for us, so we will use it to save some time.

To generate the mocks we just run the following command on the same directory where we have our code.

mockery

After running the mockery command, we see three new files in our project

  • mock_message_consumer.go

  • mock_events_deserializer.go

  • mock_events_handler.go

Each of which contains a mocked implementation of each of the three interfaces on which the MessageProcessor depends.

Now that we have our mocks let’s see how to write a unit test. The behavior that we want to test is the following:

Given a Message with a valid payload
When the MessageProcessor receives the message from the MessageConsumer
Then the MessageProcessor must call the EventsDeserializerDeserialize method with the Message payload as a parameter
And the MessageProcessor must call the EventsHandlerHandle method with the event returned by the Deserialize method

Note that what we just wrote is a so-called acceptance criterion. The test that we will write must validate that the acceptance criteria are met by the MessageProcessor.

We will start with the mocks initialization. The first mock that we are going to initialize is the MessageConsumer mock.

messagesCh := make(chan Message)
messageConsumerMock := &MockMessageConsumer{}
messageConsumerMock.On("Consume").Return((<-chan Message)(messagesCh), nil)

Note that besides initializing the mock object we are also defining an expectation. An expectation is a way to tell the mock that it must expect a call on the Consume method. This call must be without any parameters. When the mock receives that call it must return the messageCh (initialized in the first line) as the first return value and nil as the second return value. The messageCh will allow us to trigger the test case. This expectation will be verified at the end of the test by calling the AssertExpectations(t) method on the mock.
Let’s continue with the next mock, the EventsDeserializer mock.

message := Message{
   Payload: []byte{},
}

event := WithdrawalCreated{
   WithdrawalId:       "19cf3c4c-4a0d-417e-b0cc-83385b9487de",
   Amount:             100,
   SourceAccount:      "source account details",
   DestinationAccount: "destination account details",
}

eventsDeserializerMock := &MockEventsDeserializer{}
eventsDeserializerMock.On("Deserialize", message).Return(event, nil)

In this case, we tell the mock that it must expect a call on the Deserialize method with the indicated message as a parameter. If that call is received it must return the event indicated as the first return value and nil as the second return value.
The third mock that we need to configure is the EventsHandler mock. This mock must expect a call on the HandleEvent method with the event that we defined before, the one returned by the eventsDeserializerMock, as a parameter. This call will return nil. We also configure the mock to execute some code when it receives this call. The code that will execute will call the Done() method on a WaitGroup instance that we previously defined. Using WaitGroups is the recommended way in Go to wait for goroutines to finish their processing. Given that the MessageProcessor will execute the handler’s HandleEvent method asynchronously inside a goroutine we need a way to make the main goroutine (the goroutine in which runs the test) wait for the other goroutines. Otherwise, the test will run until the end without waiting for the spawned goroutines to finish, and it will fail because the HandleEvent method will not be called.

wg := sync.WaitGroup{}
wg.Add(1)
eventsHandlerMock := &MockEventsHandler{}
eventsHandlerMock.On("HandleEvent", event).Return(nil).Run(func(args mock.Arguments) {
   wg.Done()
})

Now that we have all the mocks initialized and configured we can initialize and start the MessageProcessor.

messageProcessor := NewMessageProcessor(messageConsumerMock, eventsDeserializerMock, eventsHandlerMock)

messageProcessorStartError := messageProcessor.Start()
require.NoError(t, messageProcessorStartError)

After starting the messageProcessor instance we verify that it starts without errors. If any error occurs we fail the test immediately. This behavior of failing without executing the rest of the test is achieved by using the methods from the github.com/stretchr/testify/require package.

The next step is triggering the processing of one message. We do so by sending the message that we initialized at the beginning of the test over the messageCh instance which we configured to be returned by the messageConsumerMock.

messagesCh <- message

Finally, we check that all the method calls that we expected were actually made by using the AssertExpectations method on each mock.

messageConsumerMock.AssertExpectations(t)
eventsDeserializerMock.AssertExpectations(t)
eventsHandlerMock.AssertExpectations(t)

Summary

In this section, we refactorized the code written in the previous section. Following the Dependency Inversion principle we substituted the dependencies on concrete implementations used in the MessageProcessor with dependencies on interfaces. This change allowed us to implement a unit test that complies with the FIRST principles mentioned in the previous section.
In the next and last section of this blog post, we are going to iterate one more time over our code. This time we will change the code to make it more strongly typed and this way less error-prone and easier to maintain.

Iteration 4: Introducing generics

💡
Find the complete code for this section here

Let’s take a look one more time at the code implemented in the 2nd iteration to handle Payment events.

func (h *PaymentsEventsHandler) HandleEvent(event any) error {
   switch event.(type) {
   case WithdrawalCreated:
       log.Printf("handling WithdrawalCreated event: %+v", event)
   default:
       return fmt.Errorf("unexpected event type %t", event)
   }
   return nil
}

The HandleEvent(event any) method uses a type switch to determine, on runtime, the type of the event that needs to be handled. If the type is identified the event will be routed to the specific handler but if the type is not in the list of expected types a runtime error will be triggered. Now let’s think about what happens if we want to process a new Payments event. We must add this new event to the PaymentsEventsDeserializer component and also in the PaymentsEventsHandler. What happens if we forget to add the event in the PaymentsEventsHandler? We will get a runtime error. If we don’t have enough tests to catch this bug in the development or testing stage we can end with an “unexpected event type …” error on production. If, instead, we build the entire processing pipeline strongly typed this error will be caught in the development stage by the compiler reducing the risk of a runtime error occurring in production. Even if we have enough tests to prevent this error to reach production, it is better, in terms of development speed, to detect this kind of error as early as possible, and that is at compile time.

Generics to the rescue

So, how can we remove the usage of the any type and the type assertions while still keeping our MessageProcessor reusable (independent of any specific events group or source)? Before Golang 1.18 there was no easy way, but now the story changed with the introduction of Generics. So let’s see how we can use this new feature of the language to make our processing pipeline more strongly typed.

Let’s start by defining a common interface for all the events that will be processed by the MessageProcessor.

type Event[Visitor any] interface {
   Accept(visitor Visitor) error
}

The Event interface contains a type parameter (introduced in version 1.18) called Visitor that can be of any type. It declares a method called Accept that receives an object of type Visitor and returns an error. This may not make too much sense now, but be patient and keep pairing with me.

Then we are going to write a Visitor interface for each of the event groups that our service has to process. One group is the Payments events. Another group will be the DinoPay events. For the Payments events group, the interface will be the following.

type PaymentsEventsVisitor interface {
   VisitWithdrawalCreated(withdrawalCreated WithdrawalCreated) error
}

The interface will have one method for each event. In the implementation of this interface, we will perform the actual handling of each event (by delegating to specific handlers). For now, let’s write a dummy implementation that will log the event the same way we did with the PaymentsEventsHandler before.

type PaymentsEventsVisitorImpl struct {
}

func NewPaymentsEventsVisitorImpl() *PaymentsEventsVisitorImpl {
    return &PaymentsEventsVisitorImpl{}
}

func (p PaymentsEventsVisitorImpl) VisitWithdrawalCreated(withdrawalCreated WithdrawalCreated) error {
    log.Printf("handling WithdrawalCreated event: %+v", withdrawalCreated)
    return nil
}

Now let’s modify the implementation of the WithdrawalCreated event to make it implement the Event interface.

type WithdrawalCreated struct {
   WithdrawalId       string  `json:"withdrawal_id"`
   Amount             float64 `json:"amount"`
   SourceAccount      string  `json:"source_account"`
   DestinationAccount string  `json:"destination_account"`
}

func (w WithdrawalCreated) Accept(visitor PaymentsEventsVisitor) error {
   return visitor.VisitWithdrawalCreated(w)
}

Now the Event interface starts making a bit more sense, doesn’t it? The idea is that every event, in order to be able to be processed by the MessageProcessor, must implement the Event interface. The implementation of the Event interface consists in implementing the Accept method. The implementation of the Accept method is pretty straightforward, it has to call the corresponding method on the visitor object received as a parameter. This mechanism is called double dispatch and we took it from a design pattern called Visitor (for more information on this pattern, you can check out this great blog post by Alexander Shvets). With this technique the MessageProcessor instead of calling the visitor method directly, which would imply performing a type switch on the event type, just calls the Accept method on the event passing to it the visitor. The event knows what method it has to call on the visitor. This way we can keep our MessageProcessor agnostic of the specific event it is processing and at the same time we are avoiding doing type assertions on runtime which, as we already mentioned, is harder to maintain.

The EventsDeserializer interface will declare the same type parameter as the Event interface.

type EventsDeserializer[Visitor any] interface {
   Deserialize(message Message) (Event[Visitor], error)
}

Note that the first return value of the Deserialize method is not of type any anymore. Now it returns a perfectly typed object of type Event[Visitor]. The implementation of the PaymentsEventsDeserializer (ex PaymentsEventsDeserializerImpl) will still look the same as before, the only difference is that the return type now is Event[PaymetsEventsVisitor].

func (r *PaymentsEventsDeserializer) Deserialize(message Message) (Event[PaymentsEventsVisitor], error) {
 // …
}

Finally, let’s see how the MessageProcessor looks now.

type MessageProcessor[Visitor any] struct {
   messageConsumer    MessageConsumer
   eventsDeserializer EventsDeserializer[Visitor]
   eventsVisitor      Visitor
}

func NewMessageProcessor[Visitor any](
   messageConsumer MessageConsumer,
   eventsDeserializer EventsDeserializer[Visitor],
   eventsVisitor Visitor,
) *MessageProcessor[Visitor] {

   return &MessageProcessor[Visitor]{
       messageConsumer:    messageConsumer,
       eventsDeserializer: eventsDeserializer,
       eventsVisitor:      eventsVisitor,
   }
}

// …

func (p *MessageProcessor[Visitor]) processMsg(message Message) {
    event, err := p.eventsDeserializer.Deserialize(message.Payload)
    if err != nil {
        p.printErrorLog(message, err)
        return
    }
    if event != nil {
        err = event.Accept(p.eventsVisitor)
        if err != nil {
            p.printErrorLog(message, err)
        }
    }
}

func (p *MessageProcessor[Visitor]) printErrorLog(message Message, err error) {
   log.Printf("error processing message with payload: %s: %s", message.Payload, err.Error())
}

The MessageProcessor struct is also parameterized with the type of an event's visitor. Specifying a specific visitor, during the instantiation of the MessageProcessor, will define, in compilation time, the set (or group) of events that specific instance will allow.
Note that the EventsHandler interface (along with its implementations, the PaymentsEventsHandler and the MockEventsHandler) was substituted by the PaymentsEventsVisitor interface (and its implementations).

To make this new concept of generics clear let’s see how the main function of our application looks now.

func main() {

   messageConsumer := NewRabbitMQMessageConsumer()
   paymentsEventsDeserializer := NewPaymentsEventsDeserializer()
   paymentsEventsVisitor := NewPaymentsEventVisitorImpl()

   processor := NewMessageProcessor[PaymentsEventsVisitor](
       messageConsumer,
       paymentsEventsDeserializer,
       paymentsEventsVisitor,
   )

   err := processor.Start()
   if err != nil {
       log.Fatalf("failed to start message processor: %s", err.Error())
   }

   blockForeverCh := make(chan any)
   <-blockForeverCh
}

The NewMessageProcessor method is called with the type parameter set to the interface PaymentsEventsVisitor. This will make the compiler verify that the second parameter of the NewMessageProcessor constructor is of type EventsDeserializer[PaymentsEventsVisitor] and that the third parameter’s type is PaymentsEventsVisitor. This way the whole processing pipeline is strongly typed.

If we add a new event, let’s say an event called AnotherPaymentEvent, in order to include it in the processing pipeline we first have to make that event implement the Event interface. The implementation will look like this.

type AnotherPaymentEvent struct {
}

func (w AnotherPaymentEvent) Accept(visitor PaymentsEventsVisitor) error {
  visitor.VisitAnotherPaymentEvent(w)
}

Now, this code will not compile until we add the VisitAnotherPaymentEvent method to the PaymentsEventsVisitor interface and its corresponding implementation as well. Only then we will be able to implement the deserialization logic in the PaymentsEventsDeserializer component. With this approach, in opposition to what happened with the solution implemented in the previous iteration, it is not possible to add a new event without implementing its handling logic (by mistake) because the compiler will just not allow it.

Summary

In this last iteration, we improved our code by removing the runtime type assertions which were a potential source of runtime errors. To do so we used generics to be able to define a common interface for all the events that can be processed by our MessageProcessor and the Visitor design pattern to delegate to each event the knowledge of which specific method should be called on the corresponding Visitor.

Closing notes

We built a reusable and strongly typed MessageProcessor for the DinoPay Gateway. We showed how to do it in an iterative and incremental way. Along the way, we discussed (briefly) the SOLID principles, unit tests, generics, and the visitor design pattern.

In the following articles of this Walletera series, we will improve the MessageProcessor by adding the features needed for a production-grade component and also will continue with the rest of the components needed to complete the implementation of the DinoPay Gateway.