Skip to content

Commit ad022a6

Browse files
committed
WIP - Converts messages published into Redis to json format to allow for language agnostic polling
1 parent 4376100 commit ad022a6

7 files changed

Lines changed: 93 additions & 117 deletions

File tree

active_publisher.gemspec

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,5 @@ Gem::Specification.new do |spec|
3737
spec.add_development_dependency "pry"
3838
spec.add_development_dependency "rake"
3939
spec.add_development_dependency "rspec", "~> 3.2"
40+
spec.add_development_dependency "rspec-benchmark"
4041
end

lib/active_publisher/async/redis_adapter.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ def initialize(new_redis_pool)
4242

4343
def publish(route, payload, exchange_name, options = {})
4444
message = ::ActivePublisher::Message.new(route, payload, exchange_name, options)
45-
queue << ::Marshal.dump(message)
45+
queue << message.to_json
4646
flush_queue! if queue.size >= flush_min || options[:flush_queue]
4747

4848
nil

lib/active_publisher/async/redis_adapter/redis_multi_pop_queue.rb

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
require "json"
2+
13
module ActivePublisher
24
module Async
35
module RedisAdapter
@@ -10,7 +12,7 @@ def initialize(redis_connection_pool, new_list_key)
1012
end
1113

1214
def <<(message)
13-
encoded_message = ::Marshal.dump(message)
15+
encoded_message = message.to_json
1416

1517
redis_pool.with do |redis|
1618
redis.rpush(list_key, encoded_message)
@@ -24,7 +26,7 @@ def concat(*messages)
2426

2527
encoded_messages = []
2628
messages.each do |message|
27-
encoded_messages << ::Marshal.dump(message)
29+
encoded_messages << message.to_json
2830
end
2931

3032
redis_pool.with do |redis|
@@ -92,10 +94,12 @@ def shift(number)
9294
messages = [messages] unless messages.respond_to?(:each)
9395

9496
shifted_messages = []
97+
9598
messages.each do |message|
9699
next if message.nil?
97-
98-
shifted_messages << ::Marshal.load(message)
100+
# TODO: This should probably attempt to ::Marshal.load and fall back to JSON for
101+
# apps cutting over to new serialization.
102+
shifted_messages << ::ActivePublisher::Message.from_json(message)
99103
end
100104

101105
shifted_messages

lib/active_publisher/message.rb

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,20 @@
1+
require "json"
12
module ActivePublisher
2-
class Message < Struct.new(:route, :payload, :exchange_name, :options); end
3+
class Message < Struct.new(:route, :payload, :exchange_name, :options)
4+
class << self
5+
def from_json(payload)
6+
parsed = JSON.load(payload)
7+
self.new(
8+
parsed["route"],
9+
parsed["payload"],
10+
parsed["exchange_name"],
11+
parsed["options"],
12+
)
13+
end
14+
end
15+
16+
def to_json
17+
self.to_h.to_json
18+
end
19+
end
320
end
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
describe ::ActivePublisher::Async::RedisAdapter::Adapter do
2+
subject { described_class.new(redis_pool) }
3+
let(:route) { "test" }
4+
let(:payload) { "payload" }
5+
let(:exchange_name) { "place" }
6+
let(:options) { { :flush_queue => true, :test => :ok } }
7+
let(:message) { ::ActivePublisher::Message.new(route, payload, exchange_name, options) }
8+
let(:redis_pool) { ::ConnectionPool.new(:size => 5) { ::Redis.new } }
9+
10+
describe "#publish.benchmark" do
11+
before do
12+
allow(::ActivePublisher::Message).to receive(:new).with(route, payload, exchange_name, options).and_return(message)
13+
end
14+
15+
it "can serialize messages to publish into redis in under 3ms" do
16+
expect {
17+
expect_any_instance_of(::Redis).to receive(:rpush)
18+
subject.publish(route, payload, exchange_name, options)
19+
}.to perform_under(3).ms.sample(1_000).times
20+
end
21+
end
22+
23+
describe "#shutdown!" do
24+
# This is called when the rspec finishes. I'm sure we can make this a better test.
25+
end
26+
end

