-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconsumer.go
More file actions
41 lines (37 loc) · 834 Bytes
/
consumer.go
File metadata and controls
41 lines (37 loc) · 834 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package msgr
import (
"log"
"github.com/streadway/amqp"
)
// Accept delivers a stream of messgaes.
func (c *QueueConsumer) Accept() (bool, <-chan amqp.Delivery) {
// check channel state.
c.mu.Lock()
if c.channel == nil {
newchan := channel(c.conn, c.conf.Channel)
if newchan == nil {
c.mu.Unlock()
return false, nil
}
c.channel = newchan
}
c.mu.Unlock()
messages, err := c.channel.Consume(
c.conf.Channel, // queue
"", // messageConsumer
consumeAutoAck, // auto-ack
consumeExclusive, // exclusive
false, // no-local
consumeNoWait, // no-wait
nil, // args
)
if err != nil {
log.Println("could not consume messages")
log.Println("error was: ", err)
c.mu.Lock()
c.channel = nil
c.mu.Unlock()
return false, nil
}
return true, messages
}