AMQP Package

Introduction

The amqp package is a wrapper and drop-in replacement for streadway/amqp, and is designed to enable automatic redialing to any code using the streadway package with no changes.

To begin using rogerRabbit/amqp, simply change your imports from:

import "github.com/streadway/amqp"

To:

import "github.com/peake100/rogerRabbit-go/amqp"

Under the hood, this library is a wrapper around streadway/amqp and would not be possible without the amazing development team behind it.

Roger, Rabbit seeks to expand on the work done by streadway/amqp, not replace it.

Basic Usage

As this package is a drop-in replacement for streadway/amqp, basic usage will not be covered by these docs.

The official rabbitMQ documentation has an excellent set of tutorials using the streadway library. Simply replacing the import statements of the official examples to "github.com/peake100/rogerRabbit-go/pkg/amqp" will allow you to follow along as normal using this package.

Note

If you find any examples that do NOT work after such a replacement, please open a PR!

For instance, the hello world receiver example would be simply changed from this:

package main

import (
  "log"

  "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
  }
}

func main() {
  conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
  failOnError(err, "Failed to connect to RabbitMQ")
  defer conn.Close()

  ch, err := conn.Channel()
  failOnError(err, "Failed to open a channel")
  defer ch.Close()

  q, err := ch.QueueDeclare(
    "hello", // name
    false,   // durable
    false,   // delete when unused
    false,   // exclusive
    false,   // no-wait
    nil,     // arguments
  )
  failOnError(err, "Failed to declare a queue")

  msgs, err := ch.Consume(
    q.Name, // queue
    "",     // consumer
    true,   // auto-ack
    false,  // exclusive
    false,  // no-local
    false,  // no-wait
    nil,    // args
  )
  failOnError(err, "Failed to register a consumer")

  forever := make(chan bool)

  go func() {
    for d := range msgs {
      log.Printf("Received a message: %s", d.Body)
    }
  }()

  log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
  <-forever
}

To this:

package main

import (
  "log"

  // ONLY THIS CHANGES
  "github.com/peake100/rogerRabbit-go/pkg/amqp"
)

func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
  }
}

func main() {
  conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
  failOnError(err, "Failed to connect to RabbitMQ")
  defer conn.Close()

  ch, err := conn.Channel()
  failOnError(err, "Failed to open a channel")
  defer ch.Close()

  q, err := ch.QueueDeclare(
    "hello", // name
    false,   // durable
    false,   // delete when unused
    false,   // exclusive
    false,   // no-wait
    nil,     // arguments
  )
  failOnError(err, "Failed to declare a queue")

  msgs, err := ch.Consume(
    q.Name, // queue
    "",     // consumer
    true,   // auto-ack
    false,  // exclusive
    false,  // no-local
    false,  // no-wait
    nil,    // args
  )
  failOnError(err, "Failed to register a consumer")

  forever := make(chan bool)

  go func() {
    for d := range msgs {
      log.Printf("Received a message: %s", d.Body)
    }
  }()

  log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
  <-forever
}

This documentation will focus on the differences from – and expansion upon – the streadway API, rather than retreading a primer on how to work with the basic API.

It is suggested that users new to amqp who have not used streadway/amqp start with the basic RabbitMQ tutorials before continuing this documentation.

When To Use This Library

Roger, Rabbit is designed to remove all the mental overhead involved with managing unexpected broker disconnects, it features systems to automatically recreate client / server topologies on reconnect, ensure consistent Delivery and Publish tags over reconnection events, and more.

But there are some assumptions that need to be made or such features. In general, this library should be used when:

  • Basic Client/Server Topology: All calls on a channel to QueueDeclare, QueueBind, ExchangeDeclare and ExchangeUnbind will be re-made every time a channel drops it’s connection and has to reconnect. In general, if you are using complex topology where Queues and Exchanges are being routinely shifted, deleted, and altered this library’s behavior may result in the re-declaration of unwanted Queues and Exchanges.

  • Orphaned Publications Are Nacks: Failed in-flight Publications on Channels in confirmation mode will be exposed to the end user as a Nacked publication. There is an additional extension of the streadway API to flag Orphaned publications, but such handling will require code tweaks and not be a drop-in replacement.

  • Delivery Acknowledgements do not Mix Per-Message and Multiple: Roger, Rabbit will detect orphaned acknowledgements and return an error when orphans occur (the broker is disconnected before a delivery is acknowledged), but mixing Ack with multiple=true and multiple=false may confuse the library, and is currently not supported by Roger, Rabbit.