spec/lib/active_publisher/async/redis_adapter/redis_multi_pop_queue_spec.rb

Lines changed: 34 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
describe ::ActivePublisher::Async::RedisAdapter::RedisMultiPopQueue do
22
let(:list_key) { ::ActivePublisher::Async::RedisAdapter::REDIS_LIST_KEY }
33
let(:redis_pool) { ::ConnectionPool.new(:size => 5) { ::Redis.new } }
4+
let(:message) { ::ActivePublisher::Message.new('rtg.key', 'payload', 'some.exchange', {})}
5+
let(:ten_messages) { 10.times.map { message } }
46
subject { described_class.new(redis_pool, list_key) }
57

68
describe "initialize with a redis_pool and list_key" do
@@ -13,29 +15,18 @@
1315

1416
describe "#<<" do
1517
it "pushes 1 item on the list" do
16-
subject << "derp"
18+
subject << message
1719
expect(subject.size).to be 1
18-
expect(subject.pop_up_to(100)).to eq(["derp"])
20+
expect(subject.pop_up_to(100)).to eq([message])
1921
end
2022

2123
it "pushes 10 items on the list" do
2224
10.times do
23-
subject << "derp"
25+
subject << message
2426
end
2527

2628
expect(subject.size).to be 10
27-
expect(subject.pop_up_to(100)).to eq([
28-
"derp",
29-
"derp",
30-
"derp",
31-
"derp",
32-
"derp",
33-
"derp",
34-
"derp",
35-
"derp",
36-
"derp",
37-
"derp",
38-
])
29+
expect(subject.pop_up_to(100)).to eq(ten_messages)
3930
end
4031
end
4132

@@ -45,86 +36,40 @@
4536
end
4637

4738
it "pushes 1 item on the list" do
48-
subject.concat("derp")
39+
subject.concat(message)
4940
expect(subject.size).to be 1
50-
expect(subject.pop_up_to(100)).to eq(["derp"])
41+
expect(subject.pop_up_to(100)).to eq([message])
5142
end
5243

5344
it "pushes 10 items on the list" do
5445
10.times do
55-
subject.concat("derp")
46+
subject.concat(message)
5647
end
5748

5849
expect(subject.size).to be 10
59-
expect(subject.pop_up_to(100)).to eq([
60-
"derp",
61-
"derp",
62-
"derp",
63-
"derp",
64-
"derp",
65-
"derp",
66-
"derp",
67-
"derp",
68-
"derp",
69-
"derp",
70-
])
50+
expect(subject.pop_up_to(100)).to eq(ten_messages)
7151
end
7252

7353
it "pushes 10 items on the list in single concat" do
74-
subject.concat("derp",
75-
"derp",
76-
"derp",
77-
"derp",
78-
"derp",
79-
"derp",
80-
"derp",
81-
"derp",
82-
"derp",
83-
"derp")
54+
subject.concat(message,
55+
message,
56+
message,
57+
message,
58+
message,
59+
message,
60+
message,
61+
message,
62+
message,
63+
message)
8464

8565
expect(subject.size).to be 10
86-
expect(subject.pop_up_to(100)).to eq([
87-
"derp",
88-
"derp",
89-
"derp",
90-
"derp",
91-
"derp",
92-
"derp",
93-
"derp",
94-
"derp",
95-
"derp",
96-
"derp",
97-
])
66+
expect(subject.pop_up_to(100)).to eq(ten_messages)
9867
end
9968

10069
it "pushes 10 items on the list in single concat (with array)" do
101-
array = [
102-
"derp",
103-
"derp",
104-
"derp",
105-
"derp",
106-
"derp",
107-
"derp",
108-
"derp",
109-
"derp",
110-
"derp",
111-
"derp"
112-
]
113-
114-
subject.concat(array)
70+
subject.concat(ten_messages)
11571
expect(subject.size).to be 10
116-
expect(subject.pop_up_to(100)).to eq([
117-
"derp",
118-
"derp",
119-
"derp",
120-
"derp",
121-
"derp",
122-
"derp",
123-
"derp",
124-
"derp",
125-
"derp",
126-
"derp",
127-
])
72+
expect(subject.pop_up_to(100)).to eq(ten_messages)
12873
end
12974
end
13075

