Skip to content

Commit a8191ed

Browse files
committed
chore: delete git module since it's not used in ably-java
1 parent 41e3213 commit a8191ed

17 files changed

Lines changed: 1355 additions & 35 deletions

.gitmodules

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +0,0 @@
1-
[submodule "lib/src/test/resources/ably-common"]
2-
path = lib/src/test/resources/ably-common
3-
url = https://github.com/ably/ably-common.git

lib/src/main/java/io/ably/lib/realtime/ChannelBase.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import io.ably.lib.http.HttpUtils;
1616
import io.ably.lib.objects.LiveObjects;
1717
import io.ably.lib.objects.LiveObjectsPlugin;
18+
import io.ably.lib.rest.RestAnnotations;
1819
import io.ably.lib.transport.ConnectionManager;
1920
import io.ably.lib.transport.ConnectionManager.QueuedMessage;
2021
import io.ably.lib.transport.Defaults;
@@ -105,6 +106,8 @@ public LiveObjects getObjects() throws AblyException {
105106
return liveObjectsPlugin.getInstance(name);
106107
}
107108

109+
public final RealtimeAnnotations annotations;
110+
108111
/***
109112
* internal
110113
*
@@ -887,7 +890,7 @@ private void onMessage(final ProtocolMessage protocolMessage) {
887890
if(msg.createdAt == null && msg.action == MessageAction.MESSAGE_CREATE) msg.createdAt = msg.timestamp;
888891

889892
try {
890-
msg.decode(options, decodingContext);
893+
if (msg.data != null) msg.decode(options, decodingContext);
891894
} catch (MessageDecodeException e) {
892895
if (e.errorInfo.code == 40018) {
893896
Log.e(TAG, String.format(Locale.ROOT, "Delta message decode failure - %s. Message id = %s, channel = %s", e.errorInfo.message, msg.id, name));
@@ -1310,6 +1313,10 @@ else if(stateChange.current.equals(failureState)) {
13101313
state = ChannelState.initialized;
13111314
this.decodingContext = new DecodingContext();
13121315
this.liveObjectsPlugin = liveObjectsPlugin;
1316+
this.annotations = new RealtimeAnnotations(
1317+
this,
1318+
new RestAnnotations(name, ably.http, ably.options, options)
1319+
);
13131320
}
13141321

13151322
void onChannelMessage(ProtocolMessage msg) {
@@ -1376,6 +1383,9 @@ void onChannelMessage(ProtocolMessage msg) {
13761383
case error:
13771384
setFailed(msg.error);
13781385
break;
1386+
case annotation:
1387+
annotations.onAnnotation(msg);
1388+
break;
13791389
default:
13801390
Log.e(TAG, "onChannelMessage(): Unexpected message action (" + msg.action + ")");
13811391
}
@@ -1402,6 +1412,17 @@ public void once(ChannelState state, ChannelStateListener listener) {
14021412
super.once(state.getChannelEvent(), listener);
14031413
}
14041414

1415+
/**
1416+
* (Internal) Sends a protocol message and provides a callback for completion.
1417+
*
1418+
* @param protocolMessage the protocol message to be sent
1419+
* @param listener the listener to be notified upon completion of the message delivery
1420+
*/
1421+
public void sendProtocolMessage(ProtocolMessage protocolMessage, CompletionListener listener) throws AblyException {
1422+
ConnectionManager connectionManager = ably.connection.connectionManager;
1423+
connectionManager.send(protocolMessage, ably.options.queueMessages, listener);
1424+
}
1425+
14051426
private static final String TAG = Channel.class.getName();
14061427
final AblyRealtime ably;
14071428
final String basePath;
Lines changed: 325 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,325 @@
1+
package io.ably.lib.realtime;
2+
3+
import io.ably.lib.rest.RestAnnotations;
4+
import io.ably.lib.types.AblyException;
5+
import io.ably.lib.types.Annotation;
6+
import io.ably.lib.types.AnnotationAction;
7+
import io.ably.lib.types.AsyncPaginatedResult;
8+
import io.ably.lib.types.Callback;
9+
import io.ably.lib.types.ErrorInfo;
10+
import io.ably.lib.types.MessageDecodeException;
11+
import io.ably.lib.types.PaginatedResult;
12+
import io.ably.lib.types.Param;
13+
import io.ably.lib.types.ProtocolMessage;
14+
import io.ably.lib.util.Log;
15+
import io.ably.lib.util.Multicaster;
16+
17+
import java.util.ArrayList;
18+
import java.util.HashMap;
19+
import java.util.List;
20+
import java.util.Locale;
21+
import java.util.Map;
22+
23+
/**
24+
* RealtimeAnnotation provides subscription capabilities for annotations received on a channel.
25+
* It allows adding or removing listeners to handle annotation events and facilitates broadcasting
26+
* those events to the appropriate listeners.
27+
* <p>
28+
* Note: This is an experimental API. While the underlying functionality is stable,
29+
* the public API may change in future releases.
30+
*/
31+
public class RealtimeAnnotations {
32+
33+
private static final String TAG = RealtimeAnnotations.class.getName();
34+
35+
private final ChannelBase channel;
36+
private final RestAnnotations restAnnotations;
37+
private final AnnotationMulticaster listeners = new AnnotationMulticaster();
38+
private final Map<String, AnnotationMulticaster> typeListeners = new HashMap<>();
39+
40+
public RealtimeAnnotations(ChannelBase channel, RestAnnotations restAnnotations) {
41+
this.channel = channel;
42+
this.restAnnotations = restAnnotations;
43+
}
44+
45+
/**
46+
* Publishes an annotation to the specified channel with the given message serial.
47+
* Validates and encodes the annotation before sending it as a protocol message.
48+
* <p>
49+
* Note: This is an experimental API. While the underlying functionality is stable,
50+
* the public API may change in future releases.
51+
*
52+
* @param messageSerial the unique serial identifier for the message to be annotated
53+
* @param annotation the annotation object associated with the message
54+
* @param listener the completion listener to handle success or failure during the publish process
55+
* @throws AblyException if an error occurs during validation, encoding, or sending the annotation
56+
*/
57+
public void publish(String messageSerial, Annotation annotation, CompletionListener listener) throws AblyException {
58+
Log.v(TAG, String.format("publish(MsgSerial, Annotation); channel = %s", channel.name));
59+
60+
// (RSAN1, RSAN1a3)
61+
if (annotation.type == null) {
62+
throw AblyException.fromErrorInfo(new ErrorInfo("Annotation type must be specified", 400, 40000));
63+
}
64+
65+
// (RSAN1, RSAN1c1)
66+
annotation.messageSerial = messageSerial;
67+
// (RSAN1, RSAN1c2)
68+
if (annotation.action == null) {
69+
annotation.action = AnnotationAction.ANNOTATION_CREATE;
70+
}
71+
72+
try {
73+
// (RSAN1, RSAN1c3)
74+
annotation.encode(channel.options);
75+
} catch (MessageDecodeException e) {
76+
throw AblyException.fromThrowable(e);
77+
}
78+
79+
Log.v(TAG, String.format("RealtimeAnnotations.publish(): channelName = %s, sending annotation with messageSerial = %s, type = %s",
80+
channel.name, messageSerial, annotation.type));
81+
82+
ProtocolMessage protocolMessage = new ProtocolMessage();
83+
protocolMessage.action = ProtocolMessage.Action.annotation;
84+
protocolMessage.channel = channel.name;
85+
protocolMessage.annotations = new Annotation[]{annotation};
86+
87+
channel.sendProtocolMessage(protocolMessage, listener);
88+
}
89+
90+
/**
91+
* Publishes an annotation to the specified channel with the given message serial.
92+
* Validates and encodes the annotation before sending it as a protocol message.
93+
* <p>
94+
* Note: This is an experimental API. While the underlying functionality is stable,
95+
* the public API may change in future releases.
96+
*
97+
* @param messageSerial the unique serial identifier for the message to be annotated
98+
* @param annotation the annotation object associated with the message
99+
* @throws AblyException if an error occurs during validation, encoding, or sending the annotation
100+
*/
101+
public void publish(String messageSerial, Annotation annotation) throws AblyException {
102+
publish(messageSerial, annotation, null);
103+
}
104+
105+
/**
106+
* Deletes an annotation associated with the specified message serial.
107+
* Sets the annotation action to `ANNOTATION_DELETE` and publishes the
108+
* update to the channel with the given completion listener.
109+
* <p>
110+
* Note: This is an experimental API. While the underlying functionality is stable,
111+
* the public API may change in future releases.
112+
*
113+
* @param messageSerial the unique serial identifier for the message being annotated
114+
* @param annotation the annotation object to be deleted
115+
* @param listener the completion listener to handle success or failure during the deletion process
116+
* @throws AblyException if an error occurs during the deletion or publishing process
117+
*/
118+
public void delete(String messageSerial, Annotation annotation, CompletionListener listener) throws AblyException {
119+
Log.v(TAG, String.format("delete(MsgSerial, Annotation); channel = %s", channel.name));
120+
annotation.action = AnnotationAction.ANNOTATION_DELETE;
121+
publish(messageSerial, annotation, listener);
122+
}
123+
124+
public void delete(String messageSerial, Annotation annotation) throws AblyException {
125+
delete(messageSerial, annotation, null);
126+
}
127+
128+
/**
129+
* Retrieves a paginated list of annotations associated with the specified message serial.
130+
* <p>
131+
* Note: This is an experimental API. While the underlying functionality is stable,
132+
* the public API may change in future releases.
133+
*
134+
* @param messageSerial the unique serial identifier for the message being annotated.
135+
* @param params an array of query parameters for filtering or modifying the request.
136+
* @return a {@link PaginatedResult} containing the matching annotations.
137+
* @throws AblyException if an error occurs during the retrieval process.
138+
*/
139+
public PaginatedResult<Annotation> get(String messageSerial, Param[] params) throws AblyException {
140+
return restAnnotations.get(messageSerial, params);
141+
}
142+
143+
/**
144+
* Retrieves a paginated list of annotations associated with the specified message serial.
145+
* <p>
146+
* Note: This is an experimental API. While the underlying functionality is stable,
147+
* the public API may change in future releases.
148+
*
149+
* @param messageSerial the unique serial identifier for the message being annotated
150+
* @return a PaginatedResult containing the matching annotations
151+
* @throws AblyException if an error occurs during the retrieval process
152+
*/
153+
public PaginatedResult<Annotation> get(String messageSerial) throws AblyException {
154+
return restAnnotations.get(messageSerial, null);
155+
}
156+
157+
/**
158+
* Asynchronously retrieves a paginated list of annotations associated with the specified message serial.
159+
* <p>
160+
* Note: This is an experimental API. While the underlying functionality is stable,
161+
* the public API may change in future releases.
162+
*
163+
* @param messageSerial the unique serial identifier for the message being annotated.
164+
* @param params an array of query parameters for filtering or modifying the request.
165+
* @param callback a callback to handle the result asynchronously, providing an {@link AsyncPaginatedResult} containing the matching annotations.
166+
*/
167+
public void getAsync(String messageSerial, Param[] params, Callback<AsyncPaginatedResult<Annotation>> callback) {
168+
restAnnotations.getAsync(messageSerial, params, callback);
169+
}
170+
171+
/**
172+
* Asynchronously retrieves a paginated list of annotations associated with the specified message serial.
173+
* <p>
174+
* Note: This is an experimental API. While the underlying functionality is stable,
175+
* the public API may change in future releases.
176+
*
177+
* @param messageSerial the unique serial identifier for the message being annotated.
178+
* @param callback a callback to handle the result asynchronously, providing an {@link AsyncPaginatedResult} containing the matching annotations.
179+
*/
180+
public void getAsync(String messageSerial, Callback<AsyncPaginatedResult<Annotation>> callback) {
181+
restAnnotations.getAsync(messageSerial, null, callback);
182+
}
183+
184+
/**
185+
* Subscribes the given {@link AnnotationListener} to the channel, allowing it to receive annotations.
186+
* If the channel's attach on subscribe option is enabled, the channel is attached automatically.
187+
* <p>
188+
* Note: This is an experimental API. While the underlying functionality is stable,
189+
* the public API may change in future releases.
190+
*
191+
* @param listener the listener to be subscribed to the channel
192+
* @throws AblyException if an error occurs during channel attachment
193+
*/
194+
public synchronized void subscribe(AnnotationListener listener) throws AblyException {
195+
Log.v(TAG, String.format("subscribe(); annotations in channel = %s", channel.name));
196+
listeners.add(listener);
197+
if (channel.attachOnSubscribeEnabled()) {
198+
channel.attach();
199+
}
200+
}
201+
202+
/**
203+
* Unsubscribes the specified {@link AnnotationListener} from the channel, stopping it
204+
* from receiving further annotations. Any corresponding type-specific listeners
205+
* associated with the listener are also removed.
206+
* <p>
207+
* Note: This is an experimental API. While the underlying functionality is stable,
208+
* the public API may change in future releases.
209+
*
210+
* @param listener the {@link AnnotationListener} to be unsubscribed
211+
*/
212+
public synchronized void unsubscribe(AnnotationListener listener) {
213+
Log.v(TAG, String.format("unsubscribe(); annotations in channel = %s", channel.name));
214+
listeners.remove(listener);
215+
for (AnnotationMulticaster multicaster : typeListeners.values()) {
216+
multicaster.remove(listener);
217+
}
218+
}
219+
220+
/**
221+
* Subscribes the given {@link AnnotationListener} to the channel for a specific annotation type,
222+
* allowing it to receive annotations of the specified type. If the channel's attach on subscribe
223+
* option is enabled, the channel is attached automatically.
224+
* <p>
225+
* Note: This is an experimental API. While the underlying functionality is stable,
226+
* the public API may change in future releases.
227+
*
228+
* @param type the specific annotation type to subscribe to; if null, subscribes to all types
229+
* @param listener the {@link AnnotationListener} to be subscribed
230+
*/
231+
public synchronized void subscribe(String type, AnnotationListener listener) throws AblyException {
232+
Log.v(TAG, String.format("subscribe(); annotations in channel = %s; single type = %s", channel.name, type));
233+
subscribeImpl(type, listener);
234+
if (channel.attachOnSubscribeEnabled()) {
235+
channel.attach();
236+
}
237+
}
238+
239+
/**
240+
* Unsubscribes the specified {@link AnnotationListener} from receiving annotations
241+
* of a particular type within the channel. If there are no remaining listeners
242+
* for the specified type, the type-specific listener collection is also removed.
243+
* <p>
244+
* Note: This is an experimental API. While the underlying functionality is stable,
245+
* the public API may change in future releases.
246+
*
247+
* @param type the specific annotation type to unsubscribe from; if null, unsubscribes
248+
* from all annotations associated with the listener
249+
* @param listener the {@link AnnotationListener} to be unsubscribed
250+
*/
251+
public synchronized void unsubscribe(String type, AnnotationListener listener) {
252+
Log.v(TAG, String.format("unsubscribe(); annotations in channel = %s; single type = %s", channel.name, type));
253+
unsubscribeImpl(type, listener);
254+
}
255+
256+
/**
257+
* Internal method. Handles incoming annotation messages from the protocol layer.
258+
*
259+
* @param protocolMessage the protocol message containing annotation data
260+
*/
261+
public void onAnnotation(ProtocolMessage protocolMessage) {
262+
List<Annotation> annotations = new ArrayList<>();
263+
for (int i = 0; i < protocolMessage.annotations.length; i++) {
264+
Annotation annotation = protocolMessage.annotations[i];
265+
try {
266+
if (annotation.data != null) annotation.decode(channel.options);
267+
} catch (MessageDecodeException e) {
268+
Log.e(TAG, String.format(Locale.ROOT, "%s on channel %s", e.errorInfo.message, channel.name));
269+
}
270+
/* populate fields derived from protocol message */
271+
if (annotation.connectionId == null) annotation.connectionId = protocolMessage.connectionId;
272+
if (annotation.timestamp == 0) annotation.timestamp = protocolMessage.timestamp;
273+
if (annotation.id == null) annotation.id = protocolMessage.id + ':' + i;
274+
annotations.add(annotation);
275+
}
276+
broadcastAnnotation(annotations);
277+
}
278+
279+
private void broadcastAnnotation(List<Annotation> annotations) {
280+
for (Annotation annotation : annotations) {
281+
listeners.onAnnotation(annotation);
282+
283+
String type = annotation.type != null ? annotation.type : "";
284+
AnnotationMulticaster eventListener = typeListeners.get(type);
285+
if (eventListener != null) eventListener.onAnnotation(annotation);
286+
}
287+
}
288+
289+
private void subscribeImpl(String type, AnnotationListener listener) {
290+
String annotationType = type != null ? type : "";
291+
AnnotationMulticaster typeSpecificListeners = typeListeners.get(annotationType);
292+
if (typeSpecificListeners == null) {
293+
typeSpecificListeners = new AnnotationMulticaster();
294+
typeListeners.put(annotationType, typeSpecificListeners);
295+
}
296+
typeSpecificListeners.add(listener);
297+
}
298+
299+
private void unsubscribeImpl(String type, AnnotationListener listener) {
300+
AnnotationMulticaster listeners = typeListeners.get(type);
301+
if (listeners != null) {
302+
listeners.remove(listener);
303+
if (listeners.isEmpty()) {
304+
typeListeners.remove(type);
305+
}
306+
}
307+
}
308+
309+
public interface AnnotationListener {
310+
void onAnnotation(Annotation annotation);
311+
}
312+
313+
private static class AnnotationMulticaster extends Multicaster<AnnotationListener> implements AnnotationListener {
314+
@Override
315+
public void onAnnotation(Annotation annotation) {
316+
for (final AnnotationListener member : getMembers()) {
317+
try {
318+
member.onAnnotation(annotation);
319+
} catch (Exception e) {
320+
Log.e(TAG, e.getMessage(), e);
321+
}
322+
}
323+
}
324+
}
325+
}

0 commit comments

Comments
 (0)