Unsupported Features

Roger, Rabbit strives to be a complete, drop-in replacement for streadway/amqp, but is still under construction. The following features have yet to be implemented:

  • Transactions: Calling Channel.Tx(), Channel.TxCommit() and Channel.TxRollback() will result in a panic. Transactions are an interesting problem to solve for with robust channels and draft PRs for how to handle them are welcome!

Robust Features

In this section, we will examine features unique to Roger, Rabbit.

Connection Recovery

Both the Connection and Channel types are robust transport mirrors of the streadway types by the same names, and will automatically re-connect when a connection is lost:

// Get a new connection to our test broker.
connection, err := amqp.Dial(amqpTest.TestDialAddress)
if err != nil {
  panic(err)
}

// Get a new channel from our robust connection.
channel, err := connection.Channel()
if err != nil {
  panic(err)
}

// We can use the test method to return an testing object with some additional
// methods. ForceReconnect force-closes the underlying transport, causing the
// robust connection to reconnect.
//
// We'll use a dummy *testing.T object here. These methods are designed for tests
// only and should not be used in production code.
channel.Test(new(testing.T)).ForceReconnect(context.Background())

// We can see here our channel is still open.
fmt.Println("IS CLOSED:", channel.IsClosed())

// We can even declare a queue on it
queue, err := channel.QueueDeclare(
  "example_channel_reconnect", // name
  false, // durable
  true, // autoDelete
  false, // exclusive
  false, // noWait
  nil, // args
)
if err != nil {
  panic(err)
}

// Here is the result
fmt.Printf("QUEUE    : %+v\n", queue)

// Explicitly close the connection. This will also close all child channels.
err = connection.Close()
if err != nil {
  panic(err)
}

// Now that we have explicitly closed the connection, the channel will be closed.
fmt.Println("IS CLOSED:", channel.IsClosed())

// Output:
// IS CLOSED: false
// QUEUE    : {Name:example_channel_reconnect Messages:0 Consumers:0}
// IS CLOSED: true

Topology Recreation

Roger, Rabbit’s Channel type remembers all called to Channel.QueueDeclare(), Channel.QueueBind(), Channel.ExchangeDeclare() and Channel.ExchangeBind(), and replays those calls on a reconnection event:

Note

Calling Channel.QueueDelete(), Channel.QueueUnbind(), Channel.ExchangeDelete, and Channel.ExchangeUnbind() will remove relevant robust queues and bindings from the internally tracked lists. Queues invoked in these methods will NOT be recreated on a reconnection event.

Delivery Tag Continuity

Delivery tags remain continuous, even across unexpected disconnects. Roger, rabbit takes care of all the internal logic of lining up the caller-facing delivery tag with the actual delivery tag relative to the current underlying channel:

// Get a new connection to our test broker.
connection, err := amqp.DialCtx(context.Background(), amqpTest.TestDialAddress)
if err != nil {
  panic(err)
}

// Get a new channel from our robust connection for consuming.
consumeChannel, err := connection.Channel()
if err != nil {
  panic(err)
}

// Get a new channel from our robust connection for publishing.
publishChannel, err := connection.Channel()
if err != nil {
  panic(err)
}

queueName := "example_delivery_tag_continuity"

// Declare the queue we are going to use.
queue, err := consumeChannel.QueueDeclare(
  queueName, // name
  false, // durable
  false, // autoDelete
  false, // exclusive
  false, // noWait
  nil, // args
)
if err != nil {
  panic(err)
}

// Clean up the queue on exit,
defer consumeChannel.QueueDelete(
  queue.Name, false, false, false,
)

