1414import com .launchdarkly .sdk .android .subsystems .TransactionalDataStore ;
1515import com .launchdarkly .sdk .internal .http .HttpProperties ;
1616
17+ import java .io .Closeable ;
1718import java .net .URI ;
1819import java .util .ArrayList ;
1920import java .util .Arrays ;
3233 * <p>
3334 * Package-private — not part of the public SDK API.
3435 */
35- class FDv2DataSourceBuilder implements ComponentConfigurer <DataSource > {
36+ class FDv2DataSourceBuilder implements ComponentConfigurer <DataSource >, Closeable {
3637
37- private final Map <ConnectionMode , ModeDefinition > modeTable ;
38+ private Map <ConnectionMode , ModeDefinition > modeTable ;
3839 private final ConnectionMode startingMode ;
3940
4041 private ConnectionMode activeMode ;
4142 private boolean includeInitializers = true ; // false during mode switches to skip initializers (CONNMODE 2.0.1)
4243 private ScheduledExecutorService sharedExecutor ;
4344
4445 FDv2DataSourceBuilder () {
45- this (makeDefaultModeTable (), ConnectionMode .STREAMING );
46+ this .modeTable = null ; // built lazily in build() so lambdas can capture sharedExecutor
47+ this .startingMode = ConnectionMode .STREAMING ;
4648 }
4749
48- private static Map <ConnectionMode , ModeDefinition > makeDefaultModeTable () {
50+ private Map <ConnectionMode , ModeDefinition > makeDefaultModeTable () {
4951 ComponentConfigurer <Initializer > pollingInitializer = ctx -> {
50- ClientContextImpl impl = ClientContextImpl .get (ctx );
51- TransactionalDataStore store = impl .getTransactionalDataStore ();
52- SelectorSource selectorSource = store != null
53- ? new SelectorSourceFacade (store )
54- : () -> com .launchdarkly .sdk .fdv2 .Selector .EMPTY ;
55- URI pollingBase = StandardEndpoints .selectBaseUri (
56- ctx .getServiceEndpoints ().getPollingBaseUri (),
57- StandardEndpoints .DEFAULT_POLLING_BASE_URI ,
58- "polling" , ctx .getBaseLogger ());
59- HttpProperties httpProps = LDUtil .makeHttpProperties (ctx );
60- FDv2Requestor requestor = new DefaultFDv2Requestor (
61- ctx .getEvaluationContext (), pollingBase ,
62- StandardEndpoints .FDV2_POLLING_REQUEST_GET_BASE_PATH ,
63- StandardEndpoints .FDV2_POLLING_REQUEST_REPORT_BASE_PATH ,
64- httpProps , ctx .getHttp ().isUseReport (),
65- ctx .isEvaluationReasons (), null , ctx .getBaseLogger ());
66- return new FDv2PollingInitializer (requestor , selectorSource ,
67- Executors .newSingleThreadExecutor (), ctx .getBaseLogger ());
52+ DataSourceSetup s = new DataSourceSetup (ctx );
53+ FDv2Requestor requestor = makePollingRequestor (ctx , s .httpProps );
54+ return new FDv2PollingInitializer (requestor , s .selectorSource ,
55+ sharedExecutor , ctx .getBaseLogger ());
6856 };
6957
7058 ComponentConfigurer <Synchronizer > pollingSynchronizer = ctx -> {
71- ClientContextImpl impl = ClientContextImpl .get (ctx );
72- TransactionalDataStore store = impl .getTransactionalDataStore ();
73- SelectorSource selectorSource = store != null
74- ? new SelectorSourceFacade (store )
75- : () -> com .launchdarkly .sdk .fdv2 .Selector .EMPTY ;
76- URI pollingBase = StandardEndpoints .selectBaseUri (
77- ctx .getServiceEndpoints ().getPollingBaseUri (),
78- StandardEndpoints .DEFAULT_POLLING_BASE_URI ,
79- "polling" , ctx .getBaseLogger ());
80- HttpProperties httpProps = LDUtil .makeHttpProperties (ctx );
81- FDv2Requestor requestor = new DefaultFDv2Requestor (
82- ctx .getEvaluationContext (), pollingBase ,
83- StandardEndpoints .FDV2_POLLING_REQUEST_GET_BASE_PATH ,
84- StandardEndpoints .FDV2_POLLING_REQUEST_REPORT_BASE_PATH ,
85- httpProps , ctx .getHttp ().isUseReport (),
86- ctx .isEvaluationReasons (), null , ctx .getBaseLogger ());
87- ScheduledExecutorService exec = Executors .newScheduledThreadPool (1 );
88- return new FDv2PollingSynchronizer (requestor , selectorSource , exec ,
59+ DataSourceSetup s = new DataSourceSetup (ctx );
60+ FDv2Requestor requestor = makePollingRequestor (ctx , s .httpProps );
61+ return new FDv2PollingSynchronizer (requestor , s .selectorSource ,
62+ sharedExecutor ,
8963 0 , PollingDataSourceBuilder .DEFAULT_POLL_INTERVAL_MILLIS , ctx .getBaseLogger ());
9064 };
9165
9266 ComponentConfigurer <Synchronizer > streamingSynchronizer = ctx -> {
93- ClientContextImpl impl = ClientContextImpl .get (ctx );
94- TransactionalDataStore store = impl .getTransactionalDataStore ();
95- SelectorSource selectorSource = store != null
96- ? new SelectorSourceFacade (store )
97- : () -> com .launchdarkly .sdk .fdv2 .Selector .EMPTY ;
67+ DataSourceSetup s = new DataSourceSetup (ctx );
68+ // The streaming synchronizer uses a polling requestor for its internal
69+ // polling fallback (e.g. when the stream cannot be established).
70+ FDv2Requestor pollingRequestor = makePollingRequestor (ctx , s .httpProps );
9871 URI streamBase = StandardEndpoints .selectBaseUri (
9972 ctx .getServiceEndpoints ().getStreamingBaseUri (),
10073 StandardEndpoints .DEFAULT_STREAMING_BASE_URI ,
10174 "streaming" , ctx .getBaseLogger ());
102- URI pollingBase = StandardEndpoints .selectBaseUri (
103- ctx .getServiceEndpoints ().getPollingBaseUri (),
104- StandardEndpoints .DEFAULT_POLLING_BASE_URI ,
105- "polling" , ctx .getBaseLogger ());
106- HttpProperties httpProps = LDUtil .makeHttpProperties (ctx );
107- FDv2Requestor requestor = new DefaultFDv2Requestor (
108- ctx .getEvaluationContext (), pollingBase ,
109- StandardEndpoints .FDV2_POLLING_REQUEST_GET_BASE_PATH ,
110- StandardEndpoints .FDV2_POLLING_REQUEST_REPORT_BASE_PATH ,
111- httpProps , ctx .getHttp ().isUseReport (),
112- ctx .isEvaluationReasons (), null , ctx .getBaseLogger ());
11375 return new FDv2StreamingSynchronizer (
114- ctx .getEvaluationContext (), selectorSource , streamBase ,
76+ ctx .getEvaluationContext (), s . selectorSource , streamBase ,
11577 StandardEndpoints .FDV2_STREAMING_REQUEST_BASE_PATH ,
116- requestor ,
78+ pollingRequestor ,
11779 StreamingDataSourceBuilder .DEFAULT_INITIAL_RECONNECT_DELAY_MILLIS ,
11880 ctx .isEvaluationReasons (), ctx .getHttp ().isUseReport (),
119- httpProps , Executors . newSingleThreadExecutor () ,
81+ s . httpProps , sharedExecutor ,
12082 ctx .getBaseLogger (), null );
12183 };
12284
12385 ComponentConfigurer <Synchronizer > backgroundPollingSynchronizer = ctx -> {
124- ClientContextImpl impl = ClientContextImpl .get (ctx );
125- TransactionalDataStore store = impl .getTransactionalDataStore ();
126- SelectorSource selectorSource = store != null
127- ? new SelectorSourceFacade (store )
128- : () -> com .launchdarkly .sdk .fdv2 .Selector .EMPTY ;
129- URI pollingBase = StandardEndpoints .selectBaseUri (
130- ctx .getServiceEndpoints ().getPollingBaseUri (),
131- StandardEndpoints .DEFAULT_POLLING_BASE_URI ,
132- "polling" , ctx .getBaseLogger ());
133- HttpProperties httpProps = LDUtil .makeHttpProperties (ctx );
134- FDv2Requestor requestor = new DefaultFDv2Requestor (
135- ctx .getEvaluationContext (), pollingBase ,
136- StandardEndpoints .FDV2_POLLING_REQUEST_GET_BASE_PATH ,
137- StandardEndpoints .FDV2_POLLING_REQUEST_REPORT_BASE_PATH ,
138- httpProps , ctx .getHttp ().isUseReport (),
139- ctx .isEvaluationReasons (), null , ctx .getBaseLogger ());
140- ScheduledExecutorService exec = Executors .newScheduledThreadPool (1 );
141- return new FDv2PollingSynchronizer (requestor , selectorSource , exec ,
86+ DataSourceSetup s = new DataSourceSetup (ctx );
87+ FDv2Requestor requestor = makePollingRequestor (ctx , s .httpProps );
88+ return new FDv2PollingSynchronizer (requestor , s .selectorSource ,
89+ sharedExecutor ,
14290 0 , LDConfig .DEFAULT_BACKGROUND_POLL_INTERVAL_MILLIS , ctx .getBaseLogger ());
14391 };
14492
@@ -207,6 +155,18 @@ ModeDefinition getModeDefinition(@NonNull ConnectionMode mode) {
207155
208156 @ Override
209157 public DataSource build (ClientContext clientContext ) {
158+ if (sharedExecutor == null ) {
159+ // Pool size 4: supports the FDv2DataSource main loop (1), an active synchronizer
160+ // such as the streaming event loop (1), FDv2DataSource condition timers for
161+ // fallback/recovery (up to 2 short-lived scheduled tasks). Only one mode is active
162+ // at a time (teardown/rebuild), so this pool serves all components.
163+ sharedExecutor = Executors .newScheduledThreadPool (4 );
164+ }
165+
166+ if (modeTable == null ) {
167+ modeTable = Collections .unmodifiableMap (makeDefaultModeTable ());
168+ }
169+
210170 ConnectionMode mode = activeMode != null ? activeMode : startingMode ;
211171
212172 ModeDefinition modeDef = modeTable .get (mode );
@@ -223,10 +183,6 @@ public DataSource build(ClientContext clientContext) {
223183 "FDv2DataSource requires a DataSourceUpdateSinkV2 implementation" );
224184 }
225185
226- if (sharedExecutor == null ) {
227- sharedExecutor = Executors .newScheduledThreadPool (2 );
228- }
229-
230186 List <FDv2DataSource .DataSourceFactory <Initializer >> initFactories =
231187 includeInitializers ? resolved .getInitializerFactories () : Collections .<FDv2DataSource .DataSourceFactory <Initializer >>emptyList ();
232188
@@ -243,6 +199,14 @@ public DataSource build(ClientContext clientContext) {
243199 );
244200 }
245201
202+ @ Override
203+ public void close () {
204+ if (sharedExecutor != null ) {
205+ sharedExecutor .shutdownNow ();
206+ sharedExecutor = null ;
207+ }
208+ }
209+
246210 private static ResolvedModeDefinition resolve (
247211 ModeDefinition def , ClientContext clientContext
248212 ) {
@@ -256,4 +220,43 @@ private static ResolvedModeDefinition resolve(
256220 }
257221 return new ResolvedModeDefinition (initFactories , syncFactories );
258222 }
223+
224+ /**
225+ * Holds the shared infrastructure needed by all FDv2 data source components:
226+ * a {@link SelectorSource} backed by the {@link TransactionalDataStore} (or an empty
227+ * fallback if none is configured), and the {@link HttpProperties} for the current
228+ * client configuration. Polling-specific setup (the {@link FDv2Requestor}) is built
229+ * separately via {@link #makePollingRequestor}.
230+ */
231+ private static final class DataSourceSetup {
232+ final SelectorSource selectorSource ;
233+ final HttpProperties httpProps ;
234+
235+ DataSourceSetup (ClientContext ctx ) {
236+ TransactionalDataStore store = ClientContextImpl .get (ctx ).getTransactionalDataStore ();
237+ this .selectorSource = store != null
238+ ? new SelectorSourceFacade (store )
239+ : () -> com .launchdarkly .sdk .fdv2 .Selector .EMPTY ;
240+ this .httpProps = LDUtil .makeHttpProperties (ctx );
241+ }
242+ }
243+
244+ /**
245+ * Builds a {@link DefaultFDv2Requestor} configured for polling endpoints. Used
246+ * directly by polling components and as the fallback requestor for the streaming
247+ * synchronizer (which needs it for internal polling fallback when the stream cannot
248+ * be established).
249+ */
250+ private static FDv2Requestor makePollingRequestor (ClientContext ctx , HttpProperties httpProps ) {
251+ URI pollingBase = StandardEndpoints .selectBaseUri (
252+ ctx .getServiceEndpoints ().getPollingBaseUri (),
253+ StandardEndpoints .DEFAULT_POLLING_BASE_URI ,
254+ "polling" , ctx .getBaseLogger ());
255+ return new DefaultFDv2Requestor (
256+ ctx .getEvaluationContext (), pollingBase ,
257+ StandardEndpoints .FDV2_POLLING_REQUEST_GET_BASE_PATH ,
258+ StandardEndpoints .FDV2_POLLING_REQUEST_REPORT_BASE_PATH ,
259+ httpProps , ctx .getHttp ().isUseReport (),
260+ ctx .isEvaluationReasons (), null , ctx .getBaseLogger ());
261+ }
259262}
0 commit comments