Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ source 'https://rubygems.org'

gemspec

gem "pry"
gem "hiredis-client"
# gem 'bunny', '=0.7.10', :path => "#{ENV['HOME']}/src/bunny"
4 changes: 4 additions & 0 deletions lib/beetle/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ def initialize(client, options = {}) #:nodoc:

private

def single_broker_mode?
@client.single_broker_mode?
end

def error(text)
logger.error text
raise Error.new(text)
Expand Down
4 changes: 4 additions & 0 deletions lib/beetle/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ def initialize(config = Beetle.config)
)
end

def single_broker_mode?
servers.size == 1
end

# register an exchange with the given _name_ and a set of _options_:
# [<tt>:type</tt>]
# the type option will be overwritten and always be <tt>:topic</tt>, beetle does not allow fanout exchanges
Expand Down
9 changes: 9 additions & 0 deletions lib/beetle/message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ class Message
# value returned by handler execution
attr_reader :handler_result

def self.create(queue, header, body, opts = {})
new(queue, header, body, opts)
end

def self.single_broker(queue, header, body, opts = {})
SingleBrokerMessage.new(queue, header, body, opts)
end

def initialize(queue, header, body, opts = {})
@queue = queue
@header = header
Expand Down Expand Up @@ -101,6 +109,7 @@ def decode #:nodoc:
def self.publishing_options(opts = {}) #:nodoc:
flags = 0
flags |= FLAG_REDUNDANT if opts[:redundant]

expires_at = now + (opts[:ttl] || DEFAULT_TTL).to_i
opts = opts.slice(*PUBLISHING_KEYS)
opts[:message_id] = generate_uuid.to_s
Expand Down
1 change: 1 addition & 0 deletions lib/beetle/publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def publish(message_name, data, opts={}) #:nodoc:
opts = @client.messages[message_name].merge(opts.symbolize_keys)
exchange_name = opts.delete(:exchange)
opts.delete(:queue)

recycle_dead_servers unless @dead_servers.empty?
throttle!
if opts[:redundant]
Expand Down
101 changes: 101 additions & 0 deletions lib/beetle/single_broker_message.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
module Beetle
class SingleBrokerMessage < Message
def initialize(*args, **kwargs)
super(*args, **kwargs)
@store = nil
end

def ack!
logger.debug "Beetle: ack! for message #{msg_id}"
header.ack
end

def set_delay!
log_not_supported("delay between retries")
end

def delayed?
log_not_supported("delay between retries")
false
end

def redundant?
false
end

def increment_execution_attempts!; end

def attempts_limit_reached?(_attempts = nil)
# TODO: implement
false
end

def exceptions_limit_reached?
# TODO: implement
false
end

private

def log_not_supported(what)
logger.warn "Beetle: Feature not supported in single broker mode => #{what}"
end

def run_handler!(handler)
case result = run_handler(handler)
when RC::OK
ack!
result
else
handler_failed!(result)
end
end

def handler_failed!(result)
if attempts_limit_reached?
ack!
logger.debug "Beetle: reached the handler execution attempts limit: #{attempts_limit} on #{msg_id}"
RC::AttemptsLimitReached
elsif exceptions_limit_reached?
ack!
logger.debug "Beetle: reached the handler exceptions limit: #{exceptions_limit} on #{msg_id}"
RC::ExceptionsLimitReached
elsif !exception_accepted?
ack!
logger.debug "Beetle: `#{@exception.class.name}` not accepted: `retry_on`=[#{retry_on.join(',')}] on #{msg_id}"
RC::ExceptionNotAccepted
else
result
end
end

# Open questions:
# - do we need to support timeouts that span executions?
def process_internal(handler)
if @exception
ack!
RC::DecodingError
elsif @pre_exception
ack!
RC::PreprocessingError
elsif expired?
logger.warn "Beetle: ignored expired message (#{msg_id})!"
ack!
RC::Ancient
elsif simple?
ack!
run_handler(handler) == RC::HandlerCrash ? RC::AttemptsLimitReached : RC::OK
elsif attempts_limit_reached?
ack!
logger.warn "Beetle: reached the handler execution attempts limit: #{attempts_limit} on #{msg_id}"
RC::AttemptsLimitReached
elsif exceptions_limit_reached?
ack!
logger.warn "Beetle: reached the handler exceptions limit: #{exceptions_limit} on #{msg_id}"
RC::ExceptionsLimitReached
else
run_handler!(handler)
end
end
end
end
6 changes: 5 additions & 1 deletion lib/beetle/subscriber.rb
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,11 @@ def create_subscription_callback(queue_name, amqp_queue_name, handler, opts)
end
begin
message_options = opts.merge(:server => server, :store => @client.deduplication_store)
m = Message.new(amqp_queue_name, header, data, message_options)
m = if single_broker_mode?
SingleBrokerMessage.create(amqp_queue_name, header, data, message_options)
else
Message.create(amqp_queue_name, header, data, message_options)
end
processor = Handler.create(handler, opts)
result = m.process(processor)
if result.reject?
Expand Down
59 changes: 27 additions & 32 deletions test/beetle/bunny_behavior_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ class BunnyBehaviorTest < Minitest::Test
assert_equal({"bar" => "baz"}, headers["table"])
end