// Set the prefetch count to 1, that way we are less likely to lose a message
// that in in-flight from the broker in this example.
err = consumeChannel.Qos(1, 0, false)
if err != nil {
  panic(err)
}

// Start consuming the channel
consume, err := consumeChannel.Consume(
  queue.Name,
  "example consumer", // consumer name
  false,              // autoAck
  false,              // exclusive
  false,              // no local
  false,              // no wait
  nil,                // args
)

// We'll close this channel when the consumer is exhausted
consumeComplete := new(sync.WaitGroup)
consumerClosed := make(chan struct{})

// Launch a consumer
go func() {
  // Close the consumeComplete to signal exit
  defer close(consumerClosed)

  fmt.Println("STARTING CONSUMER")

  // Range over the consume channel
  for delivery := range consume {
                      // Ack the delivery.
                      err = delivery.Ack(false)
                      if err != nil {
                              panic(err)
                      }

    // Force-reconnect the channel after each delivery.
    consumeChannel.Test(new(testing.T)).ForceReconnect(context.Background())

    // Tick down the consumeComplete waitgroup
    consumeComplete.Done()

    // Print the delivery. Even though we are forcing a new underlying channel
    // to be connected each time, the delivery tags will still be continuous.
    fmt.Printf(
      "DELIVERY %v: %v\n", delivery.DeliveryTag, string(delivery.Body),
    )
  }

  fmt.Println("DELIVERIES EXHAUSTED")
}()

// We'll publish 10 test messages.
for i := 0; i < 10; i++ {
  // Add one to the consumeComplete WaitGroup.
  consumeComplete.Add(1)

  // Publish a message. Even though the consumer may be force re-connecting the
  // connection each time, we can keep using the channel.
  //
  // NOTE: it is possible that we will drop a message here during a reconnection
  // event. If we want to be sure all messages reach the broker, we'll need to
  // publish messages with the Channel in confirmation mode, which we will
  // show in another example.
  err = publishChannel.Publish(
    "",
    queue.Name,
    false,
    false,
    amqp.Publishing{
      Body: []byte(fmt.Sprintf("message %v", i)),
    },
  )
  if err != nil {
    panic(err)
  }
}

// Wait for all messages to be received
consumeComplete.Wait()

// Close the connection
err = connection.Close()
if err != nil {
  panic(err)
}

// Wait for the consumer to exit
<-consumerClosed

// exit

// Output:
// STARTING CONSUMER
// DELIVERY 1: message 0
// DELIVERY 2: message 1
// DELIVERY 3: message 2
// DELIVERY 4: message 3
// DELIVERY 5: message 4
// DELIVERY 6: message 5
// DELIVERY 7: message 6
// DELIVERY 8: message 7
// DELIVERY 9: message 8
// DELIVERY 10: message 9
// DELIVERIES EXHAUSTED

Warning

In the above example, is possible that we will drop the publishing of message during a reconnection event. If we want to be sure all messages reach the broker, we’ll need to publish messages with the Channel in confirmation mode, which we will show in the next example.

Delivery Tag Orphans

When manually acking Deliveries, it is possible that between the time we get a Delivery, and the time that we ack it, a disconnection of the underlying channel has occurred and the delivery is no longer acknowledgable. In such cases, an error will be returned indicating this delivery has been orphaned:

// Get a new connection to our test broker.
connection, err := amqp.Dial(amqpTest.TestDialAddress)
if err != nil {
  panic(err)
}

// Get a new channel from our robust connection for consuming.
channel, err := connection.Channel()
if err != nil {
  panic(err)
}

queueName := "example_delivery_ack_orphan"

// Declare the queue we are going to use.
_, err = channel.QueueDeclare(
  queueName, // name
  false, // durable
  true, // autoDelete
  false, // exclusive
  false,  // noWait
  nil, // args
)
if err != nil {
  panic(err)
}

// Cleanup channel on exit.
defer channel.QueueDelete(queueName, false, false, false)

