# Building a SOLID, Strongly Typed, and Generic Messages Processor

<div data-node-type="callout">
<div data-node-type="callout-emoji">💡</div>
<div data-node-type="callout-text">This article belongs to the <a target="_blank" rel="noopener noreferrer nofollow" href="https://fedevmoya.hashnode.dev/series/walletera" style="pointer-events: none">Walletera</a> series.</div>
</div>

In this article, we will see how to build a Messages Processor for the [DinoPay Gateway](https://walletera.dev/designing-a-digital-wallet#heading-a-fictitious-payment-service-provider-dinopay). Along the way we will see how to work in an iterative and incremental way. We will also learn about the SOLID principles and how to apply them. Finally we will explore the usage of generics to make our solution more strongly typed.

# Introduction

Recalling from [the previous article](https://fedevmoya.hashnode.dev/designing-a-digital-wallet), where we described the architecture of our digital wallet, the DinoPay Gateway is the service that handles the communication between the internal Payments service and the external DinoPay payments service provider. Let’s see the design diagram for the DinoPay Gateway to refresh our memories.

![](https://cdn.hashnode.com/res/hashnode/image/upload/v1689276769682/e215dd6d-27f6-412d-99ff-5692ac650dbc.png align="center")

From the diagram above, we see that the gateway will be processing messages from three different sources, the Payments Service, the DinoPay Payments Provider, and the Internal Messages Repository. For each of these sources, we will have an instance of a **Message Processor** that will process the incoming messages. The responsibilities of this component are.

* Consumption of messages from external sources. Examples of external sources may be message queues, databases, or HTTP requests.
    
* Deserialization of the messages payloads (the events) into the corresponding internal representation.
    
* Execution of specific event handlers.
    

Besides these basic tasks, there are other tasks that are required in order to build a production-grade message processor.

* Acknowledgment of the messages based on the response from the event handlers.
    
* Supporting a graceful shutdown. The message processor will be a stateful long-running process. This means that during shutdown there are a couple of tasks that need to be done. These tasks include:
    
    * Closing any underlying connection that may be held (connection to the message queue or the database).
        
    * Waiting for any inflight message to be processed.
        
* Collect and send metrics like the number of messages successfully processed, the number of processing errors, and the time that it takes to process each message.
    

In this article, we will cover how to implement the basic tasks in a SOLID (we will see later what this acronym means), strongly typed, and generic way. The other three requirements will be addressed in subsequent articles.

The approach that we will follow for building this message processor will be iterative and incremental. In the first iteration, we will focus on the basic setup needed to consume messages from a particular message queue, RabbitMQ. In the second iteration, we will add the logic needed to process Payments events. Then we will identify the weak points of our implementation and what options we have to improve it. In the subsequent iteration, we will refactor the code by applying the SOLID principles. Finally, in the last iteration, we will see how we can improve our solution even more by using generics and the visitor design pattern to make the code strongly typed.

For the message queue, we will use RabbitMQ. We picked this message queue for no reason other than being the most popular. But we could have used any other solution, like Nats, Amazon SQS/SNS, or whatever. The implementation of the message processor will be agnostic of the message queue.

# Iteration 1: The basic stuff

<div data-node-type="callout">
<div data-node-type="callout-emoji">💡</div>
<div data-node-type="callout-text">Find the complete code for this section <a target="_blank" rel="noopener noreferrer nofollow" href="https://github.com/walletera/message-processor/tree/3597a43338d388e69a4284564a70e61691c481ff" style="pointer-events: none">here</a></div>
</div>

Let's start with the basic stuff needed to consume messages from Rabbitmq. The first component that we are going to write is a `RabbitMQMessageConsumer`. This component will implement the logic needed to connect and consume messages from an amqp server (amqp is the core messaging protocol used by RabbitMQ). Let’s see what this component looks like.

```go
type RabbitMQMessageConsumer struct {
    conn        *amqp.Connection
    connChannel *amqp.Channel
}

func NewRabbitMQMessageConsumer() *RabbitMQMessageConsumer {
    return &RabbitMQMessageConsumer{}
}

func (r *RabbitMQMessageConsumer) Consume() (<-chan Message, error) {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        return nil, fmt.Errorf("failed to connect to RabbitMQ: %w", err)
    }

    r.conn = conn

    ch, err := conn.Channel()
    if err != nil {
        return nil, fmt.Errorf("failed to open a channel: %w", err)
    }

    r.connChannel = ch

    q, err := ch.QueueDeclare(
        "message-consumer-queue", // name
        false,                    // durable
        false,                    // delete when unused
        false,                    // exclusive
        false,                    // no-wait
        nil,                      // arguments
    )
    if err != nil {
        return nil, fmt.Errorf("failed to declare a queue: %w", err)
    }

    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        true,   // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    if err != nil {
        return nil, fmt.Errorf("failed to register a consumer: %w", err)
    }

    messagesCh := make(chan Message)
    go func() {
        defer close(messagesCh)
        for msg := range msgs {
            messagesCh <- Message{
                Payload: msg.Body,
            }
        }
    }()

    return messagesCh, nil
}

func (r *RabbitMQMessageConsumer) Close() error {
    err := r.connChannel.Close()
    if err != nil {
        return fmt.Errorf("failed to close rabbitmq connection channel: %w", err)
    }
    err = r.conn.Close()
    if err != nil {
        return fmt.Errorf("failed to close rabbitmq connection: %w", err)
    }
    return nil
}
```

We are not going to analyze this implementation in detail. I copied most of the code from the [rabbitmq go tutorial](https://www.rabbitmq.com/tutorials/tutorial-one-go.html) so you can read the details there if you are interested. The important part for us is the interface. The interface of this component is compounded by two methods. The first method is `Consume()`. It receives no parameter and returns two values.  The first value’s type is `<-chan Message` and the type of the second value is `error`. The `Consume` method initiates the connection with the RabbitMQ server and then forwards every incoming message to the messages channel returned as the first return value. If any error occurs while trying to connect to RabbitMQ the first return value will be nil and the second return value will contain the underlying error wrapped into a higher-level error. The other method that compounds the interface is the `Close()` method. This method closes the underlying connection to the RabbitMQ server. If the attempt to close the connection fails the method will return an error.  
The Message type will be a struct with a field of type `[]byte` called `Payload` that will contain the raw event, i.e. the event in the transmission format which in our case will be JSON.

```go
type Message struct {
   Payload []byte
}
```

## The Message Processor skeleton

Now that we have a way of consuming messages from Rabbimq let’s write the first version of the `MessageProcessor`.

```go
type MessageProcessor struct {
   messageConsumer *RabbitMQMessageConsumer
}

func NewMessageProcessor() *MessageProcessor {
   return &MessageProcessor{
       messageConsumer: NewRabbitMQMessageConsumer(),
   }
}

func (p *MessageProcessor) Start() error {
   msgCh, err := p.messageConsumer.Consume()
   if err != nil {
       return fmt.Errorf("failed consuming from message consumer: %w", err)
   }
   go p.processMessages(msgCh)
   return nil
}

func (p *MessageProcessor) processMessages(ch <-chan Message) {
   for msg := range ch {
       go p.processMessage(msg)
   }
}

func (p *MessageProcessor) processMessage(msg Message) {
   log.Printf("processing message with payload %s", msg.Payload)
}
```

Our `MessageProcessor` will be constructed using the `NewMessageProcessor()` method. This method creates a pointer to a new instance of the `MessageProcessor` struct. The `MessageProcessor` struct has a field called `messageConsumer` which is a pointer to a  `RabbitMQMessageConsumner`. This field is initialized using the `NewRabbitMQConsumer()` method that we define at the beginning of this iteration.

The `MessageProcessor` is a long-running process. It is initiated by the `Start()` method. In the `Start()` method, we first call the `Consume()` method from the `messageConsumer` to get the messages channel, if no error is returned we then call the `processMessages` method passing the messages channel as a parameter. Note that we are calling this method with the `go` operator in front. This will execute the method in a new goroutine which will allow the `Start()` method to return immediately simplifying the implementation of the caller component.

The `processMessages(ch <-chan Message)` method uses a for loop to consume the messages from the `ch` channel. Using a for loop to do this task is a good practice because [we don’t need to check explicitly if the channel is still open before reading a value](https://go.dev/tour/concurrency/4), as soon as the channel is closed the for loop will exit. For each incoming message, we will call the `processMessage` method passing the message as a parameter. To increase the throughput of our application we will process each message concurrently. To achieve this we use the `go` operator again to execute the `processMessage` method in a new goroutine.

Finally, the `processMessage` method implements the processing of a particular message. For now, the processing will be printing the message. Along the article, we will see different implementations of this method.

## The main function

Let's write a very simple main function to run our `MessageProcessor`. In this implementation, we create an instance of the `MessageProcessor` using the constructor method `NewMessageProcessor()` and then we start it using the `Start()` method. At the bottom of the function, we just blocked the main goroutine to keep our application alive.

```go
func main() {

   processor := NewMessageProcessor()

   err := processor.Start()
   if err != nil {
       log.Fatalf("failed to start message processor: %s", err.Error())
   }

   blockForeverCh := make(chan any)
   <-blockForeverCh
}
```

## Exercising our application

Let’s see if what we did so far works. To do this we will start a RabbitMQ server and run our application. The application has hardcoded parameters (you can see those parameters in the RabbitMQMessageConsumer component) that will allow it to connect to a local RabbitMQ instance run with the default configuration. Then we will send some messages to RabbitMQ. Finally, we will inspect the output printed by the application to see if the messages were handled as expected.

Let’s start by initiating the RabbitMQ instance. The easiest way to do this is using Docker. This way we don’t need to install RabbitMQ on our service. We just write a very simple docker-compose.yml file and run it. Docker will download the RabbitMQ image and will run it. Let’s see how we can do that.

First, we write the docker-compose.yml file.

```yaml
version: "3"

services:
 rabbitmq:
   image: rabbitmq:3.8.0-management
   container_name: rabbitmq
   ports:
     - "5672:5672"
```

With this simple YAML file, we are telling Docker what RabbitMQ image we want to use and also that we want to expose the port 5672 from the server running inside the container to the host. This way our app can connect to the RabbitMQ using the [localhost](http://localhost) IP 127.0.0.1.  
Now we can start our RabbitMQ instance by executing (on the same directory where we placed the docker-compose.yml) the following command.

```bash
docker-compose up -d
```

The next step will be compiling and running our application.

```bash
go build -o message_processor
./message_processor &> message_processor_output.txt &
MESSAGE_PROCESSOR_PID=$!
```

Execute this on the same directory where you placed the go code. The first line compiles the application into a binary named message\_processor. The second line executes the binary in the background and redirects its output to the file message\_processor\_output.txt. The third line saves the process id of our app in the MESSAGE\_PROCESOR\_PID environment variable. This will allow killing the app when we finish with the test. Executing the app this way will make it easier to put all the commands together in a bash script later.

Now will send some messages to RabbitMQ.

```bash
for i in {1..10} ; do
   message="Message #$i"
   echo "Publishing message $message"
   docker-compose exec rabbitmq rabbitmqadmin publish routing_key="message-consumer-queue" payload="$message"
done
```

Here we are using `docker-compose exec` to execute the `rabbitmqadmin` command inside the container. This command allows publishing messages from the command line. We will execute this command 10 times to send 10 messages.  
Let’s see what was printed to the message\_processor\_output.txt file executing this command.

```purebasic
2023/03/08 18:37:21 processing message with payload Message #1
2023/03/08 18:37:22 processing message with payload Message #2
2023/03/08 18:37:23 processing message with payload Message #3
2023/03/08 18:37:24 processing message with payload Message #4
2023/03/08 18:37:24 processing message with payload Message #5
2023/03/08 18:37:25 processing message with payload Message #6
2023/03/08 18:37:26 processing message with payload Message #7
2023/03/08 18:37:27 processing message with payload Message #8
2023/03/08 18:37:27 processing message with payload Message #9
2023/03/08 18:37:28 processing message with payload Message #10
```

The final step is killing our application and shutting down the RabbitMQ.

```bash
kill -n 9 $MESSAGE_PROCESSOR_PID
docker-compose down
```

## Summary

In this first iteration, we wrote the skeleton of a `MessageProcessor` which uses a `RabbitMQMessageConsumer` instance to connect and consume messages from the message queue. We also exercised the application with some messages and we saw that it was able to handle it as expected.

In the next iteration, we will modify our `MessageProcessor` to handle one of the events emitted by the Payments service, the `WithdrawalCreated` event.

# Iteration 2: Handling Payments events

<div data-node-type="callout">
<div data-node-type="callout-emoji">💡</div>
<div data-node-type="callout-text">Find the complete code for this section <a target="_blank" rel="noopener noreferrer nofollow" href="https://github.com/walletera/message-processor/tree/a094858d688d17585e3bb114404e40aceef59bad" style="pointer-events: none">here</a></div>
</div>

In this iteration, we will modify our `MessageProcessor` to handle Payments events. We will write a component that will be in charge of the deserialization process. We will also write a handler that will handle the deserialized events.

For the moment we will only handle `WithdrawalCreated` events. Let’s look at an example of this event.

```json
{
   "type": "WithdrawalCreated",
   "data": {
      "withdrawal_id": "e728f3a7-b92f-46fe-b080-524442065cb3",
      "amount": 100,
      "source_account": "source account details",
      "destination_account": "destination account details"
   }
}
```

## Deserialization of events

The first component that we need to implement is a deserializer, i.e. the component that will take raw strings containing the events payload in JSON format and will output a go struct representing that event. Let’s start by writing the structs that represent a `WithdrawalCreated` event.

```go
type PaymentsEventsDeserializer struct {
}

func NewPaymentsEventsDeserializer() *PaymentsEventsDeserializer {
   return &PaymentsEventsDeserializer{}
}

func (d *PaymentsEventsDeserializer) Deserialize(message Message) (any, error) {
   var event EventEnvelope
   err := json.Unmarshal(message.Payload, &event)
   if err != nil {
       return nil, fmt.Errorf("error deserializing message with payload %s: %w", message.Payload, err)
   }
   switch event.Type {
   case "WithdrawalCreated":
       var withdrawalCreated WithdrawalCreated
       err := json.Unmarshal(event.Data, &withdrawalCreated)
       if err != nil {
           log.Printf("error deserializing WithdrawalCreated event data %s: %s", event.Data, err.Error())
       }
       return withdrawalCreated, nil
   default:
       log.Printf("unexpected event type: %s", event.Type)
       return nil, nil
   }
}
```

The deserialization logic is implemented in the `Deserialize(message Message)` method. It receives a message as a parameter and returns two values, the first value will be the deserialized event and the second value will contain any error that may occur during the deserialization process. The `Deserialize` method first unmarshals the message payload into an instance of an EventEnvelope. Note that the Data field of the `EventEnvelope` struct is of type `json.RawMessage`. Using this type tells the `json.Unmarshal` function that we don’t want to deserialize that field (yet) so it will leave it as a slice of bytes (`json.RawMessage` is an alias of `[]byte`).

Once we have the event envelope deserialized we use the `Type` field to decide how are we going to deserialize the data object. For now, we are only processing events of type `WithdrawalCreated` so our switch only contains one case for this type of event.

Note that the type of the first return value is `any`. This is an alias for the type `interface{}` that appeared in version 1.18 of Golang. Similarly to the Object type in Java or the void pointer in C, it is used to denote the fact that the `Deserialize` method will return values of different types. We need to return this value because each payment event will have a different set of attributes so there is no common interface (type) between them.

## Handling the events

Once we have the event represented in a go struct we can process it. This processing will occur in specific handlers. We need a way to call the correct handler for each event. This “routing” logic will be implemented in a component that we are going to call PaymentsEventsHandler.

```go
type PaymentsEventsHandler struct {
}

func NewPaymentsEventsHandler() *PaymentsEventsHandler {
   return &PaymentsEventsHandler{}
}

func (h *PaymentsEventsHandler) HandleEvent(event any) error {
   switch event.(type) {
   case WithdrawalCreated:
       log.Printf("handling WithdrawalCreated event: %+v", event)
   default:
       return fmt.Errorf("unexpected event type %t", event)
   }
   return nil
}
```

To route the event to the specific handler we are using a type switch, this is a construct that permits several type assertions in series. For simplicity, we are not calling any real handler but we are just printing a message containing the event struct.

## The new implementation of the Message Processor

Let’s see what the new implementation of the MessageProcessor looks like.

```go
type MessageProcessor struct {
   messageConsumer            *RabbitMQMessageConsumer
   paymentsEventsDeserializer *PaymentsEventsDeserializer
   paymentsEventsHandler      *PaymentsEventsHandler
}

func NewMessageProcessor() *MessageProcessor {
   return &MessageProcessor{
       messageConsumer:            NewRabbitMQMessageConsumer(),
       paymentsEventsDeserializer: NewPaymentsEventsDeserializer(),
       paymentsEventsHandler:      NewPaymentsEventsHandler(),
   }
}

// ...

func (p *MessageProcessor) processMessage(message Message) {
   event, err := p.paymentsEventsDeserializer.Deserialize(message)
   if err != nil {
       p.pringErrorLog(message, err)
       return
   }
   err = p.paymentsEventsHandler.HandleEvent(event)
   if err != nil {
       p.pringErrorLog(message, err)
   }
}
```

The `MessageProcessor` struct now contains two additional fields. A `paymentsEventsDeserializer` of type `PaymentsEventsDeserializer` and a `paymentsEventsHandler` of type `PaymentsEventsHandler`. These fields are initialized in the constructor method the same way we initialized the `messageConsumer` field.

The `processMessage` method first uses the instance of the `PaymentsEventsDeserializer` to deserialize the message into an event. Then it passes the event to the `PaymentsEventsHandler` instance. This simple implementation will allow us to process events coming from the Payments service.

## Exercising the application with the new changes

Before exercising our app with our bash script we need to change the payload we are sending to RabbitMQ. The code that sends the messages to rabbitmq now looks like this.

```bash
print_message "Sending messages to rabbitmq"
for i in {1..10} ; do
   echo "Publishing WithdrawalCreated event with amount $i"
   docker-compose exec rabbitmq rabbitmqadmin publish routing_key="message-consumer-queue" payload="
     {
     \"type\": \"WithdrawalCreated\",
     \"data\": {
      \"withdrawal_id\": \"e728f3a7-b92f-46fe-b080-524442065cb3\",
      \"amount\": $i,
      \"source_account\": \"source account details\",
      \"destination_account\": \"destination account details\"
     }
    }
   "
done
```

This is the output that you should see if you run the bash script.

```purebasic
2023/03/31 17:00:58 handling WithdrawalCreated event: {WithdrawalId:e728f3a7-b92f-46fe-b080-524442065cb3 Amount:1 SourceAccount:source account details DestinationAccount:destination account details}
2023/03/31 17:00:59 handling WithdrawalCreated event: {WithdrawalId:e728f3a7-b92f-46fe-b080-524442065cb3 Amount:2 SourceAccount:source account details DestinationAccount:destination account details}
2023/03/31 17:01:00 handling WithdrawalCreated event: {WithdrawalId:e728f3a7-b92f-46fe-b080-524442065cb3 Amount:3 SourceAccount:source account details DestinationAccount:destination account details}
2023/03/31 17:01:00 handling WithdrawalCreated event: {WithdrawalId:e728f3a7-b92f-46fe-b080-524442065cb3 Amount:4 SourceAccount:source account details DestinationAccount:destination account details}
2023/03/31 17:01:01 handling WithdrawalCreated event: {WithdrawalId:e728f3a7-b92f-46fe-b080-524442065cb3 Amount:5 SourceAccount:source account details DestinationAccount:destination account details}
2023/03/31 17:01:02 handling WithdrawalCreated event: {WithdrawalId:e728f3a7-b92f-46fe-b080-524442065cb3 Amount:6 SourceAccount:source account details DestinationAccount:destination account details}
2023/03/31 17:01:02 handling WithdrawalCreated event: {WithdrawalId:e728f3a7-b92f-46fe-b080-524442065cb3 Amount:7 SourceAccount:source account details DestinationAccount:destination account details}
2023/03/31 17:01:03 handling WithdrawalCreated event: {WithdrawalId:e728f3a7-b92f-46fe-b080-524442065cb3 Amount:8 SourceAccount:source account details DestinationAccount:destination account details}
2023/03/31 17:01:04 handling WithdrawalCreated event: {WithdrawalId:e728f3a7-b92f-46fe-b080-524442065cb3 Amount:9 SourceAccount:source account details DestinationAccount:destination account details}
2023/03/31 17:01:04 handling WithdrawalCreated event: {WithdrawalId:e728f3a7-b92f-46fe-b080-524442065cb3 Amount:10 SourceAccount:source account details DestinationAccount:destination account details}
```

## Summary

Now our `MessageProcessor` can process Payments events of type `WithdrawalCreated`. We added to the `processMessage` method the ability to deserialize an event using the `PaymentsEventsDeserializer` and process it using the `PaymentsEventsHandler`. While this implementation works it has some issues that will make it hard to maintain. In the next section, we will discuss what are those issues, why they may present a problem with the maintainability of our application, and what design principles we can follow to improve our implementation.

# Problems with our current implementation

Let's put the break on the implementation and think a bit about whether we are following the good practices for software design or not. The topic of software design includes several concepts like object-oriented design, design patterns, domain-driven design, test-driven design, etc. But in this case, we are going to follow a set of principles known by the acronym SOLID. These principles will help us identify the weaknesses of our current implementation and will give us some ideas on how we can improve it.

## SOLID design

From [Wikipedia](https://en.wikipedia.org/wiki/SOLID):

> *In*[*software engineering*](https://en.wikipedia.org/wiki/Software_engineering)*, SOLID is a*[*mnemonic*](https://en.wikipedia.org/wiki/Mnemonic)[*acronym*](https://en.wikipedia.org/wiki/Acronym)*for five design principles intended to make*[*object-oriented*](https://en.wikipedia.org/wiki/Object-oriented)*designs more understandable, flexible, and*[*maintainable*](https://en.wikipedia.org/wiki/Software_maintenance)*. The principles are a subset of many principles promoted by American software engineer and instructor*[*Robert C. Martin*](https://en.wikipedia.org/wiki/Robert_C._Martin)*first introduced in his 2000 paper Design Principles and Design Patterns discussing*[*software rot*](https://en.wikipedia.org/wiki/Software_rot)*.*
> 
> *The SOLID ideas are*
> 
> * *The*[*Single-responsibility principle*](https://en.wikipedia.org/wiki/Single-responsibility_principle)*: "There should never be more than one reason for a*[*class*](https://en.wikipedia.org/wiki/Class_(computer_programming))*to change." In other words, every class should have only one responsibility.*
>     
> * *The*[*Open–closed principle*](https://en.wikipedia.org/wiki/Open%E2%80%93closed_principle)*: "Software entities should be open for extension, but closed for modification."*
>     
> * *The*[*Liskov substitution principle*](https://en.wikipedia.org/wiki/Liskov_substitution_principle)*: "Functions that use pointers or references to base classes must be able to use objects of derived classes without knowing it." See also*[*design by contract*](https://en.wikipedia.org/wiki/Design_by_contract)
>     
> * *The*[*Interface segregation principle*](https://en.wikipedia.org/wiki/Interface_segregation_principle)*: "Clients should not be forced to depend upon interfaces that they do not use."*
>     
> * *The*[*Dependency inversion principle*](https://en.wikipedia.org/wiki/Dependency_inversion_principle)*: "Depend upon abstractions, \[not\] concretions."*
>     

## The identified issues

The `MessageProcessor` depends on concrete implementations. These are the `RabbitMQMessageConsumer`, the `PaymentsEventsDeserializer`, and the `PaymentsEventsHandler`. This violates the fifth SOLID principle, the [**D**ependency Inversion Principle](https://stackify.com/dependency-inversion-principle/), which states that

* *High-level modules should not import anything from low-level modules. Both should depend on abstractions (e.g., interfaces).*
    
* *Abstractions should not depend on details. Details (concrete implementations) should depend on abstractions.*
    

Violating this software design principle has some consequences. We will end we code that is hard to maintain, hard to test, and will lead to code duplication. Let’s see why.

### Maintainability issues

Changes in the low-level modules will impact the `MessageProcessor`. For example, If we decide to use a different message queue (Kafka instead of RabbitMQ for example) to communicate Payments events we will have to change the `MessageProcessor` implementation to use a different message consumer. The same goes for the serialization format, if we decide to use another format than JSON, like [MessagePack](https://msgpack.org/index.html) or [Internet Object](https://docs.internetobject.org/), we will also have to modify the `MessageProcessor`. This means that we will find ourselves changing the `MessageProcessor` not because the processing logic has changed but because there was a change in some of the low-level modules on which it depends.

### Testability issues

There are a set of principles, named with the acronym FIRST (yet another acronym) that defines what is considered an effective unit test. According to these principles, a unit test should be Fast, Isolated, Repeatable, Self-Verifying, and Timely. Let's focus on the first two principles (for a detailed explanation of these principles you can read [this article](https://medium.com/pragmatic-programmers/unit-tests-are-first-fast-isolated-repeatable-self-verifying-and-timely-a83e8070698e)). Fast means that our tests should be as fast as possible. The faster your tests run, the more often you’ll run them, and we want to be able to run our tests all the time, ideally after each modification done to our `MessageProcessor`. Remember that the earlier we found bugs the cheaper is to fix them. Isolated means that each unit test should have a single reason to fail.

Because the `MessageProcessor` depends on a concrete implementation of a message consumer, the `RabbitMQMessageConsumer`, we will only be able to test it if we have a running instance of RabbitMQ. The same will occur with the concrete implementation of the handlers. If the handlers interact with an external service via HTTP for example (the handler of the `WithdrawalCreated` event, at some point, will need to interact with the DinoPay API to create a payment) we will need to have at least a mocked server of the DinoPay API running to be able to execute our tests. Having to run external services every time we want to run our tests will make the tests slow which is exactly the opposite of the first principle described above. Moreover, any test that we write besides executing the `MessageProcessor` code will also run the code of the concrete implementations on which it depends. Any error on those other components will make our test fail. This means that the test is not isolated which is the second principle.

Our current implementation will not allow us to implement proper unit tests, at least unit tests that comply with the FIRST principles.

### Reusability issues

We will not be able to reuse our `MessageProcessor` to process messages from other sources like DinoPay for example because the events deserializer is specific for Payments events. This implies that we will have to implement a different processor for each of the different event sources duplicating the core logic of the processor.

## Summary

In this section, we introduced a set of design principles called SOLID that if followed allows us to write more readable, flexible, and maintainable code. We saw that our current implementation of the `MessageProcessor` violates some of these principles. We also discuss the different issues that we may encounter if we don’t follow those principles. In the next section, we will see how we can refactor our code to align it with the SOLID principles.

# Iteration 3: Refactoring

<div data-node-type="callout">
<div data-node-type="callout-emoji">💡</div>
<div data-node-type="callout-text">Find the complete code for this section <a target="_blank" rel="noopener noreferrer nofollow" href="https://github.com/walletera/message-processor/tree/a971aa395f0e2d9bce7f11cfe8f99beb5ac69cd3" style="pointer-events: none">here</a></div>
</div>

In this iteration, we are going to address the issues mentioned in the previous section. We will make our MessageProcessor independent from the concrete implementations of the low-level modules through the introduction of interfaces.

These changes will make the MessageProcessor much more testable so we will see how we can write a unit test using mocked implementations of the new interfaces.

## Substituting the concrete implementations with interfaces

Let’s define one interface for each of the concrete components on which the `MessageProcessor` depends.

```go
type MessageConsumer interface {
   Consume() (<-chan Message, error)
   Close() error
}
```

The `MessageConsumer` interface represents a component that consumes messages from some source and communicates those messages via a channel. Note that this interface is abstract enough. It can be implemented to consume messages from a RabbitMQ server (the `RabbitMQMessageConsumer` that we already implemented) or it can be implemented by a component that receives HTTP requests (an HTTP server). In the latter case, each incoming request will be transformed into a `Message` and will be sent over the Messages channel.

```go
type EventsDeserializer interface {
   Deserialize(message Message) (any, error)
}
```

The `EventsDeserializer` interface represents any component that knows how to deserialize certain types of events. We will have one implementation for each group of events. We already implemented the `PaymentsEventsDeserializer`. For DinoPay events, we will implement the `DinoPayEventsDeserializer` and for the internal events, we will write an InternalEventsDeserializer.

```go
type EventsHandler interface {
   HandleEvent(event any) error
}
```

Finally, we defined the `EventsHandler` interface. Similar to the `EventsDeserializer`, this interface represents components that know how to handle events.

Now that we have our abstractions let’s look at the new version of our `MessageProcessor`.

```go
type MessageProcessor struct {
   messageConsumer    MessageConsumer
   eventsDeserializer EventsDeserializer
   eventsHandler      EventsHandler
}

func NewMessageProcessor(
   messageConsumer MessageConsumer,
   eventsDeserializer EventsDeserializer,
   eventsHandler EventsHandler,
) *MessageProcessor {
   return &MessageProcessor{
       messageConsumer:    messageConsumer,
       eventsDeserializer: eventsDeserializer,
       eventsHandler:      eventsHandler,
   }
}

// …

func (p *MessageProcessor) processMessage(message Message) {
   event, err := p.eventsDeserializer.Deserialize(message)
   if err != nil {
       p.printErrorLog(message, err)
   }
   if event != nil {
       err = p.eventsHandler.HandleEvent(event)
       if err != nil {
           p.printErrorLog(message, err)
       }
   }
}

func (p *MessageProcessor) printErrorLog(message Message, err error) {
   log.Printf("error processing message with payload: %s: %s", message.Payload, err.Error())
}
```

Now the `MessageProcessor` depends on abstractions. The same code can be used to process messages coming from different sources (the internal Payments service or an external PSP like DinoPay), using different transport technologies (amqp to communicate with RabbitMQ or HTTP to receive webhooks) and different serialization formats (JSON or MessagePack).

Another benefit of this refactor is that now our `MessageProcessor` can be tested in isolation which is one of the desired characteristics of a unit test. Let’s then how to write a unit test for this component.

## Writing a unit test for the Message Processor

One of the desired properties of a good unit test is isolation. The test should only execute the code of the component that is being tested. Ideally, no other code should be executed. Writing the test this way has several benefits

* The test will be repeatable, which means that no matter how many times we execute the test it will always pass
    
* Maintainable, which means that any refactor done on any of the dependencies of the unit we are testing, as long as the interfaces remain the same, will not affect our test
    
* Fast, because we are not accessing any external system (like the message bus or a database)
    

To achieve the desired level of isolation we will not use the real implementations of the interfaces on which our MessageProcessor depends but instead, we will use fake implementations called mocks. If you are not familiar with the concept of mock [here](https://dev.to/salesforceeng/mocks-in-go-tests-with-testify-mock-6pd) you can find a good article explaining how to test using mocks in go.

For testing our MessageProcessor we need three mocks, one for each of its dependencies. We can write the mocks ourselves and that would be totally fine. But, there is a great tool called [mockery](https://github.com/vektra/mockery) that can generate the mocks for us, so we will use it to save some time.

To generate the mocks we just run the following command on the same directory where we have our code.

```bash
mockery
```

After running the mockery command, we see three new files in our project

* mock\_message\_consumer.go
    
* mock\_events\_deserializer.go
    
* mock\_events\_handler.go
    

Each of which contains a mocked implementation of each of the three interfaces on which the MessageProcessor depends.

Now that we have our mocks let’s see how to write a unit test. The behavior that we want to test is the following:

**Given** a `Message` with a valid payload  
**When** the `MessageProcessor` receives the message from the `MessageConsumer`  
**Then** the `MessageProcessor` must call the `EventsDeserializerDeserialize` method with the `Message` payload as a parameter  
**And** the `MessageProcessor` must call the `EventsHandlerHandle` method with the event returned by the `Deserialize` method

Note that what we just wrote is a so-called acceptance criterion. The test that we will write must validate that the acceptance criteria are met by the MessageProcessor.

We will start with the mocks initialization. The first mock that we are going to initialize is the MessageConsumer mock.

```go
messagesCh := make(chan Message)
messageConsumerMock := &MockMessageConsumer{}
messageConsumerMock.On("Consume").Return((<-chan Message)(messagesCh), nil)
```

Note that besides initializing the mock object we are also defining an expectation. An expectation is a way to tell the mock that it must expect a call on the `Consume` method. This call must be without any parameters. When the mock receives that call it must return the `messageCh` (initialized in the first line) as the first return value and `nil` as the second return value. The `messageCh` will allow us to trigger the test case. This expectation will be verified at the end of the test by calling the `AssertExpectations(t)` method on the mock.  
Let’s continue with the next mock, the `EventsDeserializer` mock.

```go
message := Message{
   Payload: []byte{},
}

event := WithdrawalCreated{
   WithdrawalId:       "19cf3c4c-4a0d-417e-b0cc-83385b9487de",
   Amount:             100,
   SourceAccount:      "source account details",
   DestinationAccount: "destination account details",
}

eventsDeserializerMock := &MockEventsDeserializer{}
eventsDeserializerMock.On("Deserialize", message).Return(event, nil)
```

In this case, we tell the mock that it must expect a call on the `Deserialize` method with the indicated `message` as a parameter. If that call is received it must return the `event` indicated as the first return value and `nil` as the second return value.  
The third mock that we need to configure is the `EventsHandler` mock. This mock must expect a call on the `HandleEvent` method with the `event` that we defined before, the one returned by the `eventsDeserializerMock`, as a parameter. This call will return `nil`. We also configure the mock to execute some code when it receives this call. The code that will execute will call the `Done()` method on a `WaitGroup` instance that we previously defined. Using `WaitGroups` is the recommended way in Go to wait for goroutines to finish their processing. Given that the `MessageProcessor` will execute the handler’s `HandleEvent` method asynchronously inside a goroutine we need a way to make the main goroutine (the goroutine in which runs the test) wait for the other goroutines. Otherwise, the test will run until the end without waiting for the spawned goroutines to finish, and it will fail because the `HandleEvent` method will not be called.

```go
wg := sync.WaitGroup{}
wg.Add(1)
eventsHandlerMock := &MockEventsHandler{}
eventsHandlerMock.On("HandleEvent", event).Return(nil).Run(func(args mock.Arguments) {
   wg.Done()
})
```

Now that we have all the mocks initialized and configured we can initialize and start the MessageProcessor.

```go
messageProcessor := NewMessageProcessor(messageConsumerMock, eventsDeserializerMock, eventsHandlerMock)

messageProcessorStartError := messageProcessor.Start()
require.NoError(t, messageProcessorStartError)
```

After starting the `messageProcessor` instance we verify that it starts without errors. If any error occurs we fail the test immediately. This behavior of failing without executing the rest of the test is achieved by using the methods from the [github.com/stretchr/testify/require](http://github.com/stretchr/testify/require) package.

The next step is triggering the processing of one message. We do so by sending the message that we initialized at the beginning of the test over the `messageCh` instance which we configured to be returned by the `messageConsumerMock`.

```go
messagesCh <- message
```

Finally, we check that all the method calls that we expected were actually made by using the `AssertExpectations` method on each mock.

```go
messageConsumerMock.AssertExpectations(t)
eventsDeserializerMock.AssertExpectations(t)
eventsHandlerMock.AssertExpectations(t)
```

## Summary

In this section, we refactorized the code written in the previous section. Following the Dependency Inversion principle we substituted the dependencies on concrete implementations used in the `MessageProcessor` with dependencies on interfaces. This change allowed us to implement a unit test that complies with the FIRST principles mentioned in the previous section.  
In the next and last section of this blog post, we are going to iterate one more time over our code. This time we will change the code to make it more strongly typed and this way less error-prone and easier to maintain.

# Iteration 4: Introducing generics

<div data-node-type="callout">
<div data-node-type="callout-emoji">💡</div>
<div data-node-type="callout-text">Find the complete code for this section <a target="_blank" rel="noopener noreferrer nofollow" href="https://github.com/walletera/message-processor/tree/4a5eab182edf3d5c22cfe284736a4dc56655403d" style="pointer-events: none">here</a></div>
</div>

Let’s take a look one more time at the code implemented in the 2nd iteration to handle Payment events.

```go
func (h *PaymentsEventsHandler) HandleEvent(event any) error {
   switch event.(type) {
   case WithdrawalCreated:
       log.Printf("handling WithdrawalCreated event: %+v", event)
   default:
       return fmt.Errorf("unexpected event type %t", event)
   }
   return nil
}
```

The `HandleEvent(event any)` method uses a type switch to determine, on runtime, the type of the event that needs to be handled. If the type is identified the event will be routed to the specific handler but if the type is not in the list of expected types a runtime error will be triggered. Now let’s think about what happens if we want to process a new Payments event. We must add this new event to the `PaymentsEventsDeserializer` component and also in the `PaymentsEventsHandler`. What happens if we forget to add the event in the `PaymentsEventsHandler`? We will get a runtime error. If we don’t have enough tests to catch this bug in the development or testing stage we can end with an “unexpected event type …” error on production. If, instead, we build the entire processing pipeline strongly typed this error will be caught in the development stage by the compiler reducing the risk of a runtime error occurring in production. Even if we have enough tests to prevent this error to reach production, it is better, in terms of development speed, to detect this kind of error as early as possible, and that is at compile time.

## Generics to the rescue

So, how can we remove the usage of the `any` type and the type assertions while still keeping our `MessageProcessor` reusable (independent of any specific events group or source)? Before Golang 1.18 there was no easy way, but now the story changed with the introduction of Generics. So let’s see how we can use this new feature of the language to make our processing pipeline more strongly typed.

Let’s start by defining a common interface for all the events that will be processed by the `MessageProcessor`.

```go
type Event[Visitor any] interface {
   Accept(visitor Visitor) error
}
```

The Event interface contains a type parameter (introduced in version 1.18) called Visitor that can be of any type. It declares a method called `Accept` that receives an object of type `Visitor` and returns an error. This may not make too much sense now, but be patient and keep pairing with me.

Then we are going to write a Visitor interface for each of the event groups that our service has to process. One group is the Payments events. Another group will be the DinoPay events. For the Payments events group, the interface will be the following.

```go
type PaymentsEventsVisitor interface {
   VisitWithdrawalCreated(withdrawalCreated WithdrawalCreated) error
}
```

The interface will have one method for each event. In the implementation of this interface, we will perform the actual handling of each event (by delegating to specific handlers). For now, let’s write a dummy implementation that will log the event the same way we did with the `PaymentsEventsHandler` before.

```go
type PaymentsEventsVisitorImpl struct {
}

func NewPaymentsEventsVisitorImpl() *PaymentsEventsVisitorImpl {
    return &PaymentsEventsVisitorImpl{}
}

func (p PaymentsEventsVisitorImpl) VisitWithdrawalCreated(withdrawalCreated WithdrawalCreated) error {
    log.Printf("handling WithdrawalCreated event: %+v", withdrawalCreated)
    return nil
}
```

Now let’s modify the implementation of the `WithdrawalCreated` event to make it implement the `Event` interface.

```go
type WithdrawalCreated struct {
   WithdrawalId       string  `json:"withdrawal_id"`
   Amount             float64 `json:"amount"`
   SourceAccount      string  `json:"source_account"`
   DestinationAccount string  `json:"destination_account"`
}

func (w WithdrawalCreated) Accept(visitor PaymentsEventsVisitor) error {
   return visitor.VisitWithdrawalCreated(w)
}
```

Now the Event interface starts making a bit more sense, doesn’t it? The idea is that every event, in order to be able to be processed by the `MessageProcessor`, must implement the `Event` interface. The implementation of the `Event` interface consists in implementing the `Accept` method. The implementation of the `Accept` method is pretty straightforward, it has to call the corresponding method on the visitor object received as a parameter. This mechanism is called double dispatch and we took it from a design pattern called Visitor (for more information on this pattern, you can check out this great [blog post](https://refactoring.guru/design-patterns/visitor) by Alexander Shvets). With this technique the `MessageProcessor` instead of calling the visitor method directly, which would imply performing a type switch on the event type, just calls the Accept method on the event passing to it the visitor. The event knows what method it has to call on the visitor. This way we can keep our `MessageProcessor` agnostic of the specific event it is processing and at the same time we are avoiding doing type assertions on runtime which, as we already mentioned, is harder to maintain.

The `EventsDeserializer` interface will declare the same type parameter as the `Event` interface.

```go
type EventsDeserializer[Visitor any] interface {
   Deserialize(message Message) (Event[Visitor], error)
}
```

Note that the first return value of the `Deserialize` method is not of type `any` anymore. Now it returns a perfectly typed object of type `Event[Visitor]`. The implementation of the `PaymentsEventsDeserializer` (ex `PaymentsEventsDeserializerImpl`) will still look the same as before, the only difference is that the return type now is `Event[PaymetsEventsVisitor]`.

```go
func (r *PaymentsEventsDeserializer) Deserialize(message Message) (Event[PaymentsEventsVisitor], error) {
 // …
}
```

Finally, let’s see how the `MessageProcessor` looks now.

```go
type MessageProcessor[Visitor any] struct {
   messageConsumer    MessageConsumer
   eventsDeserializer EventsDeserializer[Visitor]
   eventsVisitor      Visitor
}

func NewMessageProcessor[Visitor any](
   messageConsumer MessageConsumer,
   eventsDeserializer EventsDeserializer[Visitor],
   eventsVisitor Visitor,
) *MessageProcessor[Visitor] {

   return &MessageProcessor[Visitor]{
       messageConsumer:    messageConsumer,
       eventsDeserializer: eventsDeserializer,
       eventsVisitor:      eventsVisitor,
   }
}

// …

func (p *MessageProcessor[Visitor]) processMsg(message Message) {
    event, err := p.eventsDeserializer.Deserialize(message.Payload)
    if err != nil {
        p.printErrorLog(message, err)
        return
    }
    if event != nil {
        err = event.Accept(p.eventsVisitor)
        if err != nil {
            p.printErrorLog(message, err)
        }
    }
}

func (p *MessageProcessor[Visitor]) printErrorLog(message Message, err error) {
   log.Printf("error processing message with payload: %s: %s", message.Payload, err.Error())
}
```

The `MessageProcessor` struct is also parameterized with the type of an event's visitor. Specifying a specific visitor, during the instantiation of the `MessageProcessor`, will define, in compilation time, the set (or group) of events that specific instance will allow.  
Note that the `EventsHandler` interface (along with its implementations, the `PaymentsEventsHandler` and the `MockEventsHandler`) was substituted by the `PaymentsEventsVisitor` interface (and its implementations).

To make this new concept of generics clear let’s see how the main function of our application looks now.

```go
func main() {

   messageConsumer := NewRabbitMQMessageConsumer()
   paymentsEventsDeserializer := NewPaymentsEventsDeserializer()
   paymentsEventsVisitor := NewPaymentsEventVisitorImpl()

   processor := NewMessageProcessor[PaymentsEventsVisitor](
       messageConsumer,
       paymentsEventsDeserializer,
       paymentsEventsVisitor,
   )

   err := processor.Start()
   if err != nil {
       log.Fatalf("failed to start message processor: %s", err.Error())
   }

   blockForeverCh := make(chan any)
   <-blockForeverCh
}
```

The `NewMessageProcessor` method is called with the type parameter set to the interface `PaymentsEventsVisitor`. This will make the compiler verify that the second parameter of the `NewMessageProcessor` constructor is of type `EventsDeserializer[PaymentsEventsVisitor]` and that the third parameter’s type is `PaymentsEventsVisitor`. This way the whole processing pipeline is strongly typed.

If we add a new event, let’s say an event called `AnotherPaymentEvent`, in order to include it in the processing pipeline we first have to make that event implement the `Event` interface. The implementation will look like this.

```go
type AnotherPaymentEvent struct {
}

func (w AnotherPaymentEvent) Accept(visitor PaymentsEventsVisitor) error {
  visitor.VisitAnotherPaymentEvent(w)
}
```

Now, this code will not compile until we add the `VisitAnotherPaymentEvent` method to the `PaymentsEventsVisitor` interface and its corresponding implementation as well. Only then we will be able to implement the deserialization logic in the `PaymentsEventsDeserializer` component. With this approach, in opposition to what happened with the solution implemented in the previous iteration, it is not possible to add a new event without implementing its handling logic (by mistake) because the compiler will just not allow it.

## Summary

In this last iteration, we improved our code by removing the runtime type assertions which were a potential source of runtime errors. To do so we used generics to be able to define a common interface for all the events that can be processed by our `MessageProcessor` and the Visitor design pattern to delegate to each event the knowledge of which specific method should be called on the corresponding Visitor.

# Closing notes

We built a reusable and strongly typed `MessageProcessor` for the DinoPay Gateway. We showed how to do it in an iterative and incremental way. Along the way, we discussed (briefly) the SOLID principles, unit tests, generics, and the visitor design pattern.

In the following articles of this Walletera series, we will improve the `MessageProcessor` by adding the features needed for a production-grade component and also will continue with the rest of the components needed to complete the implementation of the DinoPay Gateway.