test "publishing redundantly does not leave the garbage in dedup store" do
Beetle.config.servers = "localhost:5672,localhost:5673"
client = Beetle::Client.new
Expand All @@ -38,7 +36,7 @@ class BunnyBehaviorTest < Minitest::Test
# empty the dedup store
client.deduplication_store.flushdb

handler = TestHandler.new(stop_listening_after_n_post_processes = 2, client = client)
handler = TestHandler.new(2, client = client)
client.register_handler(:test_garbage, handler)
published = client.publish(:test_garbage, 'bam', :redundant =>true)
listen(client)
Expand All @@ -49,9 +47,9 @@ class BunnyBehaviorTest < Minitest::Test
message = messages_processed.first
assert_equal 2, published
assert_equal "bam", message.data
Beetle::DeduplicationStore::KEY_SUFFIXES.each{|suffix|
assert_equal false, client.deduplication_store.exists(message.msg_id, suffix)
}
Beetle::DeduplicationStore::KEY_SUFFIXES.each do |suffix|
assert_equal false, client.deduplication_store.exists(message.msg_id, suffix)
end
end

test "process redundant message once" do
Expand All @@ -64,7 +62,7 @@ class BunnyBehaviorTest < Minitest::Test
# empty the dedup store
client.deduplication_store.flushdb

handler = TestHandler.new(stop_listening_after_n_post_processes = 2, client = client)
handler = TestHandler.new(2, client = client)
client.register_handler(:test_processing, handler)
published = client.publish(:test_processing, 'bam', :redundant =>true)
listen(client)
Expand Down Expand Up @@ -98,16 +96,20 @@ class BunnyBehaviorTest < Minitest::Test

assert_equal 1, published
assert_equal "bam", message.data
Beetle::DeduplicationStore::KEY_SUFFIXES.map{|suffix|
assert_equal false, client.deduplication_store.exists(message.msg_id, suffix)
}
Beetle::DeduplicationStore::KEY_SUFFIXES.map do |suffix|
assert_equal false, client.deduplication_store.exists(message.msg_id, suffix)
end
end

# FIXME: that's not really testing that publisher confirms work
test "publishing with confirms works as expected" do
Beetle.config.servers = "localhost:5672"
client = Beetle::Client.new
config = Beetle::Configuration.new
config.servers = "localhost:5672"
config.publisher_confirms = true
client = Beetle::Client.new(config)

client.register_queue(:test_publisher_confirms)
client.register_message(:test_publisher_confirms, :publisher_confirms => true)
client.register_message(:test_publisher_confirms)
# purge the test queue
client.purge(:test_publisher_confirms)

Expand All @@ -119,26 +121,21 @@ class BunnyBehaviorTest < Minitest::Test
client.stop_publishing

assert_equal 1, published
assert_equal "bam", message.data

assert_equal "bam", message.data
end


def listen(client , timeout = 1)
Timeout.timeout(timeout) do
client.listen
def listen(client, timeout = 1)
Timeout.timeout(timeout) do
client.listen
end
rescue Timeout::Error
puts "Client listen timed out after #{timeout} seconds"
nil
rescue Timeout::Error
puts "Client listen timed out after #{timeout} seconds"
nil
end


class TestHandler < Beetle::Handler

attr_reader :messages_processed
attr_reader :pre_process_invocations
attr_reader :post_process_invocations
attr_reader :messages_processed, :pre_process_invocations, :post_process_invocations

def initialize(stop_listening_after_n_post_processes, client)
super()
Expand All @@ -149,22 +146,20 @@ def initialize(stop_listening_after_n_post_processes, client)
@messages_processed = []
end

def pre_process(message)
def pre_process(_message)
@pre_process_invocations += 1
end

def process
@messages_processed << message
end

def post_process
@post_process_invocations += 1
if @post_process_invocations >= @stop_listening_after_n_post_processes
@client.stop_listening
end
return unless @post_process_invocations >= @stop_listening_after_n_post_processes
@client.stop_listening
end

end


end
18 changes: 18 additions & 0 deletions test/beetle/client_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -477,4 +477,22 @@ def setup
end
end

class SingleBrokerModeTest < Minitest::Test
test "when only one server is configured we set single broker mode" do
config = Configuration.new
config.servers = "localhost:5672"
client = Client.new(config)

assert client.single_broker_mode?
end

test "when more than one server is configured we set single broker mode to false" do
config = Configuration.new
config.servers = "localhost:5672,localhost:5673"
client = Client.new(config)

refute client.single_broker_mode?
end
end

end
Loading
Loading