// Start consuming the channel
consume, err := channel.Consume(
  queueName,
  "example consumer", // consumer name
  // Auto-ack is set to false
  false, // autoAck
  false, // exclusive
  false, // no local
  false, // no wait
  nil, // args
)

// publish a message
err = channel.Publish(
  "", // exchange
  queueName,
  false,
  false,
  amqp.Publishing{
    Body: []byte("test message"),
  },
)
if err != nil {
  panic(err)
}

// get the delivery of our published message
delivery := <- consume
fmt.Println("DELIVERY:", string(delivery.Body))

// Force-close the channel.
channel.Test(new(testing.T)).ForceReconnect(context.Background())

// Now that the original underlying channel is closed, it is impossible to ack
// the delivery. We will get an error when we try.
err = delivery.Ack(false)
fmt.Println("ACK ERROR:", err)

// This error is an orphan error
var orphanErr amqp.ErrCantAcknowledgeOrphans
if !errors.As(err, &orphanErr) {
  panic("error not orphan error")
}

fmt.Println("FIRST ORPHAN TAG:", orphanErr.OrphanTagFirst)
fmt.Println("LAST ORPHAN TAG :", orphanErr.OrphanTagLast)

// Output:
// DELIVERY: test message
// ACK ERROR: 1 tags orphaned (1 - 1), 0 tags successfully acknowledged
// FIRST ORPHAN TAG: 1
// LAST ORPHAN TAG : 1

Publishing Tag Continuity

Just like with Delivery Tags, publishing tag continuity is maintained, even across disconnection events.

// Get a new connection to our test broker.
connection, err := amqp.Dial(amqpTest.TestDialAddress)
if err != nil {
  panic(err)
}

// Get a new channel from our robust connection for publishing.
publishChannel, err := connection.Channel()
if err != nil {
  panic(err)
}

// Put the channel into confirmation mode
err = publishChannel.Confirm(false)
if err != nil {
  panic(err)
}

confirmationsReceived := new(sync.WaitGroup)
confirmationsComplete := make(chan struct{})

// Create a channel to consume publication confirmations.
publishEvents := publishChannel.NotifyPublish(make(chan amqp.Confirmation))
go func() {
  // Close to signal exit.
  defer close(confirmationsComplete)

  // Range over the confirmation channel.
  for confirmation := range publishEvents {
    // Mark 1 confirmation as done.
    confirmationsReceived.Done()

    // Print confirmation.
    fmt.Printf(
      "CONFIRMATION TAG %02d: ACK: %v ORPHAN: %v\n",
      confirmation.DeliveryTag,
      confirmation.Ack,
      // If the confirmation was never received because the channel was
      // disconnected, then confirmation.Ack will be false, and
      // confirmation.DisconnectOrphan will be true.
      confirmation.DisconnectOrphan,
    )
  }
}()

// Declare the message queue
queueName := "example_delivery_tag_continuity"
_, err = publishChannel.QueueDeclare(
  queueName,
  false,
  true,
  false,
  false,
  nil,
)
if err != nil {
  panic(err)
}

// We'll publish 10 test messages.
for i := 0 ; i < 10 ; i++ {
  // We want to wait here to make sure we got the confirmation from the last
  // publication before force-closing the connection to show we can handle it.
  confirmationsReceived.Wait()

  // Force a reconnection of the underlying channel.
  publishChannel.Test(new(testing.T)).ForceReconnect(context.Background())

  // Increment the confirmation WaitGroup
  confirmationsReceived.Add(1)

  // Publish a message. Even though the consumer may be force re-connecting the
  // connection each time, we can keep using the channel.
  err = publishChannel.Publish(
    "",
    queueName,
    false,
    false,
    amqp.Publishing{
      Body: []byte(fmt.Sprintf("message %v", i)),
    },
  )
  if err != nil {
    panic(err)
  }
}

// Wait for all confirmations to be received.
confirmationsReceived.Wait()

// Close the connection.
err = connection.Close()
if err != nil {
  panic(err)
}

// Wait for the confirmation routine to exit.
<-confirmationsComplete

// Exit.

