It seems beginnig_offsets() can cause AttributeError in 2.3.0.
I was not able to repeat this robustly, but it came immediately after creating topic with many partitions, and then fetching offsets ie. Kafka was not fully ready with the topic.
I tried to follow the logic of _fetch_offsets_by_times and it seems that future.exception.invalid_metadata is accessed even with successful futures ie. whole exception is None here. Here is a full stack trace from one of my internal test run.
timestamps = {TopicPartition(topic='d94afc56', partition=0): -2}
timeout_ms = 305000
def _fetch_offsets_by_times(self, timestamps, timeout_ms=None):
if not timestamps:
return {}
timer = Timer(timeout_ms, "Failed to get offsets by timestamps in %s ms" % (timeout_ms,))
timestamps = copy.copy(timestamps)
fetched_offsets = dict()
while True:
if not timestamps:
return {}
future = self._send_list_offsets_requests(timestamps)
self._client.poll(future=future, timeout_ms=timer.timeout_ms)
# Timeout w/o future completion
if not future.is_done:
break
if future.succeeded():
fetched_offsets.update(future.value[0])
if not future.value[1]:
return fetched_offsets
timestamps = {tp: timestamps[tp] for tp in future.value[1]}
elif not future.retriable():
raise future.exception # pylint: disable-msg=raising-bad-type
> if future.exception.invalid_metadata or self._client.cluster.need_update:
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E AttributeError: 'NoneType' object has no attribute 'invalid_metadata'
fetched_offsets = {}
future = <kafka.future.Future object at 0x7f7264fa72f0>
refresh_future = <kafka.future.Future object at 0x7f72650776b0>
self = <kafka.consumer.fetcher.Fetcher object at 0x7f7264f375f0>
timeout_ms = 305000
timer = <kafka.util.Timer object at 0x7f72654012c0>
timestamps = {TopicPartition(topic='d94afc56', partition=0): -2}
It seems
beginnig_offsets()can causeAttributeErrorin 2.3.0.I was not able to repeat this robustly, but it came immediately after creating topic with many partitions, and then fetching offsets ie. Kafka was not fully ready with the topic.
I tried to follow the logic of
_fetch_offsets_by_timesand it seems thatfuture.exception.invalid_metadatais accessed even with successful futures ie. whole exception isNonehere. Here is a full stack trace from one of my internal test run.