1919import java .util .Queue ;
2020import java .util .Random ;
2121import java .util .concurrent .atomic .AtomicInteger ;
22+ import java .util .concurrent .ArrayBlockingQueue ;
2223import java .util .concurrent .Callable ;
2324import java .util .concurrent .CountDownLatch ;
2425import java .util .concurrent .ConcurrentLinkedQueue ;
@@ -81,22 +82,42 @@ protected void writeTo(StringBuilder builder){}
8182 // fakeProcessor store messages from the telemetry only
8283 public static class FakeProcessor extends StatsDProcessor {
8384
84- private final Queue <Message > messages ;
85+ private final Queue <Message >[] messages ;
86+ private final Queue <Integer >[] processorWorkQueue ;
87+ private final AtomicInteger [] qsize ; // qSize will not reflect actual size, but a close estimate.
8588 private final AtomicInteger messageSent = new AtomicInteger (0 );
8689 private final AtomicInteger messageAggregated = new AtomicInteger (0 );
8790
8891 FakeProcessor (final StatsDClientErrorHandler handler ) throws Exception {
89- super (0 , handler , 0 , 1 , 1 , 0 , 0 );
90- this .messages = new ConcurrentLinkedQueue <>();
91- }
92+ super (50 , handler , 0 , 1 , 1 , 1 , 0 , StatsDAggregator .DEFAULT_SHARDS );
93+
94+ // 1 queue (lockShardGrain = 1)
95+ this .qsize = new AtomicInteger [lockShardGrain ];
96+ this .messages = new ArrayBlockingQueue [lockShardGrain ];
97+ for (int i = 0 ; i < lockShardGrain ; i ++) {
98+ this .qsize [i ] = new AtomicInteger ();
99+ this .messages [i ] = new ArrayBlockingQueue <Message >(getQcapacity ());
100+ this .qsize [i ].set (0 );
101+ }
92102
103+ // 1 worker
104+ this .processorWorkQueue = new ArrayBlockingQueue [workers ];
105+ for (int i = 0 ; i < workers ; i ++) {
106+ this .processorWorkQueue [i ] = new ArrayBlockingQueue <Integer >(getQcapacity ());
107+ }
108+ }
93109
94110 private class FakeProcessingTask extends StatsDProcessor .ProcessingTask {
111+
112+ public FakeProcessingTask (int id ) {
113+ super (id );
114+ }
115+
95116 @ Override
96117 public void run () {
97118
98119 while (!shutdown ) {
99- final Message message = messages .poll ();
120+ final Message message = messages [ 0 ] .poll ();
100121 if (message == null ) {
101122
102123 try {
@@ -106,6 +127,7 @@ public void run() {
106127 continue ;
107128 }
108129
130+ qsize [0 ].decrementAndGet ();
109131 if (aggregator .aggregateMessage (message )) {
110132 messageAggregated .incrementAndGet ();
111133 continue ;
@@ -118,23 +140,38 @@ public void run() {
118140 }
119141
120142 @ Override
121- protected StatsDProcessor .ProcessingTask createProcessingTask () {
122- return new FakeProcessingTask ();
143+ protected StatsDProcessor .ProcessingTask createProcessingTask (int id ) {
144+ return new FakeProcessingTask (id );
123145 }
124146
125147 @ Override
126148 public boolean send (final Message msg ) {
127- messages .offer (msg );
128- return true ;
149+ if (!shutdown ) {
150+ int threadId = getThreadId ();
151+ int shard = threadId % lockShardGrain ;
152+ int processQueue = threadId % workers ;
153+
154+ if (qsize [shard ].get () < qcapacity ) {
155+ messages [shard ].offer (msg );
156+ qsize [shard ].incrementAndGet ();
157+ processorWorkQueue [processQueue ].offer (shard );
158+ return true ;
159+ }
160+ }
161+
162+ return false ;
129163 }
130164
131- public Queue <Message > getMessages () {
132- return messages ;
165+ public Queue <Message > getMessages (int id ) {
166+ return messages [ id ] ;
133167 }
134168
135169 public void clear () {
136170 try {
137- messages .clear ();
171+
172+ for (int i = 0 ; i < lockShardGrain ; i ++) {
173+ messages [i ].clear ();
174+ }
138175 highPrioMessages .clear ();
139176 } catch (Exception e ) {}
140177 }
@@ -181,7 +218,7 @@ public void aggregate_messages() throws Exception {
181218 fakeProcessor .send (new FakeAlphaMessage ("some.set" , Message .Type .SET , "value" ));
182219 }
183220
184- waitForQueueSize (fakeProcessor .messages , 0 );
221+ waitForQueueSize (fakeProcessor .messages [ 0 ] , 0 );
185222
186223 // 10 gauges, 10 counts, 10 sets
187224 assertEquals (30 , fakeProcessor .messageAggregated .get ());
@@ -206,7 +243,7 @@ public void aggregation_sharding() throws Exception {
206243 fakeProcessor .send (gauge );
207244 }
208245
209- waitForQueueSize (fakeProcessor .messages , 0 );
246+ waitForQueueSize (fakeProcessor .messages [ 0 ] , 0 );
210247
211248 for (int i =0 ; i <StatsDAggregator .DEFAULT_SHARDS ; i ++) {
212249 Map <Message , Message > map = fakeProcessor .aggregator .aggregateMetrics .get (i );
0 commit comments