// Output:
// CONFIRMATION TAG 01: ACK: true ORPHAN: false
// CONFIRMATION TAG 02: ACK: true ORPHAN: false
// CONFIRMATION TAG 03: ACK: true ORPHAN: false
// CONFIRMATION TAG 04: ACK: true ORPHAN: false
// CONFIRMATION TAG 05: ACK: true ORPHAN: false
// CONFIRMATION TAG 06: ACK: true ORPHAN: false
// CONFIRMATION TAG 07: ACK: true ORPHAN: false
// CONFIRMATION TAG 08: ACK: true ORPHAN: false
// CONFIRMATION TAG 09: ACK: true ORPHAN: false
// CONFIRMATION TAG 10: ACK: true ORPHAN: false

Note

The Confirmation.DisconnectOrphan is a new field for the Confirmation type and is unique to Roger, Rabbit.

When DisconnectOrphan is true, it means that a Nack occurred not from a broker response, but because no confirmation positive or negative was received from the broker before the connection was disrupted. Orphaned messages may have reached the broker – we have no way of knowing.

Channel Middleware

Roger, Rabbit allows the registration of middleware on all Channel methods. In fact, most of the robust Channel features are implemented through middleware defined in the amqp/defaultMiddlewares package! It is a powerful tool and one of the biggest API expansions over streadway/amqp.

Middleware signatures are defined in the amqp/amqpMiddleware package.

Registering Middleware

// define our new middleware
queueDeclareMiddleware := func(
    next amqpmiddleware.HandlerQueueDeclare,
) amqpmiddleware.HandlerQueueDeclare {
    return func(args amqpmiddleware.ArgsQueueDeclare) (streadway.Queue, error) {
        fmt.Println("MIDDLEWARE INVOKED FOR QUEUE")
        fmt.Println("QUEUE NAME :", args.Name)
        fmt.Println("AUTO-DELETE:", args.AutoDelete)
        return next(args)
    }
}

// Create a config and add our middleware to it.
config := amqp.DefaultConfig()
config.ChannelMiddleware.AddQueueDeclare(queueDeclareMiddleware)

// Get a new connection to our test broker.
connection, err := amqp.DialConfigCtx(
    context.Background(), amqptest.TestDialAddress, config,
)
if err != nil {
    panic(err)
}
defer connection.Close()

// Get a new channel from our robust connection for publishing. The channel is
// created with our default middleware.
channel, err := connection.Channel()
if err != nil {
    panic(err)
}

// Declare our queue, our middleware will be invoked and print a message.
_, err = channel.QueueDeclare(
    "example_middleware",
    false,
    true,
    false,
    false,
    nil,
)
if err != nil {
    panic(err)
}

// MIDDLEWARE INVOKED FOR QUEUE
// QUEUE NAME : example_middleware
// AUTO-DELETE: true

Middleware Providers

For more complex middleware, you can implement middleware providers, which expose methods that implement middleware. You can then register a factory method that will generate a new provider value whenever a new Connection or Channel is created, and the middleware methods will be automatically registered for you.

This can help build complex middlewares. Let’s define a custom middleware provider and factory method:

// CustomMiddlewareProvider exposes methods for middlewares that need to coordinate.
type CustomMiddlewareProvider struct {
    InvocationCount int
}

// TypeID implements amqpmiddleware.ProvidesMiddleware and returns a unique type ID
// that can be used to fetch middleware values when testing.
func (middleware *CustomMiddlewareProvider) TypeID() amqpmiddleware.ProviderTypeID {
    return "CustomMiddleware"
}

// QueueDeclare implements amqpmiddleware.ProvidesQueueDeclare.
func (middleware *CustomMiddlewareProvider) QueueDeclare(
    next amqpmiddleware.HandlerQueueDeclare,
) amqpmiddleware.HandlerQueueDeclare {
    return func(args amqpmiddleware.ArgsQueueDeclare) (streadway.Queue, error) {
        middleware.InvocationCount++
        fmt.Printf(
            "DECLARED: %v, TOTAL: %v\n", args.Name, middleware.InvocationCount,
        )
        return next(args)
    }
}

