AMQP Package¶
Introduction¶
The amqp package is a wrapper and drop-in replacement for streadway/amqp, and is designed to enable automatic redialing to any code using the streadway package with no changes.
To begin using rogerRabbit/amqp, simply change your imports from:
import "github.com/streadway/amqp"
To:
import "github.com/peake100/rogerRabbit-go/amqp"
Under the hood, this library is a wrapper around streadway/amqp and would not be possible without the amazing development team behind it.
Roger, Rabbit seeks to expand on the work done by streadway/amqp, not replace it.
Basic Usage¶
As this package is a drop-in replacement for streadway/amqp, basic usage will not be covered by these docs.
The official rabbitMQ documentation has
an excellent set of tutorials using the streadway library. Simply replacing the import
statements of the official examples to "github.com/peake100/rogerRabbit-go/pkg/amqp"
will allow you to follow along as normal using this package.
Note
If you find any examples that do NOT work after such a replacement, please open a PR!
For instance, the hello world receiver example would be simply changed from this:
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"hello", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
To this:
package main
import (
"log"
// ONLY THIS CHANGES
"github.com/peake100/rogerRabbit-go/pkg/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"hello", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
This documentation will focus on the differences from – and expansion upon – the streadway API, rather than retreading a primer on how to work with the basic API.
It is suggested that users new to amqp who have not used streadway/amqp start with the basic RabbitMQ tutorials before continuing this documentation.
When To Use This Library¶
Roger, Rabbit is designed to remove all the mental overhead involved with managing unexpected broker disconnects, it features systems to automatically recreate client / server topologies on reconnect, ensure consistent Delivery and Publish tags over reconnection events, and more.
But there are some assumptions that need to be made or such features. In general, this library should be used when:
Basic Client/Server Topology: All calls on a channel to QueueDeclare, QueueBind, ExchangeDeclare and ExchangeUnbind will be re-made every time a channel drops it’s connection and has to reconnect. In general, if you are using complex topology where Queues and Exchanges are being routinely shifted, deleted, and altered this library’s behavior may result in the re-declaration of unwanted Queues and Exchanges.
Orphaned Publications Are Nacks: Failed in-flight Publications on Channels in confirmation mode will be exposed to the end user as a Nacked publication. There is an additional extension of the streadway API to flag Orphaned publications, but such handling will require code tweaks and not be a drop-in replacement.
Delivery Acknowledgements do not Mix Per-Message and Multiple: Roger, Rabbit will detect orphaned acknowledgements and return an error when orphans occur (the broker is disconnected before a delivery is acknowledged), but mixing Ack with multiple=true and multiple=false may confuse the library, and is currently not supported by Roger, Rabbit.
Unsupported Features¶
Roger, Rabbit strives to be a complete, drop-in replacement for streadway/amqp, but is still under construction. The following features have yet to be implemented:
Transactions: Calling Channel.Tx(), Channel.TxCommit() and Channel.TxRollback() will result in a panic. Transactions are an interesting problem to solve for with robust channels and draft PRs for how to handle them are welcome!
Robust Features¶
In this section, we will examine features unique to Roger, Rabbit.
Connection Recovery¶
Both the Connection
and Channel
types are robust transport mirrors of the streadway
types by the same names, and will automatically re-connect when a connection is lost:
// Get a new connection to our test broker.
connection, err := amqp.Dial(amqpTest.TestDialAddress)
if err != nil {
panic(err)
}
// Get a new channel from our robust connection.
channel, err := connection.Channel()
if err != nil {
panic(err)
}
// We can use the test method to return an testing object with some additional
// methods. ForceReconnect force-closes the underlying transport, causing the
// robust connection to reconnect.
//
// We'll use a dummy *testing.T object here. These methods are designed for tests
// only and should not be used in production code.
channel.Test(new(testing.T)).ForceReconnect(context.Background())
// We can see here our channel is still open.
fmt.Println("IS CLOSED:", channel.IsClosed())
// We can even declare a queue on it
queue, err := channel.QueueDeclare(
"example_channel_reconnect", // name
false, // durable
true, // autoDelete
false, // exclusive
false, // noWait
nil, // args
)
if err != nil {
panic(err)
}
// Here is the result
fmt.Printf("QUEUE : %+v\n", queue)
// Explicitly close the connection. This will also close all child channels.
err = connection.Close()
if err != nil {
panic(err)
}
// Now that we have explicitly closed the connection, the channel will be closed.
fmt.Println("IS CLOSED:", channel.IsClosed())
// Output:
// IS CLOSED: false
// QUEUE : {Name:example_channel_reconnect Messages:0 Consumers:0}
// IS CLOSED: true
Topology Recreation¶
Roger, Rabbit’s Channel
type remembers all called to Channel.QueueDeclare()
,
Channel.QueueBind()
, Channel.ExchangeDeclare()
and Channel.ExchangeBind()
, and
replays those calls on a reconnection event:
Note
Calling Channel.QueueDelete()
, Channel.QueueUnbind()
, Channel.ExchangeDelete
,
and Channel.ExchangeUnbind()
will remove relevant robust queues and bindings from
the internally tracked lists. Queues invoked in these methods will NOT be recreated
on a reconnection event.
Delivery Tag Continuity¶
Delivery tags remain continuous, even across unexpected disconnects. Roger, rabbit takes care of all the internal logic of lining up the caller-facing delivery tag with the actual delivery tag relative to the current underlying channel:
// Get a new connection to our test broker.
connection, err := amqp.DialCtx(context.Background(), amqpTest.TestDialAddress)
if err != nil {
panic(err)
}
// Get a new channel from our robust connection for consuming.
consumeChannel, err := connection.Channel()
if err != nil {
panic(err)
}
// Get a new channel from our robust connection for publishing.
publishChannel, err := connection.Channel()
if err != nil {
panic(err)
}
queueName := "example_delivery_tag_continuity"
// Declare the queue we are going to use.
queue, err := consumeChannel.QueueDeclare(
queueName, // name
false, // durable
false, // autoDelete
false, // exclusive
false, // noWait
nil, // args
)
if err != nil {
panic(err)
}
// Clean up the queue on exit,
defer consumeChannel.QueueDelete(
queue.Name, false, false, false,
)
// Set the prefetch count to 1, that way we are less likely to lose a message
// that in in-flight from the broker in this example.
err = consumeChannel.Qos(1, 0, false)
if err != nil {
panic(err)
}
// Start consuming the channel
consume, err := consumeChannel.Consume(
queue.Name,
"example consumer", // consumer name
false, // autoAck
false, // exclusive
false, // no local
false, // no wait
nil, // args
)
// We'll close this channel when the consumer is exhausted
consumeComplete := new(sync.WaitGroup)
consumerClosed := make(chan struct{})
// Launch a consumer
go func() {
// Close the consumeComplete to signal exit
defer close(consumerClosed)
fmt.Println("STARTING CONSUMER")
// Range over the consume channel
for delivery := range consume {
// Ack the delivery.
err = delivery.Ack(false)
if err != nil {
panic(err)
}
// Force-reconnect the channel after each delivery.
consumeChannel.Test(new(testing.T)).ForceReconnect(context.Background())
// Tick down the consumeComplete waitgroup
consumeComplete.Done()
// Print the delivery. Even though we are forcing a new underlying channel
// to be connected each time, the delivery tags will still be continuous.
fmt.Printf(
"DELIVERY %v: %v\n", delivery.DeliveryTag, string(delivery.Body),
)
}
fmt.Println("DELIVERIES EXHAUSTED")
}()
// We'll publish 10 test messages.
for i := 0; i < 10; i++ {
// Add one to the consumeComplete WaitGroup.
consumeComplete.Add(1)
// Publish a message. Even though the consumer may be force re-connecting the
// connection each time, we can keep using the channel.
//
// NOTE: it is possible that we will drop a message here during a reconnection
// event. If we want to be sure all messages reach the broker, we'll need to
// publish messages with the Channel in confirmation mode, which we will
// show in another example.
err = publishChannel.Publish(
"",
queue.Name,
false,
false,
amqp.Publishing{
Body: []byte(fmt.Sprintf("message %v", i)),
},
)
if err != nil {
panic(err)
}
}
// Wait for all messages to be received
consumeComplete.Wait()
// Close the connection
err = connection.Close()
if err != nil {
panic(err)
}
// Wait for the consumer to exit
<-consumerClosed
// exit
// Output:
// STARTING CONSUMER
// DELIVERY 1: message 0
// DELIVERY 2: message 1
// DELIVERY 3: message 2
// DELIVERY 4: message 3
// DELIVERY 5: message 4
// DELIVERY 6: message 5
// DELIVERY 7: message 6
// DELIVERY 8: message 7
// DELIVERY 9: message 8
// DELIVERY 10: message 9
// DELIVERIES EXHAUSTED
Warning
In the above example, is possible that we will drop the publishing of message during a reconnection event. If we want to be sure all messages reach the broker, we’ll need to publish messages with the Channel in confirmation mode, which we will show in the next example.
Delivery Tag Orphans¶
When manually acking Deliveries, it is possible that between the time we get a Delivery, and the time that we ack it, a disconnection of the underlying channel has occurred and the delivery is no longer acknowledgable. In such cases, an error will be returned indicating this delivery has been orphaned:
// Get a new connection to our test broker.
connection, err := amqp.Dial(amqpTest.TestDialAddress)
if err != nil {
panic(err)
}
// Get a new channel from our robust connection for consuming.
channel, err := connection.Channel()
if err != nil {
panic(err)
}
queueName := "example_delivery_ack_orphan"
// Declare the queue we are going to use.
_, err = channel.QueueDeclare(
queueName, // name
false, // durable
true, // autoDelete
false, // exclusive
false, // noWait
nil, // args
)
if err != nil {
panic(err)
}
// Cleanup channel on exit.
defer channel.QueueDelete(queueName, false, false, false)
// Start consuming the channel
consume, err := channel.Consume(
queueName,
"example consumer", // consumer name
// Auto-ack is set to false
false, // autoAck
false, // exclusive
false, // no local
false, // no wait
nil, // args
)
// publish a message
err = channel.Publish(
"", // exchange
queueName,
false,
false,
amqp.Publishing{
Body: []byte("test message"),
},
)
if err != nil {
panic(err)
}
// get the delivery of our published message
delivery := <- consume
fmt.Println("DELIVERY:", string(delivery.Body))
// Force-close the channel.
channel.Test(new(testing.T)).ForceReconnect(context.Background())
// Now that the original underlying channel is closed, it is impossible to ack
// the delivery. We will get an error when we try.
err = delivery.Ack(false)
fmt.Println("ACK ERROR:", err)
// This error is an orphan error
var orphanErr amqp.ErrCantAcknowledgeOrphans
if !errors.As(err, &orphanErr) {
panic("error not orphan error")
}
fmt.Println("FIRST ORPHAN TAG:", orphanErr.OrphanTagFirst)
fmt.Println("LAST ORPHAN TAG :", orphanErr.OrphanTagLast)
// Output:
// DELIVERY: test message
// ACK ERROR: 1 tags orphaned (1 - 1), 0 tags successfully acknowledged
// FIRST ORPHAN TAG: 1
// LAST ORPHAN TAG : 1
Publishing Tag Continuity¶
Just like with Delivery Tags, publishing tag continuity is maintained, even across disconnection events.
// Get a new connection to our test broker.
connection, err := amqp.Dial(amqpTest.TestDialAddress)
if err != nil {
panic(err)
}
// Get a new channel from our robust connection for publishing.
publishChannel, err := connection.Channel()
if err != nil {
panic(err)
}
// Put the channel into confirmation mode
err = publishChannel.Confirm(false)
if err != nil {
panic(err)
}
confirmationsReceived := new(sync.WaitGroup)
confirmationsComplete := make(chan struct{})
// Create a channel to consume publication confirmations.
publishEvents := publishChannel.NotifyPublish(make(chan amqp.Confirmation))
go func() {
// Close to signal exit.
defer close(confirmationsComplete)
// Range over the confirmation channel.
for confirmation := range publishEvents {
// Mark 1 confirmation as done.
confirmationsReceived.Done()
// Print confirmation.
fmt.Printf(
"CONFIRMATION TAG %02d: ACK: %v ORPHAN: %v\n",
confirmation.DeliveryTag,
confirmation.Ack,
// If the confirmation was never received because the channel was
// disconnected, then confirmation.Ack will be false, and
// confirmation.DisconnectOrphan will be true.
confirmation.DisconnectOrphan,
)
}
}()
// Declare the message queue
queueName := "example_delivery_tag_continuity"
_, err = publishChannel.QueueDeclare(
queueName,
false,
true,
false,
false,
nil,
)
if err != nil {
panic(err)
}
// We'll publish 10 test messages.
for i := 0 ; i < 10 ; i++ {
// We want to wait here to make sure we got the confirmation from the last
// publication before force-closing the connection to show we can handle it.
confirmationsReceived.Wait()
// Force a reconnection of the underlying channel.
publishChannel.Test(new(testing.T)).ForceReconnect(context.Background())
// Increment the confirmation WaitGroup
confirmationsReceived.Add(1)
// Publish a message. Even though the consumer may be force re-connecting the
// connection each time, we can keep using the channel.
err = publishChannel.Publish(
"",
queueName,
false,
false,
amqp.Publishing{
Body: []byte(fmt.Sprintf("message %v", i)),
},
)
if err != nil {
panic(err)
}
}
// Wait for all confirmations to be received.
confirmationsReceived.Wait()
// Close the connection.
err = connection.Close()
if err != nil {
panic(err)
}
// Wait for the confirmation routine to exit.
<-confirmationsComplete
// Exit.
// Output:
// CONFIRMATION TAG 01: ACK: true ORPHAN: false
// CONFIRMATION TAG 02: ACK: true ORPHAN: false
// CONFIRMATION TAG 03: ACK: true ORPHAN: false
// CONFIRMATION TAG 04: ACK: true ORPHAN: false
// CONFIRMATION TAG 05: ACK: true ORPHAN: false
// CONFIRMATION TAG 06: ACK: true ORPHAN: false
// CONFIRMATION TAG 07: ACK: true ORPHAN: false
// CONFIRMATION TAG 08: ACK: true ORPHAN: false
// CONFIRMATION TAG 09: ACK: true ORPHAN: false
// CONFIRMATION TAG 10: ACK: true ORPHAN: false
Note
The Confirmation.DisconnectOrphan
is a new field for the Confirmation
type and
is unique to Roger, Rabbit.
When DisconnectOrphan
is true, it means that a Nack occurred not from a broker
response, but because no confirmation positive or negative was received from the
broker before the connection was disrupted. Orphaned messages may have reached the
broker – we have no way of knowing.
Channel Middleware¶
Roger, Rabbit allows the registration of middleware on all Channel
methods. In fact,
most of the robust Channel
features are implemented through middleware defined in
the amqp/defaultMiddlewares
package! It is a powerful tool and one of the biggest
API expansions over streadway/amqp.
Middleware signatures are defined in the amqp/amqpMiddleware
package.
Registering Middleware¶
// define our new middleware
queueDeclareMiddleware := func(
next amqpmiddleware.HandlerQueueDeclare,
) amqpmiddleware.HandlerQueueDeclare {
return func(args amqpmiddleware.ArgsQueueDeclare) (streadway.Queue, error) {
fmt.Println("MIDDLEWARE INVOKED FOR QUEUE")
fmt.Println("QUEUE NAME :", args.Name)
fmt.Println("AUTO-DELETE:", args.AutoDelete)
return next(args)
}
}
// Create a config and add our middleware to it.
config := amqp.DefaultConfig()
config.ChannelMiddleware.AddQueueDeclare(queueDeclareMiddleware)
// Get a new connection to our test broker.
connection, err := amqp.DialConfigCtx(
context.Background(), amqptest.TestDialAddress, config,
)
if err != nil {
panic(err)
}
defer connection.Close()
// Get a new channel from our robust connection for publishing. The channel is
// created with our default middleware.
channel, err := connection.Channel()
if err != nil {
panic(err)
}
// Declare our queue, our middleware will be invoked and print a message.
_, err = channel.QueueDeclare(
"example_middleware",
false,
true,
false,
false,
nil,
)
if err != nil {
panic(err)
}
// MIDDLEWARE INVOKED FOR QUEUE
// QUEUE NAME : example_middleware
// AUTO-DELETE: true
Middleware Providers¶
For more complex middleware, you can implement middleware providers, which expose methods that implement middleware. You can then register a factory method that will generate a new provider value whenever a new Connection or Channel is created, and the middleware methods will be automatically registered for you.
This can help build complex middlewares. Let’s define a custom middleware provider and factory method:
// CustomMiddlewareProvider exposes methods for middlewares that need to coordinate.
type CustomMiddlewareProvider struct {
InvocationCount int
}
// TypeID implements amqpmiddleware.ProvidesMiddleware and returns a unique type ID
// that can be used to fetch middleware values when testing.
func (middleware *CustomMiddlewareProvider) TypeID() amqpmiddleware.ProviderTypeID {
return "CustomMiddleware"
}
// QueueDeclare implements amqpmiddleware.ProvidesQueueDeclare.
func (middleware *CustomMiddlewareProvider) QueueDeclare(
next amqpmiddleware.HandlerQueueDeclare,
) amqpmiddleware.HandlerQueueDeclare {
return func(args amqpmiddleware.ArgsQueueDeclare) (streadway.Queue, error) {
middleware.InvocationCount++
fmt.Printf(
"DECLARED: %v, TOTAL: %v\n", args.Name, middleware.InvocationCount,
)
return next(args)
}
}
// QueueDelete implements amqpmiddleware.ProvidesQueueDelete.
func (middleware *CustomMiddlewareProvider) QueueDelete(
next amqpmiddleware.HandlerQueueDeclare,
) amqpmiddleware.HandlerQueueDeclare {
return func(args amqpmiddleware.ArgsQueueDeclare) (streadway.Queue, error) {
middleware.InvocationCount++
fmt.Printf(
"DELETED: %v, TOTAL: %v\n", args.Name, middleware.InvocationCount,
)
return next(args)
}
}
// NewCustomMiddlewareProvider creates a new CustomMiddlewareProvider.
func NewCustomMiddlewareProvider() amqpmiddleware.ProvidesMiddleware {
return new(CustomMiddlewareProvider)
}
We can register it on our Config so that every channel created from a connection gets a fresh instance of our provider:
// Create a config and add our middleware provider factory to it.
config := amqp.DefaultConfig()
config.ChannelMiddleware.AddProviderFactory(NewCustomMiddlewareProvider)
// Get a new connection to our test broker.
connection, err := amqp.DialConfigCtx(
context.Background(), amqptest.TestDialAddress, config,
)
if err != nil {
panic(err)
}
defer connection.Close()
// Get a new channel from our robust connection for publishing. The channel is
// created with our default middleware.
channel, err := connection.Channel()
if err != nil {
panic(err)
}
// Declare our queue, our middleware will be invoked and print a message.
_, err = channel.QueueDeclare(
"example_middleware",
false, // durable
true, // autoDelete
false, // exclusive
false, // noWait
nil, // args
)
if err != nil {
panic(err)
}
// Delete our queue, our middleware will be invoked and print a message.
_, err = channel.QueueDelete(
"example_middleware",
false, // ifUnused
false, // ifEmpty
false, // noWait
)
if err != nil {
panic(err)
}
// MIDDLEWARE INVOKED FOR QUEUE
// DECLARED: example_middleware, TOTAL: 1
// DELETED: example_middleware, TOTAL: 2
// AUTO-DELETE: true
if, for some reason, we wanted every Channel to share the same instance of the provider, we could make the following adjustment:
// Create an instance of our provider:
provider := NewCustomMiddlewareProvider()
config := amqp.DefaultConfig()
config.ChannelMiddleware.AddProviderMethods(provider)
This will add the methods once and re-use them for all Channels rather than making a fresh provider every time a channel is generated.
Note
Many of rogerRabbit’s more complex features are implemented through middleware
providers. Check the defaultmiddlewares
package to see practical examples of
middleware providers used in this library.
Testing¶
Testing is a first-class citizen of the Roger, Rabbit package. Types expose a robust
number of testing methods, and the amqpTest
offers a number of additional testing
utilities.
Note
Testing methods and utilities are heavily integrated with testify. Testify is somewhat divisive within the Go community, but as the maintainers of this repository heavily leverage it, so does Roger, Rabbit’s testing utilities.
Test() Methods¶
Both Connection
and Channel
expose a .Test()
method, which returns a testing
harness type with additional methods for running tests on it’s parent value.
Most test methods do not return an error, instead opting to report the error and immediately fail the test.
Example:
// Get a new connection to our test broker.
connection, err := amqp.Dial(amqpTest.TestDialAddress)
if err != nil {
panic(err)
}
defer connection.Close()
// Get a new channel from our robust connection for publishing. The channel is
// created with our default middleware.
channel, err := connection.Channel()
if err != nil {
panic(err)
}
// Get a channel testing harness. In a real test function, you would pass the
// test's *testing.T value. Here, we will just pass a dummy one.
testHarness := channel.Test(new(testing.T))
// We can use the test harness to force the channel to reconnect. If a reconnection
// does not occur before the passed context expires, the test will be failed.
ctx, cancel := context.WithTimeout(context.Background(), 5 * time.Second)
defer cancel()
testHarness.ForceReconnect(ctx)
// We can check how many times a reconnection has occurred. The first time we
// connect to the broker is counted, so we should get '2':
fmt.Println("RECONNECTION COUNT:", testHarness.ReconnectionCount())
// exit.
// Output:
// RECONNECTION COUNT: 2
Testify Suite¶
The amqpTesting
package makes an AmqpSuite
type that is an extension of
testify/suite.Suite
AmqpSuite
adds a number of QoL methods for quickly setting up an tearing down
integration tests with a test broker.
See the godoc documentation for more details.
Architecture¶
Overview¶
Schematic of the amqp transport (Channel
and Connection
) architecture.¶
Lets dig in to some of the specifics.
Note
When we refer to an amqp.[Type], we are referring to the rogerRabbit-go/pkg/amqp implementation of that type. If we need to refer to underlying types provided by the streadway/amqp package, we will refer to them as streadway.[Type]
Warning
The file organization of the amqp
package is still in flux, though the API and
underlying logical structure is likely to remain stable. This architecture guide
will refrain from referencing specific files or paths for the time being, until
the types and helpers are done playing musical chairs.
Streadway Transport¶
The Streadway Transport
is our underlying streadway.Connection
or
streadway.Channel
and handles all of our actual communication with the
AMQP Broker
.
It is manages with by a Robust Transport
.
Robust Transport¶
A Robust Transport is an amqp.Channel
or amqp.Connection
. These types manage
their streadway.Channel
or streadway.Connection
counterparts, and expose
Exported Methods
to the caller that re-implement the methods supplied by the
Streadway Transport
they manage.
Each Robust Transport
implements the reconnects
interface, which in turn is
referenced by the Transport Manager
and used to redial the underlying broker
connection.
So why are we talking about the Transport Manager
here if it manages the
Robust Transport
? Well, because the Transport Manager
is then embedded into the
Robust Transport it is managing, and provides some of that transport’s methods.
Close()
, NotifyDisconnect()
, and all other methods that are shared between
amqp.Channel
and amqp.Connection
are actually provided by an embedded
transportManager
value which, like an ouroboros, also contains a reference to it’s
parent transport that is used to help manage the lifecycle of that transport’s
underlying streadway transport through the shared reconnects
interface.
Method Handlers¶
Each Robust Transport
(and the Transport Manager
) contains a handlers
field
which holds all the method handlers for that transport. These handlers are comprised of
the base streadway/amqp type methods wrapped in user-supplied Middleware
.
Middleware¶
Middleware
is a core component of the Roger, Rabbit amqp package. Most of the Roger,
Rabbit’s extended functionality is supplied by middleware, such as continuous Delivery
Tags over disconnects and auto-redeclaration of broker topology on reconnections.
In the early life of this package, robust features were implemented directly on the
Channel
, resulting in features which were hard to isolate and maintain. Topology
re-declaration, for instance, involves 9 methods manipulating 6 data resources over
~700 lines of code. Debugging and tracking errors with these features was incredibly
cumbersome when they were spread out over the main Channel
method calls.
By moving these features into middleware, all off the logic that supports a given feature can be housed together, greatly reducing the complexity and increasing maintainability.
The current default middleware that ships with Roger, Rabbit is:
ConfirmsMiddleware: Tracks whether
Channel.Confirms
has been called, and sets any freshly reconnected streadway Channels into the correct state.DeliveryTagsMiddleware: Ensures that Delivery tags are continuous for the caller, even over disconnects.
FlowMiddleware: Tracks the Flow state of channel and sets the correct state on Channel reconnections.
LoggingMiddlewareConnection: Facilitates logging for all Connection Operations.
LoggingMiddlewareChannel: Facilitates logging for all Channel Operations.
PublishTagsMiddleware: Ensures that Publish Confirmation tags are continuous for the caller, even over disconnects.
QoSMiddleware: Tracks QoS setting and sets up Channels on reconnect to match.
RouteDeclarationMiddleware: Handles topology recreation on Channel reconnects.
Together these middlewares implement the bulk of our robust feature set.
Transport Lock¶
The TransportLock
is a *sync.RWMutex
contained on the transportManager
used to
ensure there is no race conditions when a disconnection event occurs.
Any process that wishes to make use of the underlying streadway connection must acquire the transport lock for read.
When the transport manager redials the broke, the transport lock is acquired for write, and not released until a successful connection occurs. This blocks all other operations (like channel and connection method calls) from interacting with the underlying streadway transport until we are reconnected.
Transport Manager¶
The Transport Manager
implements all common functionality between Channel
and
Connection
. See the above section for more details.
The Transport Manager
listens for a disconnection event to occur, the grabs an
exclusive Write access to the Transport Lock
. Only then does it continuously attempt
to redial the broker using the methods exposed on out Robust Connection
through the
reconnects
interface.
The transport manager also exposes a Retry On Disconnected
method which all of our
exported Channel
and Connection
Exported Methods
invoke to complete
requests.
Redial Routine¶
The Redial Routine
is launched by the Transport Manager
. The redial routine
is responsible for reconnecting to the underlying Streadway Transport
on a
disconnection event.
Register’s a listener with the
Streadway Transport
’sNotifyOnClose
method.Blocks until the listener signals that
Streadway Connection
has closed (disconnection event)Exits if our
Robust Transport
has been closed by the caller.Acquires the
Transport Lock
for write, blockingExported Method
calls until a successful reconnection occurs.Calls the
tryReconnect
method on theRobust
transport until we get a successful result.Register’s a listener with the new
Streadway Connection
’sNotifyOnClose
method.Releases the
Transport Lock
.Restarts at step 2.
Retry On Disconnected¶
Retry On Disconnected
is a method exposed by Transport Manager
that enables
Robust Transports
(Channel
and Connection
) to automatically retry an
operation if it failed because we were disconnected from the AMQP broker.
When invoked, Retry On Disconnected
does the following:
Acquires the
Transport Lock
for read (so multiple methods can be called simultaneously).Runs the operation handed to it by the
Exported Method
, usually aMethod Handler
.Releases the
Transport Lock
.If an error occurred because the broker was not reachable, goes back to step 1.
Passes the result back to up to the caller.
Event Relays¶
Many Channel
methods like NotifyPublish
or Consume
involve sending
events along a provided or returned Go chan [Event]
value to the caller.
When a streadway.Channel
disconnects, it closes all event chan [Event]
values
it is feeding. Because we want our event chan [Event]
values to survive a
disconnect, we cannot pass the caller’s chan [Event]
directly to a
streadway.Channel
. Instead, we need to create our own temporary chan [Event]
,
pass that to the Streadway Transport
, then relay the events from our temporary
chan [Event]
to the caller’s chan [Event]
.
To do this, we launch an Event Relay
,
When a reconnection event occurs, the relay creates a new temporary chan [Event]
,
passes that to the new Streadway Transport
, and the continues to relay messages to
the caller as if nothing has happened.
Since Channel
is the only transport that needs event relays, it manages their
lifecycle. Event Relays
must be kept in careful sync with reconnection events to
ensure that data or logic races do not occur. That Event Relay
lifecycle is as
follows:
A relay is started by the
Channel.[Method]
handler using thetransportManager.retryOperationOnClosed
function so it can acquire theTransport Lock
for read when starting up (ensuring thestreadway.Channel
is not swapped out from under it).The relay runs it’s first leg until the undeerlying
streadway.Channel
disconnects.On
streadway.Channel
disconnect, the temporarychan [Event]
feeding the relay dries up. The relay waits on a new*streadway.Channel
to be sent by it’sChannel
on a successful reconnection.The
transportManager
acquires theTransport Lock
for write and callsChannel.tryReconnect
over and over until a newstreadway.Channel
is successfully connected.When a new
streadway.Channel
is successfully established,Channel.tryReconnect
does not return until all the remaining steps are completed.Channel.tryReconnect
waits for allEvent Relays
to signal that their last leg has been successfully completed.Channel.tryReconnect
sends the new*streadway.Channel
value to each relay for it to run any setup and start relaying events again.Channel.tryReconnect
waits for all relays to signal that their setup on*streadway.Channel
is complete. If we were to return and release theTransport Lock
before this, users might start taking actions that SHOULD generate events before ourstreadway.Channel
had been set up to send them, resulting in dropped events.Channel.tryReconnect
returns with the new, connected*streadway.Channel
value releasing theTransport Lock
so that callers can begin callingChannel
methods again.