项目作者: kaxap

项目描述 :
A wrapper with auto-reconnect for Go RabbitMQ client http://github.com/streadway/amqp
高级语言: Go
项目地址: git://github.com/kaxap/rmq.git
创建时间: 2017-10-02T11:04:15Z
项目社区:https://github.com/kaxap/rmq

开源协议:MIT License

下载


rmq

A simple wrapper for Go RabbitMQ client http://github.com/streadway/amqp
Minimizes boilerplate code for using with dockerized RabbitMQ server

example

  1. package main
  2. import (
  3. "github.com/kaxap/rmq"
  4. "github.com/streadway/amqp"
  5. "log"
  6. )
  7. type something struct {
  8. ID int `json:"id"`
  9. Data string `json:"data"`
  10. }
  11. func main() {
  12. // connect to the RabbitMQ server
  13. // connection parameters should be present as environment variables
  14. // i.e. RABBITMQ_DEFAULT_USER, RABBITMQ_DEFAULT_PASS, RABBITMQ_HOST, RABBITMQ_PORT
  15. // for more information see https://hub.docker.com/_/rabbitmq/
  16. // this is short syntax for a durable consumer queue,
  17. // if you need to create a non-durable queue, please use NewQueue constructor (see "Constructors" chapter below)
  18. inputQueue, err := rmq.NewConsumerQueue("input_queue", 1)
  19. if err != nil {
  20. // could not connect or create channel
  21. log.Fatal(err)
  22. }
  23. defer inputQueue.Close()
  24. // this is short syntax for a durable producer queue
  25. outputQueue, err := rmq.NewProducerQueue("output_queue")
  26. if err != nil {
  27. // could not connect or create channel
  28. log.Fatal(err)
  29. }
  30. defer outputQueue.Close()
  31. // change delivery mode for Send and PublishJSON methods
  32. outputQueue.DeliveryMode = amqp.Persistent
  33. var a something
  34. // consume a json encoded message
  35. msg, err := inputQueue.Get(&a)
  36. log.Printf("message = %s\n", a.Data)
  37. // acknowledge the message
  38. msg.Ack(false)
  39. // modify data
  40. a.ID ++
  41. a.Data += " to you too"
  42. // send it to output queue
  43. outputQueue.Send(&a)
  44. }

Now publish {"id": 1, "data": "hello"} to the “input_queue” and see what happens.

Even shorter syntax if you don’t need to send messages very often

  1. package main
  2. import (
  3. "github.com/kaxap/rmq"
  4. "log"
  5. )
  6. type something struct {
  7. ID int `json:"id"`
  8. Data string `json:"data"`
  9. }
  10. func main() {
  11. const maxRetries := 10
  12. data := &something{ID: 1, Data: "hello!"}
  13. rmq.SendAndForget("some_queue", data, maxRetries)
  14. // this will try to connect to a durable queue with the given name and send the data
  15. // if the connection or sending has failed it will retry up to 10 times. Then it will close the connection

Note if you need to send messages to non durable queues, please use rmq.SendAndForgetNonDurable. There is also
SendAndForgetLazy available for lazy queues.

Constructors

There is 6 types of queue constructors:

  1. func NewQueue(
  2. name string, // queue name
  3. durable bool, // durable flag
  4. prefetchCount int, // how many message to prefetch for this client
  5. autoAck bool, // auto_ack flag
  6. consume bool, // true for consumer/producer worker, false for producer-only worker
  7. autoReconnect bool, // true for auto reconnect to rabbitmq server
  8. ) (*Queue, error)
  1. func NewQueueWithArgs(
  2. name string, // queue name
  3. durable bool, // durable flag
  4. prefetchCount int, // how many message to prefetch for this client
  5. autoAck bool, // auto_ack flag
  6. consume bool, // true for consumer/producer worker, false for producer-only worker
  7. autoReconnect bool, // true for auto reconnect to rabbitmq server
  8. args amqp.Table, // map[string]interface{} of queue arguments
  9. ) (*Queue, error)
  1. NewProducerQueue(name string) (*Queue, error)
  2. // short syntax for NewQueue(name, durable=true, prefetchCount=0, autoAck=false, consume=false, autoReconnect=true)
  1. NewConsumerQueue(name string, prefetchCount int) (*Queue, error)
  2. // short syntax for NewQueue(name, durable=true, prefetchCount=prefetchCount, autoAck=false, consume=true, autoReconnect=true)
  1. NewLazyProducerQueue(name string) (*Queue, error)
  2. // short syntax for producer queue with x-queue-mode: lazy args
  1. NewLazyConsumerQueue(name string, prefetchCount int) (*Queue, error)
  2. // short syntax for consumer queue with x-queue-mode: lazy args

Note that lazy queues are often used when a queue is expected to be frequently flooded. In lazy mode RabbitMQ pages out the messages on disk when possible.
For more information see https://www.rabbitmq.com/lazy-queues.html