// QueueDelete implements amqpmiddleware.ProvidesQueueDelete.
func (middleware *CustomMiddlewareProvider) QueueDelete(
    next amqpmiddleware.HandlerQueueDeclare,
) amqpmiddleware.HandlerQueueDeclare {
    return func(args amqpmiddleware.ArgsQueueDeclare) (streadway.Queue, error) {
        middleware.InvocationCount++
        fmt.Printf(
            "DELETED: %v, TOTAL: %v\n", args.Name, middleware.InvocationCount,
        )
        return next(args)
    }
}

// NewCustomMiddlewareProvider creates a new CustomMiddlewareProvider.
func NewCustomMiddlewareProvider() amqpmiddleware.ProvidesMiddleware {
    return new(CustomMiddlewareProvider)
}

We can register it on our Config so that every channel created from a connection gets a fresh instance of our provider:

// Create a config and add our middleware provider factory to it.
config := amqp.DefaultConfig()
config.ChannelMiddleware.AddProviderFactory(NewCustomMiddlewareProvider)

// Get a new connection to our test broker.
connection, err := amqp.DialConfigCtx(
    context.Background(), amqptest.TestDialAddress, config,
)
if err != nil {
    panic(err)
}
defer connection.Close()

// Get a new channel from our robust connection for publishing. The channel is
// created with our default middleware.
channel, err := connection.Channel()
if err != nil {
    panic(err)
}

// Declare our queue, our middleware will be invoked and print a message.
_, err = channel.QueueDeclare(
    "example_middleware",
    false, // durable
    true, // autoDelete
    false, // exclusive
    false, // noWait
    nil, // args
)
if err != nil {
    panic(err)
}

// Delete our queue, our middleware will be invoked and print a message.
_, err = channel.QueueDelete(
    "example_middleware",
    false, // ifUnused
    false, // ifEmpty
    false, // noWait
)
if err != nil {
    panic(err)
}

// MIDDLEWARE INVOKED FOR QUEUE
// DECLARED: example_middleware, TOTAL: 1
// DELETED: example_middleware, TOTAL: 2
// AUTO-DELETE: true

if, for some reason, we wanted every Channel to share the same instance of the provider, we could make the following adjustment:

// Create an instance of our provider:
provider := NewCustomMiddlewareProvider()

config := amqp.DefaultConfig()
config.ChannelMiddleware.AddProviderMethods(provider)

This will add the methods once and re-use them for all Channels rather than making a fresh provider every time a channel is generated.

Note

Many of rogerRabbit’s more complex features are implemented through middleware providers. Check the defaultmiddlewares package to see practical examples of middleware providers used in this library.

Testing

Testing is a first-class citizen of the Roger, Rabbit package. Types expose a robust number of testing methods, and the amqpTest offers a number of additional testing utilities.

Note

Testing methods and utilities are heavily integrated with testify. Testify is somewhat divisive within the Go community, but as the maintainers of this repository heavily leverage it, so does Roger, Rabbit’s testing utilities.

Test() Methods

Both Connection and Channel expose a .Test() method, which returns a testing harness type with additional methods for running tests on it’s parent value.

Most test methods do not return an error, instead opting to report the error and immediately fail the test.

Example:

// Get a new connection to our test broker.
connection, err := amqp.Dial(amqpTest.TestDialAddress)
if err != nil {
  panic(err)
}
defer connection.Close()

// Get a new channel from our robust connection for publishing. The channel is
// created with our default middleware.
channel, err := connection.Channel()
if err != nil {
  panic(err)
}

// Get a channel testing harness. In a real test function, you would pass the
// test's *testing.T value. Here, we will just pass a dummy one.
testHarness := channel.Test(new(testing.T))

// We can use the test harness to force the channel to reconnect. If a reconnection
// does not occur before the passed context expires, the test will be failed.
ctx, cancel := context.WithTimeout(context.Background(), 5 * time.Second)
defer cancel()
testHarness.ForceReconnect(ctx)

// We can check how many times a reconnection has occurred. The first time we
// connect to the broker is counted, so we should get '2':
fmt.Println("RECONNECTION COUNT:", testHarness.ReconnectionCount())

// exit.

// Output:
// RECONNECTION COUNT: 2

Testify Suite

The amqpTesting package makes an AmqpSuite type that is an extension of testify/suite.Suite

AmqpSuite adds a number of QoL methods for quickly setting up an tearing down integration tests with a test broker.

See the godoc documentation for more details.

Architecture

Overview

_images/architecture.svg

Schematic of the amqp transport (Channel and Connection) architecture.

Lets dig in to some of the specifics.

Note

When we refer to an amqp.[Type], we are referring to the rogerRabbit-go/pkg/amqp implementation of that type. If we need to refer to underlying types provided by the streadway/amqp package, we will refer to them as streadway.[Type]

Warning

The file organization of the amqp package is still in flux, though the API and underlying logical structure is likely to remain stable. This architecture guide will refrain from referencing specific files or paths for the time being, until the types and helpers are done playing musical chairs.

Streadway Transport

The Streadway Transport is our underlying streadway.Connection or streadway.Channel and handles all of our actual communication with the AMQP Broker.

It is manages with by a Robust Transport.

Robust Transport

A Robust Transport is an amqp.Channel or amqp.Connection. These types manage their streadway.Channel or streadway.Connection counterparts, and expose Exported Methods to the caller that re-implement the methods supplied by the Streadway Transport they manage.

Each Robust Transport implements the reconnects interface, which in turn is referenced by the Transport Manager and used to redial the underlying broker connection.

So why are we talking about the Transport Manager here if it manages the Robust Transport? Well, because the Transport Manager is then embedded into the Robust Transport it is managing, and provides some of that transport’s methods.

Close(), NotifyDisconnect(), and all other methods that are shared between amqp.Channel and amqp.Connection are actually provided by an embedded transportManager value which, like an ouroboros, also contains a reference to it’s parent transport that is used to help manage the lifecycle of that transport’s underlying streadway transport through the shared reconnects interface.

Method Handlers

Each Robust Transport (and the Transport Manager) contains a handlers field which holds all the method handlers for that transport. These handlers are comprised of the base streadway/amqp type methods wrapped in user-supplied Middleware.

Middleware

Middleware is a core component of the Roger, Rabbit amqp package. Most of the Roger, Rabbit’s extended functionality is supplied by middleware, such as continuous Delivery Tags over disconnects and auto-redeclaration of broker topology on reconnections.

In the early life of this package, robust features were implemented directly on the Channel, resulting in features which were hard to isolate and maintain. Topology re-declaration, for instance, involves 9 methods manipulating 6 data resources over ~700 lines of code. Debugging and tracking errors with these features was incredibly cumbersome when they were spread out over the main Channel method calls.

By moving these features into middleware, all off the logic that supports a given feature can be housed together, greatly reducing the complexity and increasing maintainability.

The current default middleware that ships with Roger, Rabbit is:

  • ConfirmsMiddleware: Tracks whether Channel.Confirms has been called, and sets any freshly reconnected streadway Channels into the correct state.

  • DeliveryTagsMiddleware: Ensures that Delivery tags are continuous for the caller, even over disconnects.

  • FlowMiddleware: Tracks the Flow state of channel and sets the correct state on Channel reconnections.

  • LoggingMiddlewareConnection: Facilitates logging for all Connection Operations.

  • LoggingMiddlewareChannel: Facilitates logging for all Channel Operations.

  • PublishTagsMiddleware: Ensures that Publish Confirmation tags are continuous for the caller, even over disconnects.

  • QoSMiddleware: Tracks QoS setting and sets up Channels on reconnect to match.

  • RouteDeclarationMiddleware: Handles topology recreation on Channel reconnects.

Together these middlewares implement the bulk of our robust feature set.

Transport Lock

The TransportLock is a *sync.RWMutex contained on the transportManager used to ensure there is no race conditions when a disconnection event occurs.

Any process that wishes to make use of the underlying streadway connection must acquire the transport lock for read.