@@ -135,7 +80,7 @@
13580

13681
it "is false when a single item is inserted to the list_key List" do
13782
redis_pool.with do |redis|
138-
redis.rpush(list_key, "derp")
83+
redis.rpush(list_key, message.to_json)
13984
end
14085

14186
expect(subject.empty?).to be false
@@ -144,7 +89,7 @@
14489
it "is false when ten items are inserted to the list_key List" do
14590
redis_pool.with do |redis|
14691
10.times do
147-
redis.rpush(list_key, "derp")
92+
redis.rpush(list_key, message.to_json)
14893
end
14994
end
15095

@@ -159,31 +104,20 @@
159104

160105
it "returns 1 item when a single item is inserted to the list_key List" do
161106
redis_pool.with do |redis|
162-
redis.rpush(list_key, ::Marshal.dump("derp"))
107+
redis.rpush(list_key, message.to_json)
163108
end
164109

165-
expect(subject.pop_up_to(100)).to eq(["derp"])
110+
expect(subject.pop_up_to(100)).to eq([message])
166111
end
167112

168113
it "is 10 when ten items are inserted to the list_key List" do
169114
redis_pool.with do |redis|
170115
10.times do
171-
redis.rpush(list_key, ::Marshal.dump("derp"))
116+
redis.rpush(list_key, message.to_json)
172117
end
173118
end
174119

175-
expect(subject.pop_up_to(100)).to eq([
176-
"derp",
177-
"derp",
178-
"derp",
179-
"derp",
180-
"derp",
181-
"derp",
182-
"derp",
183-
"derp",
184-
"derp",
185-
"derp",
186-
])
120+
expect(subject.pop_up_to(100)).to eq(ten_messages)
187121
end
188122
end
189123

@@ -194,31 +128,20 @@
194128

195129
it "returns 1 item when a single item is inserted to the list_key List" do
196130
redis_pool.with do |redis|
197-
redis.rpush(list_key, ::Marshal.dump("derp"))
131+
redis.rpush(list_key, message.to_json)
198132
end
199133

200-
expect(subject.shift(100)).to eq(["derp"])
134+
expect(subject.shift(100)).to eq([message])
201135
end
202136

203137
it "is 10 when ten items are inserted to the list_key List" do
204138
redis_pool.with do |redis|
205139
10.times do
206-
redis.rpush(list_key, ::Marshal.dump("derp"))
140+
redis.rpush(list_key, message.to_json)
207141
end
208142
end
209143

210-
expect(subject.shift(100)).to eq([
211-
"derp",
212-
"derp",
213-
"derp",
214-
"derp",
215-
"derp",
216-
"derp",
217-
"derp",
218-
"derp",
219-
"derp",
220-
"derp",
221-
])
144+
expect(subject.shift(100)).to eq(ten_messages)
222145
end
223146
end
224147

@@ -229,7 +152,7 @@
229152

230153
it "is 1 when a single item is inserted to the list_key List" do
231154
redis_pool.with do |redis|
232-
redis.rpush(list_key, "derp")
155+
redis.rpush(list_key, message.to_json)
233156
end
234157

235158
expect(subject.size).to be 1

spec/spec_helper.rb

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,17 @@
44
require "fakeredis/rspec"
55
require "active_publisher/async/redis_adapter"
66
require "connection_pool"
7+
require "rspec-benchmark"
78

89
::ActivePublisher::Async.publisher_adapter = ::ActivePublisher::Async::InMemoryAdapter::Adapter.new
910
# Silence the logger
1011
$TESTING = true
1112
::ActivePublisher::Logging.initialize_logger(nil)
1213

14+
RSpec.configure do |config|
15+
config.include RSpec::Benchmark::Matchers
16+
end
17+
1318
def verify_expectation_within(number_of_seconds, check_every = 0.02)
1419
waiting_since = ::Time.now
1520
begin

0 commit comments

Comments
 (0)