11"""Tornado-compatible AMQP consumer
22
33code originally taken from:
4- https://pika.readthedocs.org /en/0.9.14 /examples/tornado_consumer.html
4+ https://pika.readthedocs.io /en/1.3.2 /examples/tornado_consumer.html
55
66"""
77
88
99import logging
10-
1110import pika
1211from pika import adapters
12+ from pika .adapters .tornado_connection import TornadoConnection
13+ from pika .exchange_type import ExchangeType
1314
14-
15- log = logging .getLogger (__name__ )
15+ LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
16+ '-35s %(lineno) -5d: %(message)s' )
17+ LOGGER = logging .getLogger (__name__ )
1618
1719
1820class Consumer (object ):
@@ -28,29 +30,23 @@ class Consumer(object):
2830 commands that were issued and that should surface in the output as well.
2931
3032 """
33+ EXCHANGE = 'message'
34+ EXCHANGE_TYPE = ExchangeType .topic
35+ QUEUE = 'text'
36+ ROUTING_KEY = 'example.text'
3137
32- def __init__ (self , amqp_url , exchange , queue ,
33- exchange_type = 'topic' , routing_key = '' ,
34- exchange_kwargs = None , queue_kwargs = None , ack = True ):
38+ def __init__ (self , amqp_url ):
3539 """Create a new instance of the consumer class, passing in the AMQP
3640 URL used to connect to RabbitMQ.
3741
3842 :param str amqp_url: The AMQP url to connect with
3943
4044 """
41- self .amqp_url = amqp_url
42- self .exchange = exchange
43- self .exchange_type = exchange_type
44- self .queue = queue
45- self .routing_key = routing_key
46- self .exchange_kwargs = exchange_kwargs or {}
47- self .queue_kwargs = queue_kwargs or {}
48- self .ack = ack
49-
5045 self ._connection = None
5146 self ._channel = None
5247 self ._closing = False
5348 self ._consumer_tag = None
49+ self ._url = amqp_url
5450
5551 def connect (self ):
5652 """This method connects to RabbitMQ, returning the connection handle.
@@ -60,51 +56,52 @@ def connect(self):
6056 :rtype: pika.SelectConnection
6157
6258 """
63- log .info ('Connecting to %s' , self .amqp_url )
64- return adapters .TornadoConnection (pika .URLParameters (self .amqp_url ),
65- self .on_connection_open )
59+ LOGGER .info ('Connecting to %s' , self ._url )
60+ return TornadoConnection (
61+ pika .URLParameters (self ._url ),
62+ self .on_connection_open ,
63+ )
6664
6765 def close_connection (self ):
6866 """This method closes the connection to RabbitMQ."""
69- log .info ('Closing connection' )
67+ LOGGER .info ('Closing connection' )
7068 self ._connection .close ()
7169
7270 def add_on_connection_close_callback (self ):
7371 """This method adds an on close callback that will be invoked by pika
7472 when RabbitMQ closes the connection to the publisher unexpectedly.
7573
7674 """
77- log . debug ('Adding connection close callback' )
75+ LOGGER . info ('Adding connection close callback' )
7876 self ._connection .add_on_close_callback (self .on_connection_closed )
7977
80- def on_connection_closed (self , connection , reply_code , reply_text ):
78+ def on_connection_closed (self , connection , reason ):
8179 """This method is invoked by pika when the connection to RabbitMQ is
8280 closed unexpectedly. Since it is unexpected, we will reconnect to
8381 RabbitMQ if it disconnects.
8482
8583 :param pika.connection.Connection connection: The closed connection obj
86- :param int reply_code: The server provided reply_code if given
87- :param str reply_text: The server provided reply_text if given
84+ :param Exception reason: exception representing reason for loss of
85+ connection.
8886
8987 """
9088 self ._channel = None
9189 if self ._closing :
92- # self._connection.ioloop.stop()
93- pass
90+ self ._connection .ioloop .stop ()
9491 else :
95- log .warning ('Connection closed, reopening in 5 seconds: (%s) %s' ,
96- reply_code , reply_text )
97- self ._connection .add_timeout (5 , self .reconnect )
92+ LOGGER .warning ('Connection closed, reopening in 5 seconds: %s' ,
93+ reason )
94+ self ._connection .ioloop . call_later (5 , self .reconnect )
9895
9996 def on_connection_open (self , unused_connection ):
10097 """This method is called by pika once the connection to RabbitMQ has
10198 been established. It passes the handle to the connection object in
10299 case we need it, but in this case, we'll just mark it unused.
103100
104- :type unused_connection: pika.SelectConnection
101+ :param pika.SelectConnection _unused_connection: The connection
105102
106103 """
107- log .info ('Connection opened' )
104+ LOGGER .info ('Connection opened' )
108105 self .add_on_connection_close_callback ()
109106 self .open_channel ()
110107
@@ -114,6 +111,7 @@ def reconnect(self):
114111
115112 """
116113 if not self ._closing :
114+
117115 # Create a new connection
118116 self ._connection = self .connect ()
119117
@@ -122,23 +120,21 @@ def add_on_channel_close_callback(self):
122120 RabbitMQ unexpectedly closes the channel.
123121
124122 """
125- log . debug ('Adding channel close callback' )
123+ LOGGER . info ('Adding channel close callback' )
126124 self ._channel .add_on_close_callback (self .on_channel_closed )
127125
128- def on_channel_closed (self , channel , reply_code , reply_text ):
126+ def on_channel_closed (self , channel , reason ):
129127 """Invoked by pika when RabbitMQ unexpectedly closes the channel.
130128 Channels are usually closed if you attempt to do something that
131129 violates the protocol, such as re-declare an exchange or queue with
132130 different parameters. In this case, we'll close the connection
133131 to shutdown the object.
134132
135133 :param pika.channel.Channel: The closed channel
136- :param int reply_code: The numeric reason the channel was closed
137- :param str reply_text: The text reason the channel was closed
134+ :param Exception reason: why the channel was closed
138135
139136 """
140- log .warning ('Channel %i was closed: (%s) %s' ,
141- channel , reply_code , reply_text )
137+ LOGGER .warning ('Channel %i was closed: %s' , channel , reason )
142138 self ._connection .close ()
143139
144140 def on_channel_open (self , channel ):
@@ -150,10 +146,10 @@ def on_channel_open(self, channel):
150146 :param pika.channel.Channel channel: The channel object
151147
152148 """
153- log . debug ('Channel opened' )
149+ LOGGER . info ('Channel opened' )
154150 self ._channel = channel
155151 self .add_on_channel_close_callback ()
156- self .setup_exchange (self .exchange )
152+ self .setup_exchange (self .EXCHANGE )
157153
158154 def setup_exchange (self , exchange_name ):
159155 """Setup the exchange on RabbitMQ by invoking the Exchange.Declare RPC
@@ -163,22 +159,22 @@ def setup_exchange(self, exchange_name):
163159 :param str|unicode exchange_name: The name of the exchange to declare
164160
165161 """
166- log .debug ('Declaring exchange %s' , exchange_name )
167- self ._channel .exchange_declare (self .on_exchange_declareok ,
168- exchange_name ,
169- self .exchange_type ,
170- ** self .exchange_kwargs )
162+ LOGGER .info ('Declaring exchange %s' , exchange_name )
163+ self ._channel .exchange_declare (
164+ callback = self .on_exchange_declareok ,
165+ exchange = exchange_name ,
166+ exchange_type = self .EXCHANGE_TYPE ,
167+ )
171168
172169 def on_exchange_declareok (self , unused_frame ):
173170 """Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC
174171 command.
175172
176- :param pika.Frame.Method unused_frame: Exchange.DeclareOk response
177- frame
173+ :param pika.Frame.Method unused_frame: Exchange.DeclareOk response frame
178174
179175 """
180- log . debug ('Exchange declared' )
181- self .setup_queue (self .queue )
176+ LOGGER . info ('Exchange declared' )
177+ self .setup_queue (self .QUEUE )
182178
183179 def setup_queue (self , queue_name ):
184180 """Setup the queue on RabbitMQ by invoking the Queue.Declare RPC
@@ -188,9 +184,11 @@ def setup_queue(self, queue_name):
188184 :param str|unicode queue_name: The name of the queue to declare.
189185
190186 """
191- log .debug ('Declaring queue %s' , queue_name )
192- self ._channel .queue_declare (self .on_queue_declareok , queue_name ,
193- ** self .queue_kwargs )
187+ LOGGER .info ('Declaring queue %s' , queue_name )
188+ self ._channel .queue_declare (
189+ queue = queue_name ,
190+ callback = self .on_queue_declareok ,
191+ )
194192
195193 def on_queue_declareok (self , method_frame ):
196194 """Method invoked by pika when the Queue.Declare RPC call made in
@@ -202,18 +200,22 @@ def on_queue_declareok(self, method_frame):
202200 :param pika.frame.Method method_frame: The Queue.DeclareOk frame
203201
204202 """
205- log .debug ('Binding %s to %s with %s' ,
206- self .exchange , self .queue , self .routing_key )
207- self ._channel .queue_bind (self .on_bindok , self .queue ,
208- self .exchange , self .routing_key )
203+ LOGGER .info ('Binding %s to %s with %s' ,
204+ self .EXCHANGE , self .QUEUE , self .ROUTING_KEY )
205+ self ._channel .queue_bind (
206+ queue = self .QUEUE ,
207+ exchange = self .EXCHANGE ,
208+ routing_key = self .ROUTING_KEY ,
209+ callback = self .on_bindok ,
210+ )
209211
210212 def add_on_cancel_callback (self ):
211213 """Add a callback that will be invoked if RabbitMQ cancels the consumer
212214 for some reason. If RabbitMQ does cancel the consumer,
213215 on_consumer_cancelled will be invoked by pika.
214216
215217 """
216- log . debug ('Adding consumer cancellation callback' )
218+ LOGGER . info ('Adding consumer cancellation callback' )
217219 self ._channel .add_on_cancel_callback (self .on_consumer_cancelled )
218220
219221 def on_consumer_cancelled (self , method_frame ):
@@ -223,8 +225,8 @@ def on_consumer_cancelled(self, method_frame):
223225 :param pika.frame.Method method_frame: The Basic.Cancel frame
224226
225227 """
226- log . debug ('Consumer was cancelled remotely, shutting down: %r' ,
227- method_frame )
228+ LOGGER . info ('Consumer was cancelled remotely, shutting down: %r' ,
229+ method_frame )
228230 if self ._channel :
229231 self ._channel .close ()
230232
@@ -235,7 +237,7 @@ def acknowledge_message(self, delivery_tag):
235237 :param int delivery_tag: The delivery tag from the Basic.Deliver frame
236238
237239 """
238- log . debug ('Acknowledging message %s' , delivery_tag )
240+ LOGGER . info ('Acknowledging message %s' , delivery_tag )
239241 self ._channel .basic_ack (delivery_tag )
240242
241243 def on_message (self , unused_channel , basic_deliver , properties , body ):
@@ -249,13 +251,12 @@ def on_message(self, unused_channel, basic_deliver, properties, body):
249251 :param pika.channel.Channel unused_channel: The channel object
250252 :param pika.Spec.Basic.Deliver: basic_deliver method
251253 :param pika.Spec.BasicProperties: properties
252- :param str|unicode body: The message body
254+ :param bytes body: The message body
253255
254256 """
255- log .debug ('Received message # %s from %s: %s' ,
256- basic_deliver .delivery_tag , properties .app_id , body )
257- if self .ack :
258- self .acknowledge_message (basic_deliver .delivery_tag )
257+ LOGGER .info ('Received message # %s from %s: %s' ,
258+ basic_deliver .delivery_tag , properties .app_id , body )
259+ self .acknowledge_message (basic_deliver .delivery_tag )
259260
260261 def on_cancelok (self , unused_frame ):
261262 """This method is invoked by pika when RabbitMQ acknowledges the
@@ -266,7 +267,7 @@ def on_cancelok(self, unused_frame):
266267 :param pika.frame.Method unused_frame: The Basic.CancelOk frame
267268
268269 """
269- log . debug ('RabbitMQ acknowledged the cancellation of the consumer' )
270+ LOGGER . info ('RabbitMQ acknowledged the cancellation of the consumer' )
270271 self .close_channel ()
271272
272273 def stop_consuming (self ):
@@ -275,7 +276,7 @@ def stop_consuming(self):
275276
276277 """
277278 if self ._channel :
278- log .info ('Sending a Basic.Cancel RPC command to RabbitMQ' )
279+ LOGGER .info ('Sending a Basic.Cancel RPC command to RabbitMQ' )
279280 self ._channel .basic_cancel (self .on_cancelok , self ._consumer_tag )
280281
281282 def start_consuming (self ):
@@ -288,11 +289,12 @@ def start_consuming(self):
288289 will invoke when a message is fully received.
289290
290291 """
291- log . debug ('Issuing consumer related RPC commands' )
292+ LOGGER . info ('Issuing consumer related RPC commands' )
292293 self .add_on_cancel_callback ()
293- self ._consumer_tag = self ._channel .basic_consume (self .on_message ,
294- self .queue ,
295- no_ack = not self .ack )
294+ self ._consumer_tag = self ._channel .basic_consume (
295+ on_message_callback = self .on_message ,
296+ queue = self .QUEUE ,
297+ )
296298
297299 def on_bindok (self , unused_frame ):
298300 """Invoked by pika when the Queue.Bind method has completed. At this
@@ -302,15 +304,15 @@ def on_bindok(self, unused_frame):
302304 :param pika.frame.Method unused_frame: The Queue.BindOk response frame
303305
304306 """
305- log . debug ('Queue bound' )
307+ LOGGER . info ('Queue bound' )
306308 self .start_consuming ()
307309
308310 def close_channel (self ):
309311 """Call to close the channel with RabbitMQ cleanly by issuing the
310312 Channel.Close RPC command.
311313
312314 """
313- log . debug ('Closing the channel' )
315+ LOGGER . info ('Closing the channel' )
314316 self ._channel .close ()
315317
316318 def open_channel (self ):
@@ -319,16 +321,16 @@ def open_channel(self):
319321 on_channel_open callback will be invoked by pika.
320322
321323 """
322- log . debug ('Creating a new channel' )
324+ LOGGER . info ('Creating a new channel' )
323325 self ._connection .channel (on_open_callback = self .on_channel_open )
324326
325327 def run (self ):
326- """Run the consumer by connecting to RabbitMQ and then
328+ """Run the example consumer by connecting to RabbitMQ and then
327329 starting the IOLoop to block and allow the SelectConnection to operate.
328330
329331 """
330332 self ._connection = self .connect ()
331- # self._connection.ioloop.start()
333+ self ._connection .ioloop .start ()
332334
333335 def stop (self ):
334336 """Cleanly shutdown the connection to RabbitMQ by stopping the consumer
@@ -341,8 +343,8 @@ def stop(self):
341343 the IOLoop will be buffered but not processed.
342344
343345 """
344- log .info ('Stopping' )
346+ LOGGER .info ('Stopping' )
345347 self ._closing = True
346348 self .stop_consuming ()
347- # self._connection.ioloop.start()
348- log .info ('Stopped' )
349+ self ._connection .ioloop .start ()
350+ LOGGER .info ('Stopped' )
0 commit comments