When the transport manager redials the broke, the transport lock is acquired for write, and not released until a successful connection occurs. This blocks all other operations (like channel and connection method calls) from interacting with the underlying streadway transport until we are reconnected.

Transport Manager

The Transport Manager implements all common functionality between Channel and Connection. See the above section for more details.

The Transport Manager listens for a disconnection event to occur, the grabs an exclusive Write access to the Transport Lock. Only then does it continuously attempt to redial the broker using the methods exposed on out Robust Connection through the reconnects interface.

The transport manager also exposes a Retry On Disconnected method which all of our exported Channel and Connection Exported Methods invoke to complete requests.

Redial Routine

The Redial Routine is launched by the Transport Manager. The redial routine is responsible for reconnecting to the underlying Streadway Transport on a disconnection event.

  1. Register’s a listener with the Streadway Transport’s NotifyOnClose method.

  2. Blocks until the listener signals that Streadway Connection has closed (disconnection event)

  3. Exits if our Robust Transport has been closed by the caller.

  4. Acquires the Transport Lock for write, blocking Exported Method calls until a successful reconnection occurs.

  5. Calls the tryReconnect method on the Robust transport until we get a successful result.

  6. Register’s a listener with the new Streadway Connection’s NotifyOnClose method.

  7. Releases the Transport Lock.

  8. Restarts at step 2.

Retry On Disconnected

Retry On Disconnected is a method exposed by Transport Manager that enables Robust Transports (Channel and Connection) to automatically retry an operation if it failed because we were disconnected from the AMQP broker.

When invoked, Retry On Disconnected does the following:

  1. Acquires the Transport Lock for read (so multiple methods can be called simultaneously).

  2. Runs the operation handed to it by the Exported Method, usually a Method Handler.

  3. Releases the Transport Lock.

  4. If an error occurred because the broker was not reachable, goes back to step 1.

  5. Passes the result back to up to the caller.

Event Relays

Many Channel methods like NotifyPublish or Consume involve sending events along a provided or returned Go chan [Event] value to the caller.

When a streadway.Channel disconnects, it closes all event chan [Event] values it is feeding. Because we want our event chan [Event] values to survive a disconnect, we cannot pass the caller’s chan [Event] directly to a streadway.Channel. Instead, we need to create our own temporary chan [Event], pass that to the Streadway Transport, then relay the events from our temporary chan [Event] to the caller’s chan [Event].

To do this, we launch an Event Relay,

When a reconnection event occurs, the relay creates a new temporary chan [Event], passes that to the new Streadway Transport, and the continues to relay messages to the caller as if nothing has happened.

Since Channel is the only transport that needs event relays, it manages their lifecycle. Event Relays must be kept in careful sync with reconnection events to ensure that data or logic races do not occur. That Event Relay lifecycle is as follows:

  1. A relay is started by the Channel.[Method] handler using the transportManager.retryOperationOnClosed function so it can acquire the Transport Lock for read when starting up (ensuring the streadway.Channel is not swapped out from under it).

  2. The relay runs it’s first leg until the undeerlying streadway.Channel disconnects.

  3. On streadway.Channel disconnect, the temporary chan [Event] feeding the relay dries up. The relay waits on a new *streadway.Channel to be sent by it’s Channel on a successful reconnection.

  4. The transportManager acquires the Transport Lock for write and calls Channel.tryReconnect over and over until a new streadway.Channel is successfully connected.

  5. When a new streadway.Channel is successfully established, Channel.tryReconnect does not return until all the remaining steps are completed.

  6. Channel.tryReconnect waits for all Event Relays to signal that their last leg has been successfully completed.

  7. Channel.tryReconnect sends the new *streadway.Channel value to each relay for it to run any setup and start relaying events again.

  8. Channel.tryReconnect waits for all relays to signal that their setup on *streadway.Channel is complete. If we were to return and release the Transport Lock before this, users might start taking actions that SHOULD generate events before our streadway.Channel had been set up to send them, resulting in dropped events.

  9. Channel.tryReconnect returns with the new, connected *streadway.Channel value releasing the Transport Lock so that callers can begin calling Channel methods again.