Roger, Rabbit

_images/logo-tophat-tall.svg

Demo

Examples:

Survive a Disconnection Event Using the amqp Package:

// Get a new connection to our test broker.
//
// DialCtx is a new function that allows the Dial function to keep attempting
// re-dials to the broker until the passed context expires.
connection, err := amqp.DialCtx(context.Background(), amqptest.TestDialAddress)
if err != nil {
    panic(err)
}

// Get a new channel from our robust connection.
channel, err := connection.Channel()
if err != nil {
    panic(err)
}

// We can use the Test method to return a testing harness with some additional
// methods. ForceReconnect force-closes the underlying streadway Channel, causing
// the robust Channel to reconnect.
//
// We'll use a dummy *testing.T object here. These methods are designed for tests
// only and should not be used in production code.
channel.Test(new(testing.T)).ForceReconnect(context.Background())

// We can see here our channel is still open.
fmt.Println("IS CLOSED:", channel.IsClosed())

// We can even declare a queue on it
queue, err := channel.QueueDeclare(
    "example_channel_reconnect", // name
    false,                       // durable
    true,                        // autoDelete
    false,                       // exclusive
    false,                       // noWait
    nil,                         // args
)
if err != nil {
    panic(err)
}

// Here is the result
fmt.Printf("QUEUE    : %+v\n", queue)

// Explicitly close the connection. This will also close all child channels.
err = connection.Close()
if err != nil {
    panic(err)
}

// Now that we have explicitly closed the connection, the channel will be closed.
fmt.Println("IS CLOSED:", channel.IsClosed())

// IS CLOSED: false
// QUEUE    : {Name:example_channel_reconnect Messages:0 Consumers:0}
// IS CLOSED: true

Effortless Publishing with Confirmations using the roger Package:

// Get a new connection to our test broker.
connection, err := amqp.DialCtx(context.Background(), amqptest.TestDialAddress)
if err != nil {
    panic(err)
}
defer connection.Close()

// Get a new channel from our robust connection for publishing. This channel will
// be put into confirmation mode by the producer.
channel, err := connection.Channel()
if err != nil {
    panic(err)
}

// Declare a queue to produce to
queue, err := channel.QueueDeclare(
    "example_confirmation_producer", // name
    false,                           // durable
    true,                            // autoDelete
    false,                           // exclusive
    false,                           // noWait
    nil,                             // args
)

// Create a new producer using our channel. Passing nil to opts will result in
// default opts being used. By default, a Producer will put the passed channel in
// confirmation mode, and each time publish is called, will block until a
// confirmation from the server has been received.
producer := rproducer.New(channel, nil)
producerComplete := make(chan struct{})

// Run the producer in it's own goroutine.
go func() {
    // Signal this routine has exited on exit.
    defer close(producerComplete)

    err = producer.Run()
    if err != nil {
        panic(err)
    }
}()

messagesPublished := new(sync.WaitGroup)
for i := 0; i < 10; i++ {

    messagesPublished.Add(1)

    // Publish each message in it's own goroutine The producer handles the
    // boilerplate of tracking publication tags and receiving broker confirmations.
    go func() {
        // Release our WaitGroup on exit.
        defer messagesPublished.Done()

        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
        defer cancel()

        // Publish a message, this method will block until we get a publication
        // confirmation from the broker OR ctx expires.
                    err = producer.Publish(
                            ctx,
                            "", // exchange
                            queue.Name, // queue
                            true, // mandatory
                            false, // immediate
                            amqp.Publishing{
                                    Body: []byte("test message"),
                            },
                    )

        fmt.Println("Message Published and Confirmed!")

        if err != nil {
            panic(err)
        }
    }()
}

// Wait for all our messages to be published
messagesPublished.Wait()

// Start the shutdown of the producer
producer.StartShutdown()

// Wait for the producer to exit.
<-producerComplete

// exit.

// Message Published and Confirmed!
// Message Published and Confirmed!
// Message Published and Confirmed!
// Message Published and Confirmed!
// Message Published and Confirmed!
// Message Published and Confirmed!
// Message Published and Confirmed!
// Message Published and Confirmed!
// Message Published and Confirmed!
// Message Published and Confirmed!

API Documentation

API Documentation is hosted on pkg.go.dev and